真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

今天就跟大家聊聊有關(guān)Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

專注于為中小企業(yè)提供做網(wǎng)站、成都做網(wǎng)站服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)鼓樓免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

一、常見(jiàn)的 CDC 分析方案

我們先看一下今天的 topic 需要設(shè)計(jì)的是什么?輸入是一個(gè) CDC 或者 upsert 的數(shù)據(jù),輸出是 Database 或者是用于大數(shù)據(jù) OLAP 分析的存儲(chǔ)。

我們常見(jiàn)的輸入主要有兩種數(shù)據(jù),第一種數(shù)據(jù)是數(shù)據(jù)庫(kù)的 CDC 數(shù)據(jù),不斷的產(chǎn)生 changeLog;另一種場(chǎng)景是流計(jì)算產(chǎn)生的 upsert 數(shù)據(jù),在最新的 Flink 1.12 版本已經(jīng)支持了 upsert 數(shù)據(jù)。

1.1 離線 HBase 集群分析 CDC 數(shù)據(jù)

我們通常想到的第一個(gè)方案,就是把 CDC upsert 的數(shù)據(jù)通過(guò) Flink 進(jìn)行一些處理之后,實(shí)時(shí)的寫(xiě)到 HBase 當(dāng)中。HBase 是一個(gè)在線的、能提供在線點(diǎn)查能力的一種數(shù)據(jù)庫(kù),具有非常高的實(shí)時(shí)性,對(duì)寫(xiě)入操作是非常友好的,也可以支持一些小范圍的查詢,而且集群可擴(kuò)展。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

這種方案其實(shí)跟普通的點(diǎn)查實(shí)時(shí)鏈路是同一套,那么用 HBase 來(lái)做大數(shù)據(jù)的 OLAP 的查詢分析有什么問(wèn)題呢?

首先,HBase 是一個(gè)面向點(diǎn)查設(shè)計(jì)的一種數(shù)據(jù)庫(kù),是一種在線服務(wù),它的行存的索引不適合分析任務(wù)。典型的數(shù)倉(cāng)設(shè)計(jì)肯定是要列存的,這樣壓縮效率和查詢效率才會(huì)高。第二,HBase 的集群維護(hù)成本比較高。最后,HBase 的數(shù)據(jù)是 HFile,不方便與大數(shù)據(jù)里數(shù)倉(cāng)當(dāng)中典型的 Parquet、Avro、Orc 等結(jié)合。

1.2 Apache Kudu 維護(hù) CDC 數(shù)據(jù)集

針對(duì) HBase 分析能力比較弱的情況,社區(qū)前幾年出現(xiàn)了一個(gè)新的項(xiàng)目,這就是 Apache Kudu 項(xiàng)目。Kudu 項(xiàng)目擁有 HBase 的點(diǎn)查能力的同時(shí),采用列存,這樣列存加速非常適合 OLAP 分析。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

這種方案會(huì)有什么問(wèn)題呢?

首先 Kudu 是比較小眾的、獨(dú)立的集群,維護(hù)成本也比較高,跟 HDFS、S3、OSS 比較割裂。其次由于 Kudu 在設(shè)計(jì)上保留了點(diǎn)查能力,所以它的批量掃描性能不如 parquet,另外 Kudu 對(duì)于 delete 的支持也比較弱,最后它也不支持增量拉取。

1.3 直接導(dǎo)入 CDC 到 Hive 分析

第三種方案,也是大家在數(shù)倉(cāng)中比較常用的方案,就是把 MySQL 的數(shù)據(jù)寫(xiě)到 Hive,流程是:維護(hù)一個(gè)全量的分區(qū),然后每天做一個(gè)增量的分區(qū),最后把增量分區(qū)寫(xiě)好之后進(jìn)行一次 Merge ,寫(xiě)入一個(gè)新的分區(qū),流程上這樣是走得通的。Hive 之前的全量分區(qū)是不受增量的影響的,只有當(dāng)增量 Merge 成功之后,分區(qū)才可查,才是一個(gè)全新的數(shù)據(jù)。這種純列存的 append 的數(shù)據(jù)對(duì)于分析是非常友好的。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

