真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

kafka數(shù)據(jù)源FlinkKafkaConsumer分析

這篇文章主要講解了“kafka數(shù)據(jù)源Flink Kafka Consumer分析”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“kafka數(shù)據(jù)源Flink Kafka Consumer分析”吧!

成都創(chuàng)新互聯(lián)公司秉承專業(yè)、誠信、服務(wù)、進(jìn)取的價值觀,堅持以客戶為中心、客戶至上的服務(wù)理念,以“關(guān)注企業(yè)需求,實現(xiàn)企業(yè)價值”為導(dǎo)向,努力為企業(yè)提供全面優(yōu)質(zhì)的互聯(lián)網(wǎng)應(yīng)用服務(wù)。服務(wù)包括域名注冊、網(wǎng)絡(luò)空間、企業(yè)郵箱、網(wǎng)站建設(shè)、網(wǎng)站優(yōu)化、網(wǎng)絡(luò)營銷解決方案和咨詢服務(wù),以幫助企業(yè)客戶應(yīng)用互聯(lián)網(wǎng)。

一、open()方法調(diào)用時機

FlinkKafkaConsumer繼承自RichFunction,具有生命周期方法open()。那么flink是何時調(diào)用FlinkKafkaConsumer的open()方法呢?

StreamTask在調(diào)用算子程序之前,會執(zhí)行beforeInvoke()方法,在該方法中會初始化算子的算子并且執(zhí)行open()方法:

	operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());

initializeStateAndOpenOperators()方法中循環(huán)對算子初始化:

	protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
		for (StreamOperatorWrapper operatorWrapper : getAllOperators(true)) {
			StreamOperator operator = operatorWrapper.getStreamOperator();
			operator.initializeState(streamTaskStateInitializer);
			operator.open();
		}
	}

kafka source對應(yīng)的operator為StreamSource,其open()方法為

	public void open() throws Exception {
		super.open();
		FunctionUtils.openFunction(userFunction, new Configuration());
	}

FunctionUtils的openFunction()即執(zhí)行算子(要繼承RichFunction)的open()方法:

	public static void openFunction(Function function, Configuration parameters) throws Exception{
		if (function instanceof RichFunction) {
			RichFunction richFunction = (RichFunction) function;
			richFunction.open(parameters);
		}
	}

二、運行時上下文RuntimeContext何時賦值?

在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的構(gòu)造函數(shù)中,通過工廠類StreamOperatorFactory來創(chuàng)建StreamOperator。kafka source對應(yīng)的StreamOperatorFactory為SimpleOperatorFactory,createStreamOperator()方法中調(diào)用StreamOperator的setup()方法:

	public > T createStreamOperator(StreamOperatorParameters parameters) {
		if (operator instanceof AbstractStreamOperator) {
			((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
		}
		if (operator instanceof SetupableStreamOperator) {
			((SetupableStreamOperator) operator).setup(
				parameters.getContainingTask(),
				parameters.getStreamConfig(),
				parameters.getOutput());
		}
		return (T) operator;
	}

kafka source對應(yīng)的StreamOperator為StreamSource,其實現(xiàn)了SetupableStreamOperator接口。其setup方法在父類AbstractUdfStreamOperator:

	public void setup(StreamTask containingTask, StreamConfig config, Output> output) {
		super.setup(containingTask, config, output);
		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());

	}

FunctionUtils.setFunctionRuntimeContext()來給算子設(shè)置RuntimeContext。設(shè)置的RuntimeContext在AbstractStreamOperator的setup()方法中,為StreamingRuntimeContext:

		this.runtimeContext = new StreamingRuntimeContext(
			environment,
			environment.getAccumulatorRegistry().getUserMap(),
			getMetricGroup(),
			getOperatorID(),
			getProcessingTimeService(),
			null,
			environment.getExternalResourceInfoProvider());

三、FlinkKafkaConsumer的run()方法

Flink調(diào)用FlinkKafkaConsumer的run()方法來生產(chǎn)數(shù)據(jù)。run()方法的處理邏輯:

①創(chuàng)建KafkaFetcher,來拉取數(shù)據(jù)

		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				watermarkStrategy,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

②KafkaFetcher的runFetchLoop()中創(chuàng)建KafkaConsumerThread線程來循環(huán)拉取kafka數(shù)據(jù)。KafkaConsumerThread通過KafkaConsumer拉取kafka數(shù)據(jù),并交給Handover

				if (records == null) {
					try {
						records = consumer.poll(pollTimeout);
					}
					catch (WakeupException we) {
						continue;
					}
				}

				try {
					handover.produce(records);
					records = null;
				}

KafkaFetcher通過Handover獲取拉取的kafka數(shù)據(jù)

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
				final ConsumerRecords records = handover.pollNext();

				// get the records for each topic partition
				for (KafkaTopicPartitionState partition : subscribedPartitionStates()) {

					List> partitionRecords =
						records.records(partition.getKafkaPartitionHandle());

					partitionConsumerRecordsHandler(partitionRecords, partition);
				}
			}

