真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

DeltaLake在Soul的應(yīng)用實(shí)踐是怎么樣的

Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的,相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

成都創(chuàng)新互聯(lián)是專業(yè)的西盟網(wǎng)站建設(shè)公司,西盟接單;提供網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行西盟網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

一、背景介紹

(一)業(yè)務(wù)場景

傳統(tǒng)離線數(shù)倉模式下,日志入庫前首要階段便是ETL,Soul的埋點(diǎn)日志數(shù)據(jù)量龐大且需動(dòng)態(tài)分區(qū)入庫,在按day分區(qū)的基礎(chǔ)上,每天的動(dòng)態(tài)分區(qū)1200+,分區(qū)數(shù)據(jù)量大小不均,數(shù)萬條到數(shù)十億條不等。下圖為我們之前的ETL過程,埋點(diǎn)日志輸入Kafka,由Flume采集到HDFS,再經(jīng)由天級Spark ETL任務(wù),落表入Hive。任務(wù)凌晨開始運(yùn)行,數(shù)據(jù)處理階段約1h,Load階段1h+,整體執(zhí)行時(shí)間為2-3h。

Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的

(二)存在的問題

在上面的架構(gòu)下,我們面臨如下問題:

1.天級ETL任務(wù)耗時(shí)久,影響下游依賴的產(chǎn)出時(shí)間。
2.凌晨占用資源龐大,任務(wù)高峰期搶占大量集群資源。
3.ETL任務(wù)穩(wěn)定性不佳且出錯(cuò)需凌晨解決、影響范圍大。

二、為什么選擇Delta?

為了解決天級ETL逐漸尖銳的問題,減少資源成本、提前數(shù)據(jù)產(chǎn)出,我們決定將T+1級ETL任務(wù)轉(zhuǎn)換成T+0實(shí)時(shí)日志入庫,在保證數(shù)據(jù)一致的前提下,做到數(shù)據(jù)落地即可用。
之前我們也實(shí)現(xiàn)了Lambda架構(gòu)下離線、實(shí)時(shí)分別維護(hù)一份數(shù)據(jù),但在實(shí)際使用中仍存在一些棘手問題,比如:無法保證事務(wù)性,小文件過多帶來的集群壓力及查詢性能等問題,最終沒能達(dá)到理想化使用。

所以這次我們選擇了近來逐漸進(jìn)入大家視野的數(shù)據(jù)湖架構(gòu),數(shù)據(jù)湖的概念在此我就不過多贅述了,我理解它就是一種將元數(shù)據(jù)視為大數(shù)據(jù)的Table Format。目前主流的數(shù)據(jù)湖分別有Delta Lake(分為開源版和商業(yè)版)、Hudi、Iceberg,三者都支持了ACID語義、Upsert、Schema動(dòng)態(tài)變更、Time Travel等功能,其他方面我們做些簡單的總結(jié)對比:

開源版Delta

優(yōu)勢:

1.支持作為source流式讀
2.Spark3.0支持sql操作

劣勢:

1.引擎強(qiáng)綁定Spark
2.手動(dòng)Compaction
3.Join式Merge,成本高

Hudi

優(yōu)勢:

1.基于主鍵的快速Upsert/Delete
2.Copy on Write / Merge on Read 兩種merge方式,分別適配讀寫場景優(yōu)化
3.自動(dòng)Compaction

劣勢:

1.寫入綁定Spark/DeltaStreamer
2.API較為復(fù)雜

Iceberg

優(yōu)勢:

1.可插拔引擎

劣勢:

1.調(diào)研時(shí)還在發(fā)展階段,部分功能尚未完善
2.Join式Merge,成本高

調(diào)研時(shí)期,阿里云的同學(xué)提供了EMR版本的Delta,在開源版本的基礎(chǔ)上進(jìn)行了功能和性能上的優(yōu)化,諸如:SparkSQL/Spark Streaming SQL的集成,自動(dòng)同步Delta元數(shù)據(jù)信息到HiveMetaStore(MetaSync功能),自動(dòng)Compaction,適配Tez、Hive、Presto等更多查詢引擎,優(yōu)化查詢性能(Zorder/DataSkipping/Merge性能)等等

