真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何理解微博基于Flink的實時計算平臺建設(shè)

這篇文章主要介紹“如何理解微博基于Flink的實時計算平臺建設(shè)”,在日常操作中,相信很多人在如何理解微博基于Flink的實時計算平臺建設(shè)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何理解微博基于Flink的實時計算平臺建設(shè)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

大同網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、自適應(yīng)網(wǎng)站建設(shè)等網(wǎng)站項目制作,到程序開發(fā),運營維護(hù)。創(chuàng)新互聯(lián)成立于2013年到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)

一.技術(shù)選型

相比于 Spark,目前 Spark 的生態(tài)總體更為完善一些,且在機器學(xué)習(xí)的集成和應(yīng)用性暫時領(lǐng)先。但作為下一代大數(shù)據(jù)引擎的有力競爭者-Flink 在流式計算上有明顯優(yōu)勢,F(xiàn)link 在流式計算里屬于真正意義上的單條處理,每一條數(shù)據(jù)都觸發(fā)計算,而不是像 Spark 一樣的 Mini Batch 作為流式處理的妥協(xié)。Flink 的容錯機制較為輕量,對吞吐量影響較小,而且擁有圖和調(diào)度上的一些優(yōu)化,使得 Flink 可以達(dá)到很高的吞吐量。而 Strom 的容錯機制需要對每條數(shù)據(jù)進(jìn)行 ack,因此其吞吐量瓶頸也是備受詬病。

這里引用一張圖來對常用的實時計算框架做個對比。

如何理解微博基于Flink的實時計算平臺建設(shè)cdn.com/3da0ac542030556f0def525ccf6ec7ee9eec5b1f.jpeg">

Flink 特點

Flink 是一個開源的分布式實時計算框架。Flink 是有狀態(tài)的和容錯的,可以在維護(hù)一次應(yīng)用程序狀態(tài)的同時無縫地從故障中恢復(fù);它支持大規(guī)模計算能力,能夠在數(shù)千個節(jié)點上并發(fā)運行;它具有很好的吞吐量和延遲特性。同時,F(xiàn)link 提供了多種靈活的窗口函數(shù)。

1)狀態(tài)管理機制

Flink 檢查點機制能保持 exactly-once 語義的計算。狀態(tài)保持意味著應(yīng)用能夠保存已經(jīng)處理的數(shù)據(jù)集結(jié)果和狀態(tài)。

2)事件機制

Flink 支持流處理和窗口事件時間語義。事件時間可以很容易地通過事件到達(dá)的順序和事件可能的到達(dá)延遲流中計算出準(zhǔn)確的結(jié)果。

如何理解微博基于Flink的實時計算平臺建設(shè)

3)窗口機制

Flink 支持基于時間、數(shù)目以及會話的非常靈活的窗口機制(window)??梢远ㄖ?window 的觸發(fā)條件來支持更加復(fù)雜的流模式。

4)容錯機制

Flink 高效的容錯機制允許系統(tǒng)在高吞吐量的情況下支持 exactly-once 語義的計算。Flink 可以準(zhǔn)確、快速地做到從故障中以零數(shù)據(jù)丟失的效果進(jìn)行恢復(fù)。

如何理解微博基于Flink的實時計算平臺建設(shè)

5)高吞吐、低延遲

Flink 具有高吞吐量和低延遲(能快速處理大量數(shù)據(jù))特性。下圖展示了 Apache Flink 和 Apache Storm 完成分布式項目計數(shù)任務(wù)的性能對比。

二.架構(gòu)演變

初期架構(gòu)

初期架構(gòu)僅為計算與存儲兩層,新來的計算需求接入后需要新開發(fā)一個實時計算任務(wù)進(jìn)行上線。重復(fù)模塊的代碼復(fù)用率低,重復(fù)率高,計算任務(wù)間的區(qū)別主要是集中在任務(wù)的計算指標(biāo)口徑上。

在存儲層,各個需求方所需求的存儲路徑都不相同,計算指標(biāo)可能在不通的存儲引擎上有重復(fù),有計算資源以及存儲資源上的浪費情況。并且對于指標(biāo)的計算口徑也是僅局限于單個任務(wù)需求里的,不通需求任務(wù)對于相同的指標(biāo)的計算口徑?jīng)]有進(jìn)行統(tǒng)一的限制于保障。各個業(yè)務(wù)方也是在不同的存儲引擎上開發(fā)數(shù)據(jù)獲取服務(wù),對于那些專注于數(shù)據(jù)應(yīng)用本身的團(tuán)隊來說,無疑當(dāng)前模式存在一些弊端。

