真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink的窗口機(jī)制介紹

本篇內(nèi)容介紹了“Flink的窗口機(jī)制介紹”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

在特克斯等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)制作、做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),網(wǎng)絡(luò)營銷推廣,成都外貿(mào)網(wǎng)站建設(shè)公司,特克斯網(wǎng)站建設(shè)費(fèi)用合理。

讀懂window區(qū)別:

讀懂window內(nèi)部的源碼實(shí)現(xiàn)關(guān)系

  • Window Assigner:用來決定某個(gè)元素被分配到哪個(gè)/哪些窗口中去。

    Flink的窗口機(jī)制介紹

  • Trigger:觸發(fā)器。決定了一個(gè)窗口何時(shí)能夠被計(jì)算或清除,每個(gè)窗口都會擁有一個(gè)自己的Trigger。

    Flink的窗口機(jī)制介紹

  • Evictor:可以譯為“驅(qū)逐者”。在Trigger觸發(fā)之后,在窗口被處理之前,Evictor(如果有Evictor的話)會用來剔除窗口中不需要的元素,相當(dāng)于一個(gè)filter。

    Flink的窗口機(jī)制介紹

讀懂WindowAssignern內(nèi)部實(shí)現(xiàn)機(jī)制,它主要是實(shí)現(xiàn)數(shù)據(jù)的分發(fā),分發(fā)到不同的window中,我簡單舉例一個(gè),我設(shè)置window的開始和結(jié)束時(shí)間,然后觸發(fā)器發(fā)現(xiàn)我的window達(dá)到了結(jié)束時(shí)間,這個(gè)window就會觸發(fā)。

Flink的窗口機(jī)制介紹

一張圖讀懂trigger,evictor,emit的執(zhí)行順序

假設(shè)有一個(gè)滑動計(jì)數(shù)窗口,每2個(gè)元素計(jì)算一次最近4個(gè)元素的總和,那么窗口工作示意圖如下所示:

Flink的窗口機(jī)制介紹

測試驗(yàn)證代碼:

import java.utilimport org.apache.flink.api.common.ExecutionConfigimport org.apache.flink.streaming.api.{TimeCharacteristic, environment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssignerimport org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, Trigger}import org.apache.flink.streaming.api.windowing.windows.TimeWindowobject FlinkWindowTest {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)    val input = env.socketTextStream("localhost", 9001)    val inputMap = input.flatMap(f => {      f.split("\\W+")    }).map(line =>(line ,1))      .keyBy(0).window(new WindowAssigner[Object,TimeWindow] {      override def isEventTime = false      override def getDefaultTrigger(env: environment.StreamExecutionEnvironment) = {        ProcessingTimeTrigger.create()      }      override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext) = {        val windows = new util.ArrayList[TimeWindow](7)        //每隔1分鐘統(tǒng)計(jì)歷史5分鐘的數(shù)據(jù)        val size =1000L * 60 * 5        val slide = 1000L * 60        val lastStart = timestamp - timestamp % slide        var start = lastStart        while ( {          start > timestamp - size        })        {          start -= slide          windows.add(new TimeWindow(start, start + size))        }        //每隔1分鐘統(tǒng)計(jì)歷史1分鐘的數(shù)據(jù)        val size1 =1000L * 60        val lastStart1 = timestamp - timestamp % slide        println(timestamp % slide)        var start1 = lastStart1        while ( {          start1 > timestamp - size1        })        {          windows.add(new TimeWindow(start1, start1 + size1))          start1 -= slide        }        windows      }      override def getWindowSerializer(executionConfig: ExecutionConfig) = new TimeWindow.Serializer    }).sum(1)    .print()    env.execute()  }}

總結(jié)

WindowAssigner主要是把數(shù)據(jù)分發(fā)到不同的window窗口中去,然后每個(gè)window自己內(nèi)部設(shè)置觸發(fā)器,當(dāng)數(shù)據(jù)還沒有觸發(fā)之前整個(gè)數(shù)據(jù)是存儲在flink的state中,也就是狀態(tài)存儲。當(dāng)window觸發(fā)(Trigger的返回結(jié)果可以是)之后,Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。

“Flink的窗口機(jī)制介紹”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


網(wǎng)頁題目:Flink的窗口機(jī)制介紹
URL分享:http://weahome.cn/article/ghppjs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部