這篇文章主要介紹Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
站在用戶(hù)的角度思考問(wèn)題,與客戶(hù)深入溝通,找到黃陵網(wǎng)站設(shè)計(jì)與黃陵網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶(hù)體驗(yàn)好的作品,建站類(lèi)型包括:成都網(wǎng)站制作、成都做網(wǎng)站、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、主機(jī)域名、網(wǎng)絡(luò)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋黃陵地區(qū)。
用戶(hù)提交的代碼最終被封裝成了org.apache.flink.runtime.taskmanager.Task,Task是一個(gè)Runnable因此核心代碼就在run方法,run方法調(diào)用了doRun方法,在doRun中調(diào)用了invokable.invoke(),Task的整個(gè)處理流程其實(shí)就在這里面。org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable是一個(gè)抽象類(lèi),它的子類(lèi)是不同類(lèi)型的Task,這里我們主要關(guān)注流處理任務(wù)相關(guān)的org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask的invoke方法執(zhí)行了runMailboxLoop()方法。
runMailboxLoop()方法就是執(zhí)行org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor的runMailboxLoop方法。MailboxProcessor是一種線(xiàn)程模型,runMailboxLoop就是在while輪詢(xún)中不斷執(zhí)行任務(wù)和默認(rèn)動(dòng)作,其中默認(rèn)動(dòng)作就是StreamTask的processInput方法,該方法調(diào)用了StreamInputProcessor的inputProcessor方法,在這個(gè)方法中獲取并處理了流數(shù)據(jù)。StreamInputProcessor的子類(lèi)StreamOneInputProcessor和StreamTwoInputProcessor分別用來(lái)處理有1個(gè)和2個(gè)入度的Task(StreamMultipleInputProcessor先不管)。StreamOneInputProcessor中有1個(gè)StreamTaskInput用來(lái)獲取數(shù)據(jù),1個(gè)DataOutput用來(lái)收集從StreamTaskInput獲取的數(shù)據(jù);同理,StreamTwoInputProcessor有2個(gè)StreamTaskInput和2個(gè)DataOutput。StreamTaskInput的子類(lèi)StreamTaskNetworkInput用來(lái)從網(wǎng)絡(luò)中獲取流數(shù)據(jù),通過(guò)調(diào)用他它的emitNext不僅處理流數(shù)據(jù)還處理了checkpoint barrier,本篇文章只關(guān)注數(shù)據(jù)流的處理流程。StreamTaskNetworkInput從反序列化器中獲取到完整流數(shù)據(jù)后把數(shù)據(jù)交給DataOutput。DataOutput也有處理1個(gè)入度和2個(gè)入度的子類(lèi),它們都持有OperatorChain中第一個(gè)operator的引用,稱(chēng)為headOperator,DataOutput從StreamTaskInput那里獲取到數(shù)據(jù)后會(huì)交給headOperator來(lái)處理。到此為止,流數(shù)據(jù)被獲取并傳入了OperatorChain。 這里總結(jié)一下:StreamTask的processInput方法在MailboxProcessor中被反復(fù)調(diào)用,在processInput方法中StreamTask使用StreamInputProcessor來(lái)獲取并處理流數(shù)據(jù)。StreamInputProcessor中的StreamTaskInput用來(lái)獲取數(shù)據(jù),獲取的數(shù)據(jù)交給DataOutput,DataOutput將數(shù)據(jù)傳入OperatorChain的第一個(gè)operator。其中StreamTask,StreamInputProcessor和DataOutput都有處理1個(gè)入度和2個(gè)入度的子類(lèi)。
OperatorChain的第一個(gè)operator獲取數(shù)據(jù)后,數(shù)據(jù)是怎樣在OperatorChain中流動(dòng)的呢?首先說(shuō)說(shuō)OperatorChain,StreamOperatorWrapper是chain的每個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都有指向下一個(gè)或上一個(gè)節(jié)點(diǎn)的引用,因此OperatorChain是一個(gè)雙向鏈表。但是數(shù)據(jù)的流動(dòng)并不依靠這個(gè)鏈?zhǔn)浇Y(jié)構(gòu)。上文我們提到DataOutput將數(shù)據(jù)交給了headOperator,OperatorChain的第一個(gè)節(jié)點(diǎn)都是StreamOperator的子類(lèi),我們編寫(xiě)的filer算子,map算子等最終都會(huì)被封裝成StreamOperator,例如子類(lèi)StreamFlatMap就是執(zhí)行flatMap方法,StreamFilter就是執(zhí)行fliter方法等。這些方法執(zhí)行的時(shí)候用org.apache.flink.streaming.api.operators.Output對(duì)處理后的結(jié)果進(jìn)行收集。例如StreamFilter當(dāng)FilterFunction返回true時(shí)收集數(shù)據(jù),而StreamFlatMap將Output傳入flatMap方法中由用戶(hù)代碼進(jìn)行收集數(shù)據(jù)。收集的數(shù)據(jù)是怎樣向OperatorChain的下一個(gè)節(jié)點(diǎn)傳遞的呢?原來(lái)Output中持有OneInputStreamOperator變量指向了chain中下一個(gè)節(jié)點(diǎn)的算子,調(diào)用Output的collect方法會(huì)調(diào)用下一個(gè)算子的processElement,數(shù)據(jù)就這樣在整個(gè)OperatorChain中傳遞了。
當(dāng)數(shù)據(jù)傳到OperatorChain的最后一個(gè)算子時(shí)數(shù)據(jù)是怎樣發(fā)向下個(gè)Task的呢?最后一個(gè)算子擁有的Output實(shí)現(xiàn)類(lèi)是org.apache.flink.streaming.runtime.io.RecordWriterOutput。RecordWriterOutput的collect方法會(huì)調(diào)用的org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit方法用來(lái)發(fā)送數(shù)據(jù),該方法會(huì)將序列化器中的數(shù)據(jù)復(fù)制到BufferBuilder中。BufferBuilder維護(hù)了一個(gè)內(nèi)存片段MemorySegment并且可以創(chuàng)建相應(yīng)的消費(fèi)者。RecordWriter有2個(gè)實(shí)現(xiàn)類(lèi)ChannelSelectorRecordWriter和BroadcastRecordWriter。Task向下游節(jié)點(diǎn)的多個(gè)并行度發(fā)送數(shù)據(jù),每個(gè)并行度都對(duì)應(yīng)一個(gè)channel。ChannelSelectorRecordWriter為每個(gè)chanel都保存一個(gè)BufferBuilder并分別添加BufferConsumer:
BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);//按channel獲取BufferBuilder addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);//按channel添加BufferConsumer bufferBuilders[targetChannel] = bufferBuilder;
BroadcastRecordWriter只有一個(gè)BufferBuilder,使用同一個(gè)BufferBuilder給所有的channel添加BufferConsumer:
try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) { for (int channel = 0; channel < numberOfChannels; channel++) { addBufferConsumer(bufferConsumer.copy(), channel);//所有channel用同一個(gè)BufferBuilder達(dá)到廣播的目的 } }
RecordWriter#requestNewBufferBuilder方法會(huì)獲取BufferBuilder,如果獲取失敗會(huì)導(dǎo)致Task執(zhí)行線(xiàn)程阻塞造成反壓。
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);//嘗試獲取,獲取不到返回null if (builder == null) { long start = System.currentTimeMillis(); builder = targetPartition.getBufferBuilder(targetChannel);//阻塞獲取,導(dǎo)致反壓 idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start); } return builder; }
BufferBuilder最終來(lái)自L(fǎng)ocalBufferPool,LocalBufferPool有幾個(gè)重要的屬性:
//taskmanager的網(wǎng)絡(luò)緩存池,MemorySegment從這里獲取 private final NetworkBufferPool networkBufferPool; //已經(jīng)獲取的MemorySegment被組織成一個(gè)隊(duì)列 private final ArrayDequeavailableMemorySegments = new ArrayDeque (); //當(dāng)前l(fā)ocalBufferPool的大小 private int currentPoolSize; //已經(jīng)獲取的MemorySegment private int numberOfRequestedMemorySegments; //每個(gè)channel能同時(shí)獲取的最大BufferBuilder數(shù) private final int maxBuffersPerChannel; //subpartition就是channel,數(shù)組存儲(chǔ)了每個(gè)channel同時(shí)使用的BufferBuilder數(shù) private final int[] subpartitionBuffersCount;
BufferBuilder由requestMemorySegment方法和requestMemorySegmentBlocking方法獲取,requestMemorySegmentBlocking方法也是調(diào)用requestMemorySegment方法并在沒(méi)有獲取到MemorySegment時(shí)通過(guò)AvailableFuture的get方法來(lái)阻塞直到獲取成功為止,AvailableFuture是一個(gè)用CompletableFuture表示的狀態(tài)位,這里用到了CompletableFuture的get方法會(huì)阻塞直到complete的特性,沒(méi)有完成的future表示unavailable,完成了的表示available。requestMemorySegment方法中如果已經(jīng)獲取的MemorySegment(numberOfRequestedMemorySegments)大于了localBufferPool的大小(currentPoolSize)需要將多余的MemorySegment先歸還給networkBufferPool。之后獲取MemorySegment,如果獲取不到就設(shè)置AvailableFuture為不可用,否則記錄channel使用的MemorySegment數(shù)量,如果大于maxBuffersPerChannel,也設(shè)置AvailableFuture為不可用。
@Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException { MemorySegment segment = null; synchronized (availableMemorySegments) { returnExcessMemorySegments();//將多余的segment歸還給networkBufferPool if (availableMemorySegments.isEmpty()) { segment = requestMemorySegmentFromGlobal();//全局獲取 } // segment may have been released by buffer pool owner if (segment == null) { segment = availableMemorySegments.poll();//局部獲取 } if (segment == null) { availabilityHelper.resetUnavailable();//獲取不到設(shè)置為不可用 } //記錄channel正在使用segment數(shù),如果超了設(shè)置為不可用 if (segment != null && targetChannel != UNKNOWN_CHANNEL) { if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { unavailableSubpartitionsCount++; availabilityHelper.resetUnavailable(); } } } return segment; }
上面說(shuō)的AvailableFuture設(shè)置為不可用其實(shí)和反壓有關(guān),Task的isBackPressured方法返回了該Task是否產(chǎn)生了反壓。
public boolean isBackPressured() { if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) { return false; } //獲取所有的AvailableFuture,如果有沒(méi)完成了則有反壓 final CompletableFuture>[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length]; for (int i = 0; i < outputFutures.length; ++i) { outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture(); } return !CompletableFuture.allOf(outputFutures).isDone(); }
以上是“Apache Flink Task執(zhí)行之?dāng)?shù)據(jù)流如何處理”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!