本篇內(nèi)容介紹了“Flink的窗口機(jī)制介紹”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
在特克斯等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)制作、做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),網(wǎng)絡(luò)營銷推廣,成都外貿(mào)網(wǎng)站建設(shè)公司,特克斯網(wǎng)站建設(shè)費(fèi)用合理。
讀懂window區(qū)別:
讀懂window內(nèi)部的源碼實(shí)現(xiàn)關(guān)系
Window Assigner:用來決定某個(gè)元素被分配到哪個(gè)/哪些窗口中去。
Trigger:觸發(fā)器。決定了一個(gè)窗口何時(shí)能夠被計(jì)算或清除,每個(gè)窗口都會擁有一個(gè)自己的Trigger。
Evictor:可以譯為“驅(qū)逐者”。在Trigger觸發(fā)之后,在窗口被處理之前,Evictor(如果有Evictor的話)會用來剔除窗口中不需要的元素,相當(dāng)于一個(gè)filter。
讀懂WindowAssignern內(nèi)部實(shí)現(xiàn)機(jī)制,它主要是實(shí)現(xiàn)數(shù)據(jù)的分發(fā),分發(fā)到不同的window中,我簡單舉例一個(gè),我設(shè)置window的開始和結(jié)束時(shí)間,然后觸發(fā)器發(fā)現(xiàn)我的window達(dá)到了結(jié)束時(shí)間,這個(gè)window就會觸發(fā)。
一張圖讀懂trigger,evictor,emit的執(zhí)行順序
假設(shè)有一個(gè)滑動計(jì)數(shù)窗口,每2個(gè)元素計(jì)算一次最近4個(gè)元素的總和,那么窗口工作示意圖如下所示:
測試驗(yàn)證代碼:
import java.utilimport org.apache.flink.api.common.ExecutionConfigimport org.apache.flink.streaming.api.{TimeCharacteristic, environment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssignerimport org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, Trigger}import org.apache.flink.streaming.api.windowing.windows.TimeWindowobject FlinkWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val input = env.socketTextStream("localhost", 9001) val inputMap = input.flatMap(f => { f.split("\\W+") }).map(line =>(line ,1)) .keyBy(0).window(new WindowAssigner[Object,TimeWindow] { override def isEventTime = false override def getDefaultTrigger(env: environment.StreamExecutionEnvironment) = { ProcessingTimeTrigger.create() } override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext) = { val windows = new util.ArrayList[TimeWindow](7) //每隔1分鐘統(tǒng)計(jì)歷史5分鐘的數(shù)據(jù) val size =1000L * 60 * 5 val slide = 1000L * 60 val lastStart = timestamp - timestamp % slide var start = lastStart while ( { start > timestamp - size }) { start -= slide windows.add(new TimeWindow(start, start + size)) } //每隔1分鐘統(tǒng)計(jì)歷史1分鐘的數(shù)據(jù) val size1 =1000L * 60 val lastStart1 = timestamp - timestamp % slide println(timestamp % slide) var start1 = lastStart1 while ( { start1 > timestamp - size1 }) { windows.add(new TimeWindow(start1, start1 + size1)) start1 -= slide } windows } override def getWindowSerializer(executionConfig: ExecutionConfig) = new TimeWindow.Serializer }).sum(1) .print() env.execute() }}
WindowAssigner主要是把數(shù)據(jù)分發(fā)到不同的window窗口中去,然后每個(gè)window自己內(nèi)部設(shè)置觸發(fā)器,當(dāng)數(shù)據(jù)還沒有觸發(fā)之前整個(gè)數(shù)據(jù)是存儲在flink的state中,也就是狀態(tài)存儲。當(dāng)window觸發(fā)(Trigger的返回結(jié)果可以是)之后,Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。
“Flink的窗口機(jī)制介紹”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!