③通過SourceContext中的Output>來發(fā)送數(shù)據(jù)給下一個算子

		public void collect(T element) {
			synchronized (lock) {
				output.collect(reuse.replace(element));
			}
		}

SourceContext在StreamSource的run()方法中通過StreamSourceContexts.getSourceContext()創(chuàng)建。Output>在OperatorChain的createOutputCollector()創(chuàng)建,為其返回值。

		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
			@SuppressWarnings("unchecked")
			RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge);

			allOutputs.add(new Tuple2<>(output, outputEdge));
		}

當(dāng)有一個輸出時,是RecordWriterOutput;多個時,是CopyingDirectedOutput或DirectedOutput

④單個輸出RecordWriterOutput時,是通過成員屬性RecordWriter實例來輸出。RecordWriter通過StreamTask的createRecordWriterDelegate()創(chuàng)建,RecordWriterDelegate為RecordWriter的代理類,內(nèi)部持有RecordWriter實例:

	public static  RecordWriterDelegate>> createRecordWriterDelegate(
			StreamConfig configuration,
			Environment environment) {
		List>>> recordWrites = createRecordWriters(
			configuration,
			environment);
		if (recordWrites.size() == 1) {
			return new SingleRecordWriter<>(recordWrites.get(0));
		} else if (recordWrites.size() == 0) {
			return new NonRecordWriter<>();
		} else {
			return new MultipleRecordWriters<>(recordWrites);
		}
	}

	private static  List>>> createRecordWriters(
			StreamConfig configuration,
			Environment environment) {
		List>>> recordWriters = new ArrayList<>();
		List outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());

		for (int i = 0; i < outEdgesInOrder.size(); i++) {
			StreamEdge edge = outEdgesInOrder.get(i);
			recordWriters.add(
				createRecordWriter(
					edge,
					i,
					environment,
					environment.getTaskInfo().getTaskName(),
					edge.getBufferTimeout()));
		}
		return recordWriters;
	}

outEdgesInOrder來源于StreamGraph中的StreamNode的List outEdges。

創(chuàng)建RecordWriter時,根據(jù)StreamEdge的StreamPartitioner outputPartitioner的isBroadcast()方法判斷是BroadcastRecordWriter還是ChannelSelectorRecordWriter:

	public RecordWriter build(ResultPartitionWriter writer) {
		if (selector.isBroadcast()) {
			return new BroadcastRecordWriter<>(writer, timeout, taskName);
		} else {
			return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
		}
	}

outputPartitioner是根據(jù)上下游節(jié)點并行度是否一致來確定:

			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
				partitioner = new ForwardPartitioner();
			} else if (partitioner == null) {
				partitioner = new RebalancePartitioner();
			}

BroadcastRecordWriter和ChannelSelectorRecordWriter最終都會調(diào)用成員屬性ResultPartitionWriter targetPartition的flush()方法來輸出數(shù)據(jù)。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根據(jù)對應(yīng)的ResultPartitionDeploymentDescriptor來判斷是ConsumableNotifyingResultPartitionWriterDecorator還是直接傳入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator會把消息直接傳給下個節(jié)點消費,通過ResultPartitionConsumableNotifier來通知:

	public static ResultPartitionWriter[] decorate(
			Collection descs,
			ResultPartitionWriter[] partitionWriters,
			TaskActions taskActions,
			JobID jobId,
			ResultPartitionConsumableNotifier notifier) {

		ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];
		int counter = 0;
		for (ResultPartitionDeploymentDescriptor desc : descs) {
			if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) {
				consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator(
					taskActions,
					jobId,
					partitionWriters[counter],
					notifier);
			} else {
				consumableNotifyingPartitionWriters[counter] = partitionWriters[counter];
			}
			counter++;
		}
		return consumableNotifyingPartitionWriters;
	}

partitionWriters通過 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create()  創(chuàng)建。 ResultPartition的輸出是通過成員屬性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:

	private void createSubpartitions(
			ResultPartition partition,
			ResultPartitionType type,
			BoundedBlockingSubpartitionType blockingSubpartitionType,
			ResultSubpartition[] subpartitions) {
		// Create the subpartitions.
		if (type.isBlocking()) {
			initializeBoundedBlockingPartitions(
				subpartitions,
				partition,
				blockingSubpartitionType,
				networkBufferSize,
				channelManager);
		} else {
			for (int i = 0; i < subpartitions.length; i++) {
				subpartitions[i] = new PipelinedSubpartition(i, partition);
			}
		}
	}

流式任務(wù)時,ResultSubpartition為PipelinedSubpartition。

四、數(shù)據(jù)寫出

4.1 ResultPartitionConsumableNotifier通知

ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:

	private JobTable.Connection associateWithJobManager(
			JobTable.Job job,
			ResourceID resourceID,
			JobMasterGateway jobMasterGateway) {
		......
        ......

		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
			jobMasterGateway,
			getRpcService().getExecutor(),
			taskManagerConfiguration.getTimeout());

		......
        ......
	}