如何理解微博基于Flink的實時計算平臺建設(shè)

后期架構(gòu)

隨著數(shù)據(jù)體量的增加以及業(yè)務(wù)線的擴(kuò)展,前期架構(gòu)模式的弊端逐步開始顯現(xiàn)。從當(dāng)初單需求單任務(wù)的模式逐步轉(zhuǎn)變?yōu)橥ㄓ玫臄?shù)據(jù)架構(gòu)模式。為此,我們開發(fā)了一些基于 Flink 框架的通用組件來支持?jǐn)?shù)據(jù)的快速接入,并保證代碼模式的統(tǒng)一性和維護(hù)性。在數(shù)據(jù)層,我們基于 Clickhouse 來作為我們數(shù)據(jù)倉庫的計算和存儲引擎,利用其支持多維 OLAP 計算的特性,來處理在多維多指標(biāo)大數(shù)據(jù)量下的快速查詢需求。在數(shù)據(jù)分層上,我們參考與借鑒離線數(shù)倉的經(jīng)驗與方法,構(gòu)建多層實時數(shù)倉服務(wù),并開發(fā)多種微服務(wù)來為數(shù)倉的數(shù)據(jù)聚合,指標(biāo)提取,數(shù)據(jù)出口,數(shù)據(jù)質(zhì)量,報警監(jiān)控等提供支持。

整體架構(gòu)分為五層:

1)接入層:接入原始數(shù)據(jù)進(jìn)行處理,如 Kafka、RabbitMQ、File 等。

2)計算層:選用 Flink 作為實時計算框架,對實時數(shù)據(jù)進(jìn)行清洗,關(guān)聯(lián)等操作。

3)存儲層:對清洗完成的數(shù)據(jù)進(jìn)行數(shù)據(jù)存儲,我們對此進(jìn)行了實時數(shù)倉的模型分層與構(gòu)建,將不同應(yīng)用場景的數(shù)據(jù)分別存儲在如 Clickhouse,Hbase,redis,MySQL 等存儲。服務(wù)中,并抽象公共數(shù)據(jù)層與維度層數(shù)據(jù),分層處理壓縮數(shù)據(jù)并統(tǒng)一數(shù)據(jù)口徑。

4)服務(wù)層:對外提供統(tǒng)一的數(shù)據(jù)查詢服務(wù),支持從底層明細(xì)數(shù)據(jù)到聚合層數(shù)據(jù) 5min/10min/1hour 的多維計算服務(wù)。同時最上層特征指標(biāo)類數(shù)據(jù),如計算層輸入到Redis、Mysql 等也從此數(shù)據(jù)接口進(jìn)行獲取。

5)應(yīng)用層:以統(tǒng)一查詢服務(wù)為支撐對各個業(yè)務(wù)線數(shù)據(jù)場景進(jìn)行支撐。

  • 監(jiān)控報警:對 Flink 任務(wù)的存活狀態(tài)進(jìn)行監(jiān)控,對異常的任務(wù)進(jìn)行郵件報警并根據(jù)設(shè)定的參數(shù)對任務(wù)進(jìn)行自動拉起與恢復(fù)。根據(jù)如 Kafka 消費的 offset 指標(biāo)對消費處理延遲的實時任務(wù)進(jìn)行報警提醒。

  • 數(shù)據(jù)質(zhì)量:監(jiān)控實時數(shù)據(jù)指標(biāo),對歷史的實時數(shù)據(jù)與離線 hive 計算的數(shù)據(jù)定時做對比,提供實時數(shù)據(jù)的數(shù)據(jù)質(zhì)量指標(biāo),對超過閾值的指標(biāo)數(shù)據(jù)進(jìn)行報警。

三.數(shù)據(jù)處理流程

1.整體流程

整體數(shù)據(jù)從原始數(shù)據(jù)接入后經(jīng)過 ETL 處理, 進(jìn)入實時數(shù)倉底層數(shù)據(jù)表,經(jīng)過配置化聚合微服務(wù)組件向上進(jìn)行分層數(shù)據(jù)的聚合。根據(jù)不同業(yè)務(wù)的指標(biāo)需求也可通過特征抽取微服務(wù)直接配置化從數(shù)倉中抽取到如 Redis、ES、Mysql 中進(jìn)行獲取。大部分的數(shù)據(jù)需求可通過統(tǒng)一數(shù)據(jù)服務(wù)接口進(jìn)行獲取。

如何理解微博基于Flink的實時計算平臺建設(shè)

2.問題與挑戰(zhàn)

