這篇文章主要介紹“Hadoop中RPC機(jī)制分析Server端”,在日常操作中,相信很多人在Hadoop中RPC機(jī)制分析Server端問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Hadoop中RPC機(jī)制分析Server端”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新新互聯(lián),憑借10多年的成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)經(jīng)驗(yàn),本著真心·誠(chéng)心服務(wù)的企業(yè)理念服務(wù)于成都中小企業(yè)設(shè)計(jì)網(wǎng)站有1000多家案例。做網(wǎng)站建設(shè),選創(chuàng)新互聯(lián)。
1. Server.Listener
RPC Client 端的 RPC 請(qǐng)求發(fā)送到 Server 端后, 首先由 Server.Listener 接收
Server.Listener 類繼承自 Thread 類, 監(jiān)聽了 OP_READ 和 OP_ACCEPT 事件
Server.Listener 接收 RPC 請(qǐng)求, 在 Server.Listener.doRead() 方法中讀取數(shù)據(jù), 在 doRead() 方法中又調(diào)用了Server.Connection.readAndProcess() 方法,
最后會(huì)調(diào)用 Server.Connection.processRpcRequest() 方法, 源碼如下:
private void processRpcRequest(RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException { ... Writable rpcRequest; // 從成員變量dis中反序列化出Client端發(fā)送來的RPC請(qǐng)求( WritableRpcEngine.Invocation對(duì)象 ) try { //Read the rpc request rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf); rpcRequest.readFields(dis); } catch (Throwable t) { // includes runtime exception from newInstance ... } // 構(gòu)造Server端Server.Call實(shí)例對(duì)象 Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header .getClientId().toByteArray()); // 將Server.Call實(shí)例對(duì)象放入調(diào)用隊(duì)列中 callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count }
調(diào)用隊(duì)列 callQueue 是 Server 的成員變量, Server.Listener 和 Server.Handler 是典型的生產(chǎn)者, 消費(fèi)者模型,
Server.Listener( 生產(chǎn)者 )的doRead()方法最終調(diào)用Server.Connection.processRpcRequest() 方法,
而Server.Handler( 消費(fèi)者 )處理RPC請(qǐng)求
2. Server.Handler 繼承 Thread 類, 其主要工作是處理 callQueue 中的調(diào)用, 都在 run() 方法中完成. 在 run() 的主循環(huán)中, 每次處理一個(gè)從 callQueue 中出隊(duì)的請(qǐng)求, Server.call() 是一個(gè)抽象方法, 實(shí)際是調(diào)用了 RPC.Server.call()方法, 最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用
/** Handles queued calls . */ private class Handler extends Thread { ... @Override public void run() { ... ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { ... final Call call = callQueue.take(); // 獲取一個(gè)RPC調(diào)用請(qǐng)求 ... Writable value = null; value = call.connection.user.doAs(new PrivilegedExceptionAction() { @Override public Writable run() throws Exception { // 調(diào)用RPC.Server.call()方法 // call.rpcKind : RPC調(diào)用請(qǐng)求的類型, 一般為Writable // call.connection.protocolName : RPC協(xié)議接口的類名 // call.rpcRequest : Invocation實(shí)例對(duì)象, 包括方法名, 參數(shù)列表, 參數(shù)列表的Class對(duì)象數(shù)組 // call.timestamp : 調(diào)用時(shí)間戳 return call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); } }); } ... } }
RPC.Server.call() 方法如下:
@Override public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); }
最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用, 代碼如下:
@Override public Writable call(org.apache.hadoop.ipc.RPC.Server server, String protocolName, Writable rpcRequest, long receivedTime) throws IOException, RPC.VersionMismatch { Invocation call = (Invocation)rpcRequest; // 將RPC請(qǐng)求強(qiáng)制轉(zhuǎn)成WritableRpcEngine.Invocation對(duì)象 ... long clientVersion = call.getProtocolVersion(); final String protoName; ProtoClassProtoImpl protocolImpl; // Server端RPC協(xié)議接口的實(shí)現(xiàn)類的實(shí)例對(duì)象 ... // Invoke the protocol method try { ... // 獲取RPC請(qǐng)求中調(diào)用的方法對(duì)象Method Method method = protocolImpl.protocolClass.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); ... // 在Server端RPC協(xié)議接口的實(shí)現(xiàn)類的實(shí)例對(duì)象 protocolImpl 上調(diào)用具體的方法 Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters()); ... // 調(diào)用正常結(jié)束, 返回調(diào)用結(jié)果 return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) { // 調(diào)用出現(xiàn)異常, 用IOException包裝異常, 最后拋出該異常 Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException)target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; } } catch (Throwable e) { ... } } }
在 WritableRpcEngine.call() 方法中, 傳入的 rpcRequest 會(huì)被強(qiáng)制轉(zhuǎn)換成 WritableRpcEngine.Invocation 類型的對(duì)象 call , 并通過 call 這個(gè)對(duì)象包含的方法名(getMethodName()方法)和參數(shù)列表的 Class對(duì)象數(shù)組(getParameterClasses())獲取 Method 對(duì)象, 最終通過 Method 對(duì)象的invoke() 方法, 調(diào)用實(shí)現(xiàn)類的實(shí)例對(duì)象 protocolImpl 上的方法, 完成 Hadoop 的遠(yuǎn)程過程調(diào)用
好了, 現(xiàn)在 Server 端的具體方法已經(jīng)被調(diào)用了, 調(diào)用結(jié)果分兩種情況:
1) 調(diào)用正常結(jié)束, 則將方法的返回值和調(diào)用結(jié)果封裝成一個(gè) ObjectWritable 類型的對(duì)象, 并返回
2) 調(diào)用出現(xiàn)異常, 拋出 IOException 類型的異常
3. Server.Responder
這個(gè)類的功能: 發(fā)送 Hadoop 遠(yuǎn)程過程調(diào)用的應(yīng)答給 Client 端, Server.Responder 類繼承自 Thread 類, 監(jiān)聽了 OP_WRITE 事件, 即通道可寫. 具體細(xì)節(jié)寫不下去了
到此,關(guān)于“Hadoop中RPC機(jī)制分析Server端”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!