本篇內(nèi)容介紹了“Flink的rpc組件有哪些”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
新洲網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,新洲網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為新洲數(shù)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設(shè)公司要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的新洲做網(wǎng)站的公司定做!
Flink采用akka來實(shí)現(xiàn)rpc服務(wù)。其中有這幾個(gè)重要組件:RpcServer、RpcService、AkkaRpcActor、RpcEndpoint。
這幾個(gè)組件作用如下:
(1)RpcEndpoint
提供具體rpc服務(wù)。主要實(shí)現(xiàn)有 ResourceManager 和 TaskExecutor,
①YarnResourceManager為AM容器中啟動(dòng)的服務(wù),持有ResourceManager和NodeManager的客戶端
②TaskExecutor為NM容器中啟動(dòng)taskmanager的類
(2)AkkaRpcService
提供rpc的服務(wù)類。該類內(nèi)部持有ActorSystem實(shí)例和Supervisor實(shí)例。Supervisor中含有SupervisorActor實(shí)例,SupervisorActor用于創(chuàng)建其他Actor,可以理解為根Actor。RpcEndpoint在構(gòu)造時(shí),通過AkkaRpcService的startServer()方法,獲取RpcServer實(shí)例。
publicRpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint); final ActorRef actorRef = actorRegistration.getActorRef(); final CompletableFuture actorTerminationFuture = actorRegistration.getTerminationFuture(); LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; Option host = actorRef.path().address().host(); if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); } Set > implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); final InvocationHandler akkaInvocationHandler; if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint>) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class>[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }
先創(chuàng)建RpcEndpoint對(duì)應(yīng)的ActorRef,然后創(chuàng)建RpcServer的代理類AkkaInvocationHandler或FencedAkkaInvocationHandler,并將ActorRef實(shí)例賦給其成員屬性 rpcEndpoint:ActorRef。這里的ActorRef即為AkkaRpcActor或FencedAkkaRpcActor實(shí)例
(3)RpcServer
用來啟動(dòng)rpc服務(wù),通常不直接調(diào)用,而是調(diào)用其動(dòng)態(tài)代理類AkkaInvocationHandler或FencedAkkaInvocationHandler的start()方法
(4)AkkaInvocationHandler或FencedAkkaInvocationHandler
RpcServer的動(dòng)態(tài)代理類。start()方法用來啟動(dòng)服務(wù):
public void start() { rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender()); }
這里向rpcEndpoint,即AkkaRpcActor或FencedAkkaRpcActor實(shí)例發(fā)送一條ControlMessages.START消息
(5)AkkaRpcActor
響應(yīng)rpc消息的actor。其createReceive():
public Receive createReceive() { return ReceiveBuilder.create() .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) .match(ControlMessages.class, this::handleControlMessage) .matchAny(this::handleMessage) .build(); }
當(dāng)消息為ControlMessages.START,調(diào)用StoppedState 的start()方法
public State start(AkkaRpcActor> akkaRpcActor) { akkaRpcActor.mainThreadValidator.enterMainThread(); try { akkaRpcActor.rpcEndpoint.internalCallOnStart(); } catch (Throwable throwable) { akkaRpcActor.stop( RpcEndpointTerminationResult.failure( new AkkaRpcException( String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()), throwable))); } finally { akkaRpcActor.mainThreadValidator.exitMainThread(); } return StartedState.STARTED; }
在start()方法中調(diào)用具體提供服務(wù)的RpcEndpoint實(shí)現(xiàn)類internalCallOnStart()方法來啟動(dòng)服務(wù)。internalCallOnStart()方法中會(huì)調(diào)用onStart()方法。
“Flink的rpc組件有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!