真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

【深入淺出 Yarn 架構(gòu)與實(shí)現(xiàn)】2-2 Yarn 基礎(chǔ)庫(kù)

RPC(Remote Procedure Call) 是 Hadoop 服務(wù)通信的關(guān)鍵庫(kù),支撐上層分布式環(huán)境下復(fù)雜的進(jìn)程間(Inter-Process Communication, IPC)通信邏輯,是分布式系統(tǒng)的基礎(chǔ)。允許運(yùn)行于一臺(tái)計(jì)算機(jī)上的程序像調(diào)用本地方法一樣,調(diào)用另一臺(tái)計(jì)算機(jī)的子程序。
由于 RPC 服務(wù)整體知識(shí)較多,本節(jié)僅針對(duì)對(duì) Yarn RPC 進(jìn)行簡(jiǎn)略介紹,詳細(xì)內(nèi)容會(huì)后續(xù)開(kāi)專欄介紹。

創(chuàng)新互聯(lián)秉承實(shí)現(xiàn)全網(wǎng)價(jià)值營(yíng)銷的理念,以專業(yè)定制企業(yè)官網(wǎng),成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),小程序開(kāi)發(fā),網(wǎng)頁(yè)設(shè)計(jì)制作,手機(jī)網(wǎng)站制作,全網(wǎng)整合營(yíng)銷推廣幫助傳統(tǒng)企業(yè)實(shí)現(xiàn)“互聯(lián)網(wǎng)+”轉(zhuǎn)型升級(jí)專業(yè)定制企業(yè)官網(wǎng),公司注重人才、技術(shù)和管理,匯聚了一批優(yōu)秀的互聯(lián)網(wǎng)技術(shù)人才,對(duì)客戶都以感恩的心態(tài)奉獻(xiàn)自己的專業(yè)和所長(zhǎng)。

一、RPC 通信模型介紹

為什么會(huì)有 RPC 框架?
在分布式或微服務(wù)情境下,會(huì)有大量的服務(wù)間交互,如果用傳統(tǒng)的 HTTP 協(xié)議端口來(lái)通信,需要耗費(fèi)大量時(shí)間處理網(wǎng)絡(luò)數(shù)據(jù)交換上,還要考慮編解碼等問(wèn)題。如下圖所示。

  • 客戶端通過(guò) RPC 框架的動(dòng)態(tài)代理得到一個(gè)代理類實(shí)例,稱為 Stub(樁)
  • 客戶端調(diào)用接口方法(實(shí)際是 Stub 對(duì)應(yīng)的方法),Stub 會(huì)構(gòu)造一個(gè)請(qǐng)求,包括函數(shù)名和參數(shù)
  • 服務(wù)端收到這個(gè)請(qǐng)求后,先將服務(wù)名(函數(shù))解析出來(lái),查找是否有對(duì)應(yīng)的服務(wù)提供者
  • 服務(wù)端找到對(duì)應(yīng)的實(shí)現(xiàn)類后,會(huì)傳入?yún)?shù)調(diào)用
  • 服務(wù)端 RPC 框架得到返回結(jié)果后,再進(jìn)行封裝返回給客戶端
  • 客戶端的 Stub 收到返回值后,進(jìn)行解析,返回給調(diào)用者,完成 RPC 調(diào)用。

二、Hadoop RPC 介紹

一)簡(jiǎn)介

Hadoop RPC 是 Hadoop 自己實(shí)現(xiàn)的一個(gè) RPC 框架,主要有以下幾個(gè)特點(diǎn):

  • 透明性:像調(diào)用本地方法一樣調(diào)用遠(yuǎn)程方法。
  • 高性能:Hadoop 各個(gè)系統(tǒng)均采用 Master/Slave 結(jié)構(gòu),Master 是一個(gè) RPC Server 用于處理各個(gè) Slave 節(jié)點(diǎn)發(fā)送的請(qǐng)求,需要有高性能。
  • 可控性:由于 JDK 中的 RPC 框架 RMI 重量級(jí)過(guò)大,且封裝度太高,不方便控制和修改。因此實(shí)現(xiàn)了自己的 RPC 框架,以保證輕量級(jí)、高性能、可控性。

