真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Hadoop中RPC機(jī)制分析Server端

這篇文章主要介紹“Hadoop中RPC機(jī)制分析Server端”,在日常操作中,相信很多人在Hadoop中RPC機(jī)制分析Server端問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Hadoop中RPC機(jī)制分析Server端”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

創(chuàng)新新互聯(lián),憑借10多年的成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)經(jīng)驗(yàn),本著真心·誠(chéng)心服務(wù)的企業(yè)理念服務(wù)于成都中小企業(yè)設(shè)計(jì)網(wǎng)站有1000多家案例。做網(wǎng)站建設(shè),選創(chuàng)新互聯(lián)。

1. Server.Listener

RPC Client 端的 RPC 請(qǐng)求發(fā)送到 Server 端后, 首先由 Server.Listener 接收

Server.Listener 類繼承自 Thread 類, 監(jiān)聽了 OP_READ 和 OP_ACCEPT 事件

Server.Listener 接收 RPC 請(qǐng)求, 在 Server.Listener.doRead() 方法中讀取數(shù)據(jù), 在 doRead() 方法中又調(diào)用了Server.Connection.readAndProcess() 方法, 

最后會(huì)調(diào)用 Server.Connection.processRpcRequest() 方法, 源碼如下:

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
      ...
      Writable rpcRequest;
      // 從成員變量dis中反序列化出Client端發(fā)送來的RPC請(qǐng)求( WritableRpcEngine.Invocation對(duì)象 )
      try { //Read the rpc request
        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
        rpcRequest.readFields(dis);
      } catch (Throwable t) { // includes runtime exception from newInstance
        ...
      }
      // 構(gòu)造Server端Server.Call實(shí)例對(duì)象
      Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
              .getClientId().toByteArray());
      // 將Server.Call實(shí)例對(duì)象放入調(diào)用隊(duì)列中
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

調(diào)用隊(duì)列 callQueue 是 Server 的成員變量, Server.Listener 和 Server.Handler 是典型的生產(chǎn)者, 消費(fèi)者模型, 

Server.Listener( 生產(chǎn)者 )的doRead()方法最終調(diào)用Server.Connection.processRpcRequest() 方法, 

而Server.Handler( 消費(fèi)者 )處理RPC請(qǐng)求

2. Server.Handler 繼承 Thread 類, 其主要工作是處理 callQueue 中的調(diào)用, 都在 run() 方法中完成. 在 run() 的主循環(huán)中, 每次處理一個(gè)從 callQueue 中出隊(duì)的請(qǐng)求, Server.call() 是一個(gè)抽象方法, 實(shí)際是調(diào)用了 RPC.Server.call()方法, 最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用

/** Handles queued calls . */
  private class Handler extends Thread {
    ...
    @Override
    public void run() {
      ...
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
          ...
          final Call call = callQueue.take();    // 獲取一個(gè)RPC調(diào)用請(qǐng)求
          ...
          Writable value = null;

          value = call.connection.user.doAs(new PrivilegedExceptionAction() {
                     @Override
                     public Writable run() throws Exception {
                       // 調(diào)用RPC.Server.call()方法
                       // call.rpcKind : RPC調(diào)用請(qǐng)求的類型, 一般為Writable
                       // call.connection.protocolName : RPC協(xié)議接口的類名
                       // call.rpcRequest : Invocation實(shí)例對(duì)象, 包括方法名, 參數(shù)列表, 參數(shù)列表的Class對(duì)象數(shù)組
                       // call.timestamp : 調(diào)用時(shí)間戳
                       return call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);
                     }
                   });
      }
      ...
    }
}

RPC.Server.call() 方法如下:

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
	Writable rpcRequest, long receiveTime) throws Exception {
  return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
	  receiveTime);
}

最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用, 代碼如下:

@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
	  String protocolName, Writable rpcRequest, long receivedTime)
	  throws IOException, RPC.VersionMismatch {

	Invocation call = (Invocation)rpcRequest;	// 將RPC請(qǐng)求強(qiáng)制轉(zhuǎn)成WritableRpcEngine.Invocation對(duì)象
	...

	long clientVersion = call.getProtocolVersion();
	final String protoName;
	ProtoClassProtoImpl protocolImpl;	// Server端RPC協(xié)議接口的實(shí)現(xiàn)類的實(shí)例對(duì)象
	...
	// Invoke the protocol method
    try {
	  ...
	  // 獲取RPC請(qǐng)求中調(diào)用的方法對(duì)象Method
	  Method method = 
		  protocolImpl.protocolClass.getMethod(call.getMethodName(),
		  call.getParameterClasses());
	  method.setAccessible(true);
	  ...
	  // 在Server端RPC協(xié)議接口的實(shí)現(xiàn)類的實(shí)例對(duì)象 protocolImpl 上調(diào)用具體的方法
	  Object value = 
		  method.invoke(protocolImpl.protocolImpl, call.getParameters());
	  ...
	  // 調(diào)用正常結(jié)束, 返回調(diào)用結(jié)果
	  return new ObjectWritable(method.getReturnType(), value);

	} catch (InvocationTargetException e) {	// 調(diào)用出現(xiàn)異常, 用IOException包裝異常, 最后拋出該異常
	  Throwable target = e.getTargetException();
	  if (target instanceof IOException) {
		throw (IOException)target;
	  } else {
		IOException ioe = new IOException(target.toString());
		ioe.setStackTrace(target.getStackTrace());
		throw ioe;
	  }
	} catch (Throwable e) {
	  ...
	}
  }
}

在 WritableRpcEngine.call() 方法中, 傳入的 rpcRequest 會(huì)被強(qiáng)制轉(zhuǎn)換成 WritableRpcEngine.Invocation 類型的對(duì)象 call , 并通過 call 這個(gè)對(duì)象包含的方法名(getMethodName()方法)和參數(shù)列表的 Class對(duì)象數(shù)組(getParameterClasses())獲取 Method 對(duì)象, 最終通過 Method 對(duì)象的invoke() 方法, 調(diào)用實(shí)現(xiàn)類的實(shí)例對(duì)象 protocolImpl 上的方法, 完成 Hadoop 的遠(yuǎn)程過程調(diào)用

好了, 現(xiàn)在 Server 端的具體方法已經(jīng)被調(diào)用了, 調(diào)用結(jié)果分兩種情況:

    1) 調(diào)用正常結(jié)束, 則將方法的返回值和調(diào)用結(jié)果封裝成一個(gè) ObjectWritable 類型的對(duì)象, 并返回

    2) 調(diào)用出現(xiàn)異常, 拋出 IOException 類型的異常

3. Server.Responder

這個(gè)類的功能: 發(fā)送 Hadoop 遠(yuǎn)程過程調(diào)用的應(yīng)答給 Client 端, Server.Responder 類繼承自 Thread 類, 監(jiān)聽了 OP_WRITE 事件, 即通道可寫. 具體細(xì)節(jié)寫不下去了

到此,關(guān)于“Hadoop中RPC機(jī)制分析Server端”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!


標(biāo)題名稱:Hadoop中RPC機(jī)制分析Server端
分享地址:http://weahome.cn/article/giogoo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部