原始日志數(shù)據(jù)因為各業(yè)務(wù)日志的不同,所擁有的維度或指標(biāo)數(shù)據(jù)并不完整。所以需要進(jìn)行實時的日志的關(guān)聯(lián)才能獲取不同維度條件下的指標(biāo)數(shù)據(jù)查詢結(jié)果。并且關(guān)聯(lián)日志的回傳周期不同,有在 10min 之內(nèi)完成 95% 以上回傳的業(yè)務(wù)日志,也有類似于激活日志等依賴第三方回傳的有任務(wù)日志,延遲窗口可能大于1天。

并且最大日志關(guān)聯(lián)任務(wù)的日均數(shù)據(jù)量在 10 億級別以上,如何快速處理與構(gòu)建實時關(guān)聯(lián)任務(wù)的問題首先擺在我們面前。對此我們基于 Flink 框架開發(fā)了配置化關(guān)聯(lián)組件。對于不同關(guān)聯(lián)日志的指標(biāo)抽取,我們也開發(fā)了配置化指標(biāo)抽取組件用于快速提取復(fù)雜的日志格式。以上兩個自研組件會在后面的內(nèi)容里再做詳細(xì)介紹。

1)回傳周期超過關(guān)聯(lián)窗口的日志如何處理?

對于回傳晚的日志,我們在關(guān)聯(lián)窗口內(nèi)未取得關(guān)聯(lián)結(jié)果。我們采用實時+離線的方式進(jìn)行數(shù)據(jù)回刷補全。實時處理的日志我們會將未關(guān)聯(lián)的原始日志輸出到另外一個暫存地(Kafka),同時不斷消費處理這個未關(guān)聯(lián)的日志集合,設(shè)定超時重關(guān)聯(lián)次數(shù)與超時重關(guān)聯(lián)時間,超過所設(shè)定任意閾值后,便再進(jìn)行重關(guān)聯(lián)。離線部分,我們采用 Hive 計算昨日全天日志與 N 天內(nèi)的全量被關(guān)聯(lián)日志表進(jìn)行關(guān)聯(lián),將最終的結(jié)果回寫進(jìn)去,替換實時所計算的昨日關(guān)聯(lián)數(shù)據(jù)。

2)如何提高 Flink 任務(wù)性能?

① Operator Chain

為了更高效地分布式執(zhí)行,F(xiàn)link 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個 task 在一個線程中執(zhí)行。將 operators 鏈接成 task 是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時提高整體的吞吐量。

Flink 會在生成 JobGraph 階段,將代碼中可以優(yōu)化的算子優(yōu)化成一個算子鏈(Operator Chains)以放到一個 task(一個線程)中執(zhí)行,以減少線程之間的切換和緩沖的開銷,提高整體的吞吐量和延遲。下面以官網(wǎng)中的例子進(jìn)行說明。

如何理解微博基于Flink的實時計算平臺建設(shè)

圖中棕色的長條表示等待時間,可以發(fā)現(xiàn)網(wǎng)絡(luò)等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以并發(fā)地處理多個請求和回復(fù)。也就是說,你可以連續(xù)地向數(shù)據(jù)庫發(fā)送用戶 a、b、c 等的請求,與此同時,哪個請求的回復(fù)先返回了就處理哪個回復(fù),從而連續(xù)的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現(xiàn)原理。

③ Checkpoint 優(yōu)化

Flink 實現(xiàn)了一套強大的 checkpoint 機制,使它在獲取高吞吐量性能的同時,也能保證 Exactly Once 級別的快速恢復(fù)。

首先提升各節(jié)點 checkpoint 的性能考慮的就是存儲引擎的執(zhí)行效率。Flink 官方支持的三種 checkpoint state 存儲方案中,Memory 僅用于調(diào)試級別,無法做故障后的數(shù)據(jù)恢復(fù)。其次還有 Hdfs 與 Rocksdb,當(dāng)所做 Checkpoint 的數(shù)據(jù)大小較大時,可以考慮采用 Rocksdb 來作為 checkpoint 的存儲以提升效率。

其次的思路是資源設(shè)置,我們都知道 checkpoint 機制是在每個 task 上都會進(jìn)行,那么當(dāng)總的狀態(tài)數(shù)據(jù)大小不變的情況下,如何分配減少單個 task 所分的 checkpoint 數(shù)據(jù)變成了提升 checkpoint 執(zhí)行效率的關(guān)鍵。

最后,增量快照. 非增量快照下,每次 checkpoint 都包含了作業(yè)所有狀態(tài)數(shù)據(jù)。而大部分場景下,前后 checkpoint 里,數(shù)據(jù)發(fā)生變更的部分相對很少,所以設(shè)置增量 checkpoint,僅會對上次 checkpoint 和本次 checkpoint 之間狀態(tài)的差異進(jìn)行存儲計算,減少了 checkpoint 的耗時。

