這篇文章主要介紹了Flink的函數(shù)有哪些,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供沙洋網(wǎng)站建設(shè)、沙洋做網(wǎng)站、沙洋網(wǎng)站設(shè)計(jì)、沙洋網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)與制作、沙洋企業(yè)網(wǎng)站模板建站服務(wù),10年沙洋做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
1. Map: 將數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行一個(gè)轉(zhuǎn)化,形成一個(gè)新的數(shù)據(jù)流,消費(fèi)一個(gè)元素,并且產(chǎn)生一個(gè)元素
具體代碼實(shí)現(xiàn)
package com.wudl.core; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @version v1.0 * @ProjectName Flinklearning * @ClassName WordMap * @Description TODO map 算子實(shí)例 * @Date 2020/10/29 10:15 */ public class WordMap { /** * @param args * Map 函數(shù)的用法 * 映射:將數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行一個(gè)轉(zhuǎn)化,形成一個(gè)新的數(shù)據(jù)流,消費(fèi)一個(gè)元素,并且產(chǎn)生一個(gè)元素 *參數(shù): Lambda 表達(dá)式或者,new MapFunction實(shí)現(xiàn)類 * 返回值:DataStream */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(1); env.socketTextStream("10.204.125.140", 8899) .map(new MapFunction() { @Override public String map(String s) throws Exception { String[] split = s.split(","); return split[0] + "---" + split[1]; } }).print(); env.execute(); } }
2. FlatMap:
將數(shù)據(jù)流中的整體拆分成一個(gè) 一個(gè) 的個(gè)體使用, 消費(fèi)一個(gè)元素并產(chǎn)生零到多個(gè)元素
package com.wudl.core; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.List; /** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFlatMap * @Description TODO FlatMap * * FlatMap: 是一種扁平的映射,將數(shù)據(jù)流中的整體拆分成為一個(gè)個(gè)的個(gè)體使用, 消費(fèi)后的元素產(chǎn)生零到多個(gè)元素 * * * * @Author wudl * @Date 2020/10/29 10:46 * * * 函數(shù) FlatMap * 將數(shù)據(jù)流中的整體拆分成一個(gè) 一個(gè) 的個(gè)體使用, 消費(fèi)一個(gè)元素并產(chǎn)生零到多個(gè)元素 * 參數(shù): lambda 表達(dá)式或者是FlatFunction的實(shí)現(xiàn)類 * 返回值:DataStream * * * */ public class TransformFlatMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // DataStreamSource> listDs = env.fromCollection(Arrays.asList( // Arrays.asList(1, 2, 3), // Arrays.asList(3, 4, 5), // Arrays.asList(8,9,0) // )); // listDs.flatMap(new FlatMapFunction
, Integer>() { // @Override // public void flatMap(List
list, Collector collector) throws Exception { // // for (Integer number : list) { // collector.collect(number + 100); // } // // } // }).print(); DataStreamSource strDs = env.socketTextStream("10.204.125.140", 8899); strDs.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector collector) throws Exception { String[] split = s.split(","); collector.collect(split[0]+split[1]); } }).print(); env.execute(); } }
第三種:Filter 對數(shù)據(jù)流的過濾根據(jù)指定的規(guī)則將滿足條件的(true) 的數(shù)據(jù)保留, 不瞞住條件的(false) 將丟棄
package com.wudl.core; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFilter * @Description TODO 流的過濾 * @Date 2020/11/5 10:26 */ public class TransformFilter { /** * 函數(shù)中Filter 中過濾 * 過濾:根據(jù)指定的規(guī)則將滿足條件的(true) 的數(shù)據(jù)保留, 不瞞住條件的(false) 將丟棄 * 返回值:DataStream */ public static void main(String[] args) throws Exception { //1.獲取上下文的環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.設(shè)置并行度 env.setParallelism(1); //3.獲取數(shù)據(jù)流 DataStreamSourceSourceDs = env.socketTextStream("10.204.125.140", 8899); //4. 過濾數(shù)據(jù)流 DataStream filter = SourceDs.filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { String[] split = value.split(","); return split[1].length() > 3; } }); filter.print(); env.execute(); } }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Flink的函數(shù)有哪些”這篇文章對大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!