這篇文章主要介紹“Apache下Flink transformation的用法”,在日常操作中,相信很多人在Apache下Flink transformation的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Apache下Flink transformation的用法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
10年積累的成都網(wǎng)站設(shè)計、成都做網(wǎng)站、外貿(mào)網(wǎng)站建設(shè)經(jīng)驗,可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識你,你也不認(rèn)識我。但先網(wǎng)站設(shè)計制作后付款的網(wǎng)站建設(shè)流程,更有崖州免費網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
新建一個Object
object DataSetTransformationApp { def main(args: Array[String]): Unit = { val environment = ExecutionEnvironment.getExecutionEnvironment } def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) } }
這里的數(shù)據(jù)源是一個1到10的list集合。Map的原理是:假設(shè)data數(shù)據(jù)集中有N個元素,將每一個元素進行轉(zhuǎn)化:
data.map { x => x.toInt }
好比:y=f(x)
// 對data中的每一個元素都去做一個+1操作 data.map((x:Int) => x + 1 ).print()
然后對每一個元素都做一個+1操作。
簡單寫法:
如果這個里面只有一個元素,就可以直接寫成下面形式:
data.map((x) => x + 1).print()
更簡潔的寫法:
data.map(x => x + 1).print()
更簡潔的方法:
data.map(_ + 1).print()
輸出結(jié)果:
2 3 4 5 6 7 8 9 10 11
public static void main(String[] args) throws Exception { ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); mapFunction(executionEnvironment); } public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception { Listlist = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i + ""); } DataSource data = executionEnvironment.fromCollection(list); data.map(new MapFunction () { public Integer map(String input) { return Integer.parseInt(input) + 1; } }).print(); }
因為我們定義的List是一個String的泛型,因此MapFunction的泛型是
將每個元素執(zhí)行+1操作,并取出大于5的元素。
def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) data.map(_ + 1).filter(_ > 5).print() }
filter只會返回滿足條件的記錄。
public static void filterFunction(ExecutionEnvironment env) throws Exception { Listlist = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i); } DataSource data = env.fromCollection(list); data.map(new MapFunction () { public Integer map(Integer input) { return input + 1; } }).filter(new FilterFunction () { @Override public boolean filter(Integer input) throws Exception { return input > 5; } }).print(); }
map function 與 MapPartition function有什么區(qū)別?
需求:DataSource 中有100個元素,把結(jié)果存儲在數(shù)據(jù)庫中
如果使用map function ,那么實現(xiàn)方法如下:
// DataSource 中有100個元素,把結(jié)果存儲在數(shù)據(jù)庫中 def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for (i <- 1 to 100) { students.append("Student" + i) } val data = env.fromCollection(students) data.map(x=>{ // 每一個元素要存儲到數(shù)據(jù)庫中去,肯定需要先獲取到connection val connection = DBUtils.getConnection() println(connection + " ... ") // TODO .... 保存數(shù)據(jù)到DB DBUtils.returnConnection(connection) }).print() }
打印結(jié)果,將會打印100個獲取DBUtils.getConnection()的請求。如果數(shù)據(jù)量增多,顯然不停的獲取連接是不現(xiàn)實的。
因此MapPartition就應(yīng)運而生了,轉(zhuǎn)換一個分區(qū)里面的數(shù)據(jù),也就是說一個分區(qū)中的數(shù)據(jù)調(diào)用一次。
因此要首先設(shè)置分區(qū):
val data = env.fromCollection(students).setParallelism(4)
設(shè)置4個分區(qū),也就是并行度,然后使用mapPartition來處理:
data.mapPartition(x => { val connection = DBUtils.getConnection() println(connection + " ... ") // TODO .... 保存數(shù)據(jù)到DB DBUtils.returnConnection(connection) x }).print()
那么就會的到4次連接請求,每一個分區(qū)獲取一個connection。
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception { Listlist = new ArrayList<>(); for (int i = 1; i <= 100; i++) { list.add("student:" + i); } DataSource data = env.fromCollection(list); /*data.map(new MapFunction () { @Override public String map(String input) throws Exception { String connection = DBUtils.getConnection(); System.out.println("connection = [" + connection + "]"); DBUtils.returnConnection(connection); return input; } }).print();*/ data.mapPartition(new MapPartitionFunction () { @Override public void mapPartition(Iterable values, Collector
first表示獲取前幾個,groupBy表示分組,sortGroup表示分組內(nèi)排序
def firstFunction(env:ExecutionEnvironment): Unit = { val info = ListBuffer[(Int, String)]() info.append((1, "hadoop")) info.append((1, "spark")) info.append((1, "flink")) info.append((2, "java")) info.append((2, "springboot")) info.append((3, "linux")) info.append((4, "vue")) val data = env.fromCollection(info) data.first(3).print() //輸出:(1,hadoop) //(1,spark) //(1,flink) data.groupBy(0).first(2).print()//根據(jù)第一個字段分組,每個分組獲取前兩個數(shù)據(jù) //(3,linux) //(1,hadoop) //(1,spark) //(2,java) //(2,springboot) //(4,vue) data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根據(jù)第一個字段分組,然后在分組內(nèi)根據(jù)第二個字段升序排序,并取出前兩個數(shù)據(jù) //輸出(3,linux) //(1,flink) //(1,hadoop) //(2,java) //(2,springboot) //(4,vue) }
public static void firstFunction(ExecutionEnvironment env) throws Exception { List> info = new ArrayList<>(); info.add(new Tuple2<>(1, "hadoop")); info.add(new Tuple2<>(1, "spark")); info.add(new Tuple2<>(1, "flink")); info.add(new Tuple2<>(2, "java")); info.add(new Tuple2<>(2, "springboot")); info.add(new Tuple2<>(3, "linux")); info.add(new Tuple2<>(4, "vue")); DataSource > data = env.fromCollection(info); data.first(3).print(); data.groupBy(0).first(2).print(); data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print(); }
獲取一個元素,然后產(chǎn)生0個、1個或多個元素
def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(String)]() info.append("hadoop,spark"); info.append("hadoop,flink"); info.append("flink,flink"); val data = env.fromCollection(info) data.flatMap(_.split(",")).print() }
輸出:
hadoop spark hadoop flink flink flink
FlatMap將每個元素都用逗號分割,然后變成多個。
經(jīng)典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
將每個元素用逗號分割,然后每個元素做map,然后根據(jù)第一個字段分組,然后根據(jù)第二個字段求和。
輸出結(jié)果如下:
(hadoop,2) (flink,3) (spark,1)
同樣實現(xiàn)一個經(jīng)典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception { Listinfo = new ArrayList<>(); info.add("hadoop,spark"); info.add("hadoop,flink"); info.add("flink,flink"); DataSource data = env.fromCollection(info); data.flatMap(new FlatMapFunction () { @Override public void flatMap(String input, Collector out) throws Exception { String[] splits = input.split(","); for(String split: splits) { //發(fā)送出去 out.collect(split); } } }).map(new MapFunction >() { @Override public Tuple2 map(String value) throws Exception { return new Tuple2<>(value,1); } }).groupBy(0).sum(1).print(); }
去重操作
def distinctFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(String)]() info.append("hadoop,spark"); info.append("hadoop,flink"); info.append("flink,flink"); val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }
這樣就將每一個元素都做了去重操作。輸出如下:
hadoop flink spark
public static void distinctFunction(ExecutionEnvironment env) throws Exception { Listinfo = new ArrayList<>(); info.add("hadoop,spark"); info.add("hadoop,flink"); info.add("flink,flink"); DataSource data = env.fromCollection(info); data.flatMap(new FlatMapFunction () { @Override public void flatMap(String input, Collector out) throws Exception { String[] splits = input.split(","); for(String split: splits) { //發(fā)送出去 out.collect(split); } } }).distinct().print(); }
Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
result = input1.join(input2) .where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1)
表示第一個tuple input1中的第0個字段,與第二個tuple input2中的第一個字段進行join。
def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int, String)]() //編號 名字 info1.append((1, "hadoop")) info1.append((2, "spark")) info1.append((3, "flink")) info1.append((4, "java")) val info2 = ListBuffer[(Int, String)]() //編號 城市 info2.append((1, "北京")) info2.append((2, "上海")) info2.append((3, "深圳")) info2.append((5, "廣州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.join(data2).where(0).equalTo(0).apply((first, second)=>{ (first._1, first._2, second._2) }).print() }
輸出結(jié)果如下:
(3,flink,深圳) (1,hadoop,北京) (2,spark,上海)
public static void joinFunction(ExecutionEnvironment env) throws Exception { List> info1 = new ArrayList<>(); //編號 名字 info1.add(new Tuple2<>(1, "hadoop")); info1.add(new Tuple2<>(2, "spark")); info1.add(new Tuple2<>(3, "flink")); info1.add(new Tuple2<>(4, "java")); List > info2 = new ArrayList<>(); //編號 城市 info2.add(new Tuple2<>(1, "北京")); info2.add(new Tuple2<>(2, "上海")); info2.add(new Tuple2<>(3, "深圳")); info2.add(new Tuple2<>(5, "廣州")); DataSource > data1 = env.fromCollection(info1); DataSource > data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0).with(new JoinFunction , Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { return new Tuple3 (first.f0, first.f1,second.f1); } }).print(); }
Tuple2
上面講的join是內(nèi)連接,這個OuterJoin是外連接,包括左外連接,右外連接,全連接在兩個數(shù)據(jù)集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int, String)]() //編號 名字 info1.append((1, "hadoop")) info1.append((2, "spark")) info1.append((3, "flink")) info1.append((4, "java")) val info2 = ListBuffer[(Int, String)]() //編號 城市 info2.append((1, "北京")) info2.append((2, "上海")) info2.append((3, "深圳")) info2.append((5, "廣州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (second == null) { (first._1, first._2, "-") }else { (first._1, first._2, second._2) } }).print() //左外連接 把左邊的所有數(shù)據(jù)展示出來 }
左外連接,當(dāng)左邊的數(shù)據(jù)在右邊沒有對應(yīng)的數(shù)據(jù)時,需要進行處理,否則會出現(xiàn)空指針異常。輸出如下:
(3,flink,深圳) (1,hadoop,北京) (2,spark,上海) (4,java,-)
右外連接:
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (first == null) { (second._1, "-", second._2) }else { (first._1, first._2, second._2) } }).print()
右外連接,輸出:
(3,flink,深圳) (1,hadoop,北京) (5,-,廣州) (2,spark,上海)
全連接:
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (first == null) { (second._1, "-", second._2) }else if (second == null){ (second._1, "-", second._2) } else { (first._1, first._2, second._2) } }).print()
(3,flink,深圳) (1,hadoop,北京) (5,-,廣州) (2,spark,上海) (4,java,-)
左外連接:
public static void outjoinFunction(ExecutionEnvironment env) throws Exception { List> info1 = new ArrayList<>(); //編號 名字 info1.add(new Tuple2<>(1, "hadoop")); info1.add(new Tuple2<>(2, "spark")); info1.add(new Tuple2<>(3, "flink")); info1.add(new Tuple2<>(4, "java")); List > info2 = new ArrayList<>(); //編號 城市 info2.add(new Tuple2<>(1, "北京")); info2.add(new Tuple2<>(2, "上海")); info2.add(new Tuple2<>(3, "深圳")); info2.add(new Tuple2<>(5, "廣州")); DataSource > data1 = env.fromCollection(info1); DataSource > data2 = env.fromCollection(info2); data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction , Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { if(second == null) { return new Tuple3 (first.f0, first.f1, "-"); } return new Tuple3 (first.f0, first.f1,second.f1); } }).print(); }
右外連接:
data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { if (first == null) { return new Tuple3 (second.f0, "-", second.f1); } return new Tuple3 (first.f0, first.f1, second.f1); } }).print();
全連接:
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { if (first == null) { return new Tuple3 (second.f0, "-", second.f1); } else if (second == null) { return new Tuple3 (first.f0, first.f1, "-"); } return new Tuple3 (first.f0, first.f1, second.f1); } }).print();
笛卡爾積,左邊與右邊交叉處理
def crossFunction(env: ExecutionEnvironment): Unit = { val info1 = List("喬峰", "慕容復(fù)") val info2 = List(3,1,0) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.cross(data2).print() }
輸出:
(喬峰,3) (喬峰,1) (喬峰,0) (慕容復(fù),3) (慕容復(fù),1) (慕容復(fù),0)
public static void crossFunction(ExecutionEnvironment env) throws Exception { Listinfo1 = new ArrayList<>(); info1.add("喬峰"); info1.add("慕容復(fù)"); List info2 = new ArrayList<>(); info2.add("3"); info2.add("1"); info2.add("0"); DataSource data1 = env.fromCollection(info1); DataSource data2 = env.fromCollection(info2); data1.cross(data2).print(); }
到此,關(guān)于“Apache下Flink transformation的用法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)站標(biāo)題:Apache下Flinktransformation的用法
當(dāng)前鏈接:http://weahome.cn/article/pshoog.html