反壓(backpressure)是實(shí)時(shí)計(jì)算應(yīng)用開發(fā)中,特別是流式計(jì)算中,十分常見的問題。反壓意味著數(shù)據(jù)管道中某個(gè)節(jié)點(diǎn)成為瓶頸,處理速率跟不上上游發(fā)送數(shù)據(jù)的速率,而需要對(duì)上游進(jìn)行限速。由于實(shí)時(shí)計(jì)算應(yīng)用通常使用消息隊(duì)列來進(jìn)行生產(chǎn)端和消費(fèi)端的解耦,消費(fèi)端數(shù)據(jù)源是 pull-based 的,所以反壓通常是從某個(gè)節(jié)點(diǎn)傳導(dǎo)至數(shù)據(jù)源并降低數(shù)據(jù)源(比如 Kafka consumer)的攝入速率。
關(guān)于 Flink 的反壓機(jī)制,網(wǎng)上已經(jīng)有不少博客介紹,中文博客推薦這兩篇1。簡(jiǎn)單來說,F(xiàn)link 拓?fù)渲忻總€(gè)節(jié)點(diǎn)(Task)間的數(shù)據(jù)都以阻塞隊(duì)列的方式傳輸,下游來不及消費(fèi)導(dǎo)致隊(duì)列被占滿后,上游的生產(chǎn)也會(huì)被阻塞,最終導(dǎo)致數(shù)據(jù)源的攝入被阻塞。而本文將著重結(jié)合官方的博客[4]分享筆者在實(shí)踐中分析和處理 Flink 反壓的經(jīng)驗(yàn)。
反壓的影響
反壓并不會(huì)直接影響作業(yè)的可用性,它表明作業(yè)處于亞健康的狀態(tài),有潛在的性能瓶頸并可能導(dǎo)致更大的數(shù)據(jù)處理延遲。通常來說,對(duì)于一些對(duì)延遲要求不太高或者數(shù)據(jù)量比較小的應(yīng)用來說,反壓的影響可能并不明顯,然而對(duì)于規(guī)模比較大的 Flink 作業(yè)來說反壓可能會(huì)導(dǎo)致嚴(yán)重的問題。
這是因?yàn)?Flink 的 checkpoint 機(jī)制,反壓還會(huì)影響到兩項(xiàng)指標(biāo): checkpoint 時(shí)長(zhǎng)和 state 大小。
前者是因?yàn)?checkpoint barrier 是不會(huì)越過普通數(shù)據(jù)的,數(shù)據(jù)處理被阻塞也會(huì)導(dǎo)致 checkpoint barrier 流經(jīng)整個(gè)數(shù)據(jù)管道的時(shí)長(zhǎng)變長(zhǎng),因而 checkpoint 總體時(shí)間(End to End Duration)變長(zhǎng)。
后者是因?yàn)闉楸WC EOS(Exactly-Once-Semantics,準(zhǔn)確一次),對(duì)于有兩個(gè)以上輸入管道的 Operator,checkpoint barrier 需要對(duì)齊(Alignment),接受到較快的輸入管道的 barrier 后,它后面數(shù)據(jù)會(huì)被緩存起來但不處理,直到較慢的輸入管道的 barrier 也到達(dá),這些被緩存的數(shù)據(jù)會(huì)被放到state 里面,導(dǎo)致 checkpoint 變大。
這兩個(gè)影響對(duì)于生產(chǎn)環(huán)境的作業(yè)來說是十分危險(xiǎn)的,因?yàn)?checkpoint 是保證數(shù)據(jù)一致性的關(guān)鍵,checkpoint 時(shí)間變長(zhǎng)有可能導(dǎo)致 checkpoint 超時(shí)失敗,而 state 大小同樣可能拖慢 checkpoint 甚至導(dǎo)致 OOM (使用 Heap-based StateBackend)或者物理內(nèi)存使用超出容器資源(使用 RocksDBStateBackend)的穩(wěn)定性問題。
因此,我們?cè)谏a(chǎn)中要盡量避免出現(xiàn)反壓的情況(順帶一提,為了緩解反壓給 checkpoint 造成的壓力,社區(qū)提出了 FLIP-76: Unaligned Checkpoints[4] 來解耦反壓和 checkpoint)。
定位反壓節(jié)點(diǎn)
要解決反壓首先要做的是定位到造成反壓的節(jié)點(diǎn),這主要有兩種辦法:
通過 Flink Web UI 自帶的反壓監(jiān)控面板;
通過 Flink Task Metrics。
前者比較容易上手,適合簡(jiǎn)單分析,后者則提供了更加豐富的信息,適合用于監(jiān)控系統(tǒng)。因?yàn)榉磯簳?huì)向上游傳導(dǎo),這兩種方式都要求我們從 Source 節(jié)點(diǎn)到 Sink 的逐一排查,直到找到造成反壓的根源原因[4]。下面分別介紹這兩種辦法。
反壓監(jiān)控面板
Flink Web UI 的反壓監(jiān)控提供了 SubTask 級(jí)別的反壓監(jiān)控,原理是通過周期性對(duì) Task 線程的棧信息采樣,得到線程被阻塞在請(qǐng)求 Buffer(意味著被下游隊(duì)列阻塞)的頻率來判斷該節(jié)點(diǎn)是否處于反壓狀態(tài)。默認(rèn)配置下,這個(gè)頻率在 0.1 以下則為 OK,0.1 至 0.5 為 LOW,而超過 0.5 則為 HIGH。
圖1. Flink 1.8 的 Web UI 反壓面板(來自官方博客)
該節(jié)點(diǎn)的發(fā)送速率跟不上它的產(chǎn)生數(shù)據(jù)速率。這一般會(huì)發(fā)生在一條輸入多條輸出的 Operator(比如 flatmap)。
下游的節(jié)點(diǎn)接受速率較慢,通過反壓機(jī)制限制了該節(jié)點(diǎn)的發(fā)送速率。
如果是第一種狀況,那么該節(jié)點(diǎn)則為反壓的根源節(jié)點(diǎn),它是從 Source Task 到 Sink Task 的第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn)。如果是第二種情況,則需要繼續(xù)排查下游節(jié)點(diǎn)。
值得注意的是,反壓的根源節(jié)點(diǎn)并不一定會(huì)在反壓面板體現(xiàn)出高反壓,因?yàn)榉磯好姘灞O(jiān)控的是發(fā)送端,如果某個(gè)節(jié)點(diǎn)是性能瓶頸并不會(huì)導(dǎo)致它本身出現(xiàn)高反壓,而是導(dǎo)致它的上游出現(xiàn)高反壓??傮w來看,如果我們找到第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn),那么反壓根源要么是就這個(gè)節(jié)點(diǎn),要么是它緊接著的下游節(jié)點(diǎn)。
那么如果區(qū)分這兩種狀態(tài)呢?很遺憾只通過反壓面板是無法直接判斷的,我們還需要結(jié)合 Metrics 或者其他監(jiān)控手段來定位。此外如果作業(yè)的節(jié)點(diǎn)數(shù)很多或者并行度很大,由于要采集所有 Task 的棧信息,反壓面板的壓力也會(huì)很大甚至不可用。
Task Metrics
Flink 提供的 Task Metrics 是更好的反壓監(jiān)控手段,但也要求更加豐富的背景知識(shí)。
首先我們簡(jiǎn)單回顧下 Flink 1.5 以后的網(wǎng)路棧,熟悉的讀者可以直接跳過。
TaskManager 傳輸數(shù)據(jù)時(shí),不同的 TaskManager 上的兩個(gè) Subtask 間通常根據(jù) key 的數(shù)量有多個(gè) Channel,這些 Channel 會(huì)復(fù)用同一個(gè) TaskManager 級(jí)別的 TCP 鏈接,并且共享接收端 Subtask 級(jí)別的 Buffer Pool。
在接收端,每個(gè) Channel 在初始階段會(huì)被分配固定數(shù)量的 Exclusive Buffer,這些 Buffer 會(huì)被用于存儲(chǔ)接受到的數(shù)據(jù),交給 Operator 使用后再次被釋放。Channel 接收端空閑的 Buffer 數(shù)量稱為 Credit,Credit 會(huì)被定時(shí)同步給發(fā)送端被后者用于決定發(fā)送多少個(gè) Buffer 的數(shù)據(jù)。
在流量較大時(shí),Channel 的 Exclusive Buffer 可能會(huì)被寫滿,此時(shí) Flink 會(huì)向 Buffer Pool 申請(qǐng)剩余的 Floating Buffer。這些 Floating Buffer 屬于備用 Buffer,哪個(gè) Channel 需要就去哪里。而在 Channel 發(fā)送端,一個(gè) Subtask 所有的 Channel 會(huì)共享同一個(gè) Buffer Pool,這邊就沒有區(qū)分 Exclusive Buffer 和 Floating Buffer。
圖2. Flink Credit-Based 網(wǎng)絡(luò)
我們?cè)诒O(jiān)控反壓時(shí)會(huì)用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有關(guān),最為有用的是以下幾個(gè) Metrics:
Metris描述outPoolUsage發(fā)送端 Buffer 的使用率inPoolUsage接收端 Buffer 的使用率floatingBuffersUsage(1.9 以上)接收端 Floating Buffer 的使用率exclusiveBuffersUsage (1.9 以上)接收端 Exclusive Buffer 的使用率
其中 inPoolUsage 等于 floatingBuffersUsage 與 exclusiveBuffersUsage 的總和。
分析反壓的大致思路是:如果一個(gè) Subtask 的發(fā)送端 Buffer 占用率很高,則表明它被下游反壓限速了;如果一個(gè) Subtask 的接受端 Buffer 占用很高,則表明它將反壓傳導(dǎo)至上游。反壓情況可以根據(jù)以下表格進(jìn)行對(duì)號(hào)入座(圖片來自官網(wǎng)):
outPoolUsage 和 inPoolUsage 同為低或同為高分別表明當(dāng)前 Subtask 正?;蛱幱诒幌掠畏磯?,這應(yīng)該沒有太多疑問。而比較有趣的是當(dāng) outPoolUsage 和 inPoolUsage 表現(xiàn)不同時(shí),這可能是出于反壓傳導(dǎo)的中間狀態(tài)或者表明該 Subtask 就是反壓的根源。
如果一個(gè) Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。如果一個(gè) Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。因?yàn)橥ǔ7磯簳?huì)傳導(dǎo)至其上游,導(dǎo)致上游某些 Subtask 的 outPoolUsage 為高,我們可以根據(jù)這點(diǎn)來進(jìn)一步判斷。值得注意的是,反壓有時(shí)是短暫的且影響不大,比如來自某個(gè) Channel 的短暫網(wǎng)絡(luò)延遲或者 TaskManager 的正常 GC,這種情況下我們可以不用處理。
對(duì)于 Flink 1.9 及以上版本,除了上述的表格,我們還可以根據(jù) floatingBuffersUsage/exclusiveBuffersUsage 以及其上游 Task 的 outPoolUsage 來進(jìn)行進(jìn)一步的分析一個(gè) Subtask 和其上游 Subtask 的數(shù)據(jù)傳輸。
通常來說,floatingBuffersUsage 為高則表明反壓正在傳導(dǎo)至上游,而 exclusiveBuffersUsage 則表明了反壓是否存在傾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低為有傾斜,因?yàn)樯贁?shù) channel 占用了大部分的 Floating Buffer)。
至此,我們已經(jīng)有比較豐富的手段定位反壓的根源是出現(xiàn)在哪個(gè)節(jié)點(diǎn),但是具體的原因還沒有辦法找到。另外基于網(wǎng)絡(luò)的反壓 metrics 并不能定位到具體的 Operator,只能定位到 Task。特別是 embarrassingly parallel(易并行)的作業(yè)(所有的 Operator 會(huì)被放入一個(gè) Task,因此只有一個(gè)節(jié)點(diǎn)),反壓 metrics 則派不上用場(chǎng)。
分析具體原因及處理
定位到反壓節(jié)點(diǎn)后,分析造成原因的辦法和我們分析一個(gè)普通程序的性能瓶頸的辦法是十分類似的,可能還要更簡(jiǎn)單一點(diǎn),因?yàn)槲覀円^察的主要是 Task Thread。
在實(shí)踐中,很多情況下的反壓是由于數(shù)據(jù)傾斜造成的,這點(diǎn)我們可以通過 Web UI 各個(gè) SubTask 的 Records Sent 和 Record Received 來確認(rèn),另外 Checkpoint detail 里不同 SubTask 的 State size 也是一個(gè)分析數(shù)據(jù)傾斜的有用指標(biāo)。
此外,最常見的問題可能是用戶代碼的執(zhí)行效率問題(頻繁被阻塞或者性能問題)。最有用的辦法就是對(duì) TaskManager 進(jìn)行 CPU profile,從中我們可以分析到 Task Thread 是否跑滿一個(gè) CPU 核:如果是的話要分析 CPU 主要花費(fèi)在哪些函數(shù)里面,比如我們生產(chǎn)環(huán)境中就偶爾遇到卡在 Regex 的用戶函數(shù)(ReDoS);如果不是的話要看 Task Thread 阻塞在哪里,可能是用戶函數(shù)本身有些同步的調(diào)用,可能是 checkpoint 或者 GC 等系統(tǒng)活動(dòng)導(dǎo)致的暫時(shí)系統(tǒng)暫停。
當(dāng)然,性能分析的結(jié)果也可能是正常的,只是作業(yè)申請(qǐng)的資源不足而導(dǎo)致了反壓,這就通常要求拓展并行度。值得一提的,在未來的版本 Flink 將會(huì)直接在 WebUI 提供 JVM 的 CPU 火焰圖[5],這將大大簡(jiǎn)化性能瓶頸的分析。
另外 TaskManager 的內(nèi)存以及 GC 問題也可能會(huì)導(dǎo)致反壓,包括 TaskManager JVM 各區(qū)內(nèi)存不合理導(dǎo)致的頻繁 Full GC 甚至失聯(lián)。推薦可以通過給 TaskManager 啟用 G1 垃圾回收器來優(yōu)化 GC,并加上 -XX:+PrintGCDetails 來打印 GC 日志的方式來觀察 GC 的問題。
總結(jié)
反壓是 Flink 應(yīng)用運(yùn)維中常見的問題,它不僅意味著性能瓶頸還可能導(dǎo)致作業(yè)的不穩(wěn)定性。定位反壓可以從 Web UI 的反壓監(jiān)控面板和 Task Metric 兩者入手,前者方便簡(jiǎn)單分析,后者適合深入挖掘。定位到反壓節(jié)點(diǎn)后我們可以通過數(shù)據(jù)分布、CPU Profile 和 GC 指標(biāo)日志等手段來進(jìn)一步分析反壓背后的具體原因并進(jìn)行針對(duì)性的優(yōu)化。
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
新聞標(biāo)題:如何分析及處理Flink反壓?
分享URL:
http://weahome.cn/article/jpceis.html