真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink的rpc組件有哪些

本篇內(nèi)容介紹了“Flink的rpc組件有哪些”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

新洲網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,新洲網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為新洲數(shù)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設(shè)公司要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的新洲做網(wǎng)站的公司定做!

Flink采用akka來實(shí)現(xiàn)rpc服務(wù)。其中有這幾個(gè)重要組件:RpcServer、RpcService、AkkaRpcActor、RpcEndpoint。

Flink的rpc組件有哪些

這幾個(gè)組件作用如下:

(1)RpcEndpoint

提供具體rpc服務(wù)。主要實(shí)現(xiàn)有 ResourceManager 和 TaskExecutor,

①YarnResourceManager為AM容器中啟動(dòng)的服務(wù),持有ResourceManager和NodeManager的客戶端

②TaskExecutor為NM容器中啟動(dòng)taskmanager的類

(2)AkkaRpcService

提供rpc的服務(wù)類。該類內(nèi)部持有ActorSystem實(shí)例和Supervisor實(shí)例。Supervisor中含有SupervisorActor實(shí)例,SupervisorActor用于創(chuàng)建其他Actor,可以理解為根Actor。RpcEndpoint在構(gòu)造時(shí),通過AkkaRpcService的startServer()方法,獲取RpcServer實(shí)例。

	public  RpcServer startServer(C rpcEndpoint) {
		checkNotNull(rpcEndpoint, "rpc endpoint");

		final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
		final ActorRef actorRef = actorRegistration.getActorRef();
		final CompletableFuture actorTerminationFuture = actorRegistration.getTerminationFuture();

		LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

		final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
		final String hostname;
		Option host = actorRef.path().address().host();
		if (host.isEmpty()) {
			hostname = "localhost";
		} else {
			hostname = host.get();
		}

		Set> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

		implementedRpcGateways.add(RpcServer.class);
		implementedRpcGateways.add(AkkaBasedEndpoint.class);

		final InvocationHandler akkaInvocationHandler;

		if (rpcEndpoint instanceof FencedRpcEndpoint) {
			// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
			akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				((FencedRpcEndpoint) rpcEndpoint)::getFencingToken,
				captureAskCallstacks);

			implementedRpcGateways.add(FencedMainThreadExecutable.class);
		} else {
			akkaInvocationHandler = new AkkaInvocationHandler(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				captureAskCallstacks);
		}

		// Rather than using the System ClassLoader directly, we derive the ClassLoader
		// from this class . That works better in cases where Flink runs embedded and all Flink
		// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
		ClassLoader classLoader = getClass().getClassLoader();

		@SuppressWarnings("unchecked")
		RpcServer server = (RpcServer) Proxy.newProxyInstance(
			classLoader,
			implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]),
			akkaInvocationHandler);

		return server;
	}

先創(chuàng)建RpcEndpoint對(duì)應(yīng)的ActorRef,然后創(chuàng)建RpcServer的代理類AkkaInvocationHandler或FencedAkkaInvocationHandler,并將ActorRef實(shí)例賦給其成員屬性 rpcEndpoint:ActorRef。這里的ActorRef即為AkkaRpcActor或FencedAkkaRpcActor實(shí)例

(3)RpcServer

用來啟動(dòng)rpc服務(wù),通常不直接調(diào)用,而是調(diào)用其動(dòng)態(tài)代理類AkkaInvocationHandler或FencedAkkaInvocationHandler的start()方法

(4)AkkaInvocationHandler或FencedAkkaInvocationHandler

RpcServer的動(dòng)態(tài)代理類。start()方法用來啟動(dòng)服務(wù):

	public void start() {
		rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
	}

這里向rpcEndpoint,即AkkaRpcActor或FencedAkkaRpcActor實(shí)例發(fā)送一條ControlMessages.START消息

(5)AkkaRpcActor

響應(yīng)rpc消息的actor。其createReceive():

	public Receive createReceive() {
		return ReceiveBuilder.create()
			.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
			.match(ControlMessages.class, this::handleControlMessage)
			.matchAny(this::handleMessage)
			.build();
	}

當(dāng)消息為ControlMessages.START,調(diào)用StoppedState 的start()方法

		public State start(AkkaRpcActor akkaRpcActor) {
			akkaRpcActor.mainThreadValidator.enterMainThread();

			try {
				akkaRpcActor.rpcEndpoint.internalCallOnStart();
			} catch (Throwable throwable) {
				akkaRpcActor.stop(
					RpcEndpointTerminationResult.failure(
						new AkkaRpcException(
							String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
							throwable)));
			} finally {
				akkaRpcActor.mainThreadValidator.exitMainThread();
			}

			return StartedState.STARTED;
		}

在start()方法中調(diào)用具體提供服務(wù)的RpcEndpoint實(shí)現(xiàn)類internalCallOnStart()方法來啟動(dòng)服務(wù)。internalCallOnStart()方法中會(huì)調(diào)用onStart()方法。

“Flink的rpc組件有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


標(biāo)題名稱:Flink的rpc組件有哪些
當(dāng)前路徑:http://weahome.cn/article/ighpdo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部