三、實(shí)踐過程

測試階段,我們反饋了多個(gè)EMR Delta的bug,比如:Delta表無法自動(dòng)創(chuàng)建Hive映射表,Tez引擎無法正常讀取Delta類型的Hive表,Presto和Tez讀取Delta表數(shù)據(jù)不一致,均得到了阿里云同學(xué)的快速支持并一一解決。

引入Delta后,我們實(shí)時(shí)日志入庫架構(gòu)如下所示:

Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的

數(shù)據(jù)由各端埋點(diǎn)上報(bào)至Kafka,通過Spark任務(wù)分鐘級以Delta的形式寫入HDFS,然后在Hive中自動(dòng)化創(chuàng)建Delta表的映射表,即可通過Hive MR、Tez、Presto等查詢引擎直接進(jìn)行數(shù)據(jù)查詢及分析。

我們基于Spark,封裝了通用化ETL工具,實(shí)現(xiàn)了配置化接入,用戶無需寫代碼即可實(shí)現(xiàn)源數(shù)據(jù)到Hive的整體流程接入。并且,為了更加適配業(yè)務(wù)場景,我們在封裝層實(shí)現(xiàn)了多種實(shí)用功能:

1. 實(shí)現(xiàn)了類似Iceberg的hidden partition功能,用戶可選擇某些列做適當(dāng)變化形成一個(gè)新的列,此列可作為分區(qū)列,也可作為新增列,使用SparkSql操作。如:有日期列date,那么可以通過 'substr(date,1,4) as year' 生成新列,并可以作為分區(qū)。
2. 為避免臟數(shù)據(jù)導(dǎo)致分區(qū)出錯(cuò),實(shí)現(xiàn)了對動(dòng)態(tài)分區(qū)的正則檢測功能,比如:Hive中不支持中文分區(qū),用戶可以對動(dòng)態(tài)分區(qū)加上'\w+'的正則檢測,分區(qū)字段不符合的臟數(shù)據(jù)則會(huì)被過濾。
3. 實(shí)現(xiàn)自定義事件時(shí)間字段功能,用戶可選數(shù)據(jù)中的任意時(shí)間字段作為事件時(shí)間落入對應(yīng)分區(qū),避免數(shù)據(jù)漂移問題。
4. 嵌套Json自定義層數(shù)解析,我們的日志數(shù)據(jù)大都為Json格式,其中難免有很多嵌套Json,此功能支持用戶選擇對嵌套Json的解析層數(shù),嵌套字段也會(huì)被以單列的形式落入表中。
5. 實(shí)現(xiàn)SQL化自定義配置動(dòng)態(tài)分區(qū)的功能,解決埋點(diǎn)數(shù)據(jù)傾斜導(dǎo)致的實(shí)時(shí)任務(wù)性能問題,優(yōu)化資源使用,此場景后面會(huì)詳細(xì)介紹。

平臺(tái)化建設(shè):我們已經(jīng)把日志接入Hive的整體流程嵌入了Soul的數(shù)據(jù)平臺(tái)中,用戶可通過此平臺(tái)申請日志接入,由審批人員審批后進(jìn)行相應(yīng)參數(shù)配置,即可將日志實(shí)時(shí)接入Hive表中,簡單易用,降低操作成本。

Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的

為了解決小文件過多的問題,EMR Delta實(shí)現(xiàn)了Optimize/Vacuum語法,可以定期對Delta表執(zhí)行Optimize語法進(jìn)行小文件的合并,執(zhí)行Vacuum語法對過期文件進(jìn)行清理,使HDFS上的文件保持合適的大小及數(shù)量。值得一提的是,EMR Delta目前也實(shí)現(xiàn)了一些auto-compaction的策略,可以通過配置來自動(dòng)觸發(fā)compaction,比如:小文件數(shù)量達(dá)到一定值時(shí),在流式作業(yè)階段啟動(dòng)minor compaction任務(wù),在對實(shí)時(shí)任務(wù)影響較小的情況下,達(dá)到合并小文件的目的。

四、問題 & 方案

接下來介紹一下我們在落地Delta的過程中遇到過的問題

