真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink中Watermarks怎么用

這篇文章將為大家詳細(xì)講解有關(guān)Flink中Watermarks怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

成都創(chuàng)新互聯(lián)是專業(yè)的訥河網(wǎng)站建設(shè)公司,訥河接單;提供網(wǎng)站制作、做網(wǎng)站,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行訥河網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!

Watermarks水?。簽檩斎氲臄?shù)據(jù)流的設(shè)置一個時間事件(時間戳),對窗口內(nèi)的數(shù)據(jù)輸入流無序與延遲提供解決方案

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1

TimestampsAndWatermarks.java

import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * @Description Watermarks水?。簽檩斎氲臄?shù)據(jù)流的設(shè)置一個時間事件(時間戳),對窗口內(nèi)的數(shù)據(jù)輸入流無序與延遲提供解決方案
 */
public class TimestampsAndWatermarks {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
     */

    /**
     * 遍歷集合,分別打印不同性別的信息,對于執(zhí)行超時,自動觸發(fā)定時器
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
        TimeCharacteristic有三種時間類型:
            ProcessingTime:以operator處理的時間為準(zhǔn),它使用的是機(jī)器的系統(tǒng)時間來作為data stream的時間;
            IngestionTime:以數(shù)據(jù)進(jìn)入flink streaming data flow的時間為準(zhǔn);
            EventTime:以數(shù)據(jù)自帶的時間戳字段為準(zhǔn),應(yīng)用程序需要指定如何從record中抽取時間戳字段;需要實現(xiàn)assignTimestampsAndWatermarks方法,并設(shè)置時間水位線;
         */
        //使用event time,需要指定事件的時間戳
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        //設(shè)置自動生成水印的時間周期,避免數(shù)據(jù)流量大的情況下,頻繁添加水印導(dǎo)致計算性能降低。
        env.getConfig().setAutoWatermarkInterval(1000L);
        List> tuple3List = DataSource.getTuple3ToList();

        DataStream> inStream = env.addSource(new MyRichSourceFunction());
        DataStream> dataStream = inStream
                //為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發(fā)此window的計算,Watermarks就是用來觸發(fā)window計算的。
                //Duration.ofSeconds(2),到數(shù)據(jù)流到達(dá)flink后,再水位線中設(shè)置延遲時間,也就是在所有數(shù)據(jù)流的最大的事件時間比window窗口結(jié)束時間大或相等時,再延遲多久觸發(fā)window窗口結(jié)束;
//                .assignTimestampsAndWatermarks(
//                        WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//                                .withTimestampAssigner((element, timestamp) -> {
//                                    long times = System.currentTimeMillis() ;
//                                    System.out.println(element.f1 + ","+ element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
//                                    return times;
//                                })
//                )
                .assignTimestampsAndWatermarks(new MyWatermarkStrategy()
                        .withTimestampAssigner(new SerializableTimestampAssigner>() {
                            @Override
                            public long extractTimestamp(Tuple3 element, long timestamp) {
                                long times = System.currentTimeMillis();
                                System.out.println(element.f1 + "," + element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                                return times;
                            }
                        }))
                //分區(qū)窗口
                .keyBy((KeySelector, String>) k -> k.f1)
                //觸發(fā)3s滾動窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                //執(zhí)行窗口數(shù)據(jù),對keyBy數(shù)據(jù)流批量處理
                .apply(new WindowFunction, Tuple2, String, TimeWindow>(){
                    @Override
                    public void apply(String s, TimeWindow window, Iterable> input, Collector> out) throws Exception {
                        long times = System.currentTimeMillis() ;
                        System.out.println();
                        System.out.println("窗口處理時間:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                        Iterator> iterator = input.iterator();
                        int total = 0;
                        int size = 0;
                        String sex = "";
                        while (iterator.hasNext()){
                            Tuple3 tuple3 = iterator.next();
                            total += tuple3.f2;
                            size ++;
                            sex = tuple3.f1;
                        }
                        out.collect(new Tuple2<>(sex, total / size));
                    }
                });

        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 定期水印生成器
     */
    public static class MyWatermarkStrategy implements WatermarkStrategy>{
        @Override
        public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator>() {
                //設(shè)置固定的延遲量3.5 seconds
                private final long maxOutOfOrderness = 3500;
                private long currentMaxTimestamp;

                /**
                 * 事件處理
                 * @param event             數(shù)據(jù)流對象
                 * @param eventTimestamp    事件水位線時間
                 * @param output            輸出
                 */
                @Override
                public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 拿上一個水印時間 - 延遲量 = 等于給的窗口最終數(shù)據(jù)最后時間(如果在窗口到期內(nèi),未發(fā)生新的水印事件,則按window正常結(jié)束時間計算,當(dāng)在最后水印時間-延遲量的時間范圍內(nèi),有新的數(shù)據(jù)流進(jìn)入,則會重新觸發(fā)窗口內(nèi)對全部數(shù)據(jù)流計算)
                    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
                }
            };
        }
    }

    /**
     * 模擬數(shù)據(jù)持續(xù)輸出
     */
    public static class MyRichSourceFunction extends RichSourceFunction> {
        @Override
        public void run(SourceContext> ctx) throws Exception {
            List> tuple3List = DataSource.getTuple3ToList();
            int j = 0;
            for (int i=0;i<100;i++){
                if (i%6 == 0){
                    j=0;
                }
                ctx.collect(tuple3List.get(j));
                //1秒鐘輸出一個
                Thread.sleep(1 * 1000);
                j ++;
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

打印結(jié)果

man,張三的水位線為:2020-12-27 10:28:20
girl,李四的水位線為:2020-12-27 10:28:21
man,王五的水位線為:2020-12-27 10:28:22
girl,劉六的水位線為:2020-12-27 10:28:23
girl,伍七的水位線為:2020-12-27 10:28:24

窗口處理時間:2020-12-27 10:28:25
(man,20)
man,吳八的水位線為:2020-12-27 10:28:25
man,張三的水位線為:2020-12-27 10:28:26
girl,李四的水位線為:2020-12-27 10:28:27

窗口處理時間:2020-12-27 10:28:28
(girl,28)

窗口處理時間:2020-12-27 10:28:28
(man,29)

關(guān)于“Flink中Watermarks怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。


當(dāng)前名稱:Flink中Watermarks怎么用
文章源于:http://weahome.cn/article/ggside.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部