作者: 張皓
10余年建站經(jīng)驗, 成都網(wǎng)站設(shè)計、網(wǎng)站建設(shè)客戶的見證與正確選擇。創(chuàng)新互聯(lián)建站提供完善的營銷型網(wǎng)頁建站明細報價表。后期開發(fā)更加便捷高效,我們致力于追求更美、更快、更規(guī)范。G7主要通過在貨車上的傳感器感知車輛的軌跡、油耗、點熄火、載重、溫度等數(shù)據(jù),將車輛、司機、車隊、貨主連接到一起,優(yōu)化貨物運輸?shù)臅r效、安全、成本等痛點問題。
整個數(shù)據(jù)是通過車載的傳感器設(shè)備采集,比如公司的Smart盒子,CTBox盒子,油感設(shè)備,溫度探頭等,將車輛數(shù)據(jù)上報到后端平臺,在后端平臺計算和處理,最后展示到用戶面前。
G7的業(yè)務(wù)場景是典型的IoT場景:
傳感器數(shù)據(jù)
數(shù)據(jù)種類多
數(shù)據(jù)質(zhì)量差
數(shù)據(jù)低延遲
其中,數(shù)據(jù)質(zhì)量差的原因是整個鏈條會非常的長,從傳感器采集的車輛的數(shù)據(jù),通過網(wǎng)絡(luò)運營商將數(shù)據(jù)上報到后端服務(wù)器,再經(jīng)過解析,mq,過濾,調(diào)用三方接口,業(yè)務(wù)處理,入庫,整個過程非常的長,造成數(shù)據(jù)在傳輸過程中出現(xiàn)數(shù)據(jù)重復(fù),數(shù)據(jù)缺失等。另外一點,IoT場景需要數(shù)據(jù)傳輸?shù)难舆t非常低,比如進出區(qū)域報警,當車輛進入到某個電子圍欄中的時候需要觸發(fā)報警,這個時候需要快速產(chǎn)生報警事件,通常不能超過30s,否則時間太長車輛已經(jīng)通過了某個電子圍欄區(qū)域再報警就沒有價值了。再一個,數(shù)據(jù)量也是非常大的,現(xiàn)在每天產(chǎn)生軌跡點20億+,每天產(chǎn)生數(shù)據(jù)量100億+,對計算性能的要求非常高。
從上面的場景我們可以感知到,在G7的IoT場景需要的是一個低延遲,處理速度快的實時計算引擎。最開始我們的一些架構(gòu)是基于Lambda架構(gòu)的,比如軌跡點計算,會使用實時計算引擎計算出實時數(shù)據(jù),這份數(shù)據(jù)延遲比較低,但是數(shù)據(jù)不是很準確,另外需要用離線批量再計算一遍,這份數(shù)據(jù)通常比較準確,可以用來修復(fù)實時數(shù)據(jù)。這樣做的缺點也比較明顯,一是程序需要維護兩套代碼:實時程序和離線程序,二是實時數(shù)據(jù)不準確,準確的數(shù)據(jù)延遲又太高。后來我們驚喜的發(fā)現(xiàn)一種基于實時處理的架構(gòu)體系Kappa。
Kappa的架構(gòu)是強調(diào)數(shù)據(jù)的實時性,為了保證數(shù)據(jù)的實時性有些延遲太多的數(shù)據(jù)它會建議丟棄,所有的計算邏輯只有在實時計算中,整個計算只有一套邏輯,數(shù)據(jù)從MQ中獲取,經(jīng)過數(shù)據(jù)處理層計算和加工,最后落入到數(shù)據(jù)存儲層,對外提供數(shù)據(jù)查詢功能。相對Lambda架構(gòu),Kappa架構(gòu)更加適合IoT領(lǐng)域。
針對Kappa架構(gòu),我們對行業(yè)主流的實時流計算框架進行了對比:
分別對主流的流計算框架:Storm,Storm Trident,Spark Streaming,Google Cloud Dataflow,F(xiàn)link做了對比?;谖⑴康腟park Streaming和Storm Trident延遲比較高,從這點就不適合我們的場景。Storm的延遲很低,但是數(shù)據(jù)一致性是At Least once,容錯機制比較復(fù)雜,流控會比較抖動,這些方面都是不太適合。其中,F(xiàn)link的一致性保證(1.4版本還支持了end-to-end一致性),延遲比較低,容錯機制的開銷是比較小的(基于Chandy-Lamport的分布式快照),流控是比較優(yōu)雅的(算子之間的數(shù)據(jù)傳輸是基于分布式內(nèi)存阻塞隊列),應(yīng)用邏輯與容錯是分離的(算子處理和分布式快照checkpoint),基于以上我們認為flink是比較適合IoT這個場景的。
實時計算
實時ETL
下面分別介紹下以上三個場景的使用。
在G7的場景中,有很多業(yè)務(wù)都屬于實時計算的范疇,比如進出區(qū)域事件,超速事件,怠速事件,超速事件,疲勞報警事件,危險駕駛報警,油耗計算,里程計算等。其中疲勞報警計算是最早開始嘗試使用flink來落地的。
這是G7針對客戶推出的G7大屏,其中風險相關(guān)的部分是根據(jù)疲勞計算得出。
根據(jù)G7的大數(shù)據(jù)計算,因為疲勞駕駛造成貨車事故的比重占到整個事故的20%。對疲勞駕駛進行報警和預(yù)警就顯得特別重要,可以有效降低事故發(fā)生的可能性。
根據(jù)車輛行駛的里程,駕駛員行駛的里程,駕駛時長,判斷是否存在疲勞駕駛。如果超過報警閥值則報警,如果在報警閥值下面在預(yù)警閥值上面則預(yù)警。報警和預(yù)警都是下發(fā)語音到貨車駕駛室提醒司機。
這個業(yè)務(wù)場景中面臨的大挑戰(zhàn)是實時性,穩(wěn)定性。只有用最短的時間、最穩(wěn)定的方式將告警下發(fā)到相關(guān)人員才能大程度減少風險。
在整個處理流程中,首先會去獲取疲勞配置,根據(jù)車輛的狀態(tài)信息和司機打卡信息與疲勞配置結(jié)合,判斷是否出現(xiàn)預(yù)警和報警。計算過程中會把疲勞駕駛開始的狀態(tài)緩存起來,疲勞駕駛結(jié)束的時候獲取之前的狀態(tài)數(shù)據(jù),匹配成功之后會生成一條完整的疲勞事件。中間會調(diào)用一些接口服務(wù)比如dubbo獲取車輛的配置數(shù)據(jù)、狀態(tài)數(shù)據(jù),產(chǎn)生的疲勞報警則會調(diào)用下發(fā)語音的接口,疲勞事件結(jié)果也會存儲到hbase、mysql、kafka等。
最后開發(fā)成Flink的程序,從頭到到尾分別由以下算子構(gòu)成:消費kafka算子、類型轉(zhuǎn)換算子、數(shù)據(jù)過濾算子、異步調(diào)用第三方接口算子,窗口排序算子,疲勞處理業(yè)務(wù)邏輯算子,數(shù)據(jù)入庫算子組成。
這個過程,也是踩了不少坑,我們也有一些心得體會:
算子表達盡量單一
每個算子盡量內(nèi)聚,算子間盡量低耦合
算子打散,異步+多線程的性能發(fā)揮更好
單獨設(shè)置每個算子單元的并行度,性能更優(yōu)
hash和balance根據(jù)情況選擇:只有需要使用keyby和valuestate地方才使用hash重新分布數(shù)據(jù)。其他地方盡量使用balance并且上下游并行度一致,會將task串聯(lián)成一個線程,不會走網(wǎng)絡(luò)IO性能更高
有部分場景是數(shù)據(jù)簡單采集、處理,入庫,也就是實時ETL,包括從Kafka采集數(shù)據(jù)到HDFS、DB、HBase、ES、Kafka等,這部分工作可以抽象成Flink的算子表達:Source -> Transformation -> Sink。
這部分通??梢訤linkKafkaConumser、MapFunction、JDBCAppendTableSink這類代碼。如下:
有部分場景需要有一些實時的統(tǒng)計分析,比如統(tǒng)計最近一小時內(nèi)全國各城市,車輛總數(shù),司機總數(shù),疲勞事件,進出區(qū)域事件,打卡次數(shù),點熄火事件等。這種場景,通??梢允褂肍link SQl的做實時分析,sql+窗口函數(shù)(固定窗口,滑動窗口)。代碼大致如下:
在業(yè)務(wù)上的成功落地,我們也希望能把打造一個實時計算平臺,服務(wù)各條業(yè)務(wù)線,經(jīng)過差不多3個月的打磨,內(nèi)部代號為Glink的實時計算平臺上線,大致的架構(gòu)如下:
Glink主要由以下部分組成:
HDFS分布式文件系統(tǒng)。用來存儲flink任務(wù)中產(chǎn)生的checkpoint/savepoint數(shù)據(jù),任務(wù)報、第三方依賴包的存儲和分發(fā),任務(wù)運行中產(chǎn)生的臨時數(shù)據(jù)等;
Yarn統(tǒng)一計算資源平臺。用來提供統(tǒng)一的分布式計算資源平臺,任務(wù)提交,任務(wù)調(diào)度,任務(wù)執(zhí)行,資源隔離功能。目前所有的flink任務(wù)都是通過yarn進行統(tǒng)一的計算資源管理;
性能監(jiān)控AMP工具。使用點評開源的Cat,在此基礎(chǔ)上做二次開發(fā)并取名“天樞系統(tǒng)”??梢蕴峁┏绦虻暮臅r95、99線、平均耗時、大耗時、java GC監(jiān)控、線程監(jiān)控、堆棧信息等;
集群監(jiān)控管理。機器資源監(jiān)控使用zabbix,提供cpu、內(nèi)存、磁盤io、網(wǎng)絡(luò)io、連接數(shù)、句柄監(jiān)控。集群資源監(jiān)控和管理使用開源Ambari,提供自動化安裝、配置、集群整體任務(wù)、內(nèi)存、cpu資源、hdfs空間、yarn資源大小監(jiān)控報警;
任務(wù)監(jiān)控報警。使用flink提供的statsD reporter將數(shù)據(jù)上傳導時序數(shù)據(jù)庫InfluxDB,通過掃描Infludb數(shù)據(jù)繪制出task的處理流量,通過監(jiān)控流量閥值低于預(yù)期值報警;
診斷調(diào)試。使用成熟的日志查詢系統(tǒng) es+logstash+kibana,通過采集每個節(jié)點的日志寫入到es中, 可以在kibana中查詢關(guān)鍵信息獲取日志內(nèi)存,提供診斷和調(diào)優(yōu)程序的線索;
Flink APP 程序應(yīng)用層。具體開發(fā)的flink應(yīng)用程序,通常解決實時etl,統(tǒng)計分析,業(yè)務(wù)計算的場景;
平臺的部分功能介紹:
任務(wù)管理功能。提供任務(wù)發(fā)布,修改,升級,停止,申請資源,資源審核,啟動日志查看功能;
以上Glink實時計算平臺的功能,基本上滿足用戶獨立完成從程序開發(fā),發(fā)布,調(diào)優(yōu),上線,運維的工作。
除了提供相應(yīng)的平臺功能,還需要在flink的生態(tài)上提供比較好的封裝和工具類,因此我們提供了開發(fā)工具的腳手架:Glink-Framework框架。
Glink-Framework提供以下封裝:
簡化pom文件,減少大量的依賴、插件配置;
三方調(diào)用集成:dubbo,zuul;
三方數(shù)據(jù)庫集成:mysql,redis;
多環(huán)境管理;
依賴版本管理;
另外一方面,我們認為flink是有一定的技術(shù)門檻,特別對于之前沒有并發(fā)編程、集群開發(fā)經(jīng)驗的小伙伴,需要有一段時間的學習才能上手,針對這個痛點,我們提出了技術(shù)BP的技術(shù)合作方式。我們會根據(jù)業(yè)務(wù)的復(fù)雜度,平臺指派一至多名技術(shù)人員參與到業(yè)務(wù)方的整個開發(fā)和運維工作中,從需求分析到上線落地全程參與,后期還會有持續(xù)的技術(shù)分享和培訓幫助業(yè)務(wù)方學習開發(fā)能力。
在整個平臺化,以及業(yè)務(wù)開發(fā)的過程中,flink也踩坑不少,比較典型的下面一些。
并行度太多造成barrier對齊的花費時間更長,有個并行度28的子任務(wù)的對齊時間超過50s;
Valuestate不能跨算子共享;
flink1.3 kafka connector不支持partition增加;
與spring整合,出現(xiàn)handler匹配的問題;
其中比較有意思的是并行度太多,造成barrier對齊花費時間太多的問題。要理解這個問題首先要了解flink在生成checkpoint的過程中,會在source的插入barrier與正常消息一起往下游發(fā)射,算子中等到指定的brrier后會觸發(fā)checkpoint。如下圖所:
這是在一個流的情況下,如果有多個流同時進入一個算子處理就會復(fù)雜一點。flink在做checkpoint的時候,發(fā)現(xiàn)有多個流進入一個算子,先進入這個算子的barrier對應(yīng)的那段消息就會buffer到算子中等待另外的流對應(yīng)的barrier也到達才會觸發(fā)checkpoint,這個buffer再等待的過程稱為checkpoint alignment(barrier對齊),如下圖:
在線上運行的某個程序的一些算子因為barrier對齊的時間超過50s,造成程序 checkpoint超時失敗。對于這個問題,我們的調(diào)優(yōu)策略是兩種,一是盡量減少并行度,就是讓流入一個算子的流盡量少,如果在4個以內(nèi)barrier對齊的時間是比較少的。另外一種方式,使用at least once的語義替換exactly once的語義,這樣checkpoint的時候不會去做barrier對齊,數(shù)據(jù)到了算子馬上做checkpoint并發(fā)送下游。目前 我們的解決辦法是根據(jù)不同的業(yè)務(wù)場景來區(qū)分,如果使用at least once數(shù)據(jù)保證就能滿足業(yè)務(wù)需求的盡量用at least once語義。如果不支持的,就減少并行度以此減少barrier對齊的數(shù)據(jù)量和時間。
通過近段時間的平臺化建設(shè),在”降本增效“方面的收益主要體現(xiàn)在以下幾個方面:
資源利用率提高。目前通過對整個集群的監(jiān)控,在混合部署的情況下平均cpu利用率在20%左右,在某些cpu密集計算的業(yè)務(wù)cpu利用率會更高一些;
開發(fā)效率提升。比如ETL采集程序的開發(fā),傳統(tǒng)開發(fā)采集數(shù)據(jù)、轉(zhuǎn)化、入庫大概需要1天左右時間,通過平臺化的方式開發(fā)簡單的ETL程序在1小時內(nèi)完成開發(fā);
數(shù)據(jù)處理量大。平均每天處理數(shù)據(jù)量在80億條以上;
未來對于flink的規(guī)劃,我們主要還是會圍繞“降本增效,提供統(tǒng)一的計算平臺”為目標,主要聚焦在以下幾個方面:
1 .資源隔離更徹底。目前的資源隔離使用yarn的默認隔離方式只是對內(nèi)存隔離,后續(xù)需要使用yarn+cgroup對內(nèi)存和cpu都做隔離。另外會考慮使用yarn的node label做徹底機器級別隔離,針對不同的業(yè)務(wù)劃分不同類型的機器資源,例如高CPU的任務(wù)對應(yīng)CPU密集型的機器,高IO的任務(wù)對應(yīng)IO比較好的機器;
平臺易用性提高。平臺包括代碼發(fā)布、debug、調(diào)試、監(jiān)控、問題排查,一站式解決問題;
減少Code。通過使用Flink SQL+UDF函數(shù)的方式,將常用的方法和函數(shù)進行封裝,盡量用SQL表達業(yè)務(wù),提高開發(fā)效率。另外也會考慮CEP的模式匹配支持,目前很多業(yè)務(wù)都可以用動態(tài)CEP去支持;
此篇文章,摘自于張皓在 「Flink China社區(qū)線下 Meetup·成都站」 的技術(shù)分享
更多資訊請訪問 Apache Flink 中文社區(qū)網(wǎng)站
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。