(一)埋點(diǎn)數(shù)據(jù)動(dòng)態(tài)分區(qū)數(shù)據(jù)量分布不均導(dǎo)致的數(shù)據(jù)傾斜問題

Soul的埋點(diǎn)數(shù)據(jù)是落入分區(qū)寬表中的,按埋點(diǎn)類型分區(qū),不同類型的埋點(diǎn)數(shù)據(jù)量分布不均,例如:通過Spark寫入Delta的過程中,5min為一個(gè)Batch,大部分類型的埋點(diǎn),5min的數(shù)據(jù)量很?。?0M以下),但少量埋點(diǎn)數(shù)據(jù)量卻在5min能達(dá)到1G或更多。數(shù)據(jù)落地時(shí),我們假設(shè)DataFrame有M個(gè)partition,表有N個(gè)動(dòng)態(tài)分區(qū),每個(gè)partition中的數(shù)據(jù)都是均勻且混亂的,那么每個(gè)partition中都會(huì)生成N個(gè)文件分別對應(yīng)N個(gè)動(dòng)態(tài)分區(qū),那么每個(gè)Batch就會(huì)生成M*N個(gè)小文件。

Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的

為了解決上述問題,數(shù)據(jù)落地前對DataFrame按動(dòng)態(tài)分區(qū)字段repartition,這樣就能保證每個(gè)partition中分別有不同分區(qū)的數(shù)據(jù),這樣每個(gè)Batch就只會(huì)生成N個(gè)文件,即每個(gè)動(dòng)態(tài)分區(qū)一個(gè)文件,這樣解決了小文件膨脹的問題。但與此同時(shí),有幾個(gè)數(shù)據(jù)量過大的分區(qū)的數(shù)據(jù)也會(huì)只分布在一個(gè)partition中,就導(dǎo)致了某幾個(gè)partition數(shù)據(jù)傾斜,且這些分區(qū)每個(gè)Batch產(chǎn)生的文件過大等問題。

解決方案:如下圖,我們實(shí)現(xiàn)了用戶通過SQL自定義配置repartition列的功能,簡單來說,用戶可以使用SQL,把數(shù)據(jù)量過大的幾個(gè)埋點(diǎn),通過加鹽方式打散到多個(gè)partition,對于數(shù)據(jù)量正常的埋點(diǎn)則無需操作。通過此方案,我們把Spark任務(wù)中每個(gè)Batch執(zhí)行最慢的partition的執(zhí)行時(shí)間從3min提升到了40s,解決了文件過小或過大的問題,以及數(shù)據(jù)傾斜導(dǎo)致的性能問題。

Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的

(二)應(yīng)用層基于元數(shù)據(jù)的動(dòng)態(tài)schema變更

數(shù)據(jù)湖支持了動(dòng)態(tài)schema變更,但在Spark寫入之前,構(gòu)造DataFrame時(shí),是需要獲取數(shù)據(jù)schema的,如果此時(shí)無法動(dòng)態(tài)變更,那么便無法把新字段寫入Delta表,Delta的動(dòng)態(tài)schena便也成了擺設(shè)。埋點(diǎn)數(shù)據(jù)由于類型不同,每條埋點(diǎn)數(shù)據(jù)的字段并不完全相同,那么在落表時(shí),必須取所有數(shù)據(jù)的字段并集,作為Delta表的schema,這就需要我們在構(gòu)建DataFrame時(shí)便能感知是否有新增字段。

解決方案:我們額外設(shè)計(jì)了一套元數(shù)據(jù),在Spark構(gòu)建DataFrame時(shí),首先根據(jù)此元數(shù)據(jù)判斷是否有新增字段,如有,就把新增字段更新至元數(shù)據(jù),以此元數(shù)據(jù)為schema構(gòu)建DataFrame,就能保證我們在應(yīng)用層動(dòng)態(tài)感知schema變更,配合Delta的動(dòng)態(tài)schema變更,新字段自動(dòng)寫入Delta表,并把變化同步到對應(yīng)的Hive表中。

(三)Spark Kafka偏移量提交機(jī)制導(dǎo)致的數(shù)據(jù)重復(fù)