3)如何保障任務(wù)的穩(wěn)定性?

在任務(wù)執(zhí)行過程中,會遇到各種各樣的問題,導(dǎo)致任務(wù)異常甚至失敗。所以如何做好異常情況下的恢復(fù)工作顯得異常重要。

① 設(shè)定重啟策略

Flink 支持不同的重啟策略,以在故障發(fā)生時控制作業(yè)如何重啟。集群在啟動時會伴隨一個默認(rèn)的重啟策略,在沒有定義具體重啟策略時會使用該默認(rèn)策略。如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認(rèn)策略。

默認(rèn)的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個策略被使用。

常用的重啟策略:

  • 固定間隔(Fixed delay);

  • 失敗率(Failure rate);

  • 無重啟(No restart)。

② 設(shè)置 HA

Flink 在任務(wù)啟動時指定 HA 配置主要是為了利用 Zookeeper 在所有運行的 JobManager 實例之間進(jìn)行分布式協(xié)調(diào) .Zookeeper 通過 leader 選取和輕量級一致性的狀態(tài)存儲來提供高可用的分布式協(xié)調(diào)服務(wù)。

③ 任務(wù)監(jiān)控報警平臺

在實際環(huán)境中,我們遇見過因為集群狀態(tài)不穩(wěn)定而導(dǎo)致的任務(wù)失敗。在 Flink 1.6 版本中,甚至遇見過任務(wù)出現(xiàn)假死的情況,也就是 Yarn 上的 job 資源依然存在,而 Flink 任務(wù)實際已經(jīng)死亡。為了監(jiān)測與恢復(fù)這些異常的任務(wù),并且對實時任務(wù)做統(tǒng)一的提交、報警監(jiān)控、任務(wù)恢復(fù)等管理,我們開發(fā)了任務(wù)提交與管理平臺。通過 Shell 拉取 Yarn 上 Running 狀態(tài)與 Flink Job 狀態(tài)的列表進(jìn)行對比,心跳監(jiān)測平臺上的所有任務(wù),并進(jìn)行告警、自動恢復(fù)等操作。

④ 作業(yè)指標(biāo)監(jiān)控

Flink 任務(wù)在運行過程中,各 Operator 都會產(chǎn)生各自的指標(biāo)數(shù)據(jù),例如,Source 會產(chǎn)出 numRecordIn、numRecordsOut 等各項指標(biāo)信息,我們會將這些指標(biāo)信息進(jìn)行收集,并展示在我們的可視化平臺上。指標(biāo)平臺如下圖:

如何理解微博基于Flink的實時計算平臺建設(shè)

⑤ 任務(wù)運行節(jié)點監(jiān)控

我們的 Flink 任務(wù)都是運行在 Yarn 上,針對每一個運行的作業(yè),我們需要監(jiān)控其運行環(huán)境。會收集 JobManager 及 TaskManager 的各項指標(biāo)。收集的指標(biāo)有 jobmanager-fullgc-count、jobmanager-younggc-count、jobmanager-fullgc-time、jobmanager-younggc-time、taskmanager-fullgc-count、taskmanager-younggc-count、taskmanager-fullgc-time、taskmanager-younggc-time 等,用于判斷任務(wù)運行環(huán)境的健康度,及用于排查可能出現(xiàn)的問題。監(jiān)控界面如下:

四.數(shù)據(jù)關(guān)聯(lián)組件

1.如何選擇關(guān)聯(lián)方式?

1)Flink Table

從 Flink 的官方文檔,我們知道 Flink 的編程模型分為四層,sql 是最高層的 api, Table api 是中間層,DataSteam/DataSet Api 是核心,stateful Streaming process 層是底層實現(xiàn)。

如何理解微博基于Flink的實時計算平臺建設(shè)

剛開始我們直接使用 Flink Table 做為數(shù)據(jù)關(guān)聯(lián)的方式,直接將接入進(jìn)來的 DataStream 注冊為 Dynamic Table 后進(jìn)行兩表關(guān)聯(lián)查詢,如下圖:

但嘗試后發(fā)現(xiàn)在做那些日志數(shù)據(jù)量大的關(guān)聯(lián)查詢時往往只能在較小的時間窗口內(nèi)做查詢,否則會超過 datanode 節(jié)點單臺內(nèi)存限制,產(chǎn)生異常。但為了滿足不同業(yè)務(wù)日志延遲到達(dá)的情況,這種實現(xiàn)方式并不通用。

2)Rocksdb

