今天就跟大家聊聊有關(guān)Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
專注于為中小企業(yè)提供做網(wǎng)站、成都做網(wǎng)站服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)鼓樓免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
我們先看一下今天的 topic 需要設(shè)計(jì)的是什么?輸入是一個(gè) CDC 或者 upsert 的數(shù)據(jù),輸出是 Database 或者是用于大數(shù)據(jù) OLAP 分析的存儲(chǔ)。
我們常見(jiàn)的輸入主要有兩種數(shù)據(jù),第一種數(shù)據(jù)是數(shù)據(jù)庫(kù)的 CDC 數(shù)據(jù),不斷的產(chǎn)生 changeLog;另一種場(chǎng)景是流計(jì)算產(chǎn)生的 upsert 數(shù)據(jù),在最新的 Flink 1.12 版本已經(jīng)支持了 upsert 數(shù)據(jù)。
1.1 離線 HBase 集群分析 CDC 數(shù)據(jù)
我們通常想到的第一個(gè)方案,就是把 CDC upsert 的數(shù)據(jù)通過(guò) Flink 進(jìn)行一些處理之后,實(shí)時(shí)的寫(xiě)到 HBase 當(dāng)中。HBase 是一個(gè)在線的、能提供在線點(diǎn)查能力的一種數(shù)據(jù)庫(kù),具有非常高的實(shí)時(shí)性,對(duì)寫(xiě)入操作是非常友好的,也可以支持一些小范圍的查詢,而且集群可擴(kuò)展。
這種方案其實(shí)跟普通的點(diǎn)查實(shí)時(shí)鏈路是同一套,那么用 HBase 來(lái)做大數(shù)據(jù)的 OLAP 的查詢分析有什么問(wèn)題呢?
首先,HBase 是一個(gè)面向點(diǎn)查設(shè)計(jì)的一種數(shù)據(jù)庫(kù),是一種在線服務(wù),它的行存的索引不適合分析任務(wù)。典型的數(shù)倉(cāng)設(shè)計(jì)肯定是要列存的,這樣壓縮效率和查詢效率才會(huì)高。第二,HBase 的集群維護(hù)成本比較高。最后,HBase 的數(shù)據(jù)是 HFile,不方便與大數(shù)據(jù)里數(shù)倉(cāng)當(dāng)中典型的 Parquet、Avro、Orc 等結(jié)合。
1.2 Apache Kudu 維護(hù) CDC 數(shù)據(jù)集
針對(duì) HBase 分析能力比較弱的情況,社區(qū)前幾年出現(xiàn)了一個(gè)新的項(xiàng)目,這就是 Apache Kudu 項(xiàng)目。Kudu 項(xiàng)目擁有 HBase 的點(diǎn)查能力的同時(shí),采用列存,這樣列存加速非常適合 OLAP 分析。
這種方案會(huì)有什么問(wèn)題呢?
首先 Kudu 是比較小眾的、獨(dú)立的集群,維護(hù)成本也比較高,跟 HDFS、S3、OSS 比較割裂。其次由于 Kudu 在設(shè)計(jì)上保留了點(diǎn)查能力,所以它的批量掃描性能不如 parquet,另外 Kudu 對(duì)于 delete 的支持也比較弱,最后它也不支持增量拉取。
1.3 直接導(dǎo)入 CDC 到 Hive 分析
第三種方案,也是大家在數(shù)倉(cāng)中比較常用的方案,就是把 MySQL 的數(shù)據(jù)寫(xiě)到 Hive,流程是:維護(hù)一個(gè)全量的分區(qū),然后每天做一個(gè)增量的分區(qū),最后把增量分區(qū)寫(xiě)好之后進(jìn)行一次 Merge ,寫(xiě)入一個(gè)新的分區(qū),流程上這樣是走得通的。Hive 之前的全量分區(qū)是不受增量的影響的,只有當(dāng)增量 Merge 成功之后,分區(qū)才可查,才是一個(gè)全新的數(shù)據(jù)。這種純列存的 append 的數(shù)據(jù)對(duì)于分析是非常友好的。
這種方案會(huì)有什么問(wèn)題呢?
增量數(shù)據(jù)和全量數(shù)據(jù)的 Merge 是有延時(shí)的,數(shù)據(jù)不是實(shí)時(shí)寫(xiě)入的,典型的是一天進(jìn)行一次 Merge,這就是 T+1 的數(shù)據(jù)了。所以,時(shí)效性很差,不支持實(shí)時(shí) upsert。每次 Merge 都需要把所有數(shù)據(jù)全部重讀重寫(xiě)一遍,效率比較差、比較浪費(fèi)資源。
1.4 Spark + Delta 分析 CDC 數(shù)據(jù)
針對(duì)這個(gè)問(wèn)題,Spark + Delta 在分析 CDC 數(shù)據(jù)的時(shí)候提供了 MERGE INTO 的語(yǔ)法。這并不僅僅是對(duì) Hive 數(shù)倉(cāng)的語(yǔ)法簡(jiǎn)化,Spark + Delta 作為新型數(shù)據(jù)湖的架構(gòu)(例如 Iceberg、Hudi),它對(duì)數(shù)據(jù)的管理不是分區(qū),而是文件,因此 Delta 優(yōu)化 MERGE INTO 語(yǔ)法,僅掃描和重寫(xiě)發(fā)生變化的文件即可,因此高效很多。
我們?cè)u(píng)估一下這個(gè)方案,他的優(yōu)點(diǎn)是僅依賴 Spark + Delta 架構(gòu)簡(jiǎn)潔、沒(méi)有在線服務(wù)、列存,分析速度非??臁?yōu)化之后的 MERGE INTO 語(yǔ)法速度也夠快。
這個(gè)方案,業(yè)務(wù)上是一個(gè) Copy On Write 的一個(gè)方案,它只需要 copy 少量的文件,可以讓延遲做的相對(duì)低。理論上,在更新的數(shù)據(jù)跟現(xiàn)有的存量沒(méi)有很大重疊的話,可以把天級(jí)別的延遲做到小時(shí)級(jí)別的延遲,性能也是可以跟得上的。
這個(gè)方案在 Hive 倉(cāng)庫(kù)處理 upsert 數(shù)據(jù)的路上已經(jīng)前進(jìn)了一小步了。但小時(shí)級(jí)別的延遲畢竟不如實(shí)時(shí)更有效,因此這個(gè)方案最大的缺點(diǎn)在 Copy On Write 的 Merge 有一定的開(kāi)銷,延遲不能做的太低。
第一部分大概現(xiàn)有的方案就是這么多,同時(shí)還需要再?gòu)?qiáng)調(diào)一下,upsert 之所以如此重要,是因?yàn)樵跀?shù)據(jù)湖的方案中,upsert 是實(shí)現(xiàn)數(shù)據(jù)庫(kù)準(zhǔn)實(shí)時(shí)、實(shí)時(shí)入湖的一個(gè)關(guān)鍵技術(shù)點(diǎn)。
2.1 Flink 對(duì) CDC 數(shù)據(jù)消費(fèi)的支持
第一,F(xiàn)link 原生支持 CDC 數(shù)據(jù)消費(fèi)。在前文 Spark + Delta 的方案中,MARGE INTO 的語(yǔ)法,用戶需要感知 CDC 的屬性概念,然后寫(xiě)到 merge 的語(yǔ)法上來(lái)。但是 Flink 是原生支持 CDC 數(shù)據(jù)的。用戶只要聲明一個(gè) Debezium 或者其他 CDC 的 format,F(xiàn)link 上面的 SQL 是不需要感知任何 CDC 或者 upsert 的屬性的。Flink 中內(nèi)置了 hidden column 來(lái)標(biāo)識(shí)它 CDC 的類型數(shù)據(jù),所以對(duì)用戶而言比較簡(jiǎn)潔。
如下圖示例,在 CDC 的處理當(dāng)中,F(xiàn)link 在只用聲明一個(gè) MySQL Binlog 的 DDL 語(yǔ)句,后面的 select 都不用感知 CDC 屬性。
2.2 Flink 對(duì) Change Log Stream 的支持
下圖介紹的是 Flink 原生支持 Change Log Stream,F(xiàn)link 在接入一個(gè) Change Log Stream 之后,拓?fù)涫遣挥藐P(guān)心 Change Log flag 的 SQL。拓?fù)渫耆前凑兆约簶I(yè)務(wù)邏輯來(lái)定義,并且一直到最后寫(xiě)入 Iceberg,中間不用感知 Change Log 的 flag。
2.3 Flink + Iceberg CDC 導(dǎo)入方案評(píng)估
最后,F(xiàn)link + Iceberg 的 CDC 導(dǎo)入方案的優(yōu)點(diǎn)是什么?
對(duì)比之前的方案,Copy On Write 跟 Merge On Read 都有適用的場(chǎng)景,側(cè)重點(diǎn)不同。Copy On Write 在更新部分文件的場(chǎng)景中,當(dāng)只需要重寫(xiě)其中的一部分文件時(shí)是很高效的,產(chǎn)生的數(shù)據(jù)是純 append 的全量數(shù)據(jù)集,在用于數(shù)據(jù)分析的時(shí)候也是最快的,這是 Copy On Write 的優(yōu)勢(shì)。
另外一個(gè)是 Merge On Read,即將數(shù)據(jù)連同 CDC flag 直接 append 到 Iceberg 當(dāng)中,在 merge 的時(shí)候,把這些增量的數(shù)據(jù)按照一定的組織格式、一定高效的計(jì)算方式與全量的上一次數(shù)據(jù)進(jìn)行一次 merge。這樣的好處是支持近實(shí)時(shí)的導(dǎo)入和實(shí)時(shí)數(shù)據(jù)讀?。贿@套計(jì)算方案的 Flink SQL 原生支持 CDC 的攝入,不需要額外的業(yè)務(wù)字段設(shè)計(jì)。
Iceberg 是統(tǒng)一的數(shù)據(jù)湖存儲(chǔ),支持多樣化的計(jì)算模型,也支持各種引擎(包括 Spark、Presto、hive)來(lái)進(jìn)行分析;產(chǎn)生的 file 都是純列存的,對(duì)于后面的分析是非??斓?;Iceberg 作為數(shù)據(jù)湖基于 snapshot 的設(shè)計(jì),支持增量讀?。籌ceberg 架構(gòu)足夠簡(jiǎn)潔,沒(méi)有在線服務(wù)節(jié)點(diǎn),純 table format 的,這給了上游平臺(tái)方足夠的能力來(lái)定制自己的邏輯和服務(wù)化。
3.1 批量更新場(chǎng)景和 CDC 寫(xiě)入場(chǎng)景
首先我們來(lái)了解一下在整個(gè)數(shù)據(jù)湖里面批量更新的兩個(gè)場(chǎng)景。
第一批量更新的這種場(chǎng)景,在這個(gè)場(chǎng)景中我們使用一個(gè) SQL 更新了成千上萬(wàn)行的數(shù)據(jù),比如歐洲的 GDPR 策略,當(dāng)一個(gè)用戶注銷掉自己的賬戶之后,后臺(tái)的系統(tǒng)是必須將這個(gè)用戶所有相關(guān)的數(shù)據(jù)全部物理刪除。
第二個(gè)場(chǎng)景是我們需要將 date lake 中一些擁有共同特性的數(shù)據(jù)刪除掉,這個(gè)場(chǎng)景也是屬于批量更新的一個(gè)場(chǎng)景,在這個(gè)場(chǎng)景中刪除的條件可能是任意的條件,跟主鍵(Primary key)沒(méi)有任何關(guān)系,同時(shí)這個(gè)待更新的數(shù)據(jù)集是非常大,這種作業(yè)是一個(gè)長(zhǎng)耗時(shí)低頻次的作業(yè)。
另外是 CDC 寫(xiě)入的場(chǎng)景,對(duì)于對(duì) Flink 來(lái)說(shuō),一般常用的有兩種場(chǎng)景,第一種場(chǎng)景是上游的 Binlog 能夠很快速的寫(xiě)到 data lake 中,然后供不同的分析引擎做分析使用; 第二種場(chǎng)景是使用 Flink 做一些聚合操作,輸出的流是 upsert 類型的數(shù)據(jù)流,也需要能夠?qū)崟r(shí)的寫(xiě)到數(shù)據(jù)湖或者是下游系統(tǒng)中去做分析。如下圖示例中 CDC 寫(xiě)入場(chǎng)景中的 SQL 語(yǔ)句,我們使用單條 SQL 更新一行數(shù)據(jù),這種計(jì)算模式是一種流式增量的導(dǎo)入,而且屬于高頻的更新。
3.2 Apache Iceberg 設(shè)計(jì) CDC 寫(xiě)入方案需要考慮的問(wèn)題
接下來(lái)我們看下 iceberg 對(duì)于 CDC 寫(xiě)入這種場(chǎng)景在方案設(shè)計(jì)時(shí)需要考慮哪些問(wèn)題。
第一是正確性,即需要保證語(yǔ)義及數(shù)據(jù)的正確性,如上游數(shù)據(jù) upsert 到 iceberg 中,當(dāng)上游 upsert 停止后, iceberg 中的數(shù)據(jù)需要和上游系統(tǒng)中的數(shù)據(jù)保持一致。
第二是高效寫(xiě)入,由于 upsert 的寫(xiě)入頻率非常高,我們需要保持高吞吐、高并發(fā)的寫(xiě)入。
第三是快速讀取,當(dāng)數(shù)據(jù)寫(xiě)入后我們需要對(duì)數(shù)據(jù)進(jìn)行分析,這其中涉及到兩個(gè)問(wèn)題,第一個(gè)問(wèn)題是需要支持細(xì)粒度的并發(fā),當(dāng)作業(yè)使用多個(gè) task 來(lái)讀取時(shí)可以保證為各個(gè) task 進(jìn)行均衡的分配以此來(lái)加速數(shù)據(jù)的計(jì)算;第二個(gè)問(wèn)題是我們要充分發(fā)揮列式存儲(chǔ)的優(yōu)勢(shì)來(lái)加速讀取。
第四是支持增量讀,例如一些傳統(tǒng)數(shù)倉(cāng)中的 ETL,通過(guò)增量讀取來(lái)進(jìn)行進(jìn)一步數(shù)據(jù)轉(zhuǎn)換。
3.3 Apache Iceberg Basic
在介紹具體的方案細(xì)節(jié)之前,我們先了解一下 Iceberg 在文件系統(tǒng)中的布局,總體來(lái)講 Iceberg 分為兩部分?jǐn)?shù)據(jù),第一部分是數(shù)據(jù)文件,如下圖中的 parquet 文件,每個(gè)數(shù)據(jù)文件對(duì)應(yīng)一個(gè)校驗(yàn)文件(.crc文件)。第二部分是表元數(shù)據(jù)文件(Metadata 文件),包含 Snapshot 文件(snap-.avro)、Manifest 文件(.avro)、TableMetadata 文件(*.json)等。
下圖展示了在 iceberg 中 snapshot、manifest 及 partition 中的文件的對(duì)應(yīng)關(guān)系。下圖中包含了三個(gè) partition,第一個(gè) partition 中有兩個(gè)文件 f1、f3,第二個(gè) partition 有兩個(gè)文件f4、f5,第三個(gè) partition 有一個(gè)文件f2。對(duì)于每一次寫(xiě)入都會(huì)生成一個(gè) manifest 文件,該文件記錄本次寫(xiě)入的文件與 partition 的對(duì)應(yīng)關(guān)系。再向上層有 snapshot 的概念,snapshot 能夠幫助快速訪問(wèn)到整張表的全量數(shù)據(jù),snapshot 記錄多個(gè) manifest,如第二個(gè) snapshot 包含 manifest2 和 manifest3。
3.4 INSERT、UPDATE、DELETE 寫(xiě)入
在了解了基本的概念,下面介紹 iceberg 中 insert、update、delete 操作的設(shè)計(jì)。
下圖示例的 SQL 中展示的表包含兩個(gè)字段即 id、data,兩個(gè)字段都是 int 類型。在一個(gè) transaction 中我們進(jìn)行了圖示中的數(shù)據(jù)流操作,首先插入了(1,2)一條記錄,接下來(lái)將這條記錄更新為(1,3),在 iceberg 中 update 操作將會(huì)拆為 delete 和 insert 兩個(gè)操作。
這么做的原因是考慮到 iceberg 作為流批統(tǒng)一的存儲(chǔ)層,將 update 操作拆解為 delete 和 insert 操作可以保證流批場(chǎng)景做更新時(shí)讀取路徑的統(tǒng)一,如在批量刪除的場(chǎng)景下以 Hive 為例,Hive 會(huì)將待刪除的行的文件 offset 寫(xiě)入到 delta 文件中,然后做一次 merge on read,因?yàn)檫@樣會(huì)比較快,在 merge 時(shí)通過(guò) position 將原文件和 delta 進(jìn)行映射,將會(huì)很快得到所有未刪除的記錄。
接下來(lái)又插入記錄(3,5),刪除了記錄(1,3),插入記錄(2,5),最終查詢是我們得到記錄(3,5)(2,5)。
上面操作看上去非常簡(jiǎn)單,但在實(shí)現(xiàn)中是存在一些語(yǔ)義上的問(wèn)題。如下圖中,在一個(gè) transaction 中首先執(zhí)行插入記錄(1,2)的操作,該操作會(huì)在 data file1 文件中寫(xiě)入 INSERT(1,2),然后執(zhí)行刪除記錄(1,2)操作,該操作會(huì)在 equalify delete file1 中寫(xiě)入 DELETE(1,2),接著又執(zhí)行插入記錄(1,2)操作,該操作會(huì)在 data file1 文件中再寫(xiě)入INSERT(1,2),然后執(zhí)行查詢操作。
在正常情況下查詢結(jié)果應(yīng)該返回記錄 INSERT(1,2),但在實(shí)現(xiàn)中,DELETE(1,2)操作無(wú)法得知?jiǎng)h除的是 data file1 文件中的哪一行,因此兩行 INSERT(1,2)記錄都將被刪除。
那么如何來(lái)解決這個(gè)問(wèn)題呢,社區(qū)當(dāng)前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即通過(guò)指定一列或多列來(lái)進(jìn)行刪除操作,position-delete 是根據(jù)文件路徑和行號(hào)來(lái)進(jìn)行刪除操作,通過(guò)將這兩種方法結(jié)合起來(lái)以保證刪除操作的正確性。
如下圖我們?cè)诘谝粋€(gè) transaction 中插入了三行記錄,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然后執(zhí)行 commit 操作進(jìn)行提交。接下來(lái)我們開(kāi)啟一個(gè)新的 transaction 并執(zhí)行插入一行數(shù)據(jù)(1,5),由于是新的 transaction,因此新建了一個(gè) data file2 并寫(xiě)入 INSERT(1,5)記錄,接下來(lái)執(zhí)行刪除記錄(1,5),實(shí)際寫(xiě)入 delete 時(shí)是:
在 position delete file1 文件寫(xiě)入(file2, 0),表示刪除 data file2 中第 0 行的記錄,這是為了解決同一個(gè) transaction 內(nèi)同一行數(shù)據(jù)反復(fù)插入刪除的語(yǔ)義的問(wèn)題。
在 equality delete file1 文件中寫(xiě)入 DELETE (1,5),之所以寫(xiě)入這個(gè) delete 是為了確保本次 txn 之前寫(xiě)入的 (1,5) 能被正確刪除。
然后執(zhí)行刪除(1,4)操作,由于(1,4)在當(dāng)前 transaction 中未曾插入過(guò),因此該操作會(huì)使用 equality-delete 操作,即在 equality delete file1 中寫(xiě)入(1,4)記錄。在上述流程中可以看出在當(dāng)前方案中存在 data file、position delete file、equality delete file 三類文件。
在了解了寫(xiě)入流程后,如何來(lái)讀取呢。如下圖所示,對(duì)于 position delete file 中的記錄(file2, 0)只需和當(dāng)前 transaction 的 data file 進(jìn)行 join 操作,對(duì)于 equality delete file 記錄(1,4)和之前的 transaction 中的 data file 進(jìn)行 join 操作。最終得到記錄 INSERT(1,3)、INSERT(1,2)保證了流程的正確性。
3.5 Manifest 文件的設(shè)計(jì)
上面介紹了 insert、update 及 delete,但在設(shè)計(jì) task 的執(zhí)行計(jì)劃時(shí)我們對(duì) manifest 進(jìn)行了一些設(shè)計(jì),目的是通過(guò) manifest 能夠快速到找到 data file,并按照數(shù)據(jù)大小進(jìn)行分割,保證每個(gè) task 處理的數(shù)據(jù)盡可能的均勻分布。
如下圖示例,包含四個(gè) transaction,前兩個(gè) transaction 是 INSERT 操作,對(duì)應(yīng) M1、M2,第三個(gè) transaction 是 DELETE 操作,對(duì)應(yīng) M3,第四個(gè) transaction 是 UPDATE 操作,包含兩個(gè) manifest 文件即 data manifest 和 delete manifest。
對(duì)于為什么要對(duì) manifest 文件拆分為 data manifest 和 delete manifest 呢,本質(zhì)上是為了快速為每個(gè) data file 找到對(duì)應(yīng)的 delete file 列表??梢钥聪聢D示例,當(dāng)我們?cè)?partition-2 做讀取時(shí),需要將 deletefile-4 與datafile-2、datafile-3 做一個(gè) join 操作,同樣也需要將 deletefile-5 與 datafile-2、datafile-3 做一個(gè) join 操作。
以 datafile-3 為例,deletefile 列表包含 deletefile-4 和 deletefile-5 兩個(gè)文件,如何快速找到對(duì)應(yīng)的 deletefIle 列表呢,我們可以根據(jù)上層的 manifest 來(lái)進(jìn)行查詢,當(dāng)我們將 manifest 文件拆分為 data manifest 和 delete manifest 后,可以將 M2(data manifest)與 M3、M4(delete manifest)先進(jìn)行一次 join 操作,這樣便可以快速的得到 data file 所對(duì)應(yīng)的 delete file 列表。
3.6 文件級(jí)別的并發(fā)
另一個(gè)問(wèn)題是我們需要保證足夠高的并發(fā)讀取,在 iceberg 中這點(diǎn)做得非常出色。在 iceberg 中可以做到文件級(jí)別的并發(fā)讀取,甚至文件中更細(xì)粒度的分段的并發(fā)讀取,比如文件有 256MB,可以分為兩個(gè) 128MB 進(jìn)行并發(fā)讀取。這里舉例說(shuō)明,假設(shè) insert 文件跟 delete 文件在兩個(gè) Bucket 中的布局方式如下圖所示。
我們通過(guò) manifest 對(duì)比發(fā)現(xiàn),datafile-2 的 delete file 列表只有 deletefile-4,這樣可以將這兩個(gè)文件作為一個(gè)單獨(dú)的 task(圖示中Task-2)進(jìn)行執(zhí)行,其他的文件也是類似,這樣可以保證每個(gè) task 數(shù)據(jù)較為均衡的進(jìn)行 merge 操作。
對(duì)于這個(gè)方案我們做了簡(jiǎn)單的總結(jié),如下圖所示。首先這個(gè)方案的優(yōu)點(diǎn)可以滿足正確性,并且可以實(shí)現(xiàn)高吞吐寫(xiě)入和并發(fā)高效的讀取,另外可以實(shí)現(xiàn) snapshot 級(jí)別的增量的拉取。
當(dāng)前該方案還是比較粗糙,下面也有一些可以優(yōu)化的點(diǎn)。
第一點(diǎn),如果同一個(gè) task 內(nèi)的 delete file 有重復(fù)可以做緩存處理,這樣可以提高 join 的效率。
第二點(diǎn),當(dāng) delete file 比較大需要溢寫(xiě)到磁盤(pán)時(shí)可以使用 kv lib 來(lái)做優(yōu)化,但這不依賴外部服務(wù)或其他繁重的索引。
第三點(diǎn),可以設(shè)計(jì) Bloom filter(布隆過(guò)濾器)來(lái)過(guò)濾無(wú)效的 IO,因?yàn)閷?duì)于 Flink 中常用的 upsert 操作會(huì)產(chǎn)生一個(gè) delete 操作和一個(gè) insert 操作,這會(huì)導(dǎo)致在 iceberg 中 data file 和 delete file 大小相差不大,這樣 join 的效率不會(huì)很高。如果采用 Bloom Filter,當(dāng) upsert 數(shù)據(jù)到來(lái)時(shí),拆分為 insert 和 delete 操作,如果通過(guò) bloom filter 過(guò)濾掉那些之前沒(méi)有 insert 過(guò)數(shù)據(jù)的 delete 操作(即如果這條數(shù)據(jù)之前沒(méi)有插入過(guò),則不需要將 delete 記錄寫(xiě)入到 delete file 中),這將極大的提高 upsert 的效率。
第四點(diǎn),是需要一些后臺(tái)的 compaction 策略來(lái)控制 delete file 文件大小,當(dāng) delete file 越少,分析的效率越高,當(dāng)然這些策略并不會(huì)影響正常的讀寫(xiě)。
3.7 增量文件集的 Transaction 提交
前面介紹了文件的寫(xiě)入,下圖我們介紹如何按照 iceberg 的語(yǔ)義進(jìn)行寫(xiě)入并且供用戶讀取。主要分為數(shù)據(jù)和 metastore 兩部分,首先會(huì)有 IcebergStreamWriter 進(jìn)行數(shù)據(jù)的寫(xiě)入,但此時(shí)寫(xiě)入數(shù)據(jù)的元數(shù)據(jù)信息并沒(méi)有寫(xiě)入到 metastore,因此對(duì)外不可見(jiàn)。第二個(gè)算子是 IcebergFileCommitter,該算子會(huì)將數(shù)據(jù)文件進(jìn)行收集, 最終通過(guò) commit transaction 來(lái)完成寫(xiě)入。
在 Iceberg 中并沒(méi)有其他任何其他第三方服務(wù)的依賴,而 Hudi 在某些方面做了一些 service 的抽象,如將 metastore 抽象為獨(dú)立的 Timeline,這可能會(huì)依賴一些獨(dú)立的索引甚至是其他的外部服務(wù)來(lái)完成。
下面是我們未來(lái)的一些規(guī)劃,首先是 Iceberg 內(nèi)核的一些優(yōu)化,包括方案中涉及到的全鏈路穩(wěn)定性測(cè)試及性能的優(yōu)化, 并提供一些 CDC 增量拉取的相關(guān) Table API 接口。
在 Flink 集成上,會(huì)實(shí)現(xiàn) CDC 數(shù)據(jù)的自動(dòng)和手動(dòng)合并數(shù)據(jù)文件的能力,并提供 Flink 增量拉取 CDC 數(shù)據(jù)的能力。
在其他生態(tài)集成上,我們會(huì)對(duì) Spark、Presto 等引擎進(jìn)行集成,并借助 Alluxio 加速數(shù)據(jù)查詢。
看完上述內(nèi)容,你們對(duì)Flink如何實(shí)時(shí)分析Iceberg數(shù)據(jù)湖的CDC數(shù)據(jù)有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。