這種方案會(huì)有什么問(wèn)題呢?

增量數(shù)據(jù)和全量數(shù)據(jù)的 Merge 是有延時(shí)的,數(shù)據(jù)不是實(shí)時(shí)寫(xiě)入的,典型的是一天進(jìn)行一次 Merge,這就是 T+1 的數(shù)據(jù)了。所以,時(shí)效性很差,不支持實(shí)時(shí) upsert。每次 Merge 都需要把所有數(shù)據(jù)全部重讀重寫(xiě)一遍,效率比較差、比較浪費(fèi)資源。

1.4 Spark + Delta 分析 CDC 數(shù)據(jù)

針對(duì)這個(gè)問(wèn)題,Spark + Delta 在分析 CDC 數(shù)據(jù)的時(shí)候提供了 MERGE INTO 的語(yǔ)法。這并不僅僅是對(duì) Hive 數(shù)倉(cāng)的語(yǔ)法簡(jiǎn)化,Spark + Delta 作為新型數(shù)據(jù)湖的架構(gòu)(例如 Iceberg、Hudi),它對(duì)數(shù)據(jù)的管理不是分區(qū),而是文件,因此 Delta 優(yōu)化 MERGE INTO 語(yǔ)法,僅掃描和重寫(xiě)發(fā)生變化的文件即可,因此高效很多。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

我們?cè)u(píng)估一下這個(gè)方案,他的優(yōu)點(diǎn)是僅依賴 Spark + Delta 架構(gòu)簡(jiǎn)潔、沒(méi)有在線服務(wù)、列存,分析速度非??臁?yōu)化之后的 MERGE INTO 語(yǔ)法速度也夠快。

這個(gè)方案,業(yè)務(wù)上是一個(gè) Copy On Write 的一個(gè)方案,它只需要 copy 少量的文件,可以讓延遲做的相對(duì)低。理論上,在更新的數(shù)據(jù)跟現(xiàn)有的存量沒(méi)有很大重疊的話,可以把天級(jí)別的延遲做到小時(shí)級(jí)別的延遲,性能也是可以跟得上的。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

這個(gè)方案在 Hive 倉(cāng)庫(kù)處理 upsert 數(shù)據(jù)的路上已經(jīng)前進(jìn)了一小步了。但小時(shí)級(jí)別的延遲畢竟不如實(shí)時(shí)更有效,因此這個(gè)方案最大的缺點(diǎn)在 Copy On Write 的 Merge 有一定的開(kāi)銷,延遲不能做的太低。

第一部分大概現(xiàn)有的方案就是這么多,同時(shí)還需要再?gòu)?qiáng)調(diào)一下,upsert 之所以如此重要,是因?yàn)樵跀?shù)據(jù)湖的方案中,upsert 是實(shí)現(xiàn)數(shù)據(jù)庫(kù)準(zhǔn)實(shí)時(shí)、實(shí)時(shí)入湖的一個(gè)關(guān)鍵技術(shù)點(diǎn)。

二、為何選擇 Flink + Iceberg

2.1 Flink 對(duì) CDC 數(shù)據(jù)消費(fèi)的支持

第一,F(xiàn)link 原生支持 CDC 數(shù)據(jù)消費(fèi)。在前文 Spark + Delta 的方案中,MARGE INTO 的語(yǔ)法,用戶需要感知 CDC 的屬性概念,然后寫(xiě)到 merge 的語(yǔ)法上來(lái)。但是 Flink 是原生支持 CDC 數(shù)據(jù)的。用戶只要聲明一個(gè) Debezium 或者其他 CDC 的 format,F(xiàn)link 上面的 SQL 是不需要感知任何 CDC 或者 upsert 的屬性的。Flink 中內(nèi)置了 hidden column 來(lái)標(biāo)識(shí)它 CDC 的類型數(shù)據(jù),所以對(duì)用戶而言比較簡(jiǎn)潔。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

