這篇文章主要介紹“Flink數(shù)據(jù)流DataStream和DataSet怎么使用”,在日常操作中,相信很多人在Flink數(shù)據(jù)流DataStream和DataSet怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink數(shù)據(jù)流DataStream和DataSet怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比阿爾山網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式阿爾山網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋阿爾山地區(qū)。費(fèi)用合理售后完善,10多年實(shí)體公司更值得信賴。
Flink主要用來處理數(shù)據(jù)流,所以從抽象上來看就是對數(shù)據(jù)流的處理,正如前面大數(shù)據(jù)開發(fā)-Flink-體系結(jié)構(gòu) && 運(yùn)行架構(gòu)提到寫Flink程序?qū)嶋H上就是在寫DataSource、Transformation、Sink.
DataSource是程序的數(shù)據(jù)源輸入,可以通過StreamExecutionEnvironment.addSource(sourceFuntion)為程序 添加一個(gè)數(shù)據(jù)源
Transformation是具體的操作,它對一個(gè)或多個(gè)輸入數(shù)據(jù)源進(jìn)行計(jì)算處理,比如Map、FlatMap和Filter等操作
Sink是程序的輸出,它可以把Transformation處理之后的數(shù)據(jù)輸出到指定的存儲介質(zhì)中
Flink針對DataStream提供了兩種實(shí)現(xiàn)方式的數(shù)據(jù)源,可以歸納為以下四種:
基于文件
readTextFile(path)
讀取文本文件,文件遵循TextInputFormat逐行讀取規(guī)則并返回
基于Socket
socketTextStream
從Socket中讀取數(shù)據(jù),元素可以通過一個(gè)分隔符分開
基于集合
fromCollection(Collection)
通過Java的Collection集合創(chuàng)建一個(gè)數(shù)據(jù)流,集合中的所有元素必須是相同類型的,需要注意的是,如果集合里面的元素要識別為POJO,需要滿足下面的條件
總結(jié):上面的要求其實(shí)就是為了讓Flink可以方便地序列化和反序列化這些對象為數(shù)據(jù)流
該類有共有的無參構(gòu)造方法
該類是共有且獨(dú)立的(沒有非靜態(tài)內(nèi)部類)
類(及父類)中所有的不被static、transient修飾的屬性要么有公有的(且不被final修飾),要么是包含公有的getter和setter方法,這些方法遵循java bean命名規(guī)范
自定義Source
使用StreamExecutionEnvironment.addSource(sourceFunction)
將一個(gè)流式數(shù)據(jù)源加到程序中,具體這個(gè)sourceFunction
是為非并行源implements SourceFunction
,或者為并行源 implements ParallelSourceFunction
接口,或者extends RichParallelSourceFunction
,對于自定義Source,Sink, Flink內(nèi)置了下面幾種Connector
連接器 | 是否提供Source支持 | 是否提供Sink支持 |
---|---|---|
Apache Kafka | 是 | 是 |
ElasticSearch | 否 | 是 |
HDFS | 否 | 是 |
Twitter Streaming PI | 是 | 否 |
對于Source的使用,其實(shí)較簡單,這里給一個(gè)較常用的自定義Source的KafaSource的使用例子。更多相關(guān)源碼可以查看:
package com.hoult.stream; public class SourceFromKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic = "animalN"; Properties props = new Properties(); props.put("bootstrap.servers", "linux121:9092"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props); DataStreamSource data = env.addSource(consumer); SingleOutputStreamOperator > maped = data.map(new MapFunction >() { @Override public Tuple2 map(String value) throws Exception { System.out.println(value); Tuple2 t = new Tuple2 (0l,0l); String[] split = value.split(","); try{ t = new Tuple2 (Long.valueOf(split[0]), Long.valueOf(split[1])); } catch (Exception e) { e.printStackTrace(); } return t; } }); KeyedStream , Long> keyed = maped.keyBy(value -> value.f0); //按照key分組策略,對流式數(shù)據(jù)調(diào)用狀態(tài)化處理 SingleOutputStreamOperator > flatMaped = keyed.flatMap(new RichFlatMapFunction , Tuple2 >() { ValueState > sumState; @Override public void open(Configuration parameters) throws Exception { //在open方法中做出State ValueStateDescriptor > descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint >() { }), Tuple2.of(0L, 0L) ); sumState = getRuntimeContext().getState(descriptor); // super.open(parameters); } @Override public void flatMap(Tuple2 value, Collector > out) throws Exception { //在flatMap方法中,更新State Tuple2 currentSum = sumState.value(); currentSum.f0 += 1; currentSum.f1 += value.f1; sumState.update(currentSum); out.collect(currentSum); /*if (currentSum.f0 == 2) { long avarage = currentSum.f1 / currentSum.f0; out.collect(new Tuple2<>(value.f0, avarage)); sumState.clear(); }*/ } }); flatMaped.print(); env.execute(); } }
對于Transformation ,F(xiàn)link提供了很多的算子,
map
DataStream → DataStream Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStreamdataStream = //... dataStream.map(new MapFunction () { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
flatMap
DataStream → DataStream Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
keyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys. This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
Attention A type cannot be a key if:
fold
aggregation
window/windowAll/window.apply/window.reduce/window.fold/window.aggregation
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey" dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
更多算子操作可以查看官網(wǎng),官網(wǎng)寫的很好:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/
Flink針對DataStream提供了大量的已經(jīng)實(shí)現(xiàn)的數(shù)據(jù)目的地(Sink),具體如下所示
writeAsText():講元素以字符串形式逐行寫入,這些字符串通過調(diào)用每個(gè)元素的toString()方法來獲取
print()/printToErr():打印每個(gè)元素的toString()方法的值到標(biāo)準(zhǔn)輸出或者標(biāo)準(zhǔn)錯(cuò)誤輸出流中
自定義輸出:addSink可以實(shí)現(xiàn)把數(shù)據(jù)輸出到第三方存儲介質(zhì)中, Flink提供了一批內(nèi)置的Connector,其中有的Connector會提供對應(yīng)的Sink支持
這里舉一個(gè)常見的例子,下層到Kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class StreamToKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedata = env.socketTextStream("teacher2", 7777); String brokerList = "teacher2:9092"; String topic = "mytopic2"; FlinkKafkaProducer producer = new FlinkKafkaProducer(brokerList, topic, new SimpleStringSchema()); data.addSink(producer); env.execute(); } }
對DataSet批處理而言,較為頻繁的操作是讀取HDFS中的文件數(shù)據(jù),因?yàn)檫@里主要介紹兩個(gè)DataSource組件
基于集合 ,用來測試和DataStream類似
基于文件 readTextFile....
Flink針對DataStream提供了大量的已經(jīng)實(shí)現(xiàn)的數(shù)據(jù)目的地(Sink),具體如下所示
writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調(diào)用每個(gè)元素的toString()方法來獲取
writeAsCsv():將元組以逗號分隔寫入文件中,行及字段之間的分隔是可配置的,每個(gè)字段的值來自對象的
toString()方法
print()/pringToErr():打印每個(gè)元素的toString()方法的值到標(biāo)準(zhǔn)輸出或者標(biāo)準(zhǔn)錯(cuò)誤輸出流中 Flink提供了一批內(nèi)置的Connector,其中有的Connector會提供對應(yīng)的Sink支持。
到此,關(guān)于“Flink數(shù)據(jù)流DataStream和DataSet怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!