這篇文章主要講解了“Flink中keyBy有哪些方式指定key”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink中keyBy有哪些方式指定key”吧!
創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括清河門(mén)網(wǎng)站建設(shè)、清河門(mén)網(wǎng)站制作、清河門(mén)網(wǎng)頁(yè)制作以及清河門(mén)網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃等。多年來(lái),我們專(zhuān)注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,清河門(mén)網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶(hù)以成都為中心已經(jīng)輻射到清河門(mén)省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶(hù)的支持與信任!
不管是stream還是batch處理,都有一個(gè)keyBy(stream)和groupBy(batch)操作。那么該如何指定key?
Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.
一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個(gè)key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數(shù)據(jù)按照key進(jìn)行分組。
DataSet
DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
類(lèi)似于MySQL中的join操作:select a.* , b.* from a join b on a.id=b.id
這里的keyBy就是a.id=b.id
DataStream> input = // [...] KeyedStream ,Tuple> keyed = input.keyBy(0)
可以傳字段的位置
DataStream> input = // [...] KeyedStream ,Tuple> keyed = input.keyBy(0,1)
可以傳字段位置的組合
這對(duì)于簡(jiǎn)單的使用時(shí)沒(méi)問(wèn)題的。但是對(duì)于內(nèi)嵌的Tuple,如下所示:
DataStream,String,Long>> ds;
如果使用keyBy(0),那么他就會(huì)使用整個(gè)Tuple2
我們可以使用基于字符串字段表達(dá)式來(lái)引用內(nèi)嵌字段去定義key。
之前我們的算子寫(xiě)法是這樣的:
text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
其中的new FlatMapFunction
public static class WC { private String word; private int count; public WC() { } public WC(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } }
修改算子的寫(xiě)法:
text.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String[] tokens = value.toLowerCase().split(","); for (String token : tokens) { if (token.length() > 0) { out.collect(new WC(token, 1)); } } } }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);
將原來(lái)的輸出Tuple2
因此,在這個(gè)例子中我們有一個(gè)POJO類(lèi),有兩個(gè)字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。
語(yǔ)法:
字段名一定要與POJO類(lèi)中的字段名一致。一定要提供默認(rèn)的構(gòu)造函數(shù),和get與set方法。
使用Tuple時(shí),0表示第一個(gè)字段
可以使用嵌套方式,舉例如下:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3word; public IntWritable hadoopCitizen; }
"count",指向的是WC中的字段count
"complex",指向的是復(fù)雜數(shù)據(jù)類(lèi)型,會(huì)遞歸選擇所有ComplexNestedClass的字段
"complex.word.f2",指向的是Tuple3中的最后一個(gè)字段。
"complex.hadoopCitizen",指向的是Hadoop IntWritable
type
scala寫(xiě)法:
object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隱式轉(zhuǎn)換 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map(x => WC(x,1)) .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } case class WC(word: String, count: Int) }
.keyBy(new KeySelector() { @Override public Object getKey(WC value) throws Exception { return value.word; } })
感謝各位的閱讀,以上就是“Flink中keyBy有哪些方式指定key”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink中keyBy有哪些方式指定key這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!