如下圖示例,在 CDC 的處理當(dāng)中,F(xiàn)link 在只用聲明一個(gè) MySQL Binlog 的 DDL 語(yǔ)句,后面的 select 都不用感知 CDC 屬性。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

2.2 Flink 對(duì) Change Log Stream 的支持

下圖介紹的是 Flink 原生支持 Change Log Stream,F(xiàn)link 在接入一個(gè) Change Log Stream 之后,拓?fù)涫遣挥藐P(guān)心 Change Log flag 的 SQL。拓?fù)渫耆前凑兆约簶I(yè)務(wù)邏輯來(lái)定義,并且一直到最后寫(xiě)入 Iceberg,中間不用感知 Change Log 的 flag。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

2.3 Flink + Iceberg CDC 導(dǎo)入方案評(píng)估

最后,F(xiàn)link + Iceberg 的 CDC 導(dǎo)入方案的優(yōu)點(diǎn)是什么?

對(duì)比之前的方案,Copy On Write 跟 Merge On Read 都有適用的場(chǎng)景,側(cè)重點(diǎn)不同。Copy On Write 在更新部分文件的場(chǎng)景中,當(dāng)只需要重寫(xiě)其中的一部分文件時(shí)是很高效的,產(chǎn)生的數(shù)據(jù)是純 append 的全量數(shù)據(jù)集,在用于數(shù)據(jù)分析的時(shí)候也是最快的,這是 Copy On Write 的優(yōu)勢(shì)。

另外一個(gè)是 Merge On Read,即將數(shù)據(jù)連同 CDC flag 直接 append 到 Iceberg 當(dāng)中,在 merge 的時(shí)候,把這些增量的數(shù)據(jù)按照一定的組織格式、一定高效的計(jì)算方式與全量的上一次數(shù)據(jù)進(jìn)行一次 merge。這樣的好處是支持近實(shí)時(shí)的導(dǎo)入和實(shí)時(shí)數(shù)據(jù)讀?。贿@套計(jì)算方案的 Flink SQL 原生支持 CDC 的攝入,不需要額外的業(yè)務(wù)字段設(shè)計(jì)。

Iceberg 是統(tǒng)一的數(shù)據(jù)湖存儲(chǔ),支持多樣化的計(jì)算模型,也支持各種引擎(包括 Spark、Presto、hive)來(lái)進(jìn)行分析;產(chǎn)生的 file 都是純列存的,對(duì)于后面的分析是非??斓?;Iceberg 作為數(shù)據(jù)湖基于 snapshot 的設(shè)計(jì),支持增量讀?。籌ceberg 架構(gòu)足夠簡(jiǎn)潔,沒(méi)有在線服務(wù)節(jié)點(diǎn),純 table format 的,這給了上游平臺(tái)方足夠的能力來(lái)定制自己的邏輯和服務(wù)化。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

三、如何實(shí)時(shí)寫(xiě)入讀取

3.1 批量更新場(chǎng)景和 CDC 寫(xiě)入場(chǎng)景

首先我們來(lái)了解一下在整個(gè)數(shù)據(jù)湖里面批量更新的兩個(gè)場(chǎng)景。

  • 第一批量更新的這種場(chǎng)景,在這個(gè)場(chǎng)景中我們使用一個(gè) SQL 更新了成千上萬(wàn)行的數(shù)據(jù),比如歐洲的 GDPR 策略,當(dāng)一個(gè)用戶注銷掉自己的賬戶之后,后臺(tái)的系統(tǒng)是必須將這個(gè)用戶所有相關(guān)的數(shù)據(jù)全部物理刪除。

  • 第二個(gè)場(chǎng)景是我們需要將 date lake 中一些擁有共同特性的數(shù)據(jù)刪除掉,這個(gè)場(chǎng)景也是屬于批量更新的一個(gè)場(chǎng)景,在這個(gè)場(chǎng)景中刪除的條件可能是任意的條件,跟主鍵(Primary key)沒(méi)有任何關(guān)系,同時(shí)這個(gè)待更新的數(shù)據(jù)集是非常大,這種作業(yè)是一個(gè)長(zhǎng)耗時(shí)低頻次的作業(yè)。

