這篇文章主要介紹“Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”,在日常操作中,相信很多人在Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、坪山網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、HTML5、成都商城網(wǎng)站開發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為坪山等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
在大數(shù)據(jù)處理領(lǐng)域,有一個(gè)非常常見但是很麻煩的問題,即hdfs小文件問題,我們也被這個(gè)問題困擾了很久。開始的時(shí)候我們是自己寫的一個(gè)小文件壓縮工具,定期的去合并,原理就是把待壓縮數(shù)據(jù)寫入一個(gè)新的臨時(shí)的文件夾,壓縮完,和原來的數(shù)據(jù)進(jìn)行檢驗(yàn),數(shù)據(jù)一致之后,用壓縮的數(shù)據(jù)覆蓋原來的數(shù)據(jù),但是由于無法保證事務(wù),所以出現(xiàn)了很多的問題,比如壓縮的同時(shí)又有數(shù)據(jù)寫入了,檢驗(yàn)就會(huì)失敗,導(dǎo)致合并小文件失敗,而且無法實(shí)時(shí)的合并,只能按照分區(qū)合并一天之前的?;蛘咭粋€(gè)小時(shí)之前的,最新的數(shù)據(jù)仍然有小文件的問題,導(dǎo)致查詢性能提高不了。
所以基于以上的一些問題,我調(diào)研了數(shù)據(jù)湖技術(shù),由于我們的流式數(shù)據(jù)主要是flink為主,查詢引擎是presto,而hudi強(qiáng)耦合了spark,對flink的支持還不太友好,所以綜合考慮了一下,決定引入iceberg。在對iceberg進(jìn)行功能測試和簡單代碼review之后,發(fā)現(xiàn)iceberg在flink這塊還有一些需要優(yōu)化和提升,不過我覺得應(yīng)該能hold的住,不完善的地方和需要優(yōu)化的地方我們自己來補(bǔ)全,所以最終引入了iceberg來解決小文件的問題。
除此之外,對于一些其他的問題,比如cdc數(shù)據(jù)的接入,以及根據(jù)查詢條件刪除數(shù)據(jù)等,后續(xù)也可以通過數(shù)據(jù)湖技術(shù)來解決。
我們的主要使用場景是使用flink將kafka的流式數(shù)據(jù)寫入到Iceberg,為了代碼的簡潔以及可維護(hù)性,我們盡量將程序使用sql來編寫,示例代碼如下:
// create catalog CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive'," + 'warehouse'='hdfs://localhost/user/hive/warehouse', 'uri'='thrift://localhost:9083')// create table CREATE TABLE iceberg.tmp.iceberg_table ( id BIGINT COMMENT 'unique id', data STRING, d int) PARTITIONED BY (d)WITH ('connector'='iceberg','write.format.default'='orc')// insert into insert into iceberg.tmp.iceberg_table select * from kafka_table
提示:記得開啟checkpoint
目前壓縮小文件是采用的一個(gè)額外批任務(wù)來進(jìn)行的,Iceberg提供了一個(gè)spark版本的action,我在做功能測試的時(shí)候發(fā)現(xiàn)了一些問題,此外我對spark也不是非常熟悉,擔(dān)心出了問題不好排查,所以參照spark版本的自己實(shí)現(xiàn)了一個(gè)flink版本,并修復(fù)了一些bug,進(jìn)行了一些功能的優(yōu)化。
由于我們的iceberg的元數(shù)據(jù)都是存儲(chǔ)在hive中的,所以壓縮程序的邏輯是我把hive中所有的iceberg表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區(qū)表還是非分區(qū)表,都進(jìn)行全表的壓縮。這樣做是為了處理某些使用eventtime的flink任務(wù),如果有延遲的數(shù)據(jù)的到來。就會(huì)把數(shù)據(jù)寫入以前的分區(qū),如果不是全表壓縮只壓縮當(dāng)天分區(qū)的話,新寫入的其他天的數(shù)據(jù)就不會(huì)被壓縮。
代碼示例參考:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles()//.maxParallelism(parallelism)//.filter(Expressions.equal("day", day))//.targetSizeInBytes(targetSizeInBytes).execute();
具體的壓縮小文件相關(guān)的信息可以參考這篇文章[Flink集成iceberg數(shù)據(jù)湖之合并小文件]。
我們的快照過期策略,我是和壓縮小文件的批處理任務(wù)寫在一起的,壓縮完小文件之后,進(jìn)行表的快照過期處理,目前保留的時(shí)間是一個(gè)小時(shí),這是因?yàn)閷τ谟幸恍┍容^大的表,分區(qū)比較多,而且checkpoint比較短,如果保留的快照過長的話,還是會(huì)保留過多小文件,我們暫時(shí)沒有查詢歷史快照的需求,所以我將快照的保留時(shí)間設(shè)置了一個(gè)小時(shí)。
long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); table.expireSnapshots() // .retainLast(20) .expireOlderThan(olderThanTimestamp) .commit();
寫入了數(shù)據(jù)之后,有時(shí)候我想查看一下相應(yīng)的快照下面有多少數(shù)據(jù)文件,直接查詢hdfs你不知道哪個(gè)是有用的,哪個(gè)是沒用的。所以需要有對應(yīng)的管理工具。目前flink這塊還不太成熟,我們可以使用spark3提供的工具來查看。
目前create table 這些操作我們是通過flink sql client來做的。其他相關(guān)的ddl的操作可以使用spark來做:
https://iceberg.apache.org/spark/#ddl-commands
一些相關(guān)的數(shù)據(jù)的操作,比如刪除數(shù)據(jù)等可以通過spark來實(shí)現(xiàn),presto目前只支持分區(qū)級別的刪除功能。
在使用iceberg的過程中,有時(shí)候會(huì)有這樣的情況,我提交了一個(gè)flink任務(wù),由于各種原因,我把它給停了,這個(gè)時(shí)候iceberg還沒提交相應(yīng)的快照。還有由于一些異常導(dǎo)致程序失敗,就會(huì)產(chǎn)生一些不在iceberg元數(shù)據(jù)里面的孤立的數(shù)據(jù)文件,這些文件對iceberg來說是不可達(dá)的,也是沒用的。所以我們需要像jvm的垃圾回收一樣來清理這些文件。
目前iceberg提供了一個(gè)spark版本的action來進(jìn)行處理這些沒用的文件,我們采取的策略和壓縮小文件一樣,獲取hive中的所有的iceberg表。每隔一個(gè)小時(shí)執(zhí)行一次定時(shí)任務(wù)來刪除這些沒用的文件。
SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
在程序運(yùn)行過程中出現(xiàn)了正常的數(shù)據(jù)文件被刪除的問題,經(jīng)過調(diào)研,由于我的快照保留設(shè)置是一小時(shí),這個(gè)清理程序清理時(shí)間也是設(shè)置一個(gè)小時(shí),通過日志發(fā)現(xiàn)是這個(gè)清理程序刪除了正常的數(shù)據(jù)。查了查代碼,覺得應(yīng)該是他們設(shè)置了一樣的時(shí)間,在清理孤立文件的時(shí)候,有其他程序正在讀寫表,由于這個(gè)清理程序是沒有事務(wù)的,導(dǎo)致刪除了正常的數(shù)據(jù)。最后把這個(gè)清理程序的清理時(shí)間改成默認(rèn)的三天,沒有再出現(xiàn)刪除數(shù)據(jù)文件的問題。當(dāng)然,為了保險(xiǎn)起見,我們可以覆蓋原來的刪除文件的方法,改成將文件到一個(gè)備份文件夾,檢查沒有問題之后,手工刪除。
目前我們使用的版本是prestosql 346,這個(gè)版本安裝的時(shí)候需要jdk11,presto查詢iceberg比較簡單。官方提供了相應(yīng)的conncter,我們配置一下就行,
//iceberg.propertiesconnector.name=iceberg hive.metastore.uri=thrift://localhost:9083
目前查詢iceberg的批處理任務(wù),使用的flink的客戶端,首先我們啟動(dòng)一個(gè)基于yarn session 的flink集群,然后通過sql客戶端提交任務(wù)到集群。
主要的配置就是我們需要根據(jù)數(shù)據(jù)的大小設(shè)置sql任務(wù)執(zhí)行的并行度,可以通過以下參數(shù)設(shè)置。
set table.exec.resource.default-parallelism = 100;
此外我在sql客戶端的配置文件里配置了hive和iceberg相應(yīng)的catalog,這樣每次客戶端啟動(dòng)的時(shí)候就不需要建catalog了。
catalogs: # empty list - name: iceberg type: iceberg warehouse: hdfs://localhost/user/hive2/warehouse uri: thrift://localhost:9083 catalog-type: hive cache-enabled: false - name: hive type: hive hive-conf-dir: /Users/user/work/hive/conf default-database: default
目前對于定時(shí)調(diào)度中的批處理任務(wù),flink的sql客戶端還沒hive那樣做的很完善,比如執(zhí)行hive -f來執(zhí)行一個(gè)文件。而且不同的任務(wù)需要不同的資源,并行度等。所以我自己封裝了一個(gè)flinK程序,通過調(diào)用這個(gè)程序來進(jìn)行處理,讀取一個(gè)指定文件里面的sql,來提交批任務(wù)。在命令行控制任務(wù)的資源和并行度等。
/home/flink/bin/flink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
批任務(wù)的查詢這塊,做了一些優(yōu)化,比如limit下推,filter下推,查詢并行度優(yōu)化等,可以大大提高查詢的速度,這些優(yōu)化都已經(jīng)推回給社區(qū)。
目前我們的所有數(shù)據(jù)都是存儲(chǔ)在hive表的,在驗(yàn)證完iceberg之后,我們決定將hive的數(shù)據(jù)遷移到iceberg,所以我寫了一個(gè)工具,可以使用hive的數(shù)據(jù),然后新建一個(gè)iceberg表,為其建立相應(yīng)的元數(shù)據(jù),但是測試的時(shí)候發(fā)現(xiàn),如果采用這種方式,就需要把寫入hive的程序停止,因?yàn)槿绻鹖ceberg和hive使用同一個(gè)數(shù)據(jù)文件,而壓縮程序會(huì)不斷地壓縮iceberg表的小文件,壓縮完之后,不會(huì)馬上刪除舊數(shù)據(jù),所以hive表就會(huì)查到雙份的數(shù)據(jù)。鑒于iceberg測試的時(shí)候還有一些不穩(wěn)定,所以我們采用雙寫的策略,原來寫入hive的程序不動(dòng),新啟動(dòng)一套程序?qū)懭雐ceberg,這樣能對iceberg表觀察一段時(shí)間。還能和原來hive中的數(shù)據(jù)進(jìn)行比對,來驗(yàn)證程序的正確性。
經(jīng)過一段時(shí)間觀察,每天將近20億數(shù)據(jù)的hive表和iceberg表,一條數(shù)據(jù)也不差。所以在最終對比數(shù)據(jù)沒有問題之后,把hive表停止寫入,使用新的iceberg表,然后把hive中的舊數(shù)據(jù)導(dǎo)入到iceberg。
到此,關(guān)于“Flink集成iceberg在生產(chǎn)環(huán)境中的使用方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!