真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark結(jié)構(gòu)化流有狀態(tài)的聚合操作-創(chuàng)新互聯(lián)

背景

spark的結(jié)構(gòu)化流的聚合操作主要有兩種,一種是不限時(shí)間維度的聚合操作,也就是全局的聚合操作,另一種是帶時(shí)間窗口的聚合操作,本文我們主要是來談?wù)勥@兩種操作的技術(shù)實(shí)現(xiàn)及應(yīng)用場(chǎng)景

乳源網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,乳源網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為乳源千余家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的乳源做網(wǎng)站的公司定做!技術(shù)原理 不根據(jù)時(shí)間維度進(jìn)行聚合

我們以統(tǒng)計(jì)輸入單詞的個(gè)數(shù)作為例子,wordstream.groupby('word').count()這個(gè)代碼會(huì)統(tǒng)計(jì)至今為止出現(xiàn)的輸入的每個(gè)單詞的個(gè)數(shù)信息,這種聚合方式會(huì)把當(dāng)前的聚合結(jié)果作為分布式式狀態(tài)維護(hù)到spark中(spark檢查點(diǎn)目錄中),它支持兩種輸出模式:更新模式和完整模式,但不支持追加模式
1.使用更新模式輸出,每次聚合操作新增和更新的行都會(huì)輸出到輸出池中,這樣就可以使用kafka數(shù)據(jù)池作為輸出池,因?yàn)閗afka數(shù)據(jù)池支持更新模式,但是不能使用只支持追加模式的輸出池
2.使用完整模式輸出,每次聚合操作后當(dāng)前的完整的單詞對(duì)應(yīng)的個(gè)數(shù)的記錄數(shù)據(jù)都會(huì)輸出到輸出池中,每次微批觸發(fā)后都會(huì)輸出完整的記錄,至于輸出池可以選擇使用支持完整輸出模式的kafka輸出池
3.不支持追加模式,由于這種聚合方式會(huì)改變之前輸出結(jié)果,所以這種類型的聚合不支持追加模式的輸出方式.

根據(jù)事件時(shí)間窗口進(jìn)行聚合

假設(shè)我們想要統(tǒng)計(jì)每臺(tái)容器每10分鐘內(nèi)的錯(cuò)誤日志量,觸發(fā)間隔為5分鐘一次,那我們可以通過如下代碼實(shí)現(xiàn):

events.groupby('ip',window('eventtime','10 minutes','5 minutes')).count()

順便提一下:使用窗口后,觸發(fā)時(shí)間間隔配置.trgger(EventTime, '5 minutes')最好也是設(shè)置成和window函數(shù)中使用的值一樣即可

這里關(guān)鍵在于window函數(shù),他可以對(duì)數(shù)據(jù)進(jìn)行分組統(tǒng)計(jì),這種統(tǒng)計(jì)方式的優(yōu)點(diǎn)是他不會(huì)丟失任何遲到的或者亂序到來的數(shù)據(jù),而且會(huì)把他們正確的分組,因?yàn)樗麜?huì)保留歷史的所有狀態(tài)數(shù)據(jù)到檢查點(diǎn)目錄中,不過也是由于可能存在遲到了幾天的數(shù)據(jù)到達(dá),導(dǎo)致需要把這條遲到的數(shù)據(jù)歸到幾天前的對(duì)應(yīng)窗口去,所以這種方式不支持追加輸出模式,只支持更新和完整輸出模式,此外這種統(tǒng)計(jì)方式的缺點(diǎn)是狀態(tài)數(shù)據(jù)會(huì)一直累加,狀態(tài)大小無(wú)限增大,包含最新時(shí)間的新分組不停的創(chuàng)建,而舊分組數(shù)據(jù)需要一直保留,以防止可能到來的遲到事件,這就引申出另一個(gè)問題:怎么清除舊的分組來限制狀態(tài)大小的無(wú)限增長(zhǎng)呢?
答案是指定水印