之后,我們直接在 DataStream 上進(jìn)行處理,在 CountWindow 窗口內(nèi)進(jìn)行關(guān)聯(lián)操作,將被關(guān)聯(lián)的數(shù)據(jù) Hash 打散后存儲在各個 datanode 節(jié)點的 Rocksdb 中,利用 Flink State 原生支持 Rocksdb 做 Checkpoint 這一特性進(jìn)行算子內(nèi)數(shù)據(jù)的備份與恢復(fù)。這種方式是可行的,但受制于 Rocksdb 集群物理磁盤為非 SSD 的因素,這種方式在我們的實際線上場景中關(guān)聯(lián)耗時較高。

3)外部存儲關(guān)聯(lián)

如 Redis 類的 KV 存儲的確在查詢速度上提升不少,但類似廣告日志數(shù)據(jù)這樣單條日志大小較大的情況,會占用不少寶貴的機器內(nèi)存資源。經(jīng)過調(diào)研后,我們選取了 Hbase 作為我們?nèi)罩娟P(guān)聯(lián)組件的關(guān)聯(lián)數(shù)據(jù)存儲方案。

為了快速構(gòu)建關(guān)聯(lián)任務(wù),我們開發(fā)了基于 Flink 的配置化組件平臺,提交配置文件即可生成數(shù)據(jù)關(guān)聯(lián)任務(wù)并自動提交到集群。下圖是任務(wù)執(zhí)行的處理流程。

示意圖如下:

如何理解微博基于Flink的實時計算平臺建設(shè)

下圖是關(guān)聯(lián)組件內(nèi)的執(zhí)行流程圖:

2.問題與優(yōu)化

1)加入 Interval Join

隨著日志量的增加,某些需要進(jìn)行關(guān)聯(lián)的日志數(shù)量可能達(dá)到日均十幾億甚至幾十億的量級。前期關(guān)聯(lián)組件的配置化生成任務(wù)的方式的確解決了大部分線上業(yè)務(wù)需求,但隨著進(jìn)一步的關(guān)聯(lián)需求增加,Hbase 面臨著巨大的查詢壓力。在我們對 Hbase 表包括 rowkey 等一系列完成優(yōu)化之后,我們開始了對關(guān)聯(lián)組件的迭代與優(yōu)化。

第一步,減少 Hbase 的查詢。我們使用 Flink Interval Join 的方式,先將大部分關(guān)聯(lián)需求在程序內(nèi)部完成,只有少部分仍需查詢的日志會去查詢外部存儲(Hbase). 經(jīng)驗證,以請求日志與實驗日志關(guān)聯(lián)為例,對于設(shè)置 Interval Join 窗口在 10s 左右即可減少 80% 的 hbase 查詢請求。

① Interval Join 的語義示意圖

如何理解微博基于Flink的實時計算平臺建設(shè)

  • 數(shù)據(jù) JOIN 的區(qū)間 - 比如時間為 3 的 EXP 會在 IMP 時間為[2, 4]區(qū)間進(jìn)行JOIN;

  • WaterMark - 比如圖示 EXP 一條數(shù)據(jù)時間是 3,IMP 一條數(shù)據(jù)時間是 5,那么WaterMark是根據(jù)實際最小值減去 UpperBound 生成,即:Min(3,5)-1 = 2;

  • 過期數(shù)據(jù) - 出于性能和存儲的考慮,要將過期數(shù)據(jù)清除,如圖當(dāng) WaterMark 是 2 的時候時間為 2 以前的數(shù)據(jù)過期了,可以被清除。

② Interval Join 內(nèi)部實現(xiàn)邏輯

③ Interval Join 改造

因 Flink 原生的 Intervak Join 實現(xiàn)的是 Inner Join,而我們業(yè)務(wù)中所需要的是 Left Join,具體改造如下:

  • 取消右側(cè)數(shù)據(jù)流的 join 標(biāo)志位;

  • 左側(cè)數(shù)據(jù)流有 join 數(shù)據(jù)時不存 state。

2)關(guān)聯(lián)率動態(tài)監(jiān)控

在任務(wù)執(zhí)行中,往往會出現(xiàn)意想不到的情況,比如被關(guān)聯(lián)的數(shù)據(jù)日志出現(xiàn)缺失,或者日志格式錯誤引發(fā)的異常,造成關(guān)聯(lián)任務(wù)的關(guān)聯(lián)率下降嚴(yán)重。那么此時關(guān)聯(lián)任務(wù)雖然繼續(xù)在運行,但對于整體數(shù)據(jù)質(zhì)量的意義不大,甚至是反向作用。在任務(wù)進(jìn)行恢復(fù)的時,還需要清除異常區(qū)間內(nèi)的數(shù)據(jù),將 Kafka Offset 設(shè)置到異常前的位置再進(jìn)行處理。

