這篇文章主要介紹“如何理解微博基于Flink的實時計算平臺建設(shè)”,在日常操作中,相信很多人在如何理解微博基于Flink的實時計算平臺建設(shè)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何理解微博基于Flink的實時計算平臺建設(shè)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
大同網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、自適應(yīng)網(wǎng)站建設(shè)等網(wǎng)站項目制作,到程序開發(fā),運營維護(hù)。創(chuàng)新互聯(lián)成立于2013年到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。
相比于 Spark,目前 Spark 的生態(tài)總體更為完善一些,且在機器學(xué)習(xí)的集成和應(yīng)用性暫時領(lǐng)先。但作為下一代大數(shù)據(jù)引擎的有力競爭者-Flink 在流式計算上有明顯優(yōu)勢,F(xiàn)link 在流式計算里屬于真正意義上的單條處理,每一條數(shù)據(jù)都觸發(fā)計算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協(xié)。Flink 的容錯機制較為輕量,對吞吐量影響較小,而且擁有圖和調(diào)度上的一些優(yōu)化,使得 Flink 可以達(dá)到很高的吞吐量。而 Strom 的容錯機制需要對每條數(shù)據(jù)進(jìn)行 ack,因此其吞吐量瓶頸也是備受詬病。
這里引用一張圖來對常用的實時計算框架做個對比。
cdn.com/3da0ac542030556f0def525ccf6ec7ee9eec5b1f.jpeg">
Flink 是一個開源的分布式實時計算框架。Flink 是有狀態(tài)的和容錯的,可以在維護(hù)一次應(yīng)用程序狀態(tài)的同時無縫地從故障中恢復(fù);它支持大規(guī)模計算能力,能夠在數(shù)千個節(jié)點上并發(fā)運行;它具有很好的吞吐量和延遲特性。同時,F(xiàn)link 提供了多種靈活的窗口函數(shù)。
Flink 檢查點機制能保持 exactly-once 語義的計算。狀態(tài)保持意味著應(yīng)用能夠保存已經(jīng)處理的數(shù)據(jù)集結(jié)果和狀態(tài)。
Flink 支持流處理和窗口事件時間語義。事件時間可以很容易地通過事件到達(dá)的順序和事件可能的到達(dá)延遲流中計算出準(zhǔn)確的結(jié)果。
Flink 支持基于時間、數(shù)目以及會話的非常靈活的窗口機制(window)??梢远ㄖ?window 的觸發(fā)條件來支持更加復(fù)雜的流模式。
Flink 高效的容錯機制允許系統(tǒng)在高吞吐量的情況下支持 exactly-once 語義的計算。Flink 可以準(zhǔn)確、快速地做到從故障中以零數(shù)據(jù)丟失的效果進(jìn)行恢復(fù)。
Flink 具有高吞吐量和低延遲(能快速處理大量數(shù)據(jù))特性。下圖展示了 Apache Flink 和 Apache Storm 完成分布式項目計數(shù)任務(wù)的性能對比。
初期架構(gòu)僅為計算與存儲兩層,新來的計算需求接入后需要新開發(fā)一個實時計算任務(wù)進(jìn)行上線。重復(fù)模塊的代碼復(fù)用率低,重復(fù)率高,計算任務(wù)間的區(qū)別主要是集中在任務(wù)的計算指標(biāo)口徑上。
在存儲層,各個需求方所需求的存儲路徑都不相同,計算指標(biāo)可能在不通的存儲引擎上有重復(fù),有計算資源以及存儲資源上的浪費情況。并且對于指標(biāo)的計算口徑也是僅局限于單個任務(wù)需求里的,不通需求任務(wù)對于相同的指標(biāo)的計算口徑?jīng)]有進(jìn)行統(tǒng)一的限制于保障。各個業(yè)務(wù)方也是在不同的存儲引擎上開發(fā)數(shù)據(jù)獲取服務(wù),對于那些專注于數(shù)據(jù)應(yīng)用本身的團(tuán)隊來說,無疑當(dāng)前模式存在一些弊端。
隨著數(shù)據(jù)體量的增加以及業(yè)務(wù)線的擴(kuò)展,前期架構(gòu)模式的弊端逐步開始顯現(xiàn)。從當(dāng)初單需求單任務(wù)的模式逐步轉(zhuǎn)變?yōu)橥ㄓ玫臄?shù)據(jù)架構(gòu)模式。為此,我們開發(fā)了一些基于 Flink 框架的通用組件來支持?jǐn)?shù)據(jù)的快速接入,并保證代碼模式的統(tǒng)一性和維護(hù)性。在數(shù)據(jù)層,我們基于 Clickhouse 來作為我們數(shù)據(jù)倉庫的計算和存儲引擎,利用其支持多維 OLAP 計算的特性,來處理在多維多指標(biāo)大數(shù)據(jù)量下的快速查詢需求。在數(shù)據(jù)分層上,我們參考與借鑒離線數(shù)倉的經(jīng)驗與方法,構(gòu)建多層實時數(shù)倉服務(wù),并開發(fā)多種微服務(wù)來為數(shù)倉的數(shù)據(jù)聚合,指標(biāo)提取,數(shù)據(jù)出口,數(shù)據(jù)質(zhì)量,報警監(jiān)控等提供支持。
整體架構(gòu)分為五層:
1)接入層:接入原始數(shù)據(jù)進(jìn)行處理,如 Kafka、RabbitMQ、File 等。
2)計算層:選用 Flink 作為實時計算框架,對實時數(shù)據(jù)進(jìn)行清洗,關(guān)聯(lián)等操作。
3)存儲層:對清洗完成的數(shù)據(jù)進(jìn)行數(shù)據(jù)存儲,我們對此進(jìn)行了實時數(shù)倉的模型分層與構(gòu)建,將不同應(yīng)用場景的數(shù)據(jù)分別存儲在如 Clickhouse,Hbase,redis,MySQL 等存儲。服務(wù)中,并抽象公共數(shù)據(jù)層與維度層數(shù)據(jù),分層處理壓縮數(shù)據(jù)并統(tǒng)一數(shù)據(jù)口徑。
4)服務(wù)層:對外提供統(tǒng)一的數(shù)據(jù)查詢服務(wù),支持從底層明細(xì)數(shù)據(jù)到聚合層數(shù)據(jù) 5min/10min/1hour 的多維計算服務(wù)。同時最上層特征指標(biāo)類數(shù)據(jù),如計算層輸入到Redis、Mysql 等也從此數(shù)據(jù)接口進(jìn)行獲取。
5)應(yīng)用層:以統(tǒng)一查詢服務(wù)為支撐對各個業(yè)務(wù)線數(shù)據(jù)場景進(jìn)行支撐。
監(jiān)控報警:對 Flink 任務(wù)的存活狀態(tài)進(jìn)行監(jiān)控,對異常的任務(wù)進(jìn)行郵件報警并根據(jù)設(shè)定的參數(shù)對任務(wù)進(jìn)行自動拉起與恢復(fù)。根據(jù)如 Kafka 消費的 offset 指標(biāo)對消費處理延遲的實時任務(wù)進(jìn)行報警提醒。
數(shù)據(jù)質(zhì)量:監(jiān)控實時數(shù)據(jù)指標(biāo),對歷史的實時數(shù)據(jù)與離線 hive 計算的數(shù)據(jù)定時做對比,提供實時數(shù)據(jù)的數(shù)據(jù)質(zhì)量指標(biāo),對超過閾值的指標(biāo)數(shù)據(jù)進(jìn)行報警。
整體數(shù)據(jù)從原始數(shù)據(jù)接入后經(jīng)過 ETL 處理, 進(jìn)入實時數(shù)倉底層數(shù)據(jù)表,經(jīng)過配置化聚合微服務(wù)組件向上進(jìn)行分層數(shù)據(jù)的聚合。根據(jù)不同業(yè)務(wù)的指標(biāo)需求也可通過特征抽取微服務(wù)直接配置化從數(shù)倉中抽取到如 Redis、ES、Mysql 中進(jìn)行獲取。大部分的數(shù)據(jù)需求可通過統(tǒng)一數(shù)據(jù)服務(wù)接口進(jìn)行獲取。
原始日志數(shù)據(jù)因為各業(yè)務(wù)日志的不同,所擁有的維度或指標(biāo)數(shù)據(jù)并不完整。所以需要進(jìn)行實時的日志的關(guān)聯(lián)才能獲取不同維度條件下的指標(biāo)數(shù)據(jù)查詢結(jié)果。并且關(guān)聯(lián)日志的回傳周期不同,有在 10min 之內(nèi)完成 95% 以上回傳的業(yè)務(wù)日志,也有類似于激活日志等依賴第三方回傳的有任務(wù)日志,延遲窗口可能大于1天。
并且最大日志關(guān)聯(lián)任務(wù)的日均數(shù)據(jù)量在 10 億級別以上,如何快速處理與構(gòu)建實時關(guān)聯(lián)任務(wù)的問題首先擺在我們面前。對此我們基于 Flink 框架開發(fā)了配置化關(guān)聯(lián)組件。對于不同關(guān)聯(lián)日志的指標(biāo)抽取,我們也開發(fā)了配置化指標(biāo)抽取組件用于快速提取復(fù)雜的日志格式。以上兩個自研組件會在后面的內(nèi)容里再做詳細(xì)介紹。
對于回傳晚的日志,我們在關(guān)聯(lián)窗口內(nèi)未取得關(guān)聯(lián)結(jié)果。我們采用實時+離線的方式進(jìn)行數(shù)據(jù)回刷補全。實時處理的日志我們會將未關(guān)聯(lián)的原始日志輸出到另外一個暫存地(Kafka),同時不斷消費處理這個未關(guān)聯(lián)的日志集合,設(shè)定超時重關(guān)聯(lián)次數(shù)與超時重關(guān)聯(lián)時間,超過所設(shè)定任意閾值后,便再進(jìn)行重關(guān)聯(lián)。離線部分,我們采用 Hive 計算昨日全天日志與 N 天內(nèi)的全量被關(guān)聯(lián)日志表進(jìn)行關(guān)聯(lián),將最終的結(jié)果回寫進(jìn)去,替換實時所計算的昨日關(guān)聯(lián)數(shù)據(jù)。
① Operator Chain
為了更高效地分布式執(zhí)行,F(xiàn)link 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個 task 在一個線程中執(zhí)行。將 operators 鏈接成 task 是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時提高整體的吞吐量。
Flink 會在生成 JobGraph 階段,將代碼中可以優(yōu)化的算子優(yōu)化成一個算子鏈(Operator Chains)以放到一個 task(一個線程)中執(zhí)行,以減少線程之間的切換和緩沖的開銷,提高整體的吞吐量和延遲。下面以官網(wǎng)中的例子進(jìn)行說明。
圖中棕色的長條表示等待時間,可以發(fā)現(xiàn)網(wǎng)絡(luò)等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以并發(fā)地處理多個請求和回復(fù)。也就是說,你可以連續(xù)地向數(shù)據(jù)庫發(fā)送用戶 a、b、c 等的請求,與此同時,哪個請求的回復(fù)先返回了就處理哪個回復(fù),從而連續(xù)的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現(xiàn)原理。
③ Checkpoint 優(yōu)化
Flink 實現(xiàn)了一套強大的 checkpoint 機制,使它在獲取高吞吐量性能的同時,也能保證 Exactly Once 級別的快速恢復(fù)。
首先提升各節(jié)點 checkpoint 的性能考慮的就是存儲引擎的執(zhí)行效率。Flink 官方支持的三種 checkpoint state 存儲方案中,Memory 僅用于調(diào)試級別,無法做故障后的數(shù)據(jù)恢復(fù)。其次還有 Hdfs 與 Rocksdb,當(dāng)所做 Checkpoint 的數(shù)據(jù)大小較大時,可以考慮采用 Rocksdb 來作為 checkpoint 的存儲以提升效率。
其次的思路是資源設(shè)置,我們都知道 checkpoint 機制是在每個 task 上都會進(jìn)行,那么當(dāng)總的狀態(tài)數(shù)據(jù)大小不變的情況下,如何分配減少單個 task 所分的 checkpoint 數(shù)據(jù)變成了提升 checkpoint 執(zhí)行效率的關(guān)鍵。
最后,增量快照. 非增量快照下,每次 checkpoint 都包含了作業(yè)所有狀態(tài)數(shù)據(jù)。而大部分場景下,前后 checkpoint 里,數(shù)據(jù)發(fā)生變更的部分相對很少,所以設(shè)置增量 checkpoint,僅會對上次 checkpoint 和本次 checkpoint 之間狀態(tài)的差異進(jìn)行存儲計算,減少了 checkpoint 的耗時。
在任務(wù)執(zhí)行過程中,會遇到各種各樣的問題,導(dǎo)致任務(wù)異常甚至失敗。所以如何做好異常情況下的恢復(fù)工作顯得異常重要。
① 設(shè)定重啟策略
Flink 支持不同的重啟策略,以在故障發(fā)生時控制作業(yè)如何重啟。集群在啟動時會伴隨一個默認(rèn)的重啟策略,在沒有定義具體重啟策略時會使用該默認(rèn)策略。如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認(rèn)策略。
默認(rèn)的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個策略被使用。
常用的重啟策略:
固定間隔(Fixed delay);
失敗率(Failure rate);
無重啟(No restart)。
② 設(shè)置 HA
Flink 在任務(wù)啟動時指定 HA 配置主要是為了利用 Zookeeper 在所有運行的 JobManager 實例之間進(jìn)行分布式協(xié)調(diào) .Zookeeper 通過 leader 選取和輕量級一致性的狀態(tài)存儲來提供高可用的分布式協(xié)調(diào)服務(wù)。
③ 任務(wù)監(jiān)控報警平臺
在實際環(huán)境中,我們遇見過因為集群狀態(tài)不穩(wěn)定而導(dǎo)致的任務(wù)失敗。在 Flink 1.6 版本中,甚至遇見過任務(wù)出現(xiàn)假死的情況,也就是 Yarn 上的 job 資源依然存在,而 Flink 任務(wù)實際已經(jīng)死亡。為了監(jiān)測與恢復(fù)這些異常的任務(wù),并且對實時任務(wù)做統(tǒng)一的提交、報警監(jiān)控、任務(wù)恢復(fù)等管理,我們開發(fā)了任務(wù)提交與管理平臺。通過 Shell 拉取 Yarn 上 Running 狀態(tài)與 Flink Job 狀態(tài)的列表進(jìn)行對比,心跳監(jiān)測平臺上的所有任務(wù),并進(jìn)行告警、自動恢復(fù)等操作。
④ 作業(yè)指標(biāo)監(jiān)控
Flink 任務(wù)在運行過程中,各 Operator 都會產(chǎn)生各自的指標(biāo)數(shù)據(jù),例如,Source 會產(chǎn)出 numRecordIn、numRecordsOut 等各項指標(biāo)信息,我們會將這些指標(biāo)信息進(jìn)行收集,并展示在我們的可視化平臺上。指標(biāo)平臺如下圖:
⑤ 任務(wù)運行節(jié)點監(jiān)控
我們的 Flink 任務(wù)都是運行在 Yarn 上,針對每一個運行的作業(yè),我們需要監(jiān)控其運行環(huán)境。會收集 JobManager 及 TaskManager 的各項指標(biāo)。收集的指標(biāo)有 jobmanager-fullgc-count、jobmanager-younggc-count、jobmanager-fullgc-time、jobmanager-younggc-time、taskmanager-fullgc-count、taskmanager-younggc-count、taskmanager-fullgc-time、taskmanager-younggc-time 等,用于判斷任務(wù)運行環(huán)境的健康度,及用于排查可能出現(xiàn)的問題。監(jiān)控界面如下:
從 Flink 的官方文檔,我們知道 Flink 的編程模型分為四層,sql 是最高層的 api, Table api 是中間層,DataSteam/DataSet Api 是核心,stateful Streaming process 層是底層實現(xiàn)。
剛開始我們直接使用 Flink Table 做為數(shù)據(jù)關(guān)聯(lián)的方式,直接將接入進(jìn)來的 DataStream 注冊為 Dynamic Table 后進(jìn)行兩表關(guān)聯(lián)查詢,如下圖:
但嘗試后發(fā)現(xiàn)在做那些日志數(shù)據(jù)量大的關(guān)聯(lián)查詢時往往只能在較小的時間窗口內(nèi)做查詢,否則會超過 datanode 節(jié)點單臺內(nèi)存限制,產(chǎn)生異常。但為了滿足不同業(yè)務(wù)日志延遲到達(dá)的情況,這種實現(xiàn)方式并不通用。
之后,我們直接在 DataStream 上進(jìn)行處理,在 CountWindow 窗口內(nèi)進(jìn)行關(guān)聯(lián)操作,將被關(guān)聯(lián)的數(shù)據(jù) Hash 打散后存儲在各個 datanode 節(jié)點的 Rocksdb 中,利用 Flink State 原生支持 Rocksdb 做 Checkpoint 這一特性進(jìn)行算子內(nèi)數(shù)據(jù)的備份與恢復(fù)。這種方式是可行的,但受制于 Rocksdb 集群物理磁盤為非 SSD 的因素,這種方式在我們的實際線上場景中關(guān)聯(lián)耗時較高。
如 Redis 類的 KV 存儲的確在查詢速度上提升不少,但類似廣告日志數(shù)據(jù)這樣單條日志大小較大的情況,會占用不少寶貴的機器內(nèi)存資源。經(jīng)過調(diào)研后,我們選取了 Hbase 作為我們?nèi)罩娟P(guān)聯(lián)組件的關(guān)聯(lián)數(shù)據(jù)存儲方案。
為了快速構(gòu)建關(guān)聯(lián)任務(wù),我們開發(fā)了基于 Flink 的配置化組件平臺,提交配置文件即可生成數(shù)據(jù)關(guān)聯(lián)任務(wù)并自動提交到集群。下圖是任務(wù)執(zhí)行的處理流程。
示意圖如下:
下圖是關(guān)聯(lián)組件內(nèi)的執(zhí)行流程圖:
隨著日志量的增加,某些需要進(jìn)行關(guān)聯(lián)的日志數(shù)量可能達(dá)到日均十幾億甚至幾十億的量級。前期關(guān)聯(lián)組件的配置化生成任務(wù)的方式的確解決了大部分線上業(yè)務(wù)需求,但隨著進(jìn)一步的關(guān)聯(lián)需求增加,Hbase 面臨著巨大的查詢壓力。在我們對 Hbase 表包括 rowkey 等一系列完成優(yōu)化之后,我們開始了對關(guān)聯(lián)組件的迭代與優(yōu)化。
第一步,減少 Hbase 的查詢。我們使用 Flink Interval Join 的方式,先將大部分關(guān)聯(lián)需求在程序內(nèi)部完成,只有少部分仍需查詢的日志會去查詢外部存儲(Hbase). 經(jīng)驗證,以請求日志與實驗日志關(guān)聯(lián)為例,對于設(shè)置 Interval Join 窗口在 10s 左右即可減少 80% 的 hbase 查詢請求。
① Interval Join 的語義示意圖
數(shù)據(jù) JOIN 的區(qū)間 - 比如時間為 3 的 EXP 會在 IMP 時間為[2, 4]區(qū)間進(jìn)行JOIN;
WaterMark - 比如圖示 EXP 一條數(shù)據(jù)時間是 3,IMP 一條數(shù)據(jù)時間是 5,那么WaterMark是根據(jù)實際最小值減去 UpperBound 生成,即:Min(3,5)-1 = 2;
過期數(shù)據(jù) - 出于性能和存儲的考慮,要將過期數(shù)據(jù)清除,如圖當(dāng) WaterMark 是 2 的時候時間為 2 以前的數(shù)據(jù)過期了,可以被清除。
② Interval Join 內(nèi)部實現(xiàn)邏輯
③ Interval Join 改造
因 Flink 原生的 Intervak Join 實現(xiàn)的是 Inner Join,而我們業(yè)務(wù)中所需要的是 Left Join,具體改造如下:
取消右側(cè)數(shù)據(jù)流的 join 標(biāo)志位;
左側(cè)數(shù)據(jù)流有 join 數(shù)據(jù)時不存 state。
2)關(guān)聯(lián)率動態(tài)監(jiān)控
在任務(wù)執(zhí)行中,往往會出現(xiàn)意想不到的情況,比如被關(guān)聯(lián)的數(shù)據(jù)日志出現(xiàn)缺失,或者日志格式錯誤引發(fā)的異常,造成關(guān)聯(lián)任務(wù)的關(guān)聯(lián)率下降嚴(yán)重。那么此時關(guān)聯(lián)任務(wù)雖然繼續(xù)在運行,但對于整體數(shù)據(jù)質(zhì)量的意義不大,甚至是反向作用。在任務(wù)進(jìn)行恢復(fù)的時,還需要清除異常區(qū)間內(nèi)的數(shù)據(jù),將 Kafka Offset 設(shè)置到異常前的位置再進(jìn)行處理。
故我們在關(guān)聯(lián)組件的優(yōu)化中,加入了動態(tài)監(jiān)控,下面示意圖:
關(guān)聯(lián)任務(wù)中定時探測指定時間范圍 Hbase 是否有最新數(shù)據(jù)寫入,如果沒有,說明寫 Hbase 任務(wù)出現(xiàn)問題,則終止關(guān)聯(lián)任務(wù);
當(dāng)寫 Hbase 任務(wù)出現(xiàn)堆積時,相應(yīng)的會導(dǎo)致關(guān)聯(lián)率下降,當(dāng)關(guān)聯(lián)率低于指定閾值時終止關(guān)聯(lián)任務(wù);
當(dāng)關(guān)聯(lián)任務(wù)終止時會發(fā)出告警,修復(fù)上游任務(wù)后可重新恢復(fù)關(guān)聯(lián)任務(wù),保證關(guān)聯(lián)數(shù)據(jù)不丟失。
為了快速進(jìn)行日志數(shù)據(jù)的指標(biāo)抽取,我們開發(fā)了基于 Flink 計算平臺的指標(biāo)抽取組件Logwash。封裝了基于 Freemaker 的模板引擎做為日志格式的解析模塊,對日志進(jìn)行提取,算術(shù)運算,條件判斷,替換,循環(huán)遍歷等操作。
下圖是 Logwash 組件的處理流程:
組件支持文本與 Json 兩種類型日志進(jìn)行解析提取,目前該清洗組件已支持微博廣告近百個實時清洗需求,提供給運維組等第三方非實時計算方向人員快速進(jìn)行提取日志的能力。
配置文件部分示例:
Flink 中 DataStream 的開發(fā),對于通用的邏輯及相同的代碼進(jìn)行了抽取,生成了我們的通用組件庫 FlinkStream。FlinkStream 包括了對 Topology 的抽象及默認(rèn)實現(xiàn)、對 Stream 的抽象及默認(rèn)實現(xiàn)、對 Source 的抽象和某些實現(xiàn)、對 Operator 的抽象及某些實現(xiàn)、Sink 的抽象及某些實現(xiàn)。任務(wù)提交統(tǒng)一使用可執(zhí)行 Jar 和配置文件,Jar 會讀取配置文件構(gòu)建對應(yīng)的拓?fù)鋱D。
對于 Source 進(jìn)行抽象,創(chuàng)建抽象類及對應(yīng)接口,對于 Flink Connector 中已有的實現(xiàn),例如 kafka,Elasticsearch 等,直接創(chuàng)建新 class 并繼承接口,實現(xiàn)對應(yīng)的方法即可。對于需要自己去實現(xiàn)的 connector,直接繼承抽象類及對應(yīng)接口,實現(xiàn)方法即可。目前只實現(xiàn)了 KafkaSource。
與 Source 抽象類似,我們實現(xiàn)了基于 Stream 到 Stream 級別的 Operator 抽象。創(chuàng)建抽象 Operate 類,抽象 Transform 方法。對于要實現(xiàn)的 Transform 操作,直接繼承抽象類,實現(xiàn)其抽象方法即可。目前實現(xiàn)的 Operator,直接按照文檔使用。如下:
針對 Sink,我們同樣創(chuàng)建了抽象類及接口。對 Flink Connector 中已有的 Sink 進(jìn)行封裝。目前可通過配置進(jìn)行數(shù)據(jù)輸出的 Sink。目前以實現(xiàn)和封裝的 Sink 組件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。
創(chuàng)建 Stream 抽象類及抽象方法 buildStream,用于構(gòu)建 StreamGraph。我們實現(xiàn)了默認(rèn)的 Stream,buildStream 方法讀取 Source 配置生成 DataStream,通過 Operator 配置列表按順序生成拓?fù)鋱D,通過 Sink 配置生成數(shù)據(jù)寫出組件。
對于單 Stream,要處理的邏輯可能比較簡單,主要讀取一個 Source 進(jìn)行數(shù)據(jù)的各種操作并輸出。對于復(fù)雜的多 Stream 業(yè)務(wù)需求,比如多流 Join,多流 Union、Split 流等,因此我們多流業(yè)務(wù)進(jìn)行了抽象,產(chǎn)生了 Topology。在 Topology 這一層可以對多流進(jìn)行配置化操作。對于通用的操作,我們實現(xiàn)了默認(rèn) Topology,直接通過配置文件就可以實現(xiàn)業(yè)務(wù)需求。對于比較復(fù)雜的業(yè)務(wù)場景,用戶可以自己實現(xiàn) Topology。
我們對抽象的組件都是可配置化的,直接通過編寫配置文件,構(gòu)造任務(wù)的運行拓?fù)浣Y(jié)構(gòu),啟動任務(wù)時指定配置文件。
正文文本框 Flink Environment 配置化,包括時間處理類型、重啟策略,checkpoint 等;
Topology 配置化,可配置不同 Stream 之間的處理邏輯與 Sink;
Stream 配置化,可配置 Source,Operator 列表,Sink。
配置示例如下:
run_env: timeCharacteristic: "ProcessingTime" #ProcessingTime\IngestionTime\EventTime restart: # 重啟策略配置 type: # noRestart, fixedDelayRestart, fallBackRestart, failureRateRestart checkpoint: # 開啟checkpoint type: "rocksdb" # streams: impStream: #粉絲經(jīng)濟(jì)曝光日志 type: "DefaultStream" config: source: type: "Kafka011" # 源是kafka011版本 config: parallelism: 20 operates: - type: "StringToMap" config: - type: "SplitElement" config: ... - type: "SelectElement" config: transforms: - type: "KeyBy" config: - type: "CountWindowWithTimeOut" #Window需要和KeyBy組合使用 config: - type: "SplitStream" config: - type: "SelectStream" config: sink: - type: Kafka config: - type: Kafka config:
在實時任務(wù)管理平臺,新建任務(wù),填寫任務(wù)名稱,選擇任務(wù)類型(Flink)及版本,上傳可執(zhí)行 Jar 文件,導(dǎo)入配置或者手動編寫配置,填寫 JobManager 及 TaskManager 內(nèi)存配置,填寫并行度配置,選擇是否重試,選擇是否從 checkpoint 恢復(fù)等選項,保存后即可在任務(wù)列表中啟動任務(wù),并觀察啟動日志用于排查啟動錯誤。
SQL 語言是一門聲明式的,簡單的,靈活的語言,F(xiàn)link 本身提供了對 SQL 的支持。Flink 1.6 版本和 1.8 版本對 SQL 語言的支持有限,不支持建表語句,不支持對外部數(shù)據(jù)的關(guān)聯(lián)操作。因此我們通過 Apache Calcite 對 Flink SQL API 進(jìn)行了擴(kuò)展,用戶只需要關(guān)心業(yè)務(wù)需求怎么用 SQL 語言來表達(dá)即可。
擴(kuò)展了支持創(chuàng)建源表 SQL,通過解析 SQL 語句,獲取數(shù)據(jù)源配置信息,創(chuàng)建對應(yīng)的 TableSource 實例,并將其注冊到 Flink environment。示例如下:
使用 Apache Calcite 對 SQL 進(jìn)行解析,通過維表關(guān)鍵字識別維表,使用 RichAsyncFunction 算子異步讀取維表數(shù)據(jù),并通過 flatMap 操作生成關(guān)聯(lián)后的 DataStream,然后轉(zhuǎn)換為 Table 注冊到 Flink Environment。示例如下:
使用 SQLQuery 方法,支持從上一層表或者視圖中創(chuàng)建視圖表,并將新的視圖表注冊到 Flink Environment。創(chuàng)建語句需要按照順序?qū)?,比?myView2 是從視圖 myView1 中創(chuàng)建的,則 myView1 創(chuàng)建語句要在myView2語句前面。如下:
支持創(chuàng)建結(jié)果表,通過解析 SQL 語句,獲取配置信息,創(chuàng)建對應(yīng)的 AppendStreamTableSink 或者 UpsertStreamTableSink 實例,并將其注冊到 Flink Environment。示例如下:
支持自定義 UDF 函數(shù),繼承 ScalarFunction 或者 TableFunction。在 resources 目錄下有相應(yīng)的 UDF 資源配置文件,默認(rèn)會注冊全部可執(zhí)行 Jar 包中配置的 UDF。直接按照使用方法使用即可。
部署方式同 Flink Stream 組件。
為了保證實時數(shù)據(jù)的統(tǒng)一對外出口以及保證數(shù)據(jù)指標(biāo)的統(tǒng)一口徑,我們根據(jù)業(yè)界離線數(shù)倉的經(jīng)驗來設(shè)計與構(gòu)架微博廣告實時數(shù)倉。
數(shù)據(jù)倉庫分為三層,自下而上為:數(shù)據(jù)引入層(ODS,Operation Data Store)、數(shù)據(jù)公共層(CDM,Common Data Model)和數(shù)據(jù)應(yīng)用層(ADS,Application Data Service)。
數(shù)據(jù)引入層(ODS,Operation Data Store):將原始數(shù)據(jù)幾乎無處理的存放在數(shù)據(jù)倉庫系統(tǒng),結(jié)構(gòu)上與源系統(tǒng)基本保持一致,是數(shù)據(jù)倉庫的數(shù)據(jù)準(zhǔn)。
數(shù)據(jù)公共層(CDM,Common Data Model,又稱通用數(shù)據(jù)模型層):包含 DIM 維度表、DWD 和 DWS,由 ODS 層數(shù)據(jù)加工而成。主要完成數(shù)據(jù)加工與整合,建立一致性的維度,構(gòu)建可復(fù)用的面向分析和統(tǒng)計的明細(xì)事實表,以及匯總公共粒度的指標(biāo)。
公共維度層(DIM):基于維度建模理念思想,建立整個企業(yè)的一致性維度。降低數(shù)據(jù)計算口徑和算法不統(tǒng)一風(fēng)險。
公共維度層的表通常也被稱為邏輯維度表,維度和維度邏輯表通常一一對應(yīng)。
公共匯總粒度事實層(DWS,Data Warehouse Service):以分析的主題對象作為建模驅(qū)動,基于上層的應(yīng)用和產(chǎn)品的指標(biāo)需求,構(gòu)建公共粒度的匯總指標(biāo)事實表,以寬表化手段物理化模型。構(gòu)建命名規(guī)范、口徑一致的統(tǒng)計指標(biāo),為上層提供公共指標(biāo),建立匯總寬表、明細(xì)事實表。
公共匯總粒度事實層的表通常也被稱為匯總邏輯表,用于存放派生指標(biāo)數(shù)據(jù)。
明細(xì)粒度事實層(DWD,Data Warehouse Detail):以業(yè)務(wù)過程作為建模驅(qū)動,基于每個具體的業(yè)務(wù)過程特點,構(gòu)建最細(xì)粒度的明細(xì)層事實表。可以結(jié)合企業(yè)的數(shù)據(jù)使用特點,將明細(xì)事實表的某些重要維度屬性字段做適當(dāng)冗余,也即寬表化處理。
明細(xì)粒度事實層的表通常也被稱為邏輯事實表。
數(shù)據(jù)應(yīng)用層(ADS,Application Data Service):存放數(shù)據(jù)產(chǎn)品個性化的統(tǒng)計指標(biāo)數(shù)據(jù)。根據(jù) CDM 與 ODS 層加工生成。
對于原始日志數(shù)據(jù),ODS 層幾乎是每條日志抽取字段后進(jìn)行保留,這樣便能對問題的回溯與追蹤。在 CDM 層對 ODS 的數(shù)據(jù)僅做時間粒度上的數(shù)據(jù)壓縮,也就是在指定時間切分窗口里,對所有維度下的指標(biāo)做聚合操作,而不涉及業(yè)務(wù)性的操作。在 ADS 層,我們會有配置化抽取微服務(wù),對底層數(shù)據(jù)做定制化計算和提取,輸出到用戶指定的存儲服務(wù)里。
到此,關(guān)于“如何理解微博基于Flink的實時計算平臺建設(shè)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章名稱:如何理解微博基于Flink的實時計算平臺建設(shè)
本文網(wǎng)址:http://weahome.cn/article/jchije.html