小編給大家分享一下Flink State管理的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
在成都做網(wǎng)站、網(wǎng)站建設(shè)中從網(wǎng)站色彩、結(jié)構(gòu)布局、欄目設(shè)置、關(guān)鍵詞群組等細(xì)微處著手,突出企業(yè)的產(chǎn)品/服務(wù)/品牌,幫助企業(yè)鎖定精準(zhǔn)用戶,提高在線咨詢和轉(zhuǎn)化,使成都網(wǎng)站營銷成為有效果、有回報(bào)的無錫營銷推廣。創(chuàng)新互聯(lián)公司專業(yè)成都網(wǎng)站建設(shè)10多年了,客戶滿意度97.8%,歡迎成都創(chuàng)新互聯(lián)客戶聯(lián)系。
去重
比如上游的系統(tǒng)數(shù)據(jù)可能會有重復(fù),落到下游系統(tǒng)時(shí)希望把重復(fù)的數(shù)據(jù)都去掉。去重需要先了解哪些數(shù)據(jù)來過,哪些數(shù)據(jù)還沒有來,也就是把所有的主鍵都記錄下來,當(dāng)一條數(shù)據(jù)到來后,能夠看到在主鍵當(dāng)中是否存在
窗口計(jì)算
比如統(tǒng)計(jì)每分鐘 Nginx 日志 API 被訪問了多少次。窗口是一分鐘計(jì)算一次,在窗口觸發(fā)前,如 08:00 ~ 08:01 這個(gè)窗口,前59秒的數(shù)據(jù)來了需要先放入內(nèi)存,即需要把這個(gè)窗口之內(nèi)的數(shù)據(jù)先保留下來,等到 8:01 時(shí)一分鐘后,再將整個(gè)窗口內(nèi)觸發(fā)的數(shù)據(jù)輸出。未觸發(fā)的窗口數(shù)據(jù)也是一種狀態(tài)。
機(jī)器學(xué)習(xí)/深度學(xué)習(xí)
如訓(xùn)練的模型以及當(dāng)前模型的參數(shù)也是一種狀態(tài),機(jī)器學(xué)習(xí)可能每次都用有一個(gè)數(shù)據(jù)集,需要在數(shù)據(jù)集上進(jìn)行學(xué)習(xí),對模型進(jìn)行一個(gè)反饋。
訪問歷史數(shù)據(jù)
比如與昨天的數(shù)據(jù)進(jìn)行對比,需要訪問一些歷史數(shù)據(jù)。如果每次從外部去讀,對資源的消耗可能比較大,所以也希望把這些歷史數(shù)據(jù)也放入狀態(tài)中做對比。
易用
Flink 提供了豐富的數(shù)據(jù)結(jié)構(gòu)、多樣的狀態(tài)組織形式以及簡潔的擴(kuò)展接口,讓狀態(tài)管理更加易用
高效
實(shí)時(shí)作業(yè)一般需要更低的延遲,一旦出現(xiàn)故障,恢復(fù)速度也需要更快;當(dāng)處理能力不夠時(shí),可以橫向擴(kuò)展,同時(shí)在處理備份時(shí),不影響作業(yè)本身處理性能;
可靠
Flink 提供了狀態(tài)持久化,包括不丟不重的語義以及具備自動(dòng)的容錯(cuò)能力,比如 HA,當(dāng)節(jié)點(diǎn)掛掉后會自動(dòng)拉起,不需要人工介入。
從狀態(tài)管理方式的方式來說,Managed State 由 Flink Runtime 管理,自動(dòng)存儲,自動(dòng)恢復(fù),在內(nèi)存管理上有優(yōu)化;而 Raw State 需要用戶自己管理,需要自己序列化,F(xiàn)link 不知道 State中存入的數(shù)據(jù)是什么結(jié)構(gòu),只有用戶自己知道,需要最終序列化為可存儲的數(shù)據(jù)結(jié)構(gòu)。
從狀態(tài)數(shù)據(jù)結(jié)構(gòu)來說,Managed State 支持已知的數(shù)據(jù)結(jié)構(gòu),如 Value、List、Map 等。而 Raw State只支持字節(jié)數(shù)組,所有狀態(tài)都要轉(zhuǎn)換為二進(jìn)制字節(jié)數(shù)組才可以。
從推薦使用場景來說,Managed State 大多數(shù)情況下均可使用,而 Raw State 是當(dāng) Managed State 不夠用時(shí),比如需要自定義Operator時(shí),推薦使用 Raw State。
Keyed State 只能用在 KeyedStream 的算子中,即在整個(gè)程序中沒有 keyBy 的過程就沒有辦法使用 KeyedStream。
Operator State 可以用于所有算子,常用于 Source.由于 Operator State 沒有 Key,并發(fā)改變時(shí)需要選擇狀態(tài)如何重新分配。其中內(nèi)置了 2 種分配方式:一種是均勻分配,另外一種是將所有 State 合并為全量 State 再分發(fā)給每個(gè)實(shí)例
Keyed State 通過 RuntimeContext 訪問,這需要 Operator 是一個(gè)Rich Function。Operator State 需要自己實(shí)現(xiàn) CheckpointedFunction 或 ListCheckpointed 接口。在數(shù)據(jù)結(jié)構(gòu)上,Keyed State 支持的數(shù)據(jù)結(jié)構(gòu),比如 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的數(shù)據(jù)結(jié)構(gòu)相對較少,如 ListState。
ValueState 存儲單個(gè)值,比如 Wordcount,用 Word 當(dāng) Key,State 就是它的 Count。這里面的單個(gè)值可能是數(shù)值或者字符串,作為單個(gè)值,訪問接口可能有兩種,get 和 set。在 State 上體現(xiàn)的是 update(T) / T value()。
MapState 的狀態(tài)數(shù)據(jù)類型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一個(gè)。
ListState 狀態(tài)數(shù)據(jù)類型是 List,訪問接口如 add、update 等
ReducingState 和 AggregatingState 與 ListState 都是同一個(gè)父類,但狀態(tài)數(shù)據(jù)類型上是單個(gè)值,原因在于其中的 add 方法不是把當(dāng)前的元素追加到列表中,而是把當(dāng)前元素直接更新進(jìn)了 Reducing 的結(jié)果中。
AggregatingState 的區(qū)別是在訪問接口,ReducingState 中 add(T)和 T get() 進(jìn)去和出來的元素都是同一個(gè)類型,但在 AggregatingState 輸入的 IN,輸出的是 OUT。
Flink 狀態(tài)保存主要依靠 Checkpoint 機(jī)制,Checkpoint 會定時(shí)制作分布式快照,對程序中的狀態(tài)進(jìn)行備份。
MemoryStateBackend
Checkpoint 的存儲,第一種是內(nèi)存存儲,即 MemoryStateBackend,構(gòu)造方法是設(shè)置最大的StateSize,選擇是否做異步快照,這種存儲狀態(tài)本身存儲在 TaskManager 節(jié)點(diǎn)也就是執(zhí)行節(jié)點(diǎn)內(nèi)存中的,因?yàn)閮?nèi)存有容量限制,所以單個(gè) State maxStateSize 默認(rèn) 5 M,且需要注意 maxStateSize <= akka.framesize 默認(rèn) 10 M。Checkpoint 存儲在 JobManager 內(nèi)存中,因此總大小不超過 JobManager 的內(nèi)存。推薦使用的場景為:本地測試、幾乎無狀態(tài)的作業(yè),比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。不推薦在生產(chǎn)場景使用。
FsStateBackend
另一種就是在文件系統(tǒng)上的 FsStateBackend ,構(gòu)建方法是需要傳一個(gè)文件路徑和是否異步快照。State 依然在 TaskManager 內(nèi)存中,但不會像 MemoryStateBackend 有 5 M 的設(shè)置上限,Checkpoint 存儲在外部文件系統(tǒng)(本地或 HDFS),打破了總大小 Jobmanager 內(nèi)存的限制。容量限制上,單 TaskManager 上 State 總量不超過它的內(nèi)存,總大小不超過配置的文件系統(tǒng)容量。推薦使用的場景、常規(guī)使用狀態(tài)的作業(yè)、例如分鐘級窗口聚合或 join、需要開啟HA的作業(yè)。
RocksDBStateBackend
還有一種存儲為 RocksDBStateBackend ,RocksDB 是一個(gè) key/value 的內(nèi)存存儲系統(tǒng),和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中,如果內(nèi)存快滿時(shí),則寫入到磁盤中,但需要注意 RocksDB 不支持同步的 Checkpoint,構(gòu)造方法中沒有同步快照這個(gè)選項(xiàng)。不過 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲在外部文件系統(tǒng)(本地或HDFS),其容量限制只要單個(gè) TaskManager 上 State 總量不超過它的內(nèi)存+磁盤,單 Key最大 2G,總大小不超過配置的文件系統(tǒng)容量即可。推薦使用的場景為:超大狀態(tài)的作業(yè),例如天級窗口聚合、需要開啟 HA 的作業(yè)、最好是對狀態(tài)讀寫性能要求不高的作業(yè)。
看完了這篇文章,相信你對“Flink State管理的示例分析”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!