故我們在關(guān)聯(lián)組件的優(yōu)化中,加入了動態(tài)監(jiān)控,下面示意圖:

如何理解微博基于Flink的實時計算平臺建設(shè)

  • 關(guān)聯(lián)任務(wù)中定時探測指定時間范圍 Hbase 是否有最新數(shù)據(jù)寫入,如果沒有,說明寫 Hbase 任務(wù)出現(xiàn)問題,則終止關(guān)聯(lián)任務(wù);

  • 當(dāng)寫 Hbase 任務(wù)出現(xiàn)堆積時,相應(yīng)的會導(dǎo)致關(guān)聯(lián)率下降,當(dāng)關(guān)聯(lián)率低于指定閾值時終止關(guān)聯(lián)任務(wù);

  • 當(dāng)關(guān)聯(lián)任務(wù)終止時會發(fā)出告警,修復(fù)上游任務(wù)后可重新恢復(fù)關(guān)聯(lián)任務(wù),保證關(guān)聯(lián)數(shù)據(jù)不丟失。

五.數(shù)據(jù)清洗組件

為了快速進(jìn)行日志數(shù)據(jù)的指標(biāo)抽取,我們開發(fā)了基于 Flink 計算平臺的指標(biāo)抽取組件Logwash。封裝了基于 Freemaker 的模板引擎做為日志格式的解析模塊,對日志進(jìn)行提取,算術(shù)運算,條件判斷,替換,循環(huán)遍歷等操作。

下圖是 Logwash 組件的處理流程:

組件支持文本與 Json 兩種類型日志進(jìn)行解析提取,目前該清洗組件已支持微博廣告近百個實時清洗需求,提供給運維組等第三方非實時計算方向人員快速進(jìn)行提取日志的能力。

配置文件部分示例:

如何理解微博基于Flink的實時計算平臺建設(shè)

六.FlinkStream 組件庫

Flink 中 DataStream 的開發(fā),對于通用的邏輯及相同的代碼進(jìn)行了抽取,生成了我們的通用組件庫 FlinkStream。FlinkStream 包括了對 Topology 的抽象及默認(rèn)實現(xiàn)、對 Stream 的抽象及默認(rèn)實現(xiàn)、對 Source 的抽象和某些實現(xiàn)、對 Operator 的抽象及某些實現(xiàn)、Sink 的抽象及某些實現(xiàn)。任務(wù)提交統(tǒng)一使用可執(zhí)行 Jar 和配置文件,Jar 會讀取配置文件構(gòu)建對應(yīng)的拓?fù)鋱D。

1.Source 抽象

對于 Source 進(jìn)行抽象,創(chuàng)建抽象類及對應(yīng)接口,對于 Flink Connector 中已有的實現(xiàn),例如 kafka,Elasticsearch 等,直接創(chuàng)建新 class 并繼承接口,實現(xiàn)對應(yīng)的方法即可。對于需要自己去實現(xiàn)的 connector,直接繼承抽象類及對應(yīng)接口,實現(xiàn)方法即可。目前只實現(xiàn)了 KafkaSource。

2.Operator 抽象

與 Source 抽象類似,我們實現(xiàn)了基于 Stream 到 Stream 級別的 Operator 抽象。創(chuàng)建抽象 Operate 類,抽象 Transform 方法。對于要實現(xiàn)的 Transform 操作,直接繼承抽象類,實現(xiàn)其抽象方法即可。目前實現(xiàn)的 Operator,直接按照文檔使用。如下:

25

3.Sink 抽象

針對 Sink,我們同樣創(chuàng)建了抽象類及接口。對 Flink Connector 中已有的 Sink 進(jìn)行封裝。目前可通過配置進(jìn)行數(shù)據(jù)輸出的 Sink。目前以實現(xiàn)和封裝的 Sink 組件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。

4.Stream 抽象

創(chuàng)建 Stream 抽象類及抽象方法 buildStream,用于構(gòu)建 StreamGraph。我們實現(xiàn)了默認(rèn)的 Stream,buildStream 方法讀取 Source 配置生成 DataStream,通過 Operator 配置列表按順序生成拓?fù)鋱D,通過 Sink 配置生成數(shù)據(jù)寫出組件。

5.Topology 抽象

