這篇文章主要介紹Flink 1.11中流批一體Hive數(shù)倉的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
創(chuàng)新互聯(lián)公司服務項目包括墨玉網(wǎng)站建設、墨玉網(wǎng)站制作、墨玉網(wǎng)頁制作以及墨玉網(wǎng)絡營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,墨玉網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務的客戶以成都為中心已經(jīng)輻射到墨玉省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!
首先恭喜 Table/SQL 的 blink planner 成為默認 Planner,撒花、撒花。Flink 1.11 中流計算結(jié)合 Hive 批處理數(shù)倉,給離線數(shù)倉帶來 Flink 流處理實時且 Exactly-once 的能力。另外,F(xiàn)link 1.11 完善了 Flink 自身的 Filesystem connector,大大提高了 Flink 的易用性。
傳統(tǒng)的離線數(shù)倉是由 Hive 加上 HDFS 的方案,Hive 數(shù)倉有著成熟和穩(wěn)定的大數(shù)據(jù)分析能力,結(jié)合調(diào)度和上下游工具,構(gòu)建一個完整的數(shù)據(jù)處理分析平臺,流程如下:
Flume 把數(shù)據(jù)導入 Hive 數(shù)倉
調(diào)度工具,調(diào)度 ETL 作業(yè)進行數(shù)據(jù)處理
在 Hive 數(shù)倉的表上,可以進行靈活的 Ad-hoc 查詢
調(diào)度工具,調(diào)度聚合作業(yè)輸出到BI層的數(shù)據(jù)庫中
這個流程下的問題是:
針對離線數(shù)倉的特點,隨著實時計算的流行,越來越多的公司引入實時數(shù)倉,實時數(shù)倉基于 Kafka + Flink streaming,定義全流程的流計算作業(yè),有著秒級甚至毫秒的實時性。
但是,實時數(shù)倉的一個問題是歷史數(shù)據(jù)只有 3-15 天,無法在其上做 Ad-hoc 的查詢。如果搭建 Lambda 的離線+實時的架構(gòu),維護成本、計算存儲成本、一致性保證、重復的開發(fā)會帶來很大的負擔。
Flink 1.11 為解決離線數(shù)倉的問題,給 Hive 數(shù)倉帶來了實時化的能力,加強各環(huán)節(jié)的實時性的同時,又不會給架構(gòu)造成太大的負擔。
實時數(shù)據(jù)導入 Hive 數(shù)倉,你是怎么做的?Flume、Spark Streaming 還是 Flink Datastream?千呼萬喚,Table / SQL 層的 streaming file sink 來啦,Flink 1.11 支持 Filesystem connector[1] 和 Hive connector 的 streaming sink[2]。(注:圖中 StreamingFileSink 的 Bucket 概念就是 Table/SQL 中的 Partition)Table/SQL 層的 streaming sink 不僅:- 帶來 Flink streaming 的實時/準實時的能力
- 支持 Filesystem connector 的全部 formats(csv,json,avro,parquet,orc)
- 支持 Hive table 的所有 formats
- 繼承 Datastream StreamingFileSink 的所有特性:Exactly-once、支持HDFS, S3。
而且引入了新的機制:Partition commit。一個合理的數(shù)倉的數(shù)據(jù)導入,它不止包含數(shù)據(jù)文件的寫入,也包含了 Partition 的可見性提交。當某個 Partition 完成寫入時,需要通知 Hive metastore 或者在文件夾內(nèi)添加 SUCCESS 文件。Flink 1.11 的 Partition commit 機制可以讓你:- Trigger:控制Partition提交的時機,可以根據(jù)Watermark加上從Partition中提取的時間來判斷,也可以通過Processing time來判斷。你可以控制:是想先盡快看到?jīng)]寫完的Partition;還是保證寫完P(guān)artition之后,再讓下游看到它。
- Policy:提交策略,內(nèi)置支持SUCCESS文件和Metastore的提交,你也可以擴展提交的實現(xiàn),比如在提交階段觸發(fā)Hive的analysis來生成統(tǒng)計信息,或者進行小文件的合并等等。
-- 結(jié)合Hive dialect使用Hive DDL語法SET table.sql-dialect=hive;CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE) PARTITIONED BY ( dt STRING, hour STRING) STORED AS PARQUET TBLPROPERTIES ( -- 使用partition中抽取時間,加上watermark決定partiton commit的時機 'sink.partition-commit.trigger'='partition-time', -- 配置hour級別的partition時間抽取策略,這個例子中dt字段是yyyy-MM-dd格式的天,hour是0-23的小時,timestamp-pattern定義了如何從這兩個partition字段推出完整的timestamp 'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’, -- 配置dalay為小時級,當 watermark > partition時間 + 1小時,會commit這個partition 'sink.partition-commit.delay'='1 h', -- partitiion commit的策略是:先更新metastore(addPartition),再寫SUCCESS文件 'sink.partition-commit.policy.kind’='metastore,success-file') SET table.sql-dialect=default;CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) -- 可以結(jié)合Table Hints動態(tài)指定table properties [3]INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
Hive 數(shù)倉中存在大量的 ETL 任務,這些任務往往是通過調(diào)度工具來周期性的運行,這樣做主要有兩個問題:
針對這些離線的 ETL 作業(yè),Flink 1.11 為此開發(fā)了實時化的 Hive 流讀,支持:- Partition 表,監(jiān)控 Partition 的生成,增量讀取新的 Partition。
- 非 Partition 表,監(jiān)控文件夾內(nèi)新文件的生成,增量讀取新的文件。
你甚至可以使用10分鐘級別的分區(qū)策略,使用 Flink 的 Hive streaming source 和Hive streaming sink 可以大大提高 Hive 數(shù)倉的實時性到準實時分鐘級 [4][5],在實時化的同時,也支持針對 Table 全量的 Ad-hoc 查詢,提高靈活性。SELECT * FROM hive_table/*+ OPTIONS('streaming-source.enable'=’true’,'streaming-source.consume-start-offset'='2020-05-20') */;
實時數(shù)據(jù)關(guān)聯(lián) Hive 表
在 Flink 與 Hive 集成的功能發(fā)布以后,我們收到最多的用戶反饋之一就是希望能夠?qū)?Flink 的實時數(shù)據(jù)與離線的 Hive 表進行關(guān)聯(lián)。因此,在 Flink 1.11 中,我們支持將實時表與 Hive 表進行 temporal join[6]。沿用 Flink 官方文檔中的例子,假定 Orders 是實時表,而 LatestRates 是一張 Hive 表,用戶可以通過以下語句進行temporal join:SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
與 Hive 表進行 temporal join 目前只支持 processing time,我們會把 Hive 表的數(shù)據(jù)緩存到內(nèi)存中,并按照固定的時間間隔去更新緩存的數(shù)據(jù)。用戶可以通過參數(shù)“l(fā)ookup.join.cache.ttl” 來控制緩存更新的間隔,默認間隔為一個小時。“l(fā)ookup.join.cache.ttl” 需要配置到 Hive 表的 property 當中,因此每張表可以有不同的配置。另外,由于需要將整張 Hive 表加載到內(nèi)存中,因此目前只適用于 Hive 表較小的場景。Flink on Hive 用戶并不能很好的使用 DDL,主要是因為:- Flink 1.10 中進一步完善了 DDL,但由于 Flink 與 Hive 在元數(shù)據(jù)語義上的差異,通過 Flink DDL 來操作 Hive 元數(shù)據(jù)的可用性比較差,僅能覆蓋很少的應用場景。
- 使用 Flink 對接 Hive 的用戶經(jīng)常需要切換到 Hive CLI 來執(zhí)行 DDL。
針對上述兩個問題,我們提出了 FLIP-123 [7],通過 Hive Dialect 為用戶提供 Hive語法兼容。該功能的最終目標,是為用戶提供近似 Hive CLI/Beeline 的使用體驗,讓用戶無需在 Flink 和 Hive 的 CLI 之間進行切換,甚至可以直接遷移部分 Hive 腳本到 Flink 中執(zhí)行。 在 Flink 1.11中,Hive Dialect 可以支持大部分常用的 DDL,比如 CREATE/ALTER TABLE、CHANGE/REPLACE COLUMN、ADD/DROP PARTITION 等等。為此,我們?yōu)?Hive Dialect 實現(xiàn)了一個獨立的 parser,F(xiàn)link 會根據(jù)用戶指定的 Dialect 決定使用哪個 parser 來解析 SQL 語句。用戶可以通過配置項“ table.sql-dialect ” 來指定使用的 SQL Dialect。它的默認值為 “default”,即 Flink 原生的 Dialect,而將其設置為 “hive” 時就開啟了 Hive Dialect。對于 SQL 用戶,可以在 yaml 文件中設置“table.sql-dialect” 來指定 session 的初始 Dialect,也可以通過 set 命令來動態(tài)調(diào)整需要使用的 Dialect,而無需重啟 session。Hive Dialect 目前所支持的具體功能可以參考 FLIP-123 或 Flink 的官方文檔。另外,該功能的一些設計原則和使用注意事項如下: - Hive Dialect 只能用于操作 Hive 表,而不是 Flink 原生的表(如 Kafka、ES 的表),這也意味著 Hive Dialect 需要配合 HiveCatalog 使用。
- 使用 Hive Dialect 時,原有的 Flink 的一些語法可能會無法使用(例如 Flink 定義的類型別名),在需要使用 Flink 語法時可以動態(tài)切換到默認的 Dialect。
- Hive Dialect 的 DDL 語法定義基于 Hive 的官方文檔,而不同 Hive 版本之間語法可能會有輕微的差異,需要用戶進行一定的調(diào)整。
- Hive Dialect 的語法實現(xiàn)基于 Calcite,而 Calcite 與 Hive 有不同的保留關(guān)鍵字。因此,某些在 Hive 中可以直接作為標識符的關(guān)鍵字(如 “default” ),在Hive Dialect 中可能需要用“`”進行轉(zhuǎn)義。
Flink 1.10中,F(xiàn)link 已經(jīng)支持了 ORC (Hive 2+) 的向量化讀取支持,但是這很局限,為此,F(xiàn)link 1.11 增加了更多的向量化支持:- Parquet for Hive 1,2,3 [9]
也就是說已經(jīng)補全了所有版本的 Parquet 和 ORC 向量化支持,默認是開啟的,提供開關(guān)。Flink 1.10 中,F(xiàn)link 文檔中列出了所需的 Hive 相關(guān)依賴,推薦用戶自行下載。但是這仍然稍顯麻煩,所以在1.11 中,F(xiàn)link 提供了內(nèi)置的依賴支持[10]:- flink-sql-connector-hive-1.2.2_2.11-1.11.jar:Hive 1 的依賴版本。
- flink-sql-connector-hive-2.2.0_2.11-1.11.jar:Hive 2.0 - 2.2 的依賴版本。
- flink-sql-connector-hive-2.3.6_2.11-1.11.jar:Hive 2.3 的依賴版本。
- flink-sql-connector-hive-3.1.2_2.11-1.11.jar:Hive 3 的依賴版本。
現(xiàn)在,你只需要單獨下一個包,再搞定 HADOOP_CLASSPATH,即可運行 Flink on Hive。除了 Hive 相關(guān)的 features,F(xiàn)link 1.11 也完成了大量其它關(guān)于流批一體的增強。Flink Filesystem connector
Flink table 在長久以來只支持一個 csv 的 file system table,而且它還不支持Partition,行為上在某些方面也有些不符合大數(shù)據(jù)計算的直覺。在 Flink 1.11,重構(gòu)了整個 Filesystem connector 的實現(xiàn)[1]:- 結(jié)合 Partition,現(xiàn)在,F(xiàn)ilesystem connector 支持 SQL 中 Partition 的所有語義,支持 Partition 的 DDL,支持 Partition Pruning,支持靜態(tài)/動態(tài) Partition 的插入,支持 overwrite 的插入。
- 支持 Streaming sink,也支持上述 Hive 支持的 Partition commit,支持寫Success 文件。
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
hour STRING
) PARTITIONED BY (dt, hour) WITH (
’connector’=’filesystem’,
’path’=’...’,
’format’=’parquet’,
'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,
'sink.partition-commit.delay'='1 h',
‘sink.partition-commit.policy.kind’='success-file')
)
-- stream environment or batch environment
INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
-- 通過 Partition 查詢
SELECT * FROM fs_table WHERE dt=’2020-05-20’ and hour=’12’;
Yarn perJob 或者 session 模式在 1.11 之前是無限擴張的,沒有辦法限制它的資源使用,只能用 Yarn queue 等方式來限制。但是傳統(tǒng)的批作業(yè)其實都是大并發(fā),運行在局限的資源上,一部分一部分階段性的運行,為此,F(xiàn)link 1.11 引入 Max Slot 的配置[11],限制 Yarn application 的資源使用。slotmanager.number-of-slots.max
定義 Flink 集群分配的最大 Slot 數(shù)。此配置選項用于限制批處理工作負載的資源消耗。不建議為流作業(yè)配置此選項,如果沒有足夠的 Slot,則流作業(yè)可能會失敗。以上是“Flink 1.11中流批一體Hive數(shù)倉的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
網(wǎng)頁題目:Flink1.11中流批一體Hive數(shù)倉的示例分析
標題來源:
http://weahome.cn/article/ipcdso.html