另外是 CDC 寫(xiě)入的場(chǎng)景,對(duì)于對(duì) Flink 來(lái)說(shuō),一般常用的有兩種場(chǎng)景,第一種場(chǎng)景是上游的 Binlog 能夠很快速的寫(xiě)到 data lake 中,然后供不同的分析引擎做分析使用; 第二種場(chǎng)景是使用 Flink 做一些聚合操作,輸出的流是 upsert 類型的數(shù)據(jù)流,也需要能夠?qū)崟r(shí)的寫(xiě)到數(shù)據(jù)湖或者是下游系統(tǒng)中去做分析。如下圖示例中 CDC 寫(xiě)入場(chǎng)景中的 SQL 語(yǔ)句,我們使用單條 SQL 更新一行數(shù)據(jù),這種計(jì)算模式是一種流式增量的導(dǎo)入,而且屬于高頻的更新。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

3.2 Apache Iceberg 設(shè)計(jì) CDC 寫(xiě)入方案需要考慮的問(wèn)題

接下來(lái)我們看下 iceberg 對(duì)于 CDC 寫(xiě)入這種場(chǎng)景在方案設(shè)計(jì)時(shí)需要考慮哪些問(wèn)題。

  • 第一是正確性,即需要保證語(yǔ)義及數(shù)據(jù)的正確性,如上游數(shù)據(jù) upsert 到 iceberg 中,當(dāng)上游 upsert 停止后, iceberg 中的數(shù)據(jù)需要和上游系統(tǒng)中的數(shù)據(jù)保持一致。

  • 第二是高效寫(xiě)入,由于 upsert 的寫(xiě)入頻率非常高,我們需要保持高吞吐、高并發(fā)的寫(xiě)入。

  • 第三是快速讀取,當(dāng)數(shù)據(jù)寫(xiě)入后我們需要對(duì)數(shù)據(jù)進(jìn)行分析,這其中涉及到兩個(gè)問(wèn)題,第一個(gè)問(wèn)題是需要支持細(xì)粒度的并發(fā),當(dāng)作業(yè)使用多個(gè) task 來(lái)讀取時(shí)可以保證為各個(gè) task 進(jìn)行均衡的分配以此來(lái)加速數(shù)據(jù)的計(jì)算;第二個(gè)問(wèn)題是我們要充分發(fā)揮列式存儲(chǔ)的優(yōu)勢(shì)來(lái)加速讀取。

  • 第四是支持增量讀,例如一些傳統(tǒng)數(shù)倉(cāng)中的 ETL,通過(guò)增量讀取來(lái)進(jìn)行進(jìn)一步數(shù)據(jù)轉(zhuǎn)換。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

3.3 Apache Iceberg Basic

在介紹具體的方案細(xì)節(jié)之前,我們先了解一下 Iceberg 在文件系統(tǒng)中的布局,總體來(lái)講 Iceberg 分為兩部分?jǐn)?shù)據(jù),第一部分是數(shù)據(jù)文件,如下圖中的 parquet 文件,每個(gè)數(shù)據(jù)文件對(duì)應(yīng)一個(gè)校驗(yàn)文件(.crc文件)。第二部分是表元數(shù)據(jù)文件(Metadata 文件),包含 Snapshot 文件(snap-.avro)、Manifest 文件(.avro)、TableMetadata 文件(*.json)等。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

下圖展示了在 iceberg 中 snapshot、manifest 及 partition 中的文件的對(duì)應(yīng)關(guān)系。下圖中包含了三個(gè) partition,第一個(gè) partition 中有兩個(gè)文件 f1、f3,第二個(gè) partition 有兩個(gè)文件f4、f5,第三個(gè) partition 有一個(gè)文件f2。對(duì)于每一次寫(xiě)入都會(huì)生成一個(gè) manifest 文件,該文件記錄本次寫(xiě)入的文件與 partition 的對(duì)應(yīng)關(guān)系。再向上層有 snapshot 的概念,snapshot 能夠幫助快速訪問(wèn)到整張表的全量數(shù)據(jù),snapshot 記錄多個(gè) manifest,如第二個(gè) snapshot 包含 manifest2 和 manifest3。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

