真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

ApacheFlink中Flink數(shù)據(jù)流編程是怎樣的

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)ApacheFlink中Flink數(shù)據(jù)流編程是怎樣的,文章內(nèi)容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

目前累計(jì)服務(wù)客戶上1000+,積累了豐富的產(chǎn)品開(kāi)發(fā)及服務(wù)經(jīng)驗(yàn)。以網(wǎng)站設(shè)計(jì)水平和技術(shù)實(shí)力,樹(shù)立企業(yè)形象,為客戶提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、網(wǎng)站策劃、網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)絡(luò)營(yíng)銷(xiāo)、VI設(shè)計(jì)、網(wǎng)站改版、漏洞修補(bǔ)等服務(wù)。成都創(chuàng)新互聯(lián)公司始終以務(wù)實(shí)、誠(chéng)信為根本,不斷創(chuàng)新和提高建站品質(zhì),通過(guò)對(duì)領(lǐng)先技術(shù)的掌握、對(duì)創(chuàng)意設(shè)計(jì)的研究、對(duì)客戶形象的視覺(jué)傳遞、對(duì)應(yīng)用系統(tǒng)的結(jié)合,為客戶提供更好的一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶,共同發(fā)展進(jìn)步。

數(shù)據(jù)源可以通過(guò)StreamExecutionEnvironment.addSource(sourceFunction)方式來(lái)創(chuàng)建,F(xiàn)link也提供了一些內(nèi)置的數(shù)據(jù)源方便使用,例如readTextFile(path) readFile(),當(dāng)然,也可以寫(xiě)一個(gè)自定義的數(shù)據(jù)源(可以通過(guò)實(shí)現(xiàn)SourceFunction方法,但是無(wú)法并行執(zhí)行?;蛘邔?shí)現(xiàn)可以并行實(shí)現(xiàn)的接口ParallelSourceFunction或者繼承RichParallelSourceFunction)

入門(mén)

首先做一個(gè)簡(jiǎn)單入門(mén),建立一個(gè)DataStreamSourceApp

Scala

object DataStreamSourceApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    socketFunction(env)
        env.execute("DataStreamSourceApp")
  }

  def socketFunction(env: StreamExecutionEnvironment): Unit = {
    val data=env.socketTextStream("192.168.152.45", 9999)
    data.print()
  }
}

這個(gè)方法將會(huì)從socket中讀取數(shù)據(jù),因此我們需要在192.168.152.45中開(kāi)啟服務(wù):

nc -lk 9999

然后運(yùn)行DataStreamSourceApp,在服務(wù)器上輸入:

iie4bu@swarm-manager:~$ nc -lk 9999
apache
flink
spark

在控制臺(tái)中也會(huì)輸出:

3> apache
4> flink
1> spark

前面的 341表示的是并行度??梢酝ㄟ^(guò)設(shè)置setParallelism來(lái)操作:

data.print().setParallelism(1)

Java

public class JavaDataStreamSourceApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        socketFunction(environment);
        environment.execute("JavaDataStreamSourceApp");
    }
    public static void socketFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.socketTextStream("192.168.152.45", 9999);
        data.print().setParallelism(1);
    }
}

自定義添加數(shù)據(jù)源方式

Scala

實(shí)現(xiàn)SourceFunction接口

這種方式不能并行處理。

新建一個(gè)自定義數(shù)據(jù)源

class CustomNonParallelSourceFunction extends SourceFunction[Long]{

  var count=1L
  var isRunning = true


  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning){
      ctx.collect(count)
      count+=1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

這個(gè)方法首先定義一個(gè)初始值count=1L,然后執(zhí)行的run方法,方法主要是輸出count,并且執(zhí)行加一操作,當(dāng)執(zhí)行cancel方法時(shí)結(jié)束。調(diào)用方法如下:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    socketFunction(env)
    nonParallelSourceFunction(env)
    env.execute("DataStreamSourceApp")
  }

  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data=env.addSource(new CustomNonParallelSourceFunction())
    data.print()
  }

輸出結(jié)果就是控制臺(tái)一直輸出count值。

無(wú)法設(shè)置并行度,除非設(shè)置并行度是1.

val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)

那么控制臺(tái)報(bào)錯(cuò):

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)
	at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16)
	at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11)
	at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)

繼承ParallelSourceFunction方法

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{

  var isRunning = true
  var count = 1L


  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while(isRunning){
      ctx.collect(count)
      count+=1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }
}

方法的功能跟上面是一樣的。main方法如下:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    socketFunction(env)
//    nonParallelSourceFunction(env)
    parallelSourceFunction(env)


    env.execute("DataStreamSourceApp")
  }

  def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3)
    data.print()
  }

可以設(shè)置并行度3,輸出結(jié)果如下:

2> 1
1> 1
2> 1
2> 2
3> 2
3> 2
3> 3
4> 3
4> 3

繼承RichParallelSourceFunction方法

class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {
  var isRunning = true
  var count = 1L


  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning) {
      ctx.collect(count)
      count += 1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    socketFunction(env)
    //    nonParallelSourceFunction(env)
//    parallelSourceFunction(env)
    richParallelSourceFunction(env)

    env.execute("DataStreamSourceApp")
  }

  def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3)
    data.print()
  }

Java

實(shí)現(xiàn)SourceFunction接口

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class JavaCustomNonParallelSourceFunction implements SourceFunction {

    boolean isRunning = true;
    long count = 1;

    @Override
    public void run(SourceFunction.SourceContext ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count+=1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
        nonParallelSourceFunction(environment);
        environment.execute("JavaDataStreamSourceApp");

    }

    public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction());
        data.print().setParallelism(1);
    }

當(dāng)設(shè)置并行度時(shí):

        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);

那么報(bào)錯(cuò)異常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)
	at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16)
	at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)

實(shí)現(xiàn)ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

public class JavaCustomParallelSourceFunction implements ParallelSourceFunction {

    boolean isRunning = true;
    long count = 1;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count+=1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
//        nonParallelSourceFunction(environment);
        parallelSourceFunction(environment);

        environment.execute("JavaDataStreamSourceApp");
    }

    public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2);
        data.print().setParallelism(1);
    }

可以設(shè)置并行度,輸出結(jié)果:

1
1
2
2
3
3
4
4
5
5

繼承抽象類(lèi)RichParallelSourceFunction

public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction {

    boolean isRunning = true;
    long count = 1;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count+=1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
//        nonParallelSourceFunction(environment);
//        parallelSourceFunction(environment);
        richpParallelSourceFunction(environment);
        environment.execute("JavaDataStreamSourceApp");
    }

    public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2);
        data.print().setParallelism(1);
    }

輸出結(jié)果:

1
1
2
2
3
3
4
4
5
5
6
6

SourceFunction  ParallelSourceFunction  RichParallelSourceFunction類(lèi)之間的關(guān)系

上述就是小編為大家分享的ApacheFlink中Flink數(shù)據(jù)流編程是怎樣的了,如果剛好有類(lèi)似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


新聞名稱:ApacheFlink中Flink數(shù)據(jù)流編程是怎樣的
當(dāng)前路徑:http://weahome.cn/article/gigcjs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部