這篇文章主要講解了“kafka數(shù)據(jù)源Flink Kafka Consumer分析”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“kafka數(shù)據(jù)源Flink Kafka Consumer分析”吧!
成都創(chuàng)新互聯(lián)公司秉承專業(yè)、誠信、服務(wù)、進(jìn)取的價值觀,堅持以客戶為中心、客戶至上的服務(wù)理念,以“關(guān)注企業(yè)需求,實現(xiàn)企業(yè)價值”為導(dǎo)向,努力為企業(yè)提供全面優(yōu)質(zhì)的互聯(lián)網(wǎng)應(yīng)用服務(wù)。服務(wù)包括域名注冊、網(wǎng)絡(luò)空間、企業(yè)郵箱、網(wǎng)站建設(shè)、網(wǎng)站優(yōu)化、網(wǎng)絡(luò)營銷解決方案和咨詢服務(wù),以幫助企業(yè)客戶應(yīng)用互聯(lián)網(wǎng)。
FlinkKafkaConsumer繼承自RichFunction,具有生命周期方法open()。那么flink是何時調(diào)用FlinkKafkaConsumer的open()方法呢?
StreamTask在調(diào)用算子程序之前,會執(zhí)行beforeInvoke()方法,在該方法中會初始化算子的算子并且執(zhí)行open()方法:
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
initializeStateAndOpenOperators()方法中循環(huán)對算子初始化:
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper, ?> operatorWrapper : getAllOperators(true)) { StreamOperator> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
kafka source對應(yīng)的operator為StreamSource,其open()方法為
public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); }
FunctionUtils的openFunction()即執(zhí)行算子(要繼承RichFunction)的open()方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{ if (function instanceof RichFunction) { RichFunction richFunction = (RichFunction) function; richFunction.open(parameters); } }
在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的構(gòu)造函數(shù)中,通過工廠類StreamOperatorFactory來創(chuàng)建StreamOperator。kafka source對應(yīng)的StreamOperatorFactory為SimpleOperatorFactory,createStreamOperator()方法中調(diào)用StreamOperator的setup()方法:
public> T createStreamOperator(StreamOperatorParameters parameters) { if (operator instanceof AbstractStreamOperator) { ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService); } if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } return (T) operator; }
kafka source對應(yīng)的StreamOperator為StreamSource,其實現(xiàn)了SetupableStreamOperator接口。其setup方法在父類AbstractUdfStreamOperator:
public void setup(StreamTask, ?> containingTask, StreamConfig config, Output> output) { super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); }
FunctionUtils.setFunctionRuntimeContext()來給算子設(shè)置RuntimeContext。設(shè)置的RuntimeContext在AbstractStreamOperator的setup()方法中,為StreamingRuntimeContext:
this.runtimeContext = new StreamingRuntimeContext( environment, environment.getAccumulatorRegistry().getUserMap(), getMetricGroup(), getOperatorID(), getProcessingTimeService(), null, environment.getExternalResourceInfoProvider());
Flink調(diào)用FlinkKafkaConsumer的run()方法來生產(chǎn)數(shù)據(jù)。run()方法的處理邏輯:
①創(chuàng)建KafkaFetcher,來拉取數(shù)據(jù)
this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);
②KafkaFetcher的runFetchLoop()中創(chuàng)建KafkaConsumerThread線程來循環(huán)拉取kafka數(shù)據(jù)。KafkaConsumerThread通過KafkaConsumer拉取kafka數(shù)據(jù),并交給Handover
if (records == null) { try { records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try { handover.produce(records); records = null; }
KafkaFetcher通過Handover獲取拉取的kafka數(shù)據(jù)
while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread final ConsumerRecordsrecords = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState partition : subscribedPartitionStates()) { List > partitionRecords = records.records(partition.getKafkaPartitionHandle()); partitionConsumerRecordsHandler(partitionRecords, partition); } }
③通過SourceContext中的Output
public void collect(T element) { synchronized (lock) { output.collect(reuse.replace(element)); } }
SourceContext在StreamSource的run()方法中通過StreamSourceContexts.getSourceContext()創(chuàng)建。Output
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutputoutput = (RecordWriterOutput ) streamOutputs.get(outputEdge); allOutputs.add(new Tuple2<>(output, outputEdge)); }
當(dāng)有一個輸出時,是RecordWriterOutput;多個時,是CopyingDirectedOutput或DirectedOutput
④單個輸出RecordWriterOutput時,是通過成員屬性RecordWriter實例來輸出。RecordWriter通過StreamTask的createRecordWriterDelegate()創(chuàng)建,RecordWriterDelegate為RecordWriter的代理類,內(nèi)部持有RecordWriter實例:
public staticRecordWriterDelegate >> createRecordWriterDelegate( StreamConfig configuration, Environment environment) { List >>> recordWrites = createRecordWriters( configuration, environment); if (recordWrites.size() == 1) { return new SingleRecordWriter<>(recordWrites.get(0)); } else if (recordWrites.size() == 0) { return new NonRecordWriter<>(); } else { return new MultipleRecordWriters<>(recordWrites); } } private static List >>> createRecordWriters( StreamConfig configuration, Environment environment) { List >>> recordWriters = new ArrayList<>(); List outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader()); for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge edge = outEdgesInOrder.get(i); recordWriters.add( createRecordWriter( edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout())); } return recordWriters; }
outEdgesInOrder來源于StreamGraph中的StreamNode的List
創(chuàng)建RecordWriter時,根據(jù)StreamEdge的StreamPartitioner> outputPartitioner的isBroadcast()方法判斷是BroadcastRecordWriter還是ChannelSelectorRecordWriter:
public RecordWriterbuild(ResultPartitionWriter writer) { if (selector.isBroadcast()) { return new BroadcastRecordWriter<>(writer, timeout, taskName); } else { return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName); } }
outputPartitioner是根據(jù)上下游節(jié)點并行度是否一致來確定:
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner
BroadcastRecordWriter和ChannelSelectorRecordWriter最終都會調(diào)用成員屬性ResultPartitionWriter targetPartition的flush()方法來輸出數(shù)據(jù)。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根據(jù)對應(yīng)的ResultPartitionDeploymentDescriptor來判斷是ConsumableNotifyingResultPartitionWriterDecorator還是直接傳入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator會把消息直接傳給下個節(jié)點消費,通過ResultPartitionConsumableNotifier來通知:
public static ResultPartitionWriter[] decorate( Collectiondescs, ResultPartitionWriter[] partitionWriters, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) { ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length]; int counter = 0; for (ResultPartitionDeploymentDescriptor desc : descs) { if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) { consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator( taskActions, jobId, partitionWriters[counter], notifier); } else { consumableNotifyingPartitionWriters[counter] = partitionWriters[counter]; } counter++; } return consumableNotifyingPartitionWriters; }
partitionWriters通過 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create() 創(chuàng)建。 ResultPartition的輸出是通過成員屬性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:
private void createSubpartitions( ResultPartition partition, ResultPartitionType type, BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition[] subpartitions) { // Create the subpartitions. if (type.isBlocking()) { initializeBoundedBlockingPartitions( subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager); } else { for (int i = 0; i < subpartitions.length; i++) { subpartitions[i] = new PipelinedSubpartition(i, partition); } } }
流式任務(wù)時,ResultSubpartition為PipelinedSubpartition。
ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:
private JobTable.Connection associateWithJobManager( JobTable.Job job, ResourceID resourceID, JobMasterGateway jobMasterGateway) { ...... ...... ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobMasterGateway, getRpcService().getExecutor(), taskManagerConfiguration.getTimeout()); ...... ...... }
RpcResultPartitionConsumableNotifier遠(yuǎn)程調(diào)用JobMaster的scheduleOrUpdateConsumers()方法,傳入ResultPartitionID partitionId
JobMaster通過ExecutionGraph的scheduleOrUpdateConsumers()通知下游消費算子。
這里有兩個關(guān)鍵代碼:
①從本算子ExecutionVertex的成員Map
void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { ....... final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId()); ....... if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" + "pipelined partitions."); } }
從IntermediateResultPartition取出消費者List> allConsumers;
從ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出執(zhí)行任務(wù);
②Execution的sendUpdatePartitionInfoRpcCall()方法通過rpc調(diào)用TaskExcutor的updatePartitions()方法來執(zhí)行下游消費者算子
private void sendUpdatePartitionInfoRpcCall( final IterablepartitionInfos) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation(); CompletableFuture updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout); updatePartitionsResultFuture.whenCompleteAsync( (ack, failure) -> { // fail if there was a failure if (failure != null) { fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() + "] on TaskManager " + taskManagerLocation + " failed", failure)); } }, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor()); } }
TaskExecutor的updatePartitions()來更新分區(qū)信息。如果之前InputChannel是未知的,則進(jìn)行更新。SimpleInputGate的updateInputChannel():
public void updateInputChannel( ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException { synchronized (requestLock) { if (closeFuture.isDone()) { // There was a race with a task failure/cancel return; } IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId(); InputChannel current = inputChannels.get(partitionId); if (current instanceof UnknownInputChannel) { UnknownInputChannel unknownChannel = (UnknownInputChannel) current; boolean isLocal = shuffleDescriptor.isLocalTo(localLocation); InputChannel newChannel; if (isLocal) { newChannel = unknownChannel.toLocalInputChannel(); } else { RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId()); remoteInputChannel.assignExclusiveSegments(); newChannel = remoteInputChannel; } LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel); inputChannels.put(partitionId, newChannel); channels[current.getChannelIndex()] = newChannel; if (requestedPartitionsFlag) { newChannel.requestSubpartition(consumedSubpartitionIndex); } for (TaskEvent event : pendingEvents) { newChannel.sendTaskEvent(event); } if (--numberOfUninitializedChannels == 0) { pendingEvents.clear(); } } } }
記錄先寫到緩存ArrayDeque
①TaskManagerServices在創(chuàng)建ShuffleEnvironment時,通過 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 獲取Netty服務(wù)端的處理器PartitionRequestServerHandler:
public ChannelHandler[] getServerChannelHandlers() { PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( partitionProvider, taskEventPublisher, queueOfPartitionQueues); return new ChannelHandler[] { messageEncoder, new NettyMessage.NettyMessageDecoder(), serverHandler, queueOfPartitionQueues }; }
②PartitionRequestServerHandler在獲取到客戶端發(fā)送的PartitionRequest 消息時, 創(chuàng)建CreditBasedSequenceNumberingViewReader,并通過 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 來設(shè)置CreditBasedSequenceNumberingViewReader
③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法調(diào)用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { // The notification might come from the same thread. For the initial writes this // might happen before the reader has set its reference to the view, because // creating the queue and the initial notification happen in the same method call. // This can be resolved by separating the creation of the view and allowing // notifications. // TODO This could potentially have a bad performance impact as in the // worst case (network consumes faster than the producer) each buffer // will trigger a separate event loop task being scheduled. ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); }
感謝各位的閱讀,以上就是“kafka數(shù)據(jù)源Flink Kafka Consumer分析”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對kafka數(shù)據(jù)源Flink Kafka Consumer分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!