3.4 INSERT、UPDATE、DELETE 寫(xiě)入

在了解了基本的概念,下面介紹 iceberg 中 insert、update、delete 操作的設(shè)計(jì)。

下圖示例的 SQL 中展示的表包含兩個(gè)字段即 id、data,兩個(gè)字段都是 int 類型。在一個(gè) transaction 中我們進(jìn)行了圖示中的數(shù)據(jù)流操作,首先插入了(1,2)一條記錄,接下來(lái)將這條記錄更新為(1,3),在 iceberg 中 update 操作將會(huì)拆為 delete 和 insert 兩個(gè)操作。

這么做的原因是考慮到 iceberg 作為流批統(tǒng)一的存儲(chǔ)層,將 update 操作拆解為 delete 和 insert 操作可以保證流批場(chǎng)景做更新時(shí)讀取路徑的統(tǒng)一,如在批量刪除的場(chǎng)景下以 Hive 為例,Hive 會(huì)將待刪除的行的文件 offset 寫(xiě)入到 delta 文件中,然后做一次 merge on read,因?yàn)檫@樣會(huì)比較快,在 merge 時(shí)通過(guò) position 將原文件和 delta 進(jìn)行映射,將會(huì)很快得到所有未刪除的記錄。

接下來(lái)又插入記錄(3,5),刪除了記錄(1,3),插入記錄(2,5),最終查詢是我們得到記錄(3,5)(2,5)。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

上面操作看上去非常簡(jiǎn)單,但在實(shí)現(xiàn)中是存在一些語(yǔ)義上的問(wèn)題。如下圖中,在一個(gè) transaction 中首先執(zhí)行插入記錄(1,2)的操作,該操作會(huì)在 data file1 文件中寫(xiě)入 INSERT(1,2),然后執(zhí)行刪除記錄(1,2)操作,該操作會(huì)在 equalify delete file1 中寫(xiě)入 DELETE(1,2),接著又執(zhí)行插入記錄(1,2)操作,該操作會(huì)在 data file1 文件中再寫(xiě)入INSERT(1,2),然后執(zhí)行查詢操作。

在正常情況下查詢結(jié)果應(yīng)該返回記錄 INSERT(1,2),但在實(shí)現(xiàn)中,DELETE(1,2)操作無(wú)法得知?jiǎng)h除的是 data file1 文件中的哪一行,因此兩行 INSERT(1,2)記錄都將被刪除。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

那么如何來(lái)解決這個(gè)問(wèn)題呢,社區(qū)當(dāng)前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即通過(guò)指定一列或多列來(lái)進(jìn)行刪除操作,position-delete 是根據(jù)文件路徑和行號(hào)來(lái)進(jìn)行刪除操作,通過(guò)將這兩種方法結(jié)合起來(lái)以保證刪除操作的正確性。

如下圖我們?cè)诘谝粋€(gè) transaction 中插入了三行記錄,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然后執(zhí)行 commit 操作進(jìn)行提交。接下來(lái)我們開(kāi)啟一個(gè)新的 transaction 并執(zhí)行插入一行數(shù)據(jù)(1,5),由于是新的 transaction,因此新建了一個(gè) data file2 并寫(xiě)入 INSERT(1,5)記錄,接下來(lái)執(zhí)行刪除記錄(1,5),實(shí)際寫(xiě)入 delete 時(shí)是:

在 position delete file1 文件寫(xiě)入(file2, 0),表示刪除 data file2 中第 0 行的記錄,這是為了解決同一個(gè) transaction 內(nèi)同一行數(shù)據(jù)反復(fù)插入刪除的語(yǔ)義的問(wèn)題。
在 equality delete file1 文件中寫(xiě)入 DELETE (1,5),之所以寫(xiě)入這個(gè) delete 是為了確保本次 txn 之前寫(xiě)入的 (1,5) 能被正確刪除。