events.withWatermark('eventtime','60 minutes').groupby('ip',window('eventtime','10 minutes','5 minutes')).count()

順便提一下:使用窗口后,觸發(fā)時(shí)間間隔配置.trgger(EventTime, '5 minutes')最好也是設(shè)置成和window函數(shù)中使用的值一樣即可

清除舊分組的關(guān)鍵在于怎么定義這個(gè)分組不再接收延遲到達(dá)的數(shù)據(jù),假設(shè)我們認(rèn)為數(shù)據(jù)達(dá)到的時(shí)間不會(huì)遲到超過1小時(shí),那我們就可以如上所示編寫程序,注意這里水位線的更新間隔是5分鐘,也就是新的分組的水位線等于當(dāng)前窗口開始創(chuàng)建之前已經(jīng)收到的記錄的大事件時(shí)間減去60分鐘,這個(gè)值就作為新分組的水位線,我們可以知道根據(jù)水位線的定義,水位線是遞增的,如果到來的記錄的事件時(shí)間高于水位線就把它歸到對(duì)應(yīng)的某個(gè)舊時(shí)間窗口中,當(dāng)前也可以歸到當(dāng)前或者未來的某個(gè)時(shí)間窗口分組中,如果低于水位線那么這條記錄就被直接丟掉不處理(不過這里也不一定:水印提供的保證沒有說一定會(huì)丟掉這條遲到太久的記錄)。有了這個(gè)水印之后,spark就可以清理掉舊的分組數(shù)據(jù)了,因?yàn)榈陀谒痪€的那些舊分組已經(jīng)不會(huì)被新來的數(shù)據(jù)更新了,可以安全的清理掉.

基于水位線的時(shí)間窗口聚合嚴(yán)格來說只能支持兩種輸出模式:

1.追加模式:由于水印的存在,spark可以判斷出哪個(gè)時(shí)間窗口內(nèi)的統(tǒng)計(jì)數(shù)據(jù)不會(huì)在發(fā)生變化,然后他就可以把這個(gè)數(shù)據(jù)不會(huì)再變化的舊分組的數(shù)據(jù)輸出到輸出池中了,比如基于文件的輸出池,不過使用這種追加模式的缺點(diǎn)是數(shù)據(jù)輸出要推遲到水位線超過對(duì)應(yīng)分組的窗口時(shí)間,才能輸出這個(gè)分組的聚合數(shù)據(jù)
2.更新模式:這種輸出模式下,新增分組的統(tǒng)計(jì)值和舊的分組中有變化的統(tǒng)計(jì)值會(huì)輸出到輸出池中.

為什么基于水位線的是時(shí)間窗口聚合不支持完整輸出模式?

其實(shí)嚴(yán)格來說,基于水位線的時(shí)間窗口聚合確實(shí)可以使用完整模式輸出,但是由于這種輸出模式需要保留過去所有的歷史狀態(tài)數(shù)據(jù),即使指定了水印,spark也不會(huì)清理舊分組的狀態(tài)數(shù)據(jù),相當(dāng)于水印沒有指定一樣,所以這個(gè)和使用水印的初衷是相違背的,會(huì)造成狀態(tài)數(shù)據(jù)的無(wú)限增長(zhǎng),所以這里才說基于水位線的時(shí)間窗口聚合不支持完整輸出模式

彩蛋:
提一個(gè)問題:
如果強(qiáng)行比如基于水位線的時(shí)間窗口聚合使用基于文件的追加模式輸出結(jié)果會(huì)怎么樣?
答案是:未定義,聚合類型支持的輸出模式和輸出池的輸出模式不匹配時(shí),有可能發(fā)生以下情況,第一種情況是spark直接報(bào)錯(cuò),程序沒法執(zhí)行,第二種情況是輸出結(jié)果未定義.

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧


文章題目:spark結(jié)構(gòu)化流有狀態(tài)的聚合操作-創(chuàng)新互聯(lián)
轉(zhuǎn)載來于:http://weahome.cn/article/dssshs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部