開源Chaperone中Uber是如何對(duì)Kafka進(jìn)行端到端審計(jì)的,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
成都創(chuàng)新互聯(lián)公司長期為1000+客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為興平企業(yè)提供專業(yè)的網(wǎng)站設(shè)計(jì)制作、網(wǎng)站建設(shè),興平網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。
隨著Uber業(yè)務(wù)規(guī)模不斷增長,我們的系統(tǒng)也在持續(xù)不斷地產(chǎn)生更多的事件、服務(wù)間的消息和日志。這些數(shù)據(jù)在得到處理之前需要經(jīng)過Kafka。那么我們的平臺(tái)是如何實(shí)時(shí)地對(duì)這些數(shù)據(jù)進(jìn)行審計(jì)的呢?
為了監(jiān)控Kafka數(shù)據(jù)管道的健康狀況并對(duì)流經(jīng)Kafka的每個(gè)消息進(jìn)行審計(jì),我們完全依賴我們的審計(jì)系統(tǒng)Chaperone。Chaperone自2016年1月成為Uber的跨數(shù)據(jù)中心基礎(chǔ)設(shè)施以來,每天處理萬億的消息量。下面我們會(huì)介紹它的工作原理,并說明我們?yōu)槭裁磿?huì)構(gòu)建Chaperone。
Uber的Kafka數(shù)據(jù)管道概覽
Uber的服務(wù)以雙活的模式運(yùn)行在多個(gè)數(shù)據(jù)中心。Apache Kafka和uReplicator是連接Uber生態(tài)系統(tǒng)各個(gè)部分的消息總線。
截止2016年11月份,Uber的Kafka數(shù)據(jù)管道概覽。數(shù)據(jù)從兩個(gè)數(shù)據(jù)中心聚合到一個(gè)Kafka集群上。
要讓Uber的Kafka對(duì)下游的消費(fèi)者做出即時(shí)響應(yīng)是很困難的。為了保證吞吐量,我們盡可能地使用批次,并嚴(yán)重依賴異步處理。服務(wù)使用自家的客戶端把消息發(fā)布到Kafka代理,代理把這些消息分批轉(zhuǎn)發(fā)到本地的Kafka集群上。有些Kafka的主題會(huì)被本地集群直接消費(fèi),而剩下的大部分會(huì)跟來自其他數(shù)據(jù)中心的數(shù)據(jù)一起被組合到一個(gè)聚合Kafka集群上,我們使用uReplicator來完成這種面向大規(guī)模流或批處理的工作。
Uber的Kafka數(shù)據(jù)管道可以分為四層,它們跨越了多個(gè)數(shù)據(jù)中心。Kafka代理和它的客戶端分別是第二層和***層。它們被作為消息進(jìn)入第三層的網(wǎng)關(guān),也就是每個(gè)數(shù)據(jù)中心的本地Kafka集群。本地集群的部分?jǐn)?shù)據(jù)會(huì)被復(fù)制到聚合集群,也就是數(shù)據(jù)管道的***一層。
Kafka數(shù)據(jù)管道的數(shù)據(jù)都會(huì)經(jīng)過分批和確認(rèn)(發(fā)送確認(rèn)):
Kafka數(shù)據(jù)管道的數(shù)據(jù)流經(jīng)的路徑概覽。
Uber的數(shù)據(jù)從代理客戶端流向Kafka需要經(jīng)過幾個(gè)階段:
應(yīng)用程序通過調(diào)用代理客戶端的produce方法向代理客戶端發(fā)送消息。
代理客戶端把收到的消息放到客戶端的緩沖區(qū)中,并讓方法調(diào)用返回。
代理客戶端把緩沖區(qū)里的消息進(jìn)行分批并發(fā)送到代理服務(wù)器端。
代理服務(wù)器把消息放到生產(chǎn)者緩沖區(qū)并對(duì)代理客戶端進(jìn)行確認(rèn)。這時(shí),消息批次已經(jīng)被分好區(qū),并根據(jù)不同的主題名稱放在了相應(yīng)的緩沖區(qū)里。
代理服務(wù)器對(duì)緩沖區(qū)里的消息進(jìn)行分批并發(fā)送到本地Kafka服務(wù)器上。
本地Kafka服務(wù)器把消息追加到本地日志并對(duì)代理服務(wù)器進(jìn)行確認(rèn)(acks=1)。
uReplicator從本地Kafka服務(wù)器獲取消息并發(fā)送到聚合服務(wù)器上。
聚合服務(wù)器把消息追加到本地日志并對(duì)uReplicator進(jìn)行確認(rèn)(acks=1)。
我們?yōu)榱俗孠afka支持高吞吐量,做出了一些權(quán)衡。數(shù)以千計(jì)的微服務(wù)使用Kafka來處理成百上千的并發(fā)業(yè)務(wù)流量(而且還在持續(xù)增長)會(huì)帶來潛在的問題。Chaperone的目標(biāo)是在數(shù)據(jù)流經(jīng)數(shù)據(jù)管道的每個(gè)階段,能夠抓住每個(gè)消息,統(tǒng)計(jì)一定時(shí)間段內(nèi)的數(shù)據(jù)量,并盡早準(zhǔn)確地檢測(cè)出數(shù)據(jù)的丟失、延遲和重復(fù)情況。
Chaperone概覽
Chaperone由四個(gè)組件組成:AuditLibrary、ChaperoneService、ChaperoneCollector和WebService。
Chaperone架構(gòu):AuditLibrary、ChaperoneService、ChaperoneCollector和WebService,它們會(huì)收集數(shù)據(jù),并進(jìn)行相關(guān)計(jì)算,自動(dòng)檢測(cè)出丟失和延遲的數(shù)據(jù),并展示審計(jì)結(jié)果。
AuditLibrary實(shí)現(xiàn)了審計(jì)算法,它會(huì)定時(shí)收集并打印統(tǒng)計(jì)時(shí)間窗。這個(gè)庫被其它三個(gè)組件所依賴。它的輸出模塊是可插拔的(可以使用Kafka、HTTP等)。在代理客戶端,審計(jì)度量指標(biāo)被發(fā)送到Kafka代理。而在其它層,度量指標(biāo)直接被發(fā)送到專門的Kafka主題上。
審計(jì)算法是AuditLibrary的核心,Chaperone使用10分鐘的滾動(dòng)時(shí)間窗來持續(xù)不斷地從每個(gè)主題收集消息。消息里的事件時(shí)間戳被用來決定該消息應(yīng)該被放到哪個(gè)時(shí)間窗里。對(duì)于同一個(gè)時(shí)間窗內(nèi)的消息,Chaperone會(huì)計(jì)算它們的數(shù)量和p99延遲。Chaperone會(huì)定時(shí)把每個(gè)時(shí)間窗的統(tǒng)計(jì)信息包裝成審計(jì)消息發(fā)送到可插拔的后端,它們可能是Kafka代理或者之前提到的Kafka服務(wù)器。
Chaperone根據(jù)消息的事件時(shí)間戳把消息聚合到滾動(dòng)時(shí)間窗內(nèi)。
審計(jì)消息里的tier字段很重要,通過它可以知道審計(jì)是在哪里發(fā)生的,也可以知道消息是否到達(dá)了某一個(gè)地方。通過比較一定時(shí)間段內(nèi)不同層之間的消息數(shù)量,我們可以知道這段時(shí)間內(nèi)所生成的消息是否被成功送達(dá)。
ChaperoneService是工作負(fù)載***的一個(gè)組件,而且總是處在饑餓的狀態(tài)。它消費(fèi)Kafka的每一個(gè)消息并記錄時(shí)間戳。ChaperoneService是基于uReplicator的HelixKafkaConsumer構(gòu)建的,這個(gè)消費(fèi)者組件已經(jīng)被證明比Kafka自帶的消費(fèi)者組件(Kafka 0.8.2)更可靠,也更好用。ChaperoneService通過定時(shí)向特定的Kafka主題生成審計(jì)消息來記錄狀態(tài)。
ChaperoneCollector監(jiān)聽特定的Kafka主題,并獲取所有的審計(jì)消息,然后把它們存到數(shù)據(jù)庫。同時(shí),它還會(huì)生產(chǎn)多個(gè)儀表盤:
Chaperone創(chuàng)建的儀表盤,從上面我們看出數(shù)據(jù)的丟失情況。
從上圖可以看出每個(gè)層的主題消息總量,它們是通過聚合所有數(shù)據(jù)中心的消息得出的。如果沒有數(shù)據(jù)丟失,所有的線會(huì)***地重合起來。如果層之間有數(shù)據(jù)丟失,那么線與線之間會(huì)出現(xiàn)裂縫。例如,從下圖可以看出,Kafka代理丟掉了一些消息,不過在之后的層里沒有消息丟失。從儀表盤可以很容易地看出數(shù)據(jù)丟失的時(shí)間窗,從而可以采取相應(yīng)的行動(dòng)。
從儀表盤上還能看出消息的延遲情況,借此我們就能夠知道消息的及時(shí)性以及它們是否在某些層發(fā)生了傳輸延遲。用戶可以直接從這一個(gè)儀表盤上看出主題的健康狀況,而無需去查看Kafka服務(wù)器或uReplicator的儀表盤:
Chaperone提供一站式的儀表盤來查看每個(gè)數(shù)據(jù)中心的主題狀態(tài)。
WebService提供了REST接口來查詢Chaperone收集到的度量指標(biāo)。通過這些接口,我們可以準(zhǔn)確地計(jì)算出數(shù)據(jù)丟失的數(shù)量。在知道了數(shù)據(jù)丟失的時(shí)間窗后,我們可以從Chaperone查到確切的數(shù)量:
Chaperone的Web界面。
Chaperone的兩個(gè)設(shè)計(jì)目標(biāo)
在設(shè)計(jì)Chaperone時(shí),為了能夠做到準(zhǔn)確的審計(jì),我們把注意力集中在兩個(gè)必須完成的任務(wù)上:
1)每個(gè)消息只被審計(jì)一次
為了確保每個(gè)消息只被審計(jì)一次,ChaperoneService使用了預(yù)寫式日志(WAL)。ChaperoneService每次在觸發(fā)Kafka審計(jì)消息時(shí),會(huì)往審計(jì)消息里添加一個(gè)UUID。這個(gè)帶有相關(guān)偏移量的消息在發(fā)送到Kafka之前被保存在WAL里。在得到Kafka的確認(rèn)之后,WAL里的消息被標(biāo)記為已完成。如果ChaperoneService崩潰,在重啟后它可以重新發(fā)送WAL里未被標(biāo)記的審計(jì)消息,并定位到最近一次的審計(jì)偏移量,然后繼續(xù)消費(fèi)。WAL確保了每個(gè)Kafka消息只被審計(jì)一次,而且每個(gè)審計(jì)消息至少會(huì)被發(fā)送一次。
接下來,ChaperoneCollector使用ChaperoneService之前添加過的UUID來移除重復(fù)消息。有了UUID和WAL,我們可以確保審計(jì)的一次性。在代理客戶端和服務(wù)器端難以實(shí)現(xiàn)一次性保證,因?yàn)檫@樣會(huì)給它們帶來額外的開銷。我們依賴它們的優(yōu)雅關(guān)閉操作,這樣它們的狀態(tài)才會(huì)被沖刷出去。
2)在層間使用一致性的時(shí)間戳
因?yàn)镃haperone可以在多個(gè)層里看到相同的Kafka消息,所以為消息內(nèi)嵌時(shí)間戳是很有必要的。如果沒有這些時(shí)間戳,在計(jì)數(shù)時(shí)會(huì)發(fā)生時(shí)間錯(cuò)位。在Uber,大部分發(fā)送到Kafka的數(shù)據(jù)要么使用avro風(fēng)格的schema編碼,要么使用JSON格式。對(duì)于使用schema編碼的消息,可以直接獲取時(shí)間戳。而對(duì)于JSON格式的消息,需要對(duì)JSON數(shù)據(jù)進(jìn)行解碼才能拿到時(shí)間戳。為了加快這個(gè)過程,我們實(shí)現(xiàn)了一個(gè)基于流的JSON消息解析器,這個(gè)解析器無需預(yù)先解碼整個(gè)消息就可以掃描到時(shí)間戳。這個(gè)解析器用在ChaperoneService里是很高效的,不過對(duì)代理客戶端和服務(wù)器來說仍然需要付出很高代價(jià)。所以在這兩個(gè)層里,我們使用的是消息的處理時(shí)間戳。因?yàn)闀r(shí)間戳的不一致造成的層間計(jì)數(shù)差異可能會(huì)觸發(fā)錯(cuò)誤的數(shù)據(jù)丟失警告。我們正在著手解決時(shí)間戳不一致問題,之后也會(huì)把解決方案公布出來。
Chaperone在Uber的兩大用途
1. 檢測(cè)數(shù)據(jù)丟失
在Chaperone之前,數(shù)據(jù)丟失的***個(gè)征兆來自數(shù)據(jù)消費(fèi)者,他們會(huì)出來抱怨數(shù)據(jù)的丟失情況。但是等他們出來抱怨已經(jīng)為時(shí)已晚,而且我們無法知道是數(shù)據(jù)管道的哪一部分出現(xiàn)了問題。有了Chaperone之后,我們創(chuàng)建了一個(gè)用于檢測(cè)丟失數(shù)據(jù)的作業(yè),它會(huì)定時(shí)地從Chaperone拉取度量指標(biāo),并在層間的消息數(shù)量出現(xiàn)不一致時(shí)發(fā)出告警。告警包含了Kafka數(shù)據(jù)管道端到端的信息,從中可以看出那些管道組件的度量指標(biāo)無法告訴我們的問題。檢測(cè)作業(yè)會(huì)自動(dòng)地發(fā)現(xiàn)新主題,并且你可以根據(jù)數(shù)據(jù)的重要性配置不同的告警規(guī)則和閾值。數(shù)據(jù)丟失的通知會(huì)通過多種通道發(fā)送出去,比如頁式調(diào)度系統(tǒng)、企業(yè)聊天系統(tǒng)或者郵件系統(tǒng),總之會(huì)很快地通知到你。
2. 在Kafka里通過偏移量之外的方式讀取數(shù)據(jù)
我們生產(chǎn)環(huán)境的大部分集群仍然在使用Kafka 0.8.x,這一版本的Kafka對(duì)從時(shí)間戳到偏移量的索引沒有提供原生支持。于是我們?cè)贑haperone里自己構(gòu)建了這樣的索引。這種索引可以用來做基于時(shí)間區(qū)間的查詢,所以我們不僅限于使用Kafka的偏移量來讀取數(shù)據(jù),我們可以使用Chaperone提供的時(shí)間戳來讀取數(shù)據(jù)。
Kafka對(duì)數(shù)據(jù)的保留是有期限的,不過我們對(duì)消息進(jìn)行了備份,并把消息的偏移量也原封不動(dòng)地保存起來。借助Chaperone提供的索引,用戶可以基于時(shí)間區(qū)間讀取這些備份數(shù)據(jù),而不是僅僅局限于Kafka現(xiàn)存的數(shù)據(jù),而且使用的訪問接口跟Kafka是一樣的。有了這個(gè)特性,Kafka用戶可以通過檢查任意時(shí)間段里的消息來對(duì)他們的服務(wù)進(jìn)行問題診斷,在必要時(shí)可以回填消息。當(dāng)下游系統(tǒng)的審計(jì)結(jié)果跟Chaperone出現(xiàn)不一致,我們可以把一些特定的消息導(dǎo)出來進(jìn)行比較,以便定位問題的根源。
看完上述內(nèi)容,你們掌握開源Chaperone中Uber是如何對(duì)Kafka進(jìn)行端到端審計(jì)的的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!