真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink中keyBy有哪些方式指定key

這篇文章主要講解了“Flink中keyBy有哪些方式指定key”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink中keyBy有哪些方式指定key”吧!

創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括清河門(mén)網(wǎng)站建設(shè)、清河門(mén)網(wǎng)站制作、清河門(mén)網(wǎng)頁(yè)制作以及清河門(mén)網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃等。多年來(lái),我們專(zhuān)注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,清河門(mén)網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶(hù)以成都為中心已經(jīng)輻射到清河門(mén)省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶(hù)的支持與信任!

keyBy 如何指定key

不管是stream還是batch處理,都有一個(gè)keyBy(stream)和groupBy(batch)操作。那么該如何指定key?

Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.

 一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個(gè)key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數(shù)據(jù)按照key進(jìn)行分組。

DataSet

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

DataStream

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

類(lèi)似于MySQL中的join操作:select a.* , b.* from a join b on a.id=b.id

這里的keyBy就是a.id=b.id

有哪幾種方式定義Key?

方式一:Tuple

DataStream> input = // [...]
KeyedStream,Tuple> keyed = input.keyBy(0)

可以傳字段的位置

DataStream> input = // [...]
KeyedStream,Tuple> keyed = input.keyBy(0,1)

可以傳字段位置的組合

這對(duì)于簡(jiǎn)單的使用時(shí)沒(méi)問(wèn)題的。但是對(duì)于內(nèi)嵌的Tuple,如下所示:

DataStream,String,Long>> ds;

如果使用keyBy(0),那么他就會(huì)使用整個(gè)Tuple2作為key,(因?yàn)門(mén)uple2是Tuple3,String,Long>的0號(hào)位置)。如果想要指定key到Tuple2內(nèi)部中,可以使用下面的方式。

方式二:字段表達(dá)式

我們可以使用基于字符串字段表達(dá)式來(lái)引用內(nèi)嵌字段去定義key。

之前我們的算子寫(xiě)法是這樣的:

text.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

其中的new FlatMapFunction>表示輸入是一個(gè)String,輸出是一個(gè)Tuple2。這里我們重新定義一個(gè)內(nèi)部類(lèi):

public static class WC {
        private String word;
        private int count;

        public WC() {
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }

修改算子的寫(xiě)法:

        text.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new WC(token, 1));
                    }
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);

將原來(lái)的輸出Tuple2,修改為輸出WC類(lèi)型;將原來(lái)的keyBy(0)修改為keyBy("word");將原來(lái)的sum(1)修改為sum("count")

因此,在這個(gè)例子中我們有一個(gè)POJO類(lèi),有兩個(gè)字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。

語(yǔ)法:

  • 字段名一定要與POJO類(lèi)中的字段名一致。一定要提供默認(rèn)的構(gòu)造函數(shù),和get與set方法。

  • 使用Tuple時(shí),0表示第一個(gè)字段

  • 可以使用嵌套方式,舉例如下:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3 word;
  public IntWritable hadoopCitizen;
}
  • "count",指向的是WC中的字段count

  • "complex",指向的是復(fù)雜數(shù)據(jù)類(lèi)型,會(huì)遞歸選擇所有ComplexNestedClass的字段

  • "complex.word.f2",指向的是Tuple3中的最后一個(gè)字段。

  • "complex.hadoopCitizen",指向的是Hadoop IntWritable type

scala寫(xiě)法:

object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 引入隱式轉(zhuǎn)換
    import org.apache.flink.api.scala._

    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map(x => WC(x,1))
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .sum("count")
        .print()
        .setParallelism(1)

    env.execute("StreamingWCScalaApp");
  }
  case class WC(word: String, count: Int)
}

 方式三:key選擇器函數(shù)

.keyBy(new KeySelector() {
            @Override
            public Object getKey(WC value) throws Exception {
                return value.word;
            }
        })

感謝各位的閱讀,以上就是“Flink中keyBy有哪些方式指定key”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink中keyBy有哪些方式指定key這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!


分享標(biāo)題:Flink中keyBy有哪些方式指定key
標(biāo)題路徑:http://weahome.cn/article/psddjg.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部