本篇內(nèi)容主要講解“Flink Aggregate怎么用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Flink Aggregate怎么用”吧!
站在用戶的角度思考問題,與客戶深入溝通,找到澄城網(wǎng)站設(shè)計與澄城網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都做網(wǎng)站、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名與空間、虛擬空間、企業(yè)郵箱。業(yè)務(wù)覆蓋澄城地區(qū)。
Aggregate算子:提供基于事件窗口進(jìn)行增量計算的函數(shù)。(對輸入窗口每個數(shù)據(jù)流元素遞增聚合計算,并將窗口狀態(tài)與窗口內(nèi)元素保持在累加器中)
示例環(huán)境
java.version: 1.8.x flink.version: 1.11.1
Aggregate.java
import com.flink.examples.DataSource; import org.apache.flink.api.common.accumulators.AverageAccumulator; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.List; /** * @Description Aggregate算子:提供基于事件窗口進(jìn)行增量計算的函數(shù)。(對輸入窗口每個數(shù)據(jù)流元素遞增聚合計算,并將窗口狀態(tài)與窗口內(nèi)元素保持在累加器中) */ public class Aggregate { /** * 遍歷集合,分別打印不同性別的總?cè)藬?shù)與平均值 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Tuple3<姓名,性別(man男,girl女),年齡> List> tuple3List = DataSource.getTuple3ToList(); DataStream dataStream = env.fromCollection(tuple3List) .keyBy((KeySelector , String>) k -> k.f1) //按數(shù)量窗口滾動,每3個輸入窗口數(shù)據(jù)流,計算一次 .countWindow(3) //只能基于Windowed窗口Stream進(jìn)行調(diào)用 .aggregate(new AggregateFunction , MyAverageAccumulator, MyAverageAccumulator>() { /** * 創(chuàng)建新累加器,開始聚合計算 * @return */ @Override public MyAverageAccumulator createAccumulator() { return new MyAverageAccumulator(); } /** * 將窗口輸入的數(shù)據(jù)流值添加到窗口累加器,并返回新的累加器值 * @param tuple3 * @param accumulator * @return */ @Override public MyAverageAccumulator add(Tuple3 tuple3, MyAverageAccumulator accumulator) { System.out.println("tuple3:" + tuple3.toString()); accumulator.setGender(tuple3.f1); //此accumulator保含個數(shù)統(tǒng)計和值累計兩個屬性,add方法內(nèi)會計算窗口內(nèi)總數(shù)與求和 accumulator.add(tuple3.f2); return accumulator; } /** * 獲取累加器聚合結(jié)果 * @param accumulator * @return */ @Override public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) { return accumulator; } /** * 合并兩個累加器,返回合并后的累加器的狀態(tài) * @param a * @param b * @return */ @Override public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) { a.merge(b); return a; } }); dataStream.print(); env.execute("flink Filter job"); } /** * 添加性別屬性(此類用于顯示不同性別的平均值) */ public static class MyAverageAccumulator extends AverageAccumulator{ private String gender; public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } @Override public String toString() { //繼承父類的this.getLocalValue()方法用于計算并返回平均值 return super.toString() + ", gender to " + gender; } } }
打印結(jié)果
tuple3:(張三,man,20) tuple3:(李四,girl,24) tuple3:(劉六,girl,32) tuple3:(王五,man,29) tuple3:(伍七,girl,18) tuple3:(吳八,man,30) 4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl 2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man
到此,相信大家對“Flink Aggregate怎么用”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!