作者:陳守元、戴資力
專注于為中小企業(yè)提供成都網站設計、網站建設服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)齊齊哈爾免費做網站提供優(yōu)質的服務。我們立足成都,凝聚了一批互聯(lián)網行業(yè)人才,有力地推動了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網站建設實現(xiàn)規(guī)模擴充和轉變。
Apache Flink 是一個分布式大數(shù)據(jù)處理引擎,可對有限數(shù)據(jù)流和無限數(shù)據(jù)流進行有狀態(tài)或無狀態(tài)的計算,能夠部署在各種集群環(huán)境,對各種規(guī)模大小的數(shù)據(jù)進行快速計算。
了解 Flink 應用開發(fā)需要先理解 Flink 的 Streams、State、Time 等基礎處理語義以及 Flink 兼顧靈活性和方便性的多層次 API。
Streams:流,分為有限數(shù)據(jù)流與無限數(shù)據(jù)流,unbounded stream 是有始無終的數(shù)據(jù)流,即無限數(shù)據(jù)流;而 bounded stream 是限定大小的有始有終的數(shù)據(jù)集合,即有限數(shù)據(jù)流,二者的區(qū)別在于無限數(shù)據(jù)流的數(shù)據(jù)會隨時間的推演而持續(xù)增加,計算持續(xù)進行且不存在結束的狀態(tài),相對的有限數(shù)據(jù)流數(shù)據(jù)大小固定,計算最終會完成并處于結束的狀態(tài)。
State,狀態(tài)是計算過程中的數(shù)據(jù)信息,在容錯恢復和 Checkpoint 中有重要的作用,流計算在本質上是 Incremental Processing,因此需要不斷查詢保持狀態(tài);另外,為了確保 Exactly- once 語義,需要數(shù)據(jù)能夠寫入到狀態(tài)中;而持久化存儲,能夠保證在整個分布式系統(tǒng)運行失敗或者掛掉的情況下做到 Exactly- once,這是狀態(tài)的另外一個價值。
Time,分為 Event time、Ingestion time、Processing time,F(xiàn)link 的無限數(shù)據(jù)流是一個持續(xù)的過程,時間是我們判斷業(yè)務狀態(tài)是否滯后,數(shù)據(jù)處理是否及時的重要依據(jù)。
在架構部分,主要分為以下四點:
cdn.xitu.io/2019/7/2/16bb2c32e9736d17?w=960&h=540&f=jpeg&s=98000">
第一, Flink 具備統(tǒng)一的框架處理有界和×××兩種數(shù)據(jù)流的能力
第二, 部署靈活,F(xiàn)link 底層支持多種資源調度器,包括 Yarn、Kubernetes 等。Flink 自身帶的 Standalone 的調度器,在部署上也十分靈活。
第三, 極高的可伸縮性,可伸縮性對于分布式系統(tǒng)十分重要,阿里巴巴雙11大屏采用 Flink 處理海量數(shù)據(jù),使用過程中測得 Flink 峰值可達 17 億/秒。
第四, 極致的流式處理性能。Flink 相對于 Storm 最大的特點是將狀態(tài)語義完全抽象到框架中,支持本地狀態(tài)讀取,避免了大量網絡 IO,可以極大提升狀態(tài)存取的性能。
后面會有專門課程講解,此處簡單分享 Flink 關于運維及業(yè)務監(jiān)控的內容:
Flink 具備 7 X 24 小時高可用的 SOA(面向服務架構),原因是在實現(xiàn)上 Flink 提供了一致性的 Checkpoint。Checkpoint 是 Flink 實現(xiàn)容錯機制的核心,它周期性的記錄計算過程中 Operator 的狀態(tài),并生成快照持久化存儲。當 Flink 作業(yè)發(fā)生故障崩潰時,可以有選擇的從 Checkpoint 中恢復,保證了計算的一致性。
Data Pipeline 的核心場景類似于數(shù)據(jù)搬運并在搬運的過程中進行部分數(shù)據(jù)清洗或者處理,而整個業(yè)務架構圖的左邊是 Periodic ETL,它提供了流式 ETL 或者實時 ETL,能夠訂閱消息隊列的消息并進行處理,清洗完成后實時寫入到下游的 Database 或 File system 中。場景舉例:
當下游要構建實時數(shù)倉時,上游則可能需要實時的 Stream ETL。這個過程會進行實時清洗或擴展數(shù)據(jù),清洗完成后寫入到下游的實時數(shù)倉的整個鏈路中,可保證數(shù)據(jù)查詢的時效性,形成實時數(shù)據(jù)采集、實時數(shù)據(jù)處理以及下游的實時 Query。
搜索引擎這塊以淘寶為例,當賣家上線新商品時,后臺會實時產生消息流,該消息流經過 Flink 系統(tǒng)時會進行數(shù)據(jù)的處理、擴展。然后將處理及擴展后的數(shù)據(jù)生成實時索引,寫入到搜索引擎中。這樣當淘寶賣家上線新商品時,能在秒級或者分鐘級實現(xiàn)搜索引擎的搜索。
Data Analytics,如圖,左邊是 Batch Analytics,右邊是 Streaming Analytics。Batch Analysis 就是傳統(tǒng)意義上使用類似于 Map Reduce、Hive、Spark Batch 等,對作業(yè)進行分析、處理、生成離線報表,Streaming Analytics 使用流式分析引擎如 Storm,F(xiàn)link 實時處理分析數(shù)據(jù),應用較多的場景如實時大屏、實時報表。
從某種程度上來說,所有的實時的數(shù)據(jù)處理或者是流式數(shù)據(jù)處理都是屬于 Data Driven,流計算本質上是 Data Driven 計算。應用較多的如風控系統(tǒng),當風控系統(tǒng)需要處理各種各樣復雜的規(guī)則時,Data Driven 就會把處理的規(guī)則和邏輯寫入到Datastream 的 API 或者是 ProcessFunction 的 API 中,然后將邏輯抽象到整個 Flink 引擎中,當外面的數(shù)據(jù)流或者是事件進入就會觸發(fā)相應的規(guī)則,這就是 Data Driven 的原理。在觸發(fā)某些規(guī)則后,Data Driven 會進行處理或者是進行預警,這些預警會發(fā)到下游產生業(yè)務通知,這是 Data Driven 的應用場景,Data Driven 在應用上更多應用于復雜事件的處理。
傳統(tǒng)批處理方法是持續(xù)收取數(shù)據(jù),以時間作為劃分多個批次的依據(jù),再周期性地執(zhí)行批次運算。但假設需要計算每小時出現(xiàn)事件轉換的次數(shù),如果事件轉換跨越了所定義的時間劃分,傳統(tǒng)批處理會將中介運算結果帶到下一個批次進行計算;除此之外,當出現(xiàn)接收到的事件順序顛倒情況下,傳統(tǒng)批處理仍會將中介狀態(tài)帶到下一批次的運算結果中,這種處理方式也不盡如人意。
第一點,要有理想方法,這個理想方法是引擎必須要有能力可以累積狀態(tài)和維護狀態(tài),累積狀態(tài)代表著過去歷史中接收過的所有事件,會影響到輸出。
第二點,時間,時間意味著引擎對于數(shù)據(jù)完整性有機制可以操控,當所有數(shù)據(jù)都完全接受到后,輸出計算結果。
第三點,理想方法模型需要實時產生結果,但更重要的是采用新的持續(xù)性數(shù)據(jù)處理模型來處理實時數(shù)據(jù),這樣才最符合 continuous data 的特性。
流式處理簡單來講即有一個無窮無盡的數(shù)據(jù)源在持續(xù)收取數(shù)據(jù),以代碼作為數(shù)據(jù)處理的基礎邏輯,數(shù)據(jù)源的數(shù)據(jù)經過代碼處理后產生出結果,然后輸出,這就是流式處理的基本原理。
假設 Input Streams 有很多個使用者,每個使用者都有自己的 ID,如果計算每個使用者出現(xiàn)的次數(shù),我們需要讓同一個使用者的出現(xiàn)事件流到同一運算代碼,這跟其他批次需要做 group by 是同樣的概念,所以跟 Stream 一樣需要做分區(qū),設定相應的 key,然后讓同樣的 key 流到同一個 computation instance 做同樣的運算。
如圖,上述代碼中定義了變數(shù) X,X 在數(shù)據(jù)處理過程中會進行讀和寫,在最后輸出結果時,可以依據(jù)變數(shù) X 決定輸出的內容,即狀態(tài) X 會影響最終的輸出結果。這個過程中,第一個重點是先進行了狀態(tài) co-partitioned key by,同樣的 key 都會流到 computation instance,與使用者出現(xiàn)次數(shù)的原理相同,次數(shù)即所謂的狀態(tài),這個狀態(tài)一定會跟同一個 key 的事件累積在同一個 computation instance。
相當于根據(jù)輸入流的 key 重新分區(qū)的 狀態(tài),當分區(qū)進入 stream 之后,這個 stream 會累積起來的狀態(tài)也變成 copartiton 了。第二個重點是 embeded local state backend。有狀態(tài)分散式流式處理的引擎,狀態(tài)可能會累積到非常大,當 key 非常多時,狀態(tài)可能就會超出單一節(jié)點的 memory 的負荷量,這時候狀態(tài)必須有狀態(tài)后端去維護它;在這個狀態(tài)后端在正常狀況下,用 in-memory 維護即可。
當我們考慮狀態(tài)容錯時難免會想到精確一次的狀態(tài)容錯,應用在運算時累積的狀態(tài),每筆輸入的事件反映到狀態(tài),更改狀態(tài)都是精確一次,如果修改超過一次的話也意味著數(shù)據(jù)引擎產生的結果是不可靠的。
如何確保狀態(tài)擁有精確一次(Exactly-once guarantee)的容錯保證?
如何在分散式場景下替多個擁有本地狀態(tài)的運算子產生一個全域一致的快照(Global consistent snapshot)?
還是以使用者出現(xiàn)次數(shù)來看,如果某個使用者出現(xiàn)的次數(shù)計算不準確,不是精確一次,那么產生的結果是無法作為參考的。在考慮精確的容錯保證前,我們先考慮最簡單的使用場景,如無限流的數(shù)據(jù)進入,后面單一的 Process 進行運算,每處理完一筆計算即會累積一次狀態(tài),這種情況下如果要確保 Process 產生精確一次的狀態(tài)容錯,每處理完一筆數(shù)據(jù),更改完狀態(tài)后進行一次快照,快照包含在隊列中并與相應的狀態(tài)進行對比,完成一致的快照,就能確保精確一次。
Flink 作為分布式的處理引擎,在分布式的場景下,進行多個本地狀態(tài)的運算,只產生一個全域一致的快照,如需要在不中斷運算值的前提下產生全域一致的快照,就涉及到分散式狀態(tài)容錯。
關于 Global consistent snapshot,當 Operator 在分布式的環(huán)境中,在各個節(jié)點做運算,首先產生 Global consistent snapshot 的方式就是處理每一筆數(shù)據(jù)的快照點是連續(xù)的,這筆運算流過所有的運算值,更改完所有的運算值后,能夠看到每一個運算值的狀態(tài)與該筆運算的位置,即可稱為 consistent snapshot,當然,Global consistent snapshot 也是簡易場景的延伸。
首先了解一下 Checkpoint,上面提到連續(xù)性快照每個 Operator 運算值本地的狀態(tài)后端都要維護狀態(tài),也就是每次將產生檢查點時會將它們傳入共享的 DFS 中。當任何一個 Process 掛掉后,可以直接從三個完整的 Checkpoint 將所有的運算值的狀態(tài)恢復,重新設定到相應位置。Checkpoint 的存在使整個 Process 能夠實現(xiàn)分散式環(huán)境中的 Exactly-once。
關于 Flink 如何在不中斷運算的狀況下持續(xù)產生 Global consistent snapshot,其方式是基于用 simple lamport 演算法機制下延伸的。已知的一個點 Checkpoint barrier, Flink 在某個 Datastream 中會一直安插 Checkpoint barrier,Checkpoint barrier 也會 N — 1等等,Checkpoint barrier N 代表著所有在這個范圍里面的數(shù)據(jù)都是Checkpoint barrier N。
舉例:假設現(xiàn)在需要產生 Checkpoint barrier N,但實際上在 Flink 中是由 job manager 觸發(fā) Checkpoint,Checkpoint 被觸發(fā)后開始從數(shù)據(jù)源產生 Checkpoint barrier。當 job 開始做 Checkpoint barrier N 的時候,可以理解為 Checkpoint barrier N 需要逐步填充左下角的表格。
如圖,當部分事件標為紅色,Checkpoint barrier N 也是紅色時,代表著這些數(shù)據(jù)或事件都由 Checkpoint barrier N 負責。Checkpoint barrier N 后面白色部分的數(shù)據(jù)或事件則不屬于 Checkpoint barrier N。
在以上的基礎上,當數(shù)據(jù)源收到 Checkpoint barrier N 之后會先將自己的狀態(tài)保存,以讀取 Kafka 資料為例,數(shù)據(jù)源的狀態(tài)就是目前它在 Kafka 分區(qū)的位置,這個狀態(tài)也會寫入到上面提到的表格中。下游的 Operator 1 會開始運算屬于 Checkpoint barrier N 的數(shù)據(jù),當 Checkpoint barrier N 跟著這些數(shù)據(jù)流動到 Operator 1 之后,Operator 1 也將屬于 Checkpoint barrier N 的所有數(shù)據(jù)都反映在狀態(tài)中,當收到 Checkpoint barrier N 時也會直接對 Checkpoint 去做快照。
當快照完成后繼續(xù)往下游走,Operator 2 也會接收到所有數(shù)據(jù),然后搜索 Checkpoint barrier N 的數(shù)據(jù)并直接反映到狀態(tài),當狀態(tài)收到 Checkpoint barrier N 之后也會直接寫入到 Checkpoint N 中。以上過程到此可以看到 Checkpoint barrier N 已經完成了一個完整的表格,這個表格叫做 Distributed Snapshots,即分布式快照。分布式快照可以用來做狀態(tài)容錯,任何一個節(jié)點掛掉的時候可以在之前的 Checkpoint 中將其恢復。繼續(xù)以上 Process,當多個 Checkpoint 同時進行,Checkpoint barrier N 已經流到 job manager 2,F(xiàn)link job manager 可以觸發(fā)其他的 Checkpoint,比如 Checkpoint N + 1,Checkpoint N + 2 等等也同步進行,利用這種機制,可以在不阻擋運算的狀況下持續(xù)地產生 Checkpoint。
狀態(tài)維護即用一段代碼在本地維護狀態(tài)值,當狀態(tài)值非常大時需要本地的狀態(tài)后端來支持。
如圖,在 Flink 程序中,可以采用 getRuntimeContext().getState(desc); 這組 API 去注冊狀態(tài)。Flink 有多種狀態(tài)后端,采用 API 注冊狀態(tài)后,讀取狀態(tài)時都是通過狀態(tài)后端來讀取的。Flink 有兩種不同的狀態(tài)值,也有兩種不同的狀態(tài)后端:
Flink 目前支持以上兩種狀態(tài)后端,一種是純 memory 的狀態(tài)后端,另一種是有資源磁盤的狀態(tài)后端,在維護狀態(tài)時可以根據(jù)狀態(tài)的數(shù)量選擇相應的狀態(tài)后端。
3.1不同時間種類
在 Flink 及其他進階的流式處理引擎出現(xiàn)之前,大數(shù)據(jù)處理引擎一直只支持 Processing-time 的處理。假設定義一個運算 windows 的窗口,windows 運算設定每小時進行結算。以 Processing-time 進行運算時可以發(fā)現(xiàn)數(shù)據(jù)引擎將 3 點至 4 點間收到的數(shù)據(jù)做結算。實際上在做報表或者分析結果時是想了解真實世界中 3 點至 4 點之間實際產生數(shù)據(jù)的輸出結果,了解實際數(shù)據(jù)的輸出結果就必須采用 Event – Time 了。
如圖,Event - Time 相當于事件,它在數(shù)據(jù)最源頭產生時帶有時間戳,后面都需要用時間戳來進行運算。用圖來表示,最開始的隊列收到數(shù)據(jù),每小時對數(shù)據(jù)劃分一個批次,這就是 Event - Time Process 在做的事情。
3.2Event - Time 處理
Event - Time 是用事件真實產生的時間戳去做 Re-bucketing,把對應時間 3 點到 4 點的數(shù)據(jù)放在 3 點到 4 點的 Bucket,然后 Bucket 產生結果。所以 Event - Time 跟 Processing - time 的概念是這樣對比的存在。
Event - Time 的重要性在于記錄引擎輸出運算結果的時間。簡單來說,流式引擎連續(xù) 24 小時在運行、搜集資料,假設 Pipeline 里有一個 windows Operator 正在做運算,每小時能產生結果,何時輸出 windows 的運算值,這個時間點就是 Event - Time 處理的精髓,用來表示該收的數(shù)據(jù)已經收到。
3.3Watermarks
Flink 實際上是用 watermarks 來實現(xiàn) Event - Time 的功能。Watermarks 在 Flink 中也屬于特殊事件,其精髓在于當某個運算值收到帶有時間戳“ T ”的 watermarks 時就意味著它不會接收到新的數(shù)據(jù)了。使用 watermarks 的好處在于可以準確預估收到數(shù)據(jù)的截止時間。舉例,假設預期收到數(shù)據(jù)時間與輸出結果時間的時間差延遲 5 分鐘,那么 Flink 中所有的 windows Operator 搜索 3 點至 4 點的數(shù)據(jù),但因為存在延遲需要再多等5分鐘直至收集完 4:05 分的數(shù)據(jù),此時方能判定 4 點鐘的資料收集完成了,然后才會產出 3 點至 4 點的數(shù)據(jù)結果。這個時間段的結果對應的就是 watermarks 的部分。
流式處理應用無時無刻不在運行,運維上有幾個重要考量:
更改應用邏輯/修 bug 等,如何將前一執(zhí)行的狀態(tài)遷移到新的執(zhí)行?
如何重新定義運行的平行化程度?
Checkpoint 完美符合以上需求,不過 Flink 中還有另外一個名詞保存點(Savepoint),當手動產生一個 Checkpoint 的時候,就叫做一個 Savepoint。Savepoint 跟 Checkpoint 的差別在于檢查點是 Flink 對于一個有狀態(tài)應用在運行中利用分布式快照持續(xù)周期性的產生 Checkpoint,而 Savepoint 則是手動產生的 Checkpoint,Savepoint 記錄著流式應用中所有運算元的狀態(tài)。
如圖,Savepoint A 和 Savepoint B,無論是變更底層代碼邏輯、修 bug 或是升級 Flink 版本,重新定義應用、計算的平行化程度等,最先需要做的事情就是產生 Savepoint。
Savepoint 產生的原理是在 Checkpoint barrier 流動到所有的 Pipeline 中手動插入從而產生分布式快照,這些分布式快照點即 Savepoint。Savepoint 可以放在任何位置保存,當完成變更時,可以直接從 Savepoint 恢復、執(zhí)行。
從 Savepoint 的恢復執(zhí)行需要注意,在變更應用的過程中時間在持續(xù),如 Kafka 在持續(xù)收集資料,當從 Savepoint 恢復時,Savepoint 保存著 Checkpoint 產生的時間以及 Kafka 的相應位置,因此它需要恢復到最新的數(shù)據(jù)。無論是任何運算,Event - Time 都可以確保產生的結果完全一致。
假設恢復后的重新運算用 Process Event - Time,將 windows 窗口設為 1 小時,重新運算能夠在 10 分鐘內將所有的運算結果都包含到單一的 windows 中。而如果使用 Event – Time,則類似于做 Bucketing。在 Bucketing 的狀況下,無論重新運算的數(shù)量多大,最終重新運算的時間以及 windows 產生的結果都一定能保證完全一致。
本文首先從 Apache Flink 的定義、架構、基本原理入手,對×××計算相關的基本概念進行辨析,在此基礎上簡單回顧了大數(shù)據(jù)處理方式的歷史演進以及有狀態(tài)的流式數(shù)據(jù)處理的原理,最后從目前有狀態(tài)的流式處理面臨的挑戰(zhàn)分析 Apache Flink 作為業(yè)界公認為最好的流計算引擎之一所具備的天然優(yōu)勢。希望有助于大家厘清×××式處理引擎涉及的基本概念,能夠更加得心應手的使用 Flink。