導讀:當今生活節(jié)奏日益加快,企業(yè)面對不斷增加的海量信息,其信息篩選和處理效率低下的困擾與日俱增。由于用戶營銷不夠細化,企業(yè) App 中許多不合時宜或不合偏好的消息推送很大程度上影響了用戶體驗,甚至引發(fā)了用戶流失。在此背景下,友信金服公司推行全域的數(shù)據(jù)體系戰(zhàn)略,通過打通和整合集團各個業(yè)務線數(shù)據(jù),利用大數(shù)據(jù)、人工智能等技術(shù)構(gòu)建統(tǒng)一的數(shù)據(jù)資產(chǎn),如 ID-Mapping、用戶標簽等。友信金服用戶畫像項目正是以此為背景成立,旨在實現(xiàn)“數(shù)據(jù)驅(qū)動業(yè)務與運營”的集團戰(zhàn)略。目前該系統(tǒng)支持日處理數(shù)據(jù)量超 10 億,接入上百種合規(guī)數(shù)據(jù)源。
傳統(tǒng)基于 Hadoop 生態(tài)的離線數(shù)據(jù)存儲計算方案已在業(yè)界大規(guī)模應用,但受制于離線計算的高時延性,越來越多的數(shù)據(jù)應用場景已從離線轉(zhuǎn)為實時。這里引用一張表格對目前主流的實時計算框架做個對比。
成都創(chuàng)新互聯(lián)公司服務項目包括青云譜網(wǎng)站建設(shè)、青云譜網(wǎng)站制作、青云譜網(wǎng)頁制作以及青云譜網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,青云譜網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務的客戶以成都為中心已經(jīng)輻射到青云譜省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!Apache Storm 的容錯機制需要對每條數(shù)據(jù)進行應答(ACK),因此其吞吐量備受影響,在數(shù)據(jù)大吞吐量的場景下會有問題,因此不適用此項目的需求。
Apache Spark 總體生態(tài)更為完善,且在機器學習的集成和應用性暫時領(lǐng)先,但 Spark 底層還是采用微批(Micro Batching)處理的形式。
Apache Flink 在流式計算上有明顯優(yōu)勢:首先其流式計算屬于真正意義上的單條處理,即每一條數(shù)據(jù)都會觸發(fā)計算。在這一點上明顯與 Spark 的微批流式處理方式不同。其次,F(xiàn)link 的容錯機制較為輕量,對吞吐量影響較小,使得 Flink 可以達到很高的吞吐量。最后 Flink 還擁有易用性高,部署簡單等優(yōu)勢。相比之下我們最終決定采用基于 Flink 的架構(gòu)方案。
用戶畫像系統(tǒng)目前為集團線上業(yè)務提供用戶實時標簽數(shù)據(jù)服務。為此我們的服務需要打通多種數(shù)據(jù)源,對海量的數(shù)字信息進行實時不間斷的數(shù)據(jù)清洗、聚類、分析,從而將它們抽象成標簽,并最終為應用方提供高質(zhì)量的標簽服務。在此背景下,我們設(shè)計用戶畫像系統(tǒng)的整體架構(gòu)如下圖所示:
整體架構(gòu)分為五層:
在整體架構(gòu)設(shè)計方案設(shè)計完成之后,我們針對數(shù)據(jù)也設(shè)計了詳盡的處理方案。在數(shù)據(jù)處理階段,鑒于 Kafka 高吞吐量、高穩(wěn)定性的特點,我們的用戶畫像系統(tǒng)統(tǒng)一采用 Kafka 作為分布式發(fā)布訂閱消息系統(tǒng)。數(shù)據(jù)清洗階段利用 Flink 來實現(xiàn)用戶唯一性識別、行為數(shù)據(jù)的清洗等,去除冗余數(shù)據(jù)。這一過程支持交互計算和多種復雜算法,并支持數(shù)據(jù)實時 / 離線計算。目前我們數(shù)據(jù)處理流程迭代了兩版,具體方案如下:
整體數(shù)據(jù)來源包含兩種:
根據(jù)不同業(yè)務的指標需求我們直接從集團數(shù)據(jù)倉庫抽取數(shù)據(jù)并落入 Kafka,或者直接從業(yè)務端以 CDC(Capture Data Change)的方式寫入 Kafka。在計算層,數(shù)據(jù)被導入到 Flink 中,通過 DataStream 生成 ID-Mapping、用戶標簽碎片等數(shù)據(jù),然后將生成數(shù)據(jù)存入 JanusGraph(JanusGraph 是以 HBase 作為后端存儲的圖數(shù)據(jù)庫介質(zhì))與 Kafka,并由 Flink 消費落入 Kafka 的用戶標簽碎片數(shù)據(jù),進行聚合生成最新的用戶標簽碎片(用戶標簽碎片是由用戶畫像系統(tǒng)獲取來自多種渠道的碎片化數(shù)據(jù)塊處理后生成的)。
數(shù)據(jù)服務層處理流程
服務層將存儲層存儲的用戶標簽碎片數(shù)據(jù),通過 JanusGraph Spark On Yarn 模式,執(zhí)行 TinkerPop OLAP 計算生成全量用戶 Yids 列表文件。Yid 是用戶畫像系統(tǒng)中定義的集團級用戶 ID 標識。結(jié)合 Yids 列表文件,在 Flink 中批量讀取 HBase 聚合成完整用戶畫像數(shù)據(jù),生成 HDFS 文件,再通過 Flink 批量操作新生成的數(shù)據(jù)生成用戶評分預測標簽,將用戶評分預測標簽落入 Phoenix,之后數(shù)據(jù)便可通過統(tǒng)一數(shù)據(jù)服務接口進行獲取。下圖完整地展示了這一流程。
ID-Mapping 數(shù)據(jù)結(jié)構(gòu)
為了實現(xiàn)用戶標簽的整合,用戶 ID 之間的強打通,我們將用戶 ID 標識看成圖的頂點、ID pair 關(guān)系看作圖的邊,比如已經(jīng)識別瀏覽器 Cookie 的用戶使用手機號登陸了公司網(wǎng)站就形成了對應關(guān)系。這樣所有用戶 ID 標識就構(gòu)成了一張大圖,其中每個小的連通子圖 / 連通分支就是一個用戶的全部標識 ID 信息。
ID-Mapping 數(shù)據(jù)由圖結(jié)構(gòu)模型構(gòu)建,圖節(jié)點包含 UserKey、Device、IdCard、Phone 等類型,分別表示用戶的業(yè)務 ID、設(shè)備 ID、身份證以及電話等信息。節(jié)點之間邊的生成規(guī)則是通過解析數(shù)據(jù)流中包含的節(jié)點信息,以一定的優(yōu)先級順序進行節(jié)點之間的連接,從而生成節(jié)點之間的邊。比如,識別了用戶手機系統(tǒng)的 Android_ID,之后用戶使用郵箱登陸了公司 App,在系統(tǒng)中找到了業(yè)務線 UID 就形成了和關(guān)系的 ID pair,然后系統(tǒng)根據(jù)節(jié)點類型進行優(yōu)先級排序,生成 Android_ID、mail、UID 的關(guān)系圖。數(shù)據(jù)圖結(jié)構(gòu)模型如下圖所示:
Gephi
1.0 版本數(shù)據(jù)處理流程在系統(tǒng)初期較好地滿足了我們的日常需求,但隨著數(shù)據(jù)量的增長,該方案遇到了一些性能瓶頸:
Gephi
鑒于這些問題,我們提出了 2.0 版本的解決方案。在 2.0 版本中,我們通過利用 HBase 列式存儲、修改圖數(shù)據(jù)結(jié)構(gòu)等優(yōu)化方案嘗試解決以上三個問題。
如下圖所示,2.0 版本數(shù)據(jù)處理流程大部分承襲了 1.0 版本。新版本數(shù)據(jù)處理流程在以下幾個方面做了優(yōu)化:
2.0 版本數(shù)據(jù)處理流程
Gephi
目前,線上部署的用戶畫像系統(tǒng)中的數(shù)據(jù)絕大部分是來自于 Kafka 的實時數(shù)據(jù)。隨著數(shù)據(jù)量越來越多,系統(tǒng)的壓力也越來越大,以至于出現(xiàn)了 Flink 背壓與 Checkpoint 超時等問題,導致 Flink 提交 Kafka 位移失敗,從而影響了數(shù)據(jù)一致性。這些線上出現(xiàn)的問題讓我們開始關(guān)注 Flink 的可靠性、穩(wěn)定性以及性能。針對這些問題,我們進行了詳細的分析并結(jié)合自身的業(yè)務特點,探索并實踐出了一些相應的解決方案。
下圖展示了 Flink 中 checkpointing 執(zhí)行流程圖:
Flink 中 checkpointing 執(zhí)行流程
通過以上流程分析,我們通過三種方式來提高 Checkpointing 性能。這些方案分別是:
CheckPoint 存儲方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文檔可知,不同 StateBackend 之間的性能以及安全性是有很大差異的。通常情況下,MemoryStateBackend 適合應用于測試環(huán)境,線上環(huán)境則最好選擇 RocksDBStateBackend。
這有兩個原因:首先,RocksDBStateBackend 是外部存儲,其他兩種 Checkpoint 存儲方式都是 JVM 堆存儲。受限于 JVM 堆內(nèi)存的大小,Checkpoint 狀態(tài)大小以及安全性可能會受到一定的制約;其次,RocksDBStateBackend 支持增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀態(tài)。與完整檢查點相比,增量檢查點可以顯著縮短 checkpointing 時間,但代價是需要更長的恢復時間。
Checkpointing 需要對每個 Task 進行數(shù)據(jù)狀態(tài)采集。單個 Task 狀態(tài)數(shù)據(jù)越多則 Checkpointing 越慢。所以我們可以通過增加 Task 并行度,減少單個 Task 狀態(tài)數(shù)據(jù)的數(shù)量來達到縮短 CheckPointing 時間的效果。
Flink 算子鏈(Operator Chains)越長,Task 也會越多,相應的狀態(tài)數(shù)據(jù)也就更多,Checkpointing 也會越慢。通過縮短算子鏈長度,可以減少 Task 數(shù)量,從而減少系統(tǒng)中的狀態(tài)數(shù)據(jù)總量,間接的達到優(yōu)化 Checkpointing 的目的。下面展示了 Flink 算子鏈的合并規(guī)則:
基于以上這些規(guī)則,我們在代碼層面上合并了相關(guān)度較大的一些 Task,使得平均的操作算子鏈長度至少縮短了 60%~70%。
在 Flink 運行過程中,每一個操作算子都會消費一個中間 / 過渡狀態(tài)的流,并對它們進行轉(zhuǎn)換,然后生產(chǎn)一個新的流。這種機制可以類比為:Flink 使用阻塞隊列作為有界的緩沖區(qū)。跟 Java 里阻塞隊列一樣,一旦隊列達到容量上限,處理速度較慢的消費者會阻塞生產(chǎn)者向隊列發(fā)送新的消息或事件。下圖展示了 Flink 中兩個操作算子之間的數(shù)據(jù)傳輸以及如何感知到背壓的:
首先,Source 中的事件進入 Flink 并被操作算子 1 處理且被序列化到 Buffer 中,然后操作算子 2 從這個 Buffer 中讀出該事件。當操作算子 2 處理能力不足的時候,操作算子 1 中的數(shù)據(jù)便無法放入 Buffer,從而形成背壓。背壓出現(xiàn)的原因可能有以下兩點:
實踐中我們通過以下方式解決背壓問題。首先,縮短算子鏈會合理的合并算子,節(jié)省出資源。其次縮短算子鏈也會減少 Task(線程)之間的切換、消息的序列化 / 反序列化以及數(shù)據(jù)在緩沖區(qū)的交換次數(shù),進而提高系統(tǒng)的整體吞吐量。最后,根據(jù)數(shù)據(jù)特性將不需要或者暫不需要的數(shù)據(jù)進行過濾,然后根據(jù)業(yè)務需求將數(shù)據(jù)分別處理,比如有些數(shù)據(jù)源需要實時的處理,有些數(shù)據(jù)是可以延遲的,最后通過使用 keyBy 關(guān)鍵字,控制 Flink 時間窗口大小,在上游算子處理邏輯中盡量合并更多數(shù)據(jù)來達到降低下游算子的處理壓力。
經(jīng)過以上優(yōu)化,在每天億級數(shù)據(jù)量下,用戶畫像可以做到實時信息實時處理并無持續(xù)背壓,Checkpointing 平均時長穩(wěn)定在 1 秒以內(nèi)。
目前用戶畫像部分數(shù)據(jù)都是從 Hive 數(shù)據(jù)倉庫拿到的,數(shù)據(jù)倉庫本身是 T+1 模式,數(shù)據(jù)延時性較大,所以為了提高數(shù)據(jù)實時性,端到端的實時流處理很有必要。
端到端是指一端采集原始數(shù)據(jù),另一端以報表 / 標簽 / 接口的方式對這些對數(shù)進行呈現(xiàn)與應用,連接兩端的是中間實時流。在后續(xù)的工作中,我們計劃將現(xiàn)有的非實時數(shù)據(jù)源全部切換到實時數(shù)據(jù)源,統(tǒng)一經(jīng)過 Kafka 和 Flink 處理后再導入到 Phoenix/JanusGraph/HBase。強制所有數(shù)據(jù)源數(shù)據(jù)進入 Kafka 的一個好處在于它能夠提高整體流程的穩(wěn)定性與可用性:首先 Kafka 作為下游系統(tǒng)的緩沖,可以避免下游系統(tǒng)的異常影響實時流的計算,起到“削峰填谷”的作用;其次,F(xiàn)link 自 1.4 版本開始正式支持與 Kafka 的端到端精確一次處理語義,在一致性方面上更有保證。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。