然后執(zhí)行刪除(1,4)操作,由于(1,4)在當(dāng)前 transaction 中未曾插入過(guò),因此該操作會(huì)使用 equality-delete 操作,即在 equality delete file1 中寫(xiě)入(1,4)記錄。在上述流程中可以看出在當(dāng)前方案中存在 data file、position delete file、equality delete file 三類文件。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

在了解了寫(xiě)入流程后,如何來(lái)讀取呢。如下圖所示,對(duì)于 position delete file 中的記錄(file2, 0)只需和當(dāng)前 transaction 的 data file 進(jìn)行 join 操作,對(duì)于 equality delete file 記錄(1,4)和之前的 transaction 中的 data file 進(jìn)行 join 操作。最終得到記錄 INSERT(1,3)、INSERT(1,2)保證了流程的正確性。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

3.5 Manifest 文件的設(shè)計(jì)

上面介紹了 insert、update 及 delete,但在設(shè)計(jì) task 的執(zhí)行計(jì)劃時(shí)我們對(duì) manifest 進(jìn)行了一些設(shè)計(jì),目的是通過(guò) manifest 能夠快速到找到 data file,并按照數(shù)據(jù)大小進(jìn)行分割,保證每個(gè) task 處理的數(shù)據(jù)盡可能的均勻分布。

如下圖示例,包含四個(gè) transaction,前兩個(gè) transaction 是 INSERT 操作,對(duì)應(yīng) M1、M2,第三個(gè) transaction 是 DELETE 操作,對(duì)應(yīng) M3,第四個(gè) transaction 是 UPDATE 操作,包含兩個(gè) manifest 文件即 data manifest 和 delete manifest。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

對(duì)于為什么要對(duì) manifest 文件拆分為 data manifest 和 delete manifest 呢,本質(zhì)上是為了快速為每個(gè) data file 找到對(duì)應(yīng)的 delete file 列表??梢钥聪聢D示例,當(dāng)我們?cè)?partition-2 做讀取時(shí),需要將 deletefile-4 與datafile-2、datafile-3 做一個(gè) join 操作,同樣也需要將 deletefile-5 與 datafile-2、datafile-3 做一個(gè) join 操作。

以 datafile-3 為例,deletefile 列表包含 deletefile-4 和 deletefile-5 兩個(gè)文件,如何快速找到對(duì)應(yīng)的 deletefIle 列表呢,我們可以根據(jù)上層的 manifest 來(lái)進(jìn)行查詢,當(dāng)我們將 manifest 文件拆分為 data manifest 和 delete manifest 后,可以將 M2(data manifest)與 M3、M4(delete manifest)先進(jìn)行一次 join 操作,這樣便可以快速的得到 data file 所對(duì)應(yīng)的 delete file 列表。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

3.6 文件級(jí)別的并發(fā)

另一個(gè)問(wèn)題是我們需要保證足夠高的并發(fā)讀取,在 iceberg 中這點(diǎn)做得非常出色。在 iceberg 中可以做到文件級(jí)別的并發(fā)讀取,甚至文件中更細(xì)粒度的分段的并發(fā)讀取,比如文件有 256MB,可以分為兩個(gè) 128MB 進(jìn)行并發(fā)讀取。這里舉例說(shuō)明,假設(shè) insert 文件跟 delete 文件在兩個(gè) Bucket 中的布局方式如下圖所示。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

我們通過(guò) manifest 對(duì)比發(fā)現(xiàn),datafile-2 的 delete file 列表只有 deletefile-4,這樣可以將這兩個(gè)文件作為一個(gè)單獨(dú)的 task(圖示中Task-2)進(jìn)行執(zhí)行,其他的文件也是類似,這樣可以保證每個(gè) task 數(shù)據(jù)較為均衡的進(jìn)行 merge 操作。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

對(duì)于這個(gè)方案我們做了簡(jiǎn)單的總結(jié),如下圖所示。首先這個(gè)方案的優(yōu)點(diǎn)可以滿足正確性,并且可以實(shí)現(xiàn)高吞吐寫(xiě)入和并發(fā)高效的讀取,另外可以實(shí)現(xiàn) snapshot 級(jí)別的增量的拉取。

