這篇文章將為大家詳細(xì)講解有關(guān)Flink中Watermarks怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
成都創(chuàng)新互聯(lián)是專業(yè)的訥河網(wǎng)站建設(shè)公司,訥河接單;提供網(wǎng)站制作、做網(wǎng)站,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行訥河網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!
Watermarks水?。簽檩斎氲臄?shù)據(jù)流的設(shè)置一個時間事件(時間戳),對窗口內(nèi)的數(shù)據(jù)輸入流無序與延遲提供解決方案
示例環(huán)境
java.version: 1.8.xflink.version: 1.11.1
TimestampsAndWatermarks.java
import com.flink.examples.DataSource; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.Date; import java.util.Iterator; import java.util.List; /** * @Description Watermarks水?。簽檩斎氲臄?shù)據(jù)流的設(shè)置一個時間事件(時間戳),對窗口內(nèi)的數(shù)據(jù)輸入流無序與延遲提供解決方案 */ public class TimestampsAndWatermarks { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html */ /** * 遍歷集合,分別打印不同性別的信息,對于執(zhí)行超時,自動觸發(fā)定時器 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* TimeCharacteristic有三種時間類型: ProcessingTime:以operator處理的時間為準(zhǔn),它使用的是機(jī)器的系統(tǒng)時間來作為data stream的時間; IngestionTime:以數(shù)據(jù)進(jìn)入flink streaming data flow的時間為準(zhǔn); EventTime:以數(shù)據(jù)自帶的時間戳字段為準(zhǔn),應(yīng)用程序需要指定如何從record中抽取時間戳字段;需要實現(xiàn)assignTimestampsAndWatermarks方法,并設(shè)置時間水位線; */ //使用event time,需要指定事件的時間戳 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //設(shè)置自動生成水印的時間周期,避免數(shù)據(jù)流量大的情況下,頻繁添加水印導(dǎo)致計算性能降低。 env.getConfig().setAutoWatermarkInterval(1000L); List> tuple3List = DataSource.getTuple3ToList(); DataStream > inStream = env.addSource(new MyRichSourceFunction()); DataStream > dataStream = inStream //為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發(fā)此window的計算,Watermarks就是用來觸發(fā)window計算的。 //Duration.ofSeconds(2),到數(shù)據(jù)流到達(dá)flink后,再水位線中設(shè)置延遲時間,也就是在所有數(shù)據(jù)流的最大的事件時間比window窗口結(jié)束時間大或相等時,再延遲多久觸發(fā)window窗口結(jié)束; // .assignTimestampsAndWatermarks( // WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(2)) // .withTimestampAssigner((element, timestamp) -> { // long times = System.currentTimeMillis() ; // System.out.println(element.f1 + ","+ element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); // return times; // }) // ) .assignTimestampsAndWatermarks(new MyWatermarkStrategy() .withTimestampAssigner(new SerializableTimestampAssigner >() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { long times = System.currentTimeMillis(); System.out.println(element.f1 + "," + element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); return times; } })) //分區(qū)窗口 .keyBy((KeySelector , String>) k -> k.f1) //觸發(fā)3s滾動窗口 .window(TumblingEventTimeWindows.of(Time.seconds(3))) //執(zhí)行窗口數(shù)據(jù),對keyBy數(shù)據(jù)流批量處理 .apply(new WindowFunction , Tuple2 , String, TimeWindow>(){ @Override public void apply(String s, TimeWindow window, Iterable > input, Collector > out) throws Exception { long times = System.currentTimeMillis() ; System.out.println(); System.out.println("窗口處理時間:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); Iterator > iterator = input.iterator(); int total = 0; int size = 0; String sex = ""; while (iterator.hasNext()){ Tuple3 tuple3 = iterator.next(); total += tuple3.f2; size ++; sex = tuple3.f1; } out.collect(new Tuple2<>(sex, total / size)); } }); dataStream.print(); env.execute("flink Filter job"); } /** * 定期水印生成器 */ public static class MyWatermarkStrategy implements WatermarkStrategy >{ @Override public WatermarkGenerator > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator >() { //設(shè)置固定的延遲量3.5 seconds private final long maxOutOfOrderness = 3500; private long currentMaxTimestamp; /** * 事件處理 * @param event 數(shù)據(jù)流對象 * @param eventTimestamp 事件水位線時間 * @param output 輸出 */ @Override public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 拿上一個水印時間 - 延遲量 = 等于給的窗口最終數(shù)據(jù)最后時間(如果在窗口到期內(nèi),未發(fā)生新的水印事件,則按window正常結(jié)束時間計算,當(dāng)在最后水印時間-延遲量的時間范圍內(nèi),有新的數(shù)據(jù)流進(jìn)入,則會重新觸發(fā)窗口內(nèi)對全部數(shù)據(jù)流計算) output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } }; } } /** * 模擬數(shù)據(jù)持續(xù)輸出 */ public static class MyRichSourceFunction extends RichSourceFunction > { @Override public void run(SourceContext > ctx) throws Exception { List > tuple3List = DataSource.getTuple3ToList(); int j = 0; for (int i=0;i<100;i++){ if (i%6 == 0){ j=0; } ctx.collect(tuple3List.get(j)); //1秒鐘輸出一個 Thread.sleep(1 * 1000); j ++; } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } } }
打印結(jié)果
man,張三的水位線為:2020-12-27 10:28:20 girl,李四的水位線為:2020-12-27 10:28:21 man,王五的水位線為:2020-12-27 10:28:22 girl,劉六的水位線為:2020-12-27 10:28:23 girl,伍七的水位線為:2020-12-27 10:28:24 窗口處理時間:2020-12-27 10:28:25 (man,20) man,吳八的水位線為:2020-12-27 10:28:25 man,張三的水位線為:2020-12-27 10:28:26 girl,李四的水位線為:2020-12-27 10:28:27 窗口處理時間:2020-12-27 10:28:28 (girl,28) 窗口處理時間:2020-12-27 10:28:28 (man,29)
關(guān)于“Flink中Watermarks怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。