框架原理和整體執(zhí)行流程與第一節(jié)介紹的 RPC 框架一致,感興趣可深入源碼進(jìn)行了解。

二)總體架構(gòu)

Hadoop RPC 架構(gòu)底層依靠 Java 的 nio、反射、動(dòng)態(tài)代理等功能實(shí)現(xiàn)「客戶端 - 服務(wù)器(C/S)」通信模型。
上層封裝供程序調(diào)用的 RPC 接口。

三、案例 demo

下面兩個(gè)案例的 demo 已上傳至 github。有幫助的話點(diǎn)個(gè)??。
https://github.com/Simon-Ace/hadoop_rpc_demo

一)RPC Writable 案例實(shí)現(xiàn)

1、新建一個(gè) maven 工程,添加依賴


    org.apache.hadoop
    hadoop-common
    2.8.5

2、定義 RPC 協(xié)議

public interface BusinessProtocol {
    void mkdir(String path);
    String getName(String name);
    long versionID = 345043000L;
}

3、定義協(xié)議實(shí)現(xiàn)

public class BusinessIMPL implements BusinessProtocol {
    @Override
    public void mkdir(String path) {
        System.out.println("成功創(chuàng)建了文件夾 :" + path);
    }

    @Override
    public String getName(String name) {
        System.out.println("成功打了招呼: hello :" + name);
        return "bigdata";
    }
}

4、通過(guò) Hadoop RPC 構(gòu)建一個(gè) RPC 服務(wù)端

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