我們在使用Spark Streaming時(shí),會(huì)在數(shù)據(jù)處理完成后將消費(fèi)者偏移量提交至Kafka,調(diào)用的是
spark-streaming-kafka-0-10中的commitAsync API。我一直處于一個(gè)誤區(qū),以為數(shù)據(jù)在處理完成后便會(huì)提交當(dāng)前Batch消費(fèi)偏移量。但后來遇到Delta表有數(shù)據(jù)重復(fù)現(xiàn)象,排查發(fā)現(xiàn)偏移量提交時(shí)機(jī)為下一個(gè)Batch開始時(shí),并不是當(dāng)前Batch數(shù)據(jù)處理完成后就提交。那么問題來了:假如一個(gè)批次5min,在3min時(shí)數(shù)據(jù)處理完成,此時(shí)成功將數(shù)據(jù)寫入Delta表,但偏移量卻在5min后(第二個(gè)批次開始時(shí))才成功提交,如果在3min-5min這個(gè)時(shí)間段中,重啟任務(wù),那么就會(huì)重復(fù)消費(fèi)當(dāng)前批次的數(shù)據(jù),造成數(shù)據(jù)重復(fù)。

解決方案:

1.StructStreaming支持了對Delta的exactly-once,可以使用StructStreaming適配解決。
2.可以通過其他方式維護(hù)消費(fèi)偏移量解決。

(四)查詢時(shí)解析元數(shù)據(jù)耗時(shí)較多

因?yàn)镈elta單獨(dú)維護(hù)了自己的元數(shù)據(jù),在使用外部查詢引擎查詢時(shí),需要先解析元數(shù)據(jù)以獲取數(shù)據(jù)文件信息。隨著Delta表的數(shù)據(jù)增長,元數(shù)據(jù)也逐漸增大,此操作耗時(shí)也逐漸變長。
解決方案:阿里云同學(xué)也在不斷優(yōu)化查詢方案,通過緩存等方式盡量減少對元數(shù)據(jù)的解析成本。

(五)關(guān)于CDC場景

目前我們基于Delta實(shí)現(xiàn)的是日志的Append場景,還有另外一種經(jīng)典業(yè)務(wù)場景CDC場景。Delta本身是支持Update/Delete的,是可以應(yīng)用在CDC場景中的。但是基于我們的業(yè)務(wù)考量,暫時(shí)沒有將Delta使用在CDC場景下,原因是Delta表的Update/Delete方式是Join式的Merge方式,我們的業(yè)務(wù)表數(shù)據(jù)量比較大,更新頻繁,并且更新數(shù)據(jù)涉及的分區(qū)較廣泛,在Merge上可能存在性能問題。
阿里云的同學(xué)也在持續(xù)在做Merge的性能優(yōu)化,比如Join的分區(qū)裁剪、Bloomfilter等,能有效減少Join時(shí)的文件數(shù)量,尤其對于分區(qū)集中的數(shù)據(jù)更新,性能更有大幅提升,后續(xù)我們也會(huì)嘗試將Delta應(yīng)用在CDC場景。

五、后續(xù)計(jì)劃

1.基于Delta Lake,進(jìn)一步打造優(yōu)化實(shí)時(shí)數(shù)倉結(jié)構(gòu),提升部分業(yè)務(wù)指標(biāo)實(shí)時(shí)性,滿足更多更實(shí)時(shí)的業(yè)務(wù)需求。
2.打通我們內(nèi)部的元數(shù)據(jù)平臺(tái),實(shí)現(xiàn)日志接入->實(shí)時(shí)入庫->元數(shù)據(jù)+血緣關(guān)系一體化、規(guī)范化管理。
3.持續(xù)觀察優(yōu)化Delta表查詢計(jì)算性能,嘗試使用Delta的更多功能,比如Z-Ordering,提升在即席查詢及數(shù)據(jù)分析場景下的性能。

看完上述內(nèi)容,你們掌握Delta Lake在Soul的應(yīng)用實(shí)踐是怎么樣的的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!


本文名稱:DeltaLake在Soul的應(yīng)用實(shí)踐是怎么樣的
網(wǎng)站地址:http://weahome.cn/article/pjgpds.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部