RpcResultPartitionConsumableNotifier遠(yuǎn)程調(diào)用JobMaster的scheduleOrUpdateConsumers()方法,傳入ResultPartitionID partitionId

4.1.1 JobMaster的scheduleOrUpdateConsumers()

JobMaster通過ExecutionGraph的scheduleOrUpdateConsumers()通知下游消費算子。

這里有兩個關(guān)鍵代碼:

①從本算子ExecutionVertex的成員Map resultPartitions中取出該分區(qū)對應(yīng)的生產(chǎn)消費信息,這些信息存儲在IntermediateResultPartition中;

	void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {

		.......

		final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());

		.......

		if (partition.getIntermediateResult().getResultType().isPipelined()) {
			// Schedule or update receivers of this partition
			execution.scheduleOrUpdateConsumers(partition.getConsumers());
		}
		else {
			throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
					"pipelined partitions.");
		}
	}

從IntermediateResultPartition取出消費者List> allConsumers;

從ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出執(zhí)行任務(wù);

②Execution的sendUpdatePartitionInfoRpcCall()方法通過rpc調(diào)用TaskExcutor的updatePartitions()方法來執(zhí)行下游消費者算子

	private void sendUpdatePartitionInfoRpcCall(
			final Iterable partitionInfos) {

		final LogicalSlot slot = assignedResource;

		if (slot != null) {
			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();

			CompletableFuture updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);

			updatePartitionsResultFuture.whenCompleteAsync(
				(ack, failure) -> {
					// fail if there was a failure
					if (failure != null) {
						fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() +
							"] on TaskManager " + taskManagerLocation + " failed", failure));
					}
				}, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor());
		}
	}
4.1.2 TaskExecutor的updatePartitions()

TaskExecutor的updatePartitions()來更新分區(qū)信息。如果之前InputChannel是未知的,則進(jìn)行更新。SimpleInputGate的updateInputChannel():

	public void updateInputChannel(
			ResourceID localLocation,
			NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
		synchronized (requestLock) {
			if (closeFuture.isDone()) {
				// There was a race with a task failure/cancel
				return;
			}

			IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();

			InputChannel current = inputChannels.get(partitionId);

			if (current instanceof UnknownInputChannel) {
				UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
				boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
				InputChannel newChannel;
				if (isLocal) {
					newChannel = unknownChannel.toLocalInputChannel();
				} else {
					RemoteInputChannel remoteInputChannel =
						unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
					remoteInputChannel.assignExclusiveSegments();
					newChannel = remoteInputChannel;
				}
				LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);

				inputChannels.put(partitionId, newChannel);
				channels[current.getChannelIndex()] = newChannel;

				if (requestedPartitionsFlag) {
					newChannel.requestSubpartition(consumedSubpartitionIndex);
				}

				for (TaskEvent event : pendingEvents) {
					newChannel.sendTaskEvent(event);
				}

				if (--numberOfUninitializedChannels == 0) {
					pendingEvents.clear();
				}
			}
		}
	}

4.2 PipelinedSubpartition寫出

kafka數(shù)據(jù)源Flink Kafka Consumer分析

記錄先寫到緩存ArrayDeque buffers中,然后通過PipelinedSubpartitionView readView的notifyDataAvailable()  -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法來通知。

4.2.1 BufferAvailabilityListener創(chuàng)建時機?

①TaskManagerServices在創(chuàng)建ShuffleEnvironment時,通過 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 獲取Netty服務(wù)端的處理器PartitionRequestServerHandler:

	public ChannelHandler[] getServerChannelHandlers() {
		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
			partitionProvider,
			taskEventPublisher,
			queueOfPartitionQueues);

		return new ChannelHandler[] {
			messageEncoder,
			new NettyMessage.NettyMessageDecoder(),
			serverHandler,
			queueOfPartitionQueues
		};
	}

②PartitionRequestServerHandler在獲取到客戶端發(fā)送的PartitionRequest 消息時, 創(chuàng)建CreditBasedSequenceNumberingViewReader,并通過 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 來設(shè)置CreditBasedSequenceNumberingViewReader

③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法調(diào)用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:

	void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
		// The notification might come from the same thread. For the initial writes this
		// might happen before the reader has set its reference to the view, because
		// creating the queue and the initial notification happen in the same method call.
		// This can be resolved by separating the creation of the view and allowing
		// notifications.

		// TODO This could potentially have a bad performance impact as in the
		// worst case (network consumes faster than the producer) each buffer
		// will trigger a separate event loop task being scheduled.
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
	}

感謝各位的閱讀,以上就是“kafka數(shù)據(jù)源Flink Kafka Consumer分析”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對kafka數(shù)據(jù)源Flink Kafka Consumer分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!


本文標(biāo)題:kafka數(shù)據(jù)源FlinkKafkaConsumer分析
標(biāo)題網(wǎng)址:http://weahome.cn/article/iiddji.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部