真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

FlinkAggregate怎么用

本篇內(nèi)容主要講解“Flink  Aggregate怎么用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Flink  Aggregate怎么用”吧!

站在用戶的角度思考問題,與客戶深入溝通,找到澄城網(wǎng)站設(shè)計與澄城網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都做網(wǎng)站、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名與空間、虛擬空間、企業(yè)郵箱。業(yè)務(wù)覆蓋澄城地區(qū)。

Aggregate算子:提供基于事件窗口進(jìn)行增量計算的函數(shù)。(對輸入窗口每個數(shù)據(jù)流元素遞增聚合計算,并將窗口狀態(tài)與窗口內(nèi)元素保持在累加器中)

示例環(huán)境

java.version: 1.8.x
flink.version: 1.11.1

Aggregate.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.accumulators.AverageAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;

/**
 * @Description Aggregate算子:提供基于事件窗口進(jìn)行增量計算的函數(shù)。(對輸入窗口每個數(shù)據(jù)流元素遞增聚合計算,并將窗口狀態(tài)與窗口內(nèi)元素保持在累加器中)
 */
public class Aggregate {

    /**
     * 遍歷集合,分別打印不同性別的總?cè)藬?shù)與平均值
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Tuple3<姓名,性別(man男,girl女),年齡>
        List> tuple3List = DataSource.getTuple3ToList();
        DataStream dataStream = env.fromCollection(tuple3List)
                .keyBy((KeySelector, String>) k -> k.f1)
                //按數(shù)量窗口滾動,每3個輸入窗口數(shù)據(jù)流,計算一次
                .countWindow(3)
                //只能基于Windowed窗口Stream進(jìn)行調(diào)用
                .aggregate(new AggregateFunction, MyAverageAccumulator, MyAverageAccumulator>() {
                    /**
                     * 創(chuàng)建新累加器,開始聚合計算
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator createAccumulator() {
                        return new MyAverageAccumulator();
                    }

                    /**
                     * 將窗口輸入的數(shù)據(jù)流值添加到窗口累加器,并返回新的累加器值
                     * @param tuple3
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator add(Tuple3 tuple3, MyAverageAccumulator accumulator) {
                        System.out.println("tuple3:" + tuple3.toString());
                        accumulator.setGender(tuple3.f1);
                        //此accumulator保含個數(shù)統(tǒng)計和值累計兩個屬性,add方法內(nèi)會計算窗口內(nèi)總數(shù)與求和
                        accumulator.add(tuple3.f2);
                        return accumulator;
                    }

                    /**
                     * 獲取累加器聚合結(jié)果
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) {
                        return accumulator;
                    }

                    /**
                     * 合并兩個累加器,返回合并后的累加器的狀態(tài)
                     * @param a
                     * @param b
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) {
                        a.merge(b);
                        return a;
                    }
                });
        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 添加性別屬性(此類用于顯示不同性別的平均值)
     */
    public static class MyAverageAccumulator extends AverageAccumulator{
        private String gender;
        public String getGender() {
            return gender;
        }
        public void setGender(String gender) {
            this.gender = gender;
        }
        @Override
        public String toString() {
            //繼承父類的this.getLocalValue()方法用于計算并返回平均值
            return super.toString() + ", gender to " + gender;
        }
    }

}

打印結(jié)果

tuple3:(張三,man,20)
tuple3:(李四,girl,24)
tuple3:(劉六,girl,32)
tuple3:(王五,man,29)
tuple3:(伍七,girl,18)
tuple3:(吳八,man,30)
4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl
2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man

到此,相信大家對“Flink  Aggregate怎么用”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


標(biāo)題名稱:FlinkAggregate怎么用
本文URL:http://weahome.cn/article/ghejsh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部