對于單 Stream,要處理的邏輯可能比較簡單,主要讀取一個 Source 進(jìn)行數(shù)據(jù)的各種操作并輸出。對于復(fù)雜的多 Stream 業(yè)務(wù)需求,比如多流 Join,多流 Union、Split 流等,因此我們多流業(yè)務(wù)進(jìn)行了抽象,產(chǎn)生了 Topology。在 Topology 這一層可以對多流進(jìn)行配置化操作。對于通用的操作,我們實現(xiàn)了默認(rèn) Topology,直接通過配置文件就可以實現(xiàn)業(yè)務(wù)需求。對于比較復(fù)雜的業(yè)務(wù)場景,用戶可以自己實現(xiàn) Topology。

6.配置化

我們對抽象的組件都是可配置化的,直接通過編寫配置文件,構(gòu)造任務(wù)的運行拓?fù)浣Y(jié)構(gòu),啟動任務(wù)時指定配置文件。

  • 正文文本框 Flink Environment 配置化,包括時間處理類型、重啟策略,checkpoint 等;

  • Topology 配置化,可配置不同 Stream 之間的處理邏輯與 Sink;

  • Stream 配置化,可配置 Source,Operator 列表,Sink。

配置示例如下:

run_env:

  timeCharacteristic: "ProcessingTime" #ProcessingTime\IngestionTime\EventTime
  restart: # 重啟策略配置
    type: # noRestart, fixedDelayRestart, fallBackRestart, failureRateRestart
  checkpoint: # 開啟checkpoint
    type: "rocksdb" # 


streams:
  impStream:  #粉絲經(jīng)濟(jì)曝光日志
    type: "DefaultStream"
    config:
      source:
        type: "Kafka011" # 源是kafka011版本
        config:
        parallelism: 20
      operates:
        -
          type: "StringToMap"
          config:
        -
          type: "SplitElement"
          config:
        ...
        -
          type: "SelectElement"
          config:


transforms:
  -
    type: "KeyBy"
    config:
  -
    type: "CountWindowWithTimeOut"  #Window需要和KeyBy組合使用
    config:
  -
    type: "SplitStream"
    config:
  -
    type: "SelectStream"
    config:
sink:
  -
    type: Kafka
    config:
  -
    type: Kafka
    config:

7.部署

在實時任務(wù)管理平臺,新建任務(wù),填寫任務(wù)名稱,選擇任務(wù)類型(Flink)及版本,上傳可執(zhí)行 Jar 文件,導(dǎo)入配置或者手動編寫配置,填寫 JobManager 及 TaskManager 內(nèi)存配置,填寫并行度配置,選擇是否重試,選擇是否從 checkpoint 恢復(fù)等選項,保存后即可在任務(wù)列表中啟動任務(wù),并觀察啟動日志用于排查啟動錯誤。

如何理解微博基于Flink的實時計算平臺建設(shè)

七.FlinkSQL 擴(kuò)展

SQL 語言是一門聲明式的,簡單的,靈活的語言,F(xiàn)link 本身提供了對 SQL 的支持。Flink 1.6 版本和 1.8 版本對 SQL 語言的支持有限,不支持建表語句,不支持對外部數(shù)據(jù)的關(guān)聯(lián)操作。因此我們通過 Apache Calcite 對 Flink SQL API 進(jìn)行了擴(kuò)展,用戶只需要關(guān)心業(yè)務(wù)需求怎么用 SQL 語言來表達(dá)即可。

1.支持創(chuàng)建源表

擴(kuò)展了支持創(chuàng)建源表 SQL,通過解析 SQL 語句,獲取數(shù)據(jù)源配置信息,創(chuàng)建對應(yīng)的 TableSource 實例,并將其注冊到 Flink environment。示例如下:

2.支持創(chuàng)建維表

使用 Apache Calcite 對 SQL 進(jìn)行解析,通過維表關(guān)鍵字識別維表,使用 RichAsyncFunction 算子異步讀取維表數(shù)據(jù),并通過 flatMap 操作生成關(guān)聯(lián)后的 DataStream,然后轉(zhuǎn)換為 Table 注冊到 Flink Environment。示例如下:

如何理解微博基于Flink的實時計算平臺建設(shè)

3.支持創(chuàng)建視圖

使用 SQLQuery 方法,支持從上一層表或者視圖中創(chuàng)建視圖表,并將新的視圖表注冊到 Flink Environment。創(chuàng)建語句需要按照順序?qū)?,比?myView2 是從視圖 myView1 中創(chuàng)建的,則 myView1 創(chuàng)建語句要在myView2語句前面。如下:

4.支持創(chuàng)建結(jié)果表

支持創(chuàng)建結(jié)果表,通過解析 SQL 語句,獲取配置信息,創(chuàng)建對應(yīng)的 AppendStreamTableSink 或者 UpsertStreamTableSink 實例,并將其注冊到 Flink Environment。示例如下:

