這篇文章主要為大家展示了“Flink執(zhí)行引擎中流批一體的示例分析”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Flink執(zhí)行引擎中流批一體的示例分析”這篇文章吧。
左權網站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網頁設計、網站建設、微信開發(fā)、APP開發(fā)、響應式網站等網站項目制作,到程序開發(fā),運營維護。成都創(chuàng)新互聯(lián)從2013年創(chuàng)立到現(xiàn)在10年的時間,我們擁有了豐富的建站經驗和運維經驗,來保證我們的工作的順利進行。專注于網站建設就選成都創(chuàng)新互聯(lián)。
隨著互聯(lián)網和移動互聯(lián)網的不斷發(fā)展,各行各業(yè)都積累海量的業(yè)務數(shù)據。
而企業(yè)為了改善用戶體驗,提升產品在市場上的競爭力,都采取了實時化方式來處理大數(shù)據。
社交媒體的實時大屏、電商的實時推薦、城市大腦的實時交通預測、金融行業(yè)的實時反欺詐,這些產品的成功都在說明大數(shù)據處理的實時化已經成為一個勢不可擋的潮流。
在實時化的大趨勢下,F(xiàn)link 已經成為實時計算行業(yè)的事實標準。我們看到,不光是阿里巴巴,國內外各個領域的頭部廠商,都把 Flink 做為實時計算的技術底座,國內有字節(jié)跳動、騰訊、華為,國外有 Netflix、Uber 等等。而業(yè)務實時化只是一個起點,F(xiàn)link 的目標之一就是給用戶提供實時離線一體化的用戶體驗。其實很多用戶不僅需要實時的數(shù)據統(tǒng)計,為了確認運營或產品的策略的效果,用戶同時還需要和歷史(昨天,甚至是去年的同期)數(shù)據比較。而從用戶的角度來看,原有的流、批獨立方案存在一些痛點:- 人力成本比較高。由于流和批是兩套系統(tǒng),相同的邏輯需要兩個團隊開發(fā)兩遍。
- 數(shù)據鏈路冗余。在很多的場景下,流和批計算內容其實是一致,但是由于是兩套系統(tǒng),所以相同邏輯還是需要運行兩遍,產生一定的資源浪費。
- 數(shù)據口徑不一致。這個是用戶遇到的最重要的問題。兩套系統(tǒng)、兩套算子,兩套 UDF,一定會產生不同程度的誤差,這些誤差給業(yè)務方帶來了非常大的困擾。這些誤差不是簡單依靠人力或者資源的投入就可以解決的。
2020 年的雙十一,在實時洪峰到達 40 億的歷史新高的同時,F(xiàn)link 團隊與 DT 團隊一起推出了基于 Flink 的全鏈路流批一體的數(shù)倉架構,很好解決了 Lambda 的架構所帶來的一系列問題:流批作業(yè)使用同一 SQL,使研發(fā)效率提升了 3~4 倍;一套引擎確保了數(shù)據口徑天然一致;流批作業(yè)在同一集群運行,削峰填谷大幅提升了資源效率。Flink 流批一體的成功,離不開 Flink 開源社區(qū)的健康蓬勃發(fā)展。從 Apache 軟件基金會 2020 年度報告可以看出,在反映開源社區(qū)繁榮情況的三個關鍵指標上 Flink 都名列前茅:用戶郵件列表活躍度,F(xiàn)link 排名第一;開發(fā)者提交次數(shù) Flink 排名第二,Github 用戶訪問量排名第二。這些數(shù)據并不局限于大數(shù)據領域,而是 Apache 開源基金會下屬的所有項目。2020 年也是 Blink 反哺社區(qū)的第二年,這兩年我們把 Blink 在集團內積累的經驗逐步貢獻回社區(qū),讓 Flink 成為真正意義上的流批一體平臺。我希望通過這篇文章給大家分享下這兩年 Flink 在執(zhí)行引擎流批融合方面做了哪些工作。同時也希望 Flink 的老用戶和新朋友可以進一步了解 Flink 流批一體架構的“前世今生”。總體來說,F(xiàn)link 的核心引擎主要分為如下三層:- SDK 層。Flink 的 SDK 主要有兩類,第一類是關系型 Relational SDK 也就是 SQL/Table,第二類是物理型 Physical SDK 也就是 DataStream。這兩類 SDK 都是流批統(tǒng)一,即不管是 SQL 還是 DataStream,用戶的業(yè)務邏輯只要開發(fā)一遍,就可以同時在流和批的兩種場景下使用;
- 執(zhí)行引擎層。執(zhí)行引擎提供了統(tǒng)一的 DAG,用來描述數(shù)據處理流程 Data Processing Pipeline(Logical Plan)。不管是流任務還是批任務,用戶的業(yè)務邏輯在執(zhí)行前,都會先轉化為此 DAG 圖。執(zhí)行引擎通過 Unified DAG Scheduler 把這個邏輯 DAG 轉化成在分布式環(huán)境下執(zhí)行的Task。Task 之間通過 Shuffle 傳輸數(shù)據,我們通過 Pluggable Unified Shuffle 架構,同時支持流批兩種 Shuffle 方式;
- 狀態(tài)存儲。狀態(tài)存儲層負責存儲算子的狀態(tài)執(zhí)行狀態(tài)。針對流作業(yè)有開源 RocksdbStatebackend、MemoryStatebackend,也有商業(yè)化的版本的GemniStateBackend;針對批作業(yè)我們在社區(qū)版本引入了 BatchStateBackend。
- 流批一體的 DataStream 介紹了如何通過流批一體的 DataStream 來解決 Flink SDK 當前面臨的挑戰(zhàn);
- 流批一體的 DAG Scheduler 介紹了如何通過統(tǒng)一的 Pipeline Region 機制充分挖掘流式引擎的性能優(yōu)勢;如何通過動態(tài)調整執(zhí)行計劃的方式來改善引擎的易用性,提高系統(tǒng)的資源利用率;
- 流批一體的 Shuffle 架構介紹如何通過一套統(tǒng)一的 Shuffle 架構既可以滿足不同 Shuffle 在策略上的定制化需求,同時還能避免在共性需求上的重復開發(fā);
- 流批一體的容錯策略介紹了如何通過統(tǒng)一的容錯策略既滿足批場景下容錯又可以提升流場景下的容錯效果。
SDK 分析以及面臨的挑戰(zhàn)
如上圖所示,目前 Flink 提供的 SDK 主要有三類:- Table/SQL 是一種 Relational 的高級 SDK,主要用在一些數(shù)據分析的場景中,既可以支持 Bounded 也可以支持 Unbounded 的輸入。由于 Table/SQL 是 Declarative 的,所以系統(tǒng)可以幫助用戶進行很多優(yōu)化,例如根據用戶提供的Schema,可以進行 Filter Push Down 謂詞下推、按需反序列二進制數(shù)據等優(yōu)化。目前 Table/SQL 可以支持 Batch 和 Streaming 兩種執(zhí)行模式。[1]
- DataStream 屬于一種 Physical SDK。Relatinal SDK 功能雖然強大,但也存在一些局限:不支持對 State、Timer 的操作;由于 Optimizer 的升級,可能導致用相同的 SQL 在兩個版本中出現(xiàn)物理執(zhí)行計劃不兼容的情況。而 DataStream SDK,既可以支持 State、Timer 維度 Low Level 的操作,同時由于 DataStream 是一種 Imperative SDK,所以對物理執(zhí)行計劃有很好的“掌控力”,從而也不存在版本升級導致的不兼容。DataStream 目前在社區(qū)仍有很大用戶群,例如目前未 Closed 的 DataStream issue 依然有近 500 個左右。雖然 DataStream 即可以支持 Bounded 又可以支持 Unbounded Input 用 DataStream 寫的 Application,但是在 Flink-1.12 之前只支持 Streaming 的執(zhí)行模式。
- DataSet 是一種僅支持 Bounded 輸入的 Physical SDK,會根據 Bounded 的特性對某些算子進行做一定的優(yōu)化,但是不支持 EventTime 和 State 等操作。雖然 DataSet 是 Flink 提供最早的一種 SDK,但是隨著實時化和數(shù)據分析場景的不斷發(fā)展,相比于 DataStream 和 SQL,DataSet 在社區(qū)的影響力在逐步下降。
目前 Table/SQL 對于流批統(tǒng)一的場景支持已經比較成熟,但是對于 Phyiscal SDK 來說還面臨的一些挑戰(zhàn),主要有兩個方面:- 利用已有 Physical SDK 無法寫出一個真正生產可以用的流批一體的 Application。例如用戶寫一個程序用來處理 Kafka 中的實時數(shù)據,那么利用相同的程序來處理存儲在 OSS/S3/HDFS 上的歷史數(shù)據也是非常自然的事情。但是目前不管是 DataSet 還是 DataStream 都無法滿足用戶這個“簡單”的訴求。大家可能覺得奇怪,DataStream 不是既支持 Bounded 的 Input 又支持 Unbounded 的 Input,為什么還會有問題呢?其實“魔鬼藏在細節(jié)中”,我會在 Unified DataStream 這一節(jié)中會做進一的闡述。
- 學習和理解的成本比較高。隨著 Flink 不斷壯大,越來越多的新用戶加入 Flink 社區(qū),但是對于這些新用戶來說就要學習兩種 Physical SDK。和其他引擎相比,用戶入門的學習成本是相對比較高的;兩種 SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對于用戶來說,理解兩套機制的門檻也不??;由于這兩 SDK 還不兼容,一個新用戶一旦選擇錯誤,將會面臨很大的切換成本。
Unified Physical SDK
為了解決上述 Physical SDK 所面臨的挑戰(zhàn),我們把 Unified DataStream SDK 作為 Flink 統(tǒng)一的 Physical SDK。這個部分主要解決兩個問題:為什么選擇 DataStream 作為 Unified Physical SDK?
Unified DataStream 比“老”的 DataStream 提供了哪些能力讓用戶可以寫出一個真正生產可以用的流批一體 Application?
為什么不是 Unified DataSet
為了解決學習和理解成本比較高的問題,最自然最簡單的方案就是從 DataStream 和 DataSet 中選擇一個作為 Flink 的唯一的 Physical SDK。那為什么我們選擇了 DataStream 而不是 DataSet 呢?主要有兩個原因:- 用戶收益。在前邊已經分析過,隨著 Flink 社區(qū)的發(fā)展,目前 DataSet 在社區(qū)的影響力逐漸下降。如果選擇使用 DataSet 作為 Unified Physical SDK,那么用戶之前在 DataStream 大量“投資”就會作廢。而選擇 DataStream,可以讓許多用戶的已有 DataStream “投資”得到額外的回報;
- 開發(fā)成本。DataSet 過于古老,缺乏大量對于現(xiàn)代實時計算引擎基本概念的支持,例如 EventTime、Watermark、State、Unbounded Source 等。另外一個更深層的原因是現(xiàn)有 DataSet 算子的實現(xiàn),在流的場景完全無法復用,例如 Join 等。而對于 DataStream 則不然,可以進行大量的復用。那么如何在流批兩種場景下復用 DataStream 的算子呢?
Unified DataStream
很多對 Flink 有一定了解的用戶可能會問:DataStream 是同時支持 Bounded/Unbounded 的輸入,為什么我們會說:用 DataStream 無法寫出一個真正生產可以用的流批一體 Application 呢?簡單來說,DataStream 原本主要設計目標是給 Unbounded 場景使用的,所以導致在 Bounded 的場景下在效率、可用性、易用性方面和傳統(tǒng)的批引擎還有一定距離。具體來說體現(xiàn)在如下兩個方面:先給大家看一個例子,下邊是一個跑同樣規(guī)模的 WordCount 的作業(yè),DataStream 和 DataSet 的性能對比圖。從這個例子可以看出,DataSet 的性能是 DataStream 將近 5 倍。很明顯,要讓 DataStream 在生產中既可以支持流的場景又要支持批的場景,就一定要大幅提高 DataStream 在 Bounded 場景下效率。那么為什么 DataStream 的效率要比 DataSet 的效率低呢?前面我們已經提到,DataStream 原本主要設計目標是給 Unbounded 的場景下使用的,而 Unounded 場景下一個主要的特點就是亂序,也就是說任何一個 DataStream 的算子無法假設處理的 Record 是按照什么順序進行的,所以許多算子會用一個 K/V 存儲來緩存這些亂序的數(shù)據,等到合適的時候再從 K/V 存儲中取出這些數(shù)據進行處理并且進行輸出。一般情況下,算子對 K/V 存儲訪問涉及大量的序列化和反序列化,同時也會引發(fā)隨機磁盤 I/O;而在 DataSet 中,假設數(shù)據是有界的,也就是可以通過優(yōu)化來避免隨機的磁盤 I/O 訪問,同時也對序列化和反序列化做了相關優(yōu)化。這也是為什么用 DataSet 寫的 WorkerCount 要比用 DataStream 寫的 WordCount 快 5 倍主要原因。 知道到了原因,是不是要把所有的 DataStream 的算子,都重寫一遍就可以了呢?理論上沒問題,但是 DataStream 有大量的算子需要重寫,有些算子還比較復雜,例如與 Window 相關的一系列算子。可以想象到,如果都全部重寫,工程量是非常之巨大的。所以我們通過單 Key 的 BatchStateBackend 幾乎完全避免了對所有算子重寫,同時還得到了非常不錯的效果。對于 Flink 有一定了解的同學應該都知道,原來用 DataStream 寫的 Application 都采用 Streaming 的執(zhí)行模式,在這種模式下是通過 Checkpoint 的方式保持端到端的 Exactly Once 的語義,具體來說一個作業(yè)的 Sink 只有當全圖的所有算子(包括 Sink 自己)都做完各自的 Snapshot 之后,Sink 才會把數(shù)據 Commit 到外部系統(tǒng)中,這是一個典型的依賴 Flink Checkpoint 機制的 2PC 協(xié)議。而在 Bounded 的場景下雖然也可以采用 Streaming 的方式但是對于用戶來說可能會存在一些問題:- 資源消耗大: 使用 Streaming 方式,需要同時拿到所有的資源。在某些情況下,用戶可能沒有這么多資源;
- 容錯成本高: 在 Bounded 場景下,為了效率一些算子可能無法支持 Snapshot 操作,一旦出錯可能需要重新執(zhí)行整個作業(yè)。
所以在 Bounded 場景下,用戶希望 Application 可以采用 Batch 執(zhí)行模式,因為利用 Batch 執(zhí)行的模式可以非常自然的解決上述兩個問題。在 Bounded 場景下支持 Batch 的執(zhí)行模式是比較簡單的,但是卻引入了一個非常棘手的問題——利用已有的 Sink API 無法保證端到端的 Exactly Once 語義。這是由于 Bounded 場景下是沒有 Checkpoint 的,而原有 Sink 都是依賴 Checkpoint 保證端到端的 ExactlyOnce 的。同時我們不希望開發(fā)者針對 Sink 在不同模式下開發(fā)兩套不同的實現(xiàn),因為這樣非常不利用 Flink 和其他生態(tài)的對接。實際上,一個 Transactional 的 Sink 主要解決如下 4 個問題:
而 Flink 應該讓 Sink 開發(fā)者提供 What to commit 和 How to commit,而系統(tǒng)應該根據不同的執(zhí)行模式,選擇 Where to commit 和 When to commit 來保證端到端的 Exactly Once。最終我們提出了一個全新 Unified Sink API,從而讓開發(fā)者只開發(fā)一套 Sink 就可以同時運行在 Streaming 和 Batch 執(zhí)行模式下。這里介紹的只是主要思路,在有限流的場景下如何保證 End to End 的一致性;如何對接 Hive、Iceberg 等外部生態(tài),實際上還是存在一定挑戰(zhàn)。Unified DAG Scheduler 要解決什么問題
- 一種是流的調度模式,在這種模式下,Scheduler 會申請到一個作業(yè)所需要的全部資源,然后同時調度這個作業(yè)的全部 Task,所有的 Task 之間采取 Pipeline 的方式進行通信。批作業(yè)也可以采取這種方式,并且在性能上也會有很大的提升。但是對于運行比較長的 Batch 作業(yè)來說來說,這種模式還是存在一定的問題:規(guī)模比較大的情況下,同時消耗的資源比較多,對于某些用戶來說,他可能沒有這么多的資源;容錯代價比較高,例如一旦發(fā)生錯誤,整個作業(yè)都需要重新運行。
- 一種是批的調度模式。這種模式和傳統(tǒng)的批引擎類似,所有 Task 都是可以獨立申請資源,Task 之間都是通過 Batch Shuffle 進行通訊。這種方式的好處是容錯代價比較小。但是這種運行方式也存在一些短板。例如,Task 之間的數(shù)據都是通過磁盤來進行交互,引發(fā)了大量的磁盤 IO。
總的來說,有了這兩種調度方式是可以基本滿足流批一體的場景需求,但是也存在著很大的改進空間,具體來說體現(xiàn)在三個方面:- 架構不一致、維護成本高。調度的本質就是進行資源的分配,換句話說就是要解決 When to deploy which tasks to where 的問題。原有兩種調度模式,在資源分配的時機和粒度上都有一定的差異,最終導致了調度架構上無法完全統(tǒng)一,需要開發(fā)人員維護兩套邏輯。例如,流的調度模式,資源分配的粒度是整個物理執(zhí)行計劃的全部 Task;批的調度模式,資源分配的粒度是單個任務,當 Scheduler 拿到一個資源的時候,就需要根據作業(yè)類型走兩套不同的處理邏輯;
- 性能。傳統(tǒng)的批調度方式,雖然容錯代價比較小,但是引入大量的磁盤 I/O,并且性能也不是最佳,無法發(fā)揮出 Flink 流式引擎的優(yōu)勢。實際上在資源相對充足的場景下,可以采取“流”的調度方式來運行 Batch 作業(yè),從而避免額外的磁盤 I/O,提高作業(yè)的執(zhí)行效率。尤其是在夜間,流作業(yè)可以釋放出一定資源,這就為批作業(yè)按照“Streaming”的方式運行提供了可能。
- 自適應。目前兩種調度方式的物理執(zhí)行計劃是靜態(tài)的,靜態(tài)生成物理執(zhí)行計劃存在調優(yōu)人力成本高、資源利用率低等問題。
基于 Pipeline Region 的統(tǒng)一調度
為了既能發(fā)揮流引擎的優(yōu)勢,同時避免全圖同時調度存在的一些短板,我們引入 Pipeline Region 的概念。Unified DAG Scheduler 允許在一個 DAG 圖中,Task 之間既可以通過 Pipeline 通訊,也可以通過 Blocking 方式進行通訊。這些由 Pipeline 的數(shù)據交換方式連接的 Task 被稱為一個 Pipeline Region。基于以上概念,F(xiàn)link 引入 Pipeline Region 的概念,不管是流作業(yè)還是批作業(yè),都是按照 Pipeline Region 粒度來申請資源和調度任務。細心的讀者可以發(fā)現(xiàn),其實原有的兩種模式都是 Pipeline Region 調度的特例。即便可以資源上滿足“流”的調度模式,那么哪些任務可以采取“流”的方式調度呢?有同學還是會擔心采取“流”的調度方式容錯代價會比較高,因為在“流”的調度方式下,一個 Task 發(fā)生錯誤,和他聯(lián)通的所有 Task 都會 Fail,然后重新運行。在 Flink 中,不同 Task 之間有兩種連接方式[2],一種是 All-to-All 的連接方式,上游 Task 會和下游的所有的 Task 進行連接;一種是 PointWise 的鏈接方式,上游的 Task 只會和下游的部分 Task 進行連接。如果一個作業(yè)的所有 Task 之間都是通過 All-to-All 方式進行連接,一旦采取“流”的調度方式,那么整個物理拓撲都需要同時被調度,那么確實存在 FailOver 代價比較高的問題[3]。但是在實際 Batch 作業(yè)的拓撲中,Task 之間不都是通過 All-to-All 的邊連接,Batch 作業(yè)中存在的大量 Task 通過 PointWise 的邊連接,通過“流”的方式調度由 PointWise 連接的 Task 連通圖,在減少作業(yè)的容錯成本的同時,可以提高作業(yè)的執(zhí)行效率,如下圖所示在,在全量的 10T TPC-DS 測試中,開啟所有 PointWise 邊都采用 Pipeline 的鏈接方式就可以讓整性能有 20% 以上的性能提升。上述只是 Schduler 提供的劃分 Pipeline Region 的 4 種策略中的一種[4],實際上 Planner 可以根據實際運行場景,定制哪些 Task 之間采取 Pipeline 的傳輸方式,哪些 Task 之間采取 Batch 的傳輸方式方式。
自適應調度
調度的本質是給物理執(zhí)行計劃進行資源分配的決策過程。Pipeline Region 解決了物理執(zhí)行計劃確定之后,流作業(yè)和批作業(yè)可以統(tǒng)一按照 Pipeline Region 的粒度進行調度。對于批作業(yè)來說靜態(tài)生成物理執(zhí)行計劃存在一些問題[5]:- 配置人力成本高。對于批作業(yè)來說,雖然理論上可以根據統(tǒng)計信息推斷出物理執(zhí)行計劃中每個階段的并發(fā)度,但是由于存在大量的 UDF 或者統(tǒng)計信息的缺失等問題,導致靜態(tài)決策結果可能會出現(xiàn)嚴重不準確的情況;為了保障業(yè)務作業(yè)的 SLA,在大促期間,業(yè)務的同學需要根據大促的流量估計,手動調整高優(yōu)批作業(yè)的并發(fā)度,由于業(yè)務變化快,一旦業(yè)務邏輯發(fā)生變化,又要不斷的重復這個過程。整個調優(yōu)過程都需要業(yè)務的同學手動操作,人力成本比較高,即便這樣也可能會出現(xiàn)誤判的情況導致無法滿足用戶 SLA;
- 資源利用率低。由于人工配置并發(fā)度成本比較高,所以不可能對所有的作業(yè)都手動配置并發(fā)度。對于中低優(yōu)先級的作業(yè),業(yè)務同學會選取一些默認值作為并發(fā)度,但是在大多數(shù)情況下這些默認值都偏大,造成資源的浪費;而且雖然高優(yōu)先級的作業(yè)可以進行手工并發(fā)配置,由于配置方式比較繁瑣,所以大促過后,雖然流量已經下降但是業(yè)務方仍然會使用大促期間的配置,也造成大量的資源浪費現(xiàn)象;
- 穩(wěn)定性差。資源浪費的情況最終導致資源的超額申請現(xiàn)象。目前大多數(shù)批作業(yè)都是采取和流作業(yè)集群混跑的方式,具體來說申請的資源都是非保障資源,一旦資源緊張或者出現(xiàn)機器熱點,這些非保障資源都是優(yōu)先被調整的對象。
為了解決靜態(tài)生成物理執(zhí)行存在這些問題,我們?yōu)榕鳂I(yè)引入了自適應調度功能[6],和原來的靜態(tài)物理執(zhí)行計劃相比,利用這個特性可以大幅提高用戶資源利用率。 Adaptive Scheduler 可以根據一個 JobVertex 的上游 JobVertex 的執(zhí)行情況,動態(tài)決定當前 JobVertex 的并發(fā)度。在未來,我們也可以根據上游 JobVertex 產出的數(shù)據,動態(tài)決定下游采用什么樣的算子。Flink 是一個流批一體的平臺,因此引擎對于不同的執(zhí)行模式要分別提供 Streaming 和Batch 兩種類型的 Shuffle。雖然 Streaming Shuffle 和 Batch Shuffle 在具體的策略上存在一定的差異,但是本質上都是為了對數(shù)據進行重新劃分(re-partition),因此不同的 Shuffle 之間還存在一定的共性。所以我們的目標是提供一套統(tǒng)一的 Shuffle 架構,既可以滿足不同 Shuffle 在策略上的定制,同時還能避免在共性需求上進行重復開發(fā)。總體來說,Shuffle 架構可以劃分成如下圖所示的四個層次。流和批的 Shuffle 需求在各層上有一定差異,也有大量的共性,下邊我做了一些簡要分析。流批 Shuffle 之間的差異
大家都知道,批作業(yè)和流作業(yè)對 Shuffle 的需求是有差異的,具體可以體現(xiàn)在如下 3 個方面:- Shuffle 數(shù)據的生命周期。流作業(yè)的 Shuffle 數(shù)據和 Task 的生命周期基本是一致的;而批作業(yè)的 Shuffle 數(shù)據和 Task 生命周期是解耦的;
- Shuffle 數(shù)據的存儲介質。因為流作業(yè)的 Shuffle 數(shù)據生命周期比較短,所以可以把流作業(yè)的 Shuffle 數(shù)據存儲在內存中;而批作業(yè)的 Shuffle 數(shù)據生命周期有一定的不確定性,所以需要把批作業(yè)的 Shuffle 數(shù)據存儲在磁盤中;
- Shuffle 部署方式[7]。把 Shuffle 服務和計算節(jié)點部署在一起,對流作業(yè)來說這種部署方式是有優(yōu)勢的,因為這樣會減少不必要網絡開銷,從而減少 Latency。但對于批作業(yè)來說,這種部署方式在資源利用率、性能、穩(wěn)定性上都存在一定的問題。[8]
流批 Shuffle 之間的共性
批作業(yè)和流作業(yè)的 Shuffle 有差異也有共性,共性主要體現(xiàn)在:- 數(shù)據的 Meta 管理。所謂 Shuffle Meta 是指邏輯數(shù)據劃分到數(shù)據物理位置的映射。不管是流還是批的場景,在正常情況下都需要從 Meta 中找出自己的讀取或者寫入數(shù)據的物理位置;在異常情況下,為了減少容錯代價,通常也會對 Shuffle Meta 數(shù)據進行持久化;
- 數(shù)據傳輸。從邏輯上講,流作業(yè)和批作業(yè)的 Shuffle 都是為了對數(shù)據進行重新劃分(re-partition/re-distribution)。在分布式系統(tǒng)中,對數(shù)據的重新劃分都涉及到跨線程、進程、機器的數(shù)據傳輸。
流批一體的 Shuffle 架構
Unified Shuffle 架構抽象出三個組件[9]: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通過和這三個組件交互完成算子間的數(shù)據的重新劃分。通過這三個組件可以滿足不同Shuffle插件在具體策略上的差異:- Shuffle Master 資源申請和資源釋放。也就是說插件需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it;
- Shuffle Writer 上游的算子利用 Writer 把數(shù)據寫入 Shuffle Service——Streaming Shuffle 會把數(shù)據寫入內存;External/Remote Batch Shuffle 可以把數(shù)據寫入到外部存儲中;
- Shuffle Reader 下游的算子可以通過 Reader 讀取 Shuffle 數(shù)據;
同時,我們也為流批 Shuffle 的共性——Meta 管理、數(shù)據傳輸、服務部署[10]——提供了架構層面的支持,從而避免對復雜組件的重復開發(fā)。高效穩(wěn)定的數(shù)據傳輸,是分布式系統(tǒng)最復雜的子系統(tǒng)之一,例如在傳輸中都要解決上下游反壓、數(shù)據壓縮、內存零拷貝等問題,在新的架構中只要開發(fā)一遍,就可以同時在流和批兩種場景下共同使用,大大減少了開發(fā)和維護的成本。
Flink 原有容錯策略是以檢查點為前提的,具體來說無論是單個 Task 出現(xiàn)失敗還是JobMaster 失敗,F(xiàn)link 都會按照最近的檢查點重啟整個作業(yè)。雖然這種策略存在一定的優(yōu)化空間,但是總的來說對于流的場景是基本是接受的。目前,F(xiàn)link Batch 運行模式下不會開啟檢查點[11],這也意味一旦出現(xiàn)任何錯誤,整個作業(yè)都要從頭執(zhí)行。雖然原有策略在理論上可以保證最終一定會產出正確的結果,但是明顯大多數(shù)客戶都無法接受這種容錯策略所付出的代價。為了解決這些問題,我們分別對 Task 和 JM 的容錯都做了相應的改進。Pipeline Region Failover
雖然在 Batch 執(zhí)行模式下沒有定時的 Checkpoint,但是在 Batch 執(zhí)行模式下,F(xiàn)link允許 Task 之間通過 Blocking Shuffle 進行通信。對于讀取 Blocking Shuffle 的 Task 發(fā)生失敗之后,由于 Blocking Shuffle 中存儲了這個 Task 所需要的全部數(shù)據,所以只需要重啟這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務即可,而不需要重啟整個作業(yè)。總的來說,Pipeline Region Failover 策略和 Scheduler 在進行正常調度的時候一樣,都是把一個 DAG 拆分成由若干 Pipeline shuffle 連接的一些 Pipeline Region,每當一個 Task 發(fā)生 FailOver 的時候,只會重啟這個 Task 所在的 Region 即可。JM Failover
JM 是一個作業(yè)的控制中心,包含了作業(yè)的各種執(zhí)行狀態(tài)。Flink 利用這些狀態(tài)對任務進行調度和部署。一旦 JM 發(fā)生錯誤之后,這些狀態(tài)將會全部丟失。如果沒有這些信息,即便所有的工作節(jié)點都沒有發(fā)生故障,新 JM 仍然無法繼續(xù)調度原來的作業(yè)。例如,由于任務的結束信息都已丟失,一個任務結束之后,新 JM 無法判斷現(xiàn)有的狀態(tài)是否滿足調度下游任務的條件——所有的輸入數(shù)據都已經產生。從上邊的分析可以看出,JM Failover 的關鍵就是如何讓一個 JM“恢復記憶”。在 VVR[12] 中我們通過基于 Operation Log 機制恢復 JM 的關鍵狀態(tài)。細心的同學可能已經發(fā)現(xiàn)了,雖然這兩個改進的出發(fā)點是為了批的場景,但是實際上對于流的作業(yè)容也同樣有效。上邊只是簡要的介紹了兩種容錯策略的思路,實際上還有很多值得思考的內容。例如 Blocking 上游數(shù)據丟失了我們應該如何處理?JM 中有哪些關鍵的狀態(tài)需要恢復?以上是“Flink執(zhí)行引擎中流批一體的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
當前題目:Flink執(zhí)行引擎中流批一體的示例分析
標題網址:
http://weahome.cn/article/goipdp.html