public class MyServer {
    public static void main(String[] args) {
        try {
            // 構(gòu)建一個(gè) RPC server 端,提供了一個(gè) BussinessProtocol 協(xié)議的 BusinessIMPL 服務(wù)實(shí)現(xiàn)
            RPC.Server server = new RPC.Builder(new Configuration())
                    .setProtocol(BusinessProtocol.class)
                    .setInstance(new BusinessIMPL())
                    .setBindAddress("localhost")
                    .setPort(6789)
                    .build();

            server.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

5、構(gòu)建一個(gè) RPC 客戶端

import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.net.InetSocketAddress;

public class MyClient {
    public static void main(String[] args) {
        try {
        	// 獲取代理類實(shí)例,也就是 Stub
            BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,
                    new InetSocketAddress("localhost", 6789), new Configuration());

            // 通過(guò) Stub 發(fā)送請(qǐng)求,實(shí)際使用就像調(diào)用本地方法一樣
            proxy.mkdir("/tmp/ABC");
            String res = proxy.getName("Simon");
            System.out.println("從 RPC 服務(wù)端接收到的返回值:" + res);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

6、測(cè)試,先啟動(dòng)服務(wù)端,再啟動(dòng)客戶端
服務(wù)端輸出

成功創(chuàng)建了文件夾 :/tmp/ABC
成功打了招呼: hello :Simon

客戶端輸出

從 RPC 服務(wù)端接收到的返回值:bigdata

二)RPC Protobuf 案例實(shí)現(xiàn)

項(xiàng)目結(jié)構(gòu)如下

對(duì) proto 文件格式不熟悉的同學(xué),參考上一篇文章《2-1 Yarn 基礎(chǔ)庫(kù)概述》

MyResourceTrackerMessage.proto 定義數(shù)據(jù)格式

syntax = "proto3";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerMessageProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

message MyRegisterNodeManagerRequestProto {
    string hostname = 1;
    int32 cpu = 2;
    int32 memory = 3;
}

message MyRegisterNodeManagerResponseProto {
    string flag = 1;
}

MyResourceTracker.proto 定義 rpc 接口

syntax = "proto3";

import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

service MyResourceTrackerService {
    rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}

2、對(duì) proto 文件編譯,生成 java 類

# 在項(xiàng)目根目錄執(zhí)行,路徑按照自己的進(jìn)行修改
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto

protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto

3、定義調(diào)用方法接口 MyResourceTracker

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;

public interface MyResourceTracker {
    MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}

4、對(duì)調(diào)用方法接口的實(shí)現(xiàn)(服務(wù)端)

import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;

public class MyResourceTrackerImpl implements MyResourceTracker {
    @Override
    public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
            MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {

        // 輸出注冊(cè)的消息
        String hostname = request.getHostname();
        int cpu = request.getCpu();
        int memory = request.getMemory();
        System.out.println("NodeManager 的注冊(cè)消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);

        // 省略處理邏輯
        // 構(gòu)建一個(gè)響應(yīng)對(duì)象,用于返回
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =
                MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();
        // 直接返回 True
        builder.setFlag("true");
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();
        return response;
    }
}

5、編寫(xiě) proto 的協(xié)議接口

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import org.apache.hadoop.ipc.ProtocolInfo;

@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {
}

6、編寫(xiě) proto 的協(xié)議接口實(shí)現(xiàn)(服務(wù)端)

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;

public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {
    final private MyResourceTracker server;

    public MyResourceTrackerServerSidePB(MyResourceTracker server) {
        this.server = server;
    }

    @Override
    public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
            RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {
        try {
            return server.registerNodeManager(request);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

7、RPC Server 的實(shí)現(xiàn)

import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;

import java.io.IOException;

public class ProtobufRpcServer {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();

        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // 構(gòu)建 Rpc Server
        RPC.Server server = new RPC.Builder(conf)
                .setProtocol(MyResourceTrackerPB.class)
                .setInstance(MyResourceTrackerProto.MyResourceTrackerService
                        .newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))
                .setBindAddress("localhost")
                .setPort(9998)
                .setNumHandlers(1)
                .setVerbose(true)
                .build();

        // Rpc Server 啟動(dòng)
        server.start();
    }
}

8、RPC Client 的實(shí)現(xiàn)

import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;

import java.io.IOException;
import java.net.InetSocketAddress;

public class ProtobufRpcClient {
    public static void main(String[] args) throws IOException {
        // 設(shè)置 RPC 引擎為 ProtobufRpcEngine
        Configuration conf = new Configuration();
        String hostname = "localhost";
        int port = 9998;
        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // 獲取代理
        MyResourceTrackerPB protocolProxy = RPC
                .getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);

        // 構(gòu)建請(qǐng)求對(duì)象
        MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =
                MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();
        MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =
                builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();

        // 發(fā)送 RPC 請(qǐng)求,獲取響應(yīng)
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;
        try {
            response = protocolProxy.registerNodeManager(null, bigdata02);
        } catch (ServiceException e) {
            e.printStackTrace();
        }

        // 處理響應(yīng)
        String flag = response.getFlag();
        System.out.println("最終注冊(cè)結(jié)果: flag = " + flag);
    }
}

9、測(cè)試
先啟動(dòng)服務(wù)端,在啟動(dòng)客戶端。

四、總結(jié)

本節(jié)介紹了 Hadoop 底層通信庫(kù) RPC。首先介紹了 RPC 的框架和原理,之后對(duì) Hadoop 自己實(shí)現(xiàn)的 RPC 進(jìn)行了介紹,并給出了兩個(gè) demo 實(shí)踐。
強(qiáng)烈建議了解基礎(chǔ)知識(shí)后,跟著 demo 實(shí)現(xiàn)一個(gè)案例出來(lái),可以更好的幫助你理解。
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo


參考文章:
YARN-RPC網(wǎng)絡(luò)通信架構(gòu)設(shè)計(jì)
YARN-高并發(fā)RPC源碼實(shí)現(xiàn)
Hadoop3.2.1 【 HDFS 】源碼分析 : RPC原理 [八] Client端實(shí)現(xiàn)&源碼
Hadoop RPC機(jī)制詳解
Hadoop2源碼分析-RPC探索實(shí)戰(zhàn)
《Hadoop 技術(shù)內(nèi)幕 - 深入解析 Yarn 結(jié)構(gòu)設(shè)計(jì)與實(shí)現(xiàn)原理》3.3 節(jié)


分享題目:【深入淺出 Yarn 架構(gòu)與實(shí)現(xiàn)】2-2 Yarn 基礎(chǔ)庫(kù)
網(wǎng)站路徑:http://weahome.cn/article/dsoioep.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部