當(dāng)前該方案還是比較粗糙,下面也有一些可以優(yōu)化的點(diǎn)。

  • 第一點(diǎn),如果同一個(gè) task 內(nèi)的 delete file 有重復(fù)可以做緩存處理,這樣可以提高 join 的效率。

  • 第二點(diǎn),當(dāng) delete file 比較大需要溢寫(xiě)到磁盤(pán)時(shí)可以使用 kv lib 來(lái)做優(yōu)化,但這不依賴外部服務(wù)或其他繁重的索引。

  • 第三點(diǎn),可以設(shè)計(jì) Bloom filter(布隆過(guò)濾器)來(lái)過(guò)濾無(wú)效的 IO,因?yàn)閷?duì)于 Flink 中常用的 upsert 操作會(huì)產(chǎn)生一個(gè) delete 操作和一個(gè) insert 操作,這會(huì)導(dǎo)致在 iceberg 中 data file 和 delete file 大小相差不大,這樣 join 的效率不會(huì)很高。如果采用 Bloom Filter,當(dāng) upsert 數(shù)據(jù)到來(lái)時(shí),拆分為 insert 和 delete 操作,如果通過(guò) bloom filter 過(guò)濾掉那些之前沒(méi)有 insert 過(guò)數(shù)據(jù)的 delete 操作(即如果這條數(shù)據(jù)之前沒(méi)有插入過(guò),則不需要將 delete 記錄寫(xiě)入到 delete file 中),這將極大的提高 upsert 的效率。

  • 第四點(diǎn),是需要一些后臺(tái)的 compaction 策略來(lái)控制 delete file 文件大小,當(dāng) delete file 越少,分析的效率越高,當(dāng)然這些策略并不會(huì)影響正常的讀寫(xiě)。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

3.7 增量文件集的 Transaction 提交

前面介紹了文件的寫(xiě)入,下圖我們介紹如何按照 iceberg 的語(yǔ)義進(jìn)行寫(xiě)入并且供用戶讀取。主要分為數(shù)據(jù)和 metastore 兩部分,首先會(huì)有 IcebergStreamWriter 進(jìn)行數(shù)據(jù)的寫(xiě)入,但此時(shí)寫(xiě)入數(shù)據(jù)的元數(shù)據(jù)信息并沒(méi)有寫(xiě)入到 metastore,因此對(duì)外不可見(jiàn)。第二個(gè)算子是 IcebergFileCommitter,該算子會(huì)將數(shù)據(jù)文件進(jìn)行收集, 最終通過(guò) commit transaction 來(lái)完成寫(xiě)入。

在 Iceberg 中并沒(méi)有其他任何其他第三方服務(wù)的依賴,而 Hudi 在某些方面做了一些 service 的抽象,如將 metastore 抽象為獨(dú)立的 Timeline,這可能會(huì)依賴一些獨(dú)立的索引甚至是其他的外部服務(wù)來(lái)完成。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

四、未來(lái)規(guī)劃

下面是我們未來(lái)的一些規(guī)劃,首先是 Iceberg 內(nèi)核的一些優(yōu)化,包括方案中涉及到的全鏈路穩(wěn)定性測(cè)試及性能的優(yōu)化, 并提供一些 CDC 增量拉取的相關(guān) Table API 接口。

在 Flink 集成上,會(huì)實(shí)現(xiàn) CDC 數(shù)據(jù)的自動(dòng)和手動(dòng)合并數(shù)據(jù)文件的能力,并提供 Flink 增量拉取 CDC 數(shù)據(jù)的能力。

在其他生態(tài)集成上,我們會(huì)對(duì) Spark、Presto 等引擎進(jìn)行集成,并借助 Alluxio 加速數(shù)據(jù)查詢。

Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)

看完上述內(nèi)容,你們對(duì)Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。


分享文章:Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)
網(wǎng)站URL:http://weahome.cn/article/jjhpsg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部