如何理解微博基于Flink的實時計算平臺建設(shè)

5.支持自定義UDF

支持自定義 UDF 函數(shù),繼承 ScalarFunction 或者 TableFunction。在 resources 目錄下有相應(yīng)的 UDF 資源配置文件,默認(rèn)會注冊全部可執(zhí)行 Jar 包中配置的 UDF。直接按照使用方法使用即可。

6.部署

部署方式同 Flink Stream 組件。

八.實時數(shù)據(jù)倉庫的構(gòu)建

為了保證實時數(shù)據(jù)的統(tǒng)一對外出口以及保證數(shù)據(jù)指標(biāo)的統(tǒng)一口徑,我們根據(jù)業(yè)界離線數(shù)倉的經(jīng)驗來設(shè)計與構(gòu)架微博廣告實時數(shù)倉。

1.分層概覽

數(shù)據(jù)倉庫分為三層,自下而上為:數(shù)據(jù)引入層(ODS,Operation Data Store)、數(shù)據(jù)公共層(CDM,Common Data Model)和數(shù)據(jù)應(yīng)用層(ADS,Application Data Service)。

  • 數(shù)據(jù)引入層(ODS,Operation Data Store):將原始數(shù)據(jù)幾乎無處理的存放在數(shù)據(jù)倉庫系統(tǒng),結(jié)構(gòu)上與源系統(tǒng)基本保持一致,是數(shù)據(jù)倉庫的數(shù)據(jù)準(zhǔn)。

  • 數(shù)據(jù)公共層(CDM,Common Data Model,又稱通用數(shù)據(jù)模型層):包含 DIM 維度表、DWD 和 DWS,由 ODS 層數(shù)據(jù)加工而成。主要完成數(shù)據(jù)加工與整合,建立一致性的維度,構(gòu)建可復(fù)用的面向分析和統(tǒng)計的明細(xì)事實表,以及匯總公共粒度的指標(biāo)。

公共維度層(DIM):基于維度建模理念思想,建立整個企業(yè)的一致性維度。降低數(shù)據(jù)計算口徑和算法不統(tǒng)一風(fēng)險。

公共維度層的表通常也被稱為邏輯維度表,維度和維度邏輯表通常一一對應(yīng)。

公共匯總粒度事實層(DWS,Data Warehouse Service):以分析的主題對象作為建模驅(qū)動,基于上層的應(yīng)用和產(chǎn)品的指標(biāo)需求,構(gòu)建公共粒度的匯總指標(biāo)事實表,以寬表化手段物理化模型。構(gòu)建命名規(guī)范、口徑一致的統(tǒng)計指標(biāo),為上層提供公共指標(biāo),建立匯總寬表、明細(xì)事實表。

公共匯總粒度事實層的表通常也被稱為匯總邏輯表,用于存放派生指標(biāo)數(shù)據(jù)。

明細(xì)粒度事實層(DWD,Data Warehouse Detail):以業(yè)務(wù)過程作為建模驅(qū)動,基于每個具體的業(yè)務(wù)過程特點,構(gòu)建最細(xì)粒度的明細(xì)層事實表。可以結(jié)合企業(yè)的數(shù)據(jù)使用特點,將明細(xì)事實表的某些重要維度屬性字段做適當(dāng)冗余,也即寬表化處理。

明細(xì)粒度事實層的表通常也被稱為邏輯事實表。

  • 數(shù)據(jù)應(yīng)用層(ADS,Application Data Service):存放數(shù)據(jù)產(chǎn)品個性化的統(tǒng)計指標(biāo)數(shù)據(jù)。根據(jù) CDM 與 ODS 層加工生成。

2.詳細(xì)分層模型

如何理解微博基于Flink的實時計算平臺建設(shè)

對于原始日志數(shù)據(jù),ODS 層幾乎是每條日志抽取字段后進(jìn)行保留,這樣便能對問題的回溯與追蹤。在 CDM 層對 ODS 的數(shù)據(jù)僅做時間粒度上的數(shù)據(jù)壓縮,也就是在指定時間切分窗口里,對所有維度下的指標(biāo)做聚合操作,而不涉及業(yè)務(wù)性的操作。在 ADS 層,我們會有配置化抽取微服務(wù),對底層數(shù)據(jù)做定制化計算和提取,輸出到用戶指定的存儲服務(wù)里。

到此,關(guān)于“如何理解微博基于Flink的實時計算平臺建設(shè)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章名稱:如何理解微博基于Flink的實時計算平臺建設(shè)
本文網(wǎng)址:http://weahome.cn/article/jchije.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部