這篇文章主要講解了“flink中的聚合算子是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“flink中的聚合算子是什么”吧!
創(chuàng)新互聯(lián)建站從2013年創(chuàng)立,先為察隅等服務(wù)建站,察隅等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為察隅企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
flink中的一個(gè)接口org.apache.flink.api.common.functions.AggregateFunction,這個(gè)類可以接在window流之后,做窗口內(nèi)的統(tǒng)計(jì)計(jì)算。
注意:除了這個(gè)接口AggregateFunction,flink中還有一個(gè)抽象類AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把這個(gè)弄混淆了,接口AggregateFunction我們可以理解為flink中的一個(gè)算子,和MapFunction、FlatMapFunction等是同級(jí)別的,而抽象類AggregateFunction是用于用戶自定義聚合函數(shù)的,和max、min之類的函數(shù)是同級(jí)的。
比如我們想實(shí)現(xiàn)一個(gè)類似sql的功能:
select TUMBLE_START(proctime,INTERVAL '2' SECOND) as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)
這個(gè)sql就是來統(tǒng)計(jì)一下每兩秒鐘的滑動(dòng)窗口內(nèi)每個(gè)人出現(xiàn)的次數(shù),今天我們就以這個(gè)簡單的sql的功能為例講解一下flink的aggregate算子,其實(shí)就是我們用程序來實(shí)現(xiàn)這個(gè)sql的功能。
首先看一下聚合函數(shù)的接口:
@PublicEvolving
public interface AggregateFunction extends Function, Serializable {
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
ACC merge(ACC a, ACC b);
OUT getResult(ACC accumulator);
}
這個(gè)接口AggregateFunction里面有4個(gè)方法,我們分別來講解一下。
首先我們自定義source生成用戶的信息
public static class MySource implements SourceFunction>{
private volatile boolean isRunning = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceContext> ctx) throws Exception{
while (isRunning){
Thread.sleep(10);
String userid = userids[(int) (Math.random() * (userids.length - 1))];
ctx.collect(Tuple2.of(userid, System.currentTimeMillis()));
}
}
@Override
public void cancel(){
isRunning = false;
}
}
public static class CountAggregate
implements AggregateFunction,Integer,Integer>{
@Override
public Integer createAccumulator(){
return 0;
}
@Override
public Integer add(Tuple2 value, Integer accumulator){
return ++accumulator;
}
@Override
public Integer getResult(Integer accumulator){
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b){
return a + b;
}
}
/**
* 這個(gè)是為了將聚合結(jié)果輸出
*/
public static class WindowResult
implements WindowFunction,Tuple,TimeWindow>{
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable input,
Collector> out) throws Exception{
String k = ((Tuple1) key).f0;
long windowStart = window.getStart();
int result = input.iterator().next();
out.collect(Tuple3.of(k, new Date(windowStart), result));
}
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> dataStream = env.addSource(new MySource());
dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.aggregate(new CountAggregate(), new WindowResult()
).print();
env.execute();
感謝各位的閱讀,以上就是“flink中的聚合算子是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)flink中的聚合算子是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!