本篇內容主要講解“Flink批處理怎么實現(xiàn)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Flink批處理怎么實現(xiàn)”吧!
銀川網站建設公司創(chuàng)新互聯(lián),銀川網站設計制作,有大型網站制作公司豐富經驗。已為銀川近1000家提供企業(yè)網站建設服務。企業(yè)網站搭建\成都外貿網站制作要多少錢,請找那個售后服務好的銀川做網站的公司定做!
Apache Flink是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進行有狀態(tài)計算。Flink被設計在所有常見的集群環(huán)境中運行,以內存執(zhí)行速度和任意規(guī)模來執(zhí)行計算,F(xiàn)link 是一個開源的流處理框架,它具有以下特點
批流一體:統(tǒng)一批處理、流處理
分布式:Flink程序可以運行在多臺機器上
高性能:處理性能比較高
高可用:Flink支持高可用性(HA)
準確:Flink可以保證數(shù)據(jù)處理的準確性
首先,類比Spark, 我們來看Flink的模塊劃分
可以啟動單個JVM,讓Flink以Local模式運行Flink也可以以Standalone 集群模式運行,同時也支持Flink ON YARN,F(xiàn)link應用直接提交到YARN上面運行Flink還可以運行在GCE(谷歌云服務)和EC2(亞馬遜云服務)
在Runtime之上提供了兩套核心的API,DataStream API(流處理)和DataSet API(批處理)
核心API之上又擴展了一些高階的庫和API
CEP流處理
Table API和SQL
Flink ML機器學習庫
Gelly圖計算
Flink作為大數(shù)據(jù)生態(tài)的一員,除了本身外,可以很好地與生態(tài)中的其他組件進行結合使用,大的概況方面來講,就有輸入方面和輸出方面,
其中中間的部分,上面已經介紹,主頁看看兩邊的,其中綠色背景是流處理方式的場景,藍色背景是批處理方式的場景
流處理方式:包含Kafka(消息隊列)、AWS kinesis(實時數(shù)據(jù)流服務)、RabbitMQ(消息隊列)、NIFI(數(shù) 據(jù)管道)、Twitter(API)
批處理方式:包含HDFS(分布式文件系統(tǒng))、HBase(分布式列式數(shù)據(jù)庫)、Amazon S3(文件系統(tǒng))、 MapR FS(文件系統(tǒng))、ALLuxio(基于內存分布式文件系統(tǒng))
流處理方式:包含Kafka(消息隊列)、AWS kinesis(實時數(shù)據(jù)流服務)、RabbitMQ(消息隊列)、NIFI(數(shù) 據(jù)管道)、Cassandra(NoSql數(shù)據(jù)庫)、ElasticSearch(全文檢索)、HDFS rolling file(滾動文件)
批處理方式:包含HBase(分布式列式數(shù)據(jù)庫)、HDFS(分布式文件系統(tǒng))
Spark中的流處理主要有兩種,一種是Spark Streamin是維批處理,如果對事件內的時間沒有要求,這種方式可以滿足很多需求,另外一種是Structed Streaming 是基于一張無界的大表,核心API就是Spark Sql的,而Flink是專注于無限流,把有界流看成是無限流的一種特殊情況,另外兩個框架都有狀態(tài)管理。
輸入的數(shù)據(jù)沒有盡頭,像水流一樣源源不斷,數(shù)據(jù)處理從當前或者過去的某一個時間 點開始,持續(xù)不停地進行。
從某一個時間點開始處理數(shù)據(jù),然后在另一個時間點結束輸入數(shù)據(jù)可能本身是有限的(即輸入數(shù)據(jù)集并不會隨著時間增長),也可能出于分析的目的被人為地設定為有限集(即只分析某一個時間段內的事件)Flink封裝了DataStream API進行流處理,封裝了DataSet API進行批處理。同時,F(xiàn)link也是一個批流一體的處理引擎,提供了Table API / SQL統(tǒng)一了批處理和流處理。
基于SubTask,每個SubTask處理時候,都會獲取狀態(tài)并更新狀態(tài),
以經典的WordCount為例,來看Flink的兩個批流處理案例,案例以nc -lp
來作為Source, 以控制臺輸出為Sink, 分為Java和Scala版本哦,
import org.apache.flink.api.scala._ object WordCountScalaBatch { def main(args: Array[String]): Unit = { val inputPath = "E:\\hadoop_res\\input\\a.txt" val outputPath = "E:\\hadoop_res\\output2" val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val text: DataSet[String] = environment.readTextFile(inputPath) text .flatMap(_.split("\\s+")) .map((_, 1)) .groupBy(0) .sum(1) .setParallelism(1) .writeAsCsv(outputPath, "\n", ",") //setParallelism(1)很多算子后面都可以調用 environment.execute("job name") } }
import org.apache.flink.streaming.api.scala._ object WordCountScalaStream { def main(args: Array[String]): Unit = { //處理流式數(shù)據(jù) val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val streamData: DataStream[String] = environment.socketTextStream("linux121", 7777) val out: DataStream[(String, Int)] = streamData .flatMap(_.split("\\s+")) .map((_, 1)) .keyBy(0) .sum(1) out.print() environment.execute("test stream") } }
package com.hoult.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountJavaBatch { public static void main(String[] args) throws Exception { String inputPath = "E:\\hadoop_res\\input\\a.txt"; String outputPath = "E:\\hadoop_res\\output"; //獲取flink的運行環(huán)境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); DataSourcetext = executionEnvironment.readTextFile(inputPath); FlatMapOperator > wordsOne = text.flatMap(new SplitClz()); //hello,1 you,1 hi,1 him,1 UnsortedGrouping > groupWordAndOne = wordsOne.groupBy(0); AggregateOperator > wordCount = groupWordAndOne.sum(1); wordCount.writeAsCsv(outputPath, "\n", "\t").setParallelism(1); executionEnvironment.execute(); } static class SplitClz implements FlatMapFunction > { public void flatMap(String s, Collector > collector) throws Exception { String[] strs = s.split("\\s+"); for (String str : strs) { collector.collect(new Tuple2 (str, 1)); } } } }
package com.hoult.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountJavaStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStream = executionEnvironment.socketTextStream("linux121", 7777); SingleOutputStreamOperator > sum = dataStream.flatMap(new FlatMapFunction >() { public void flatMap(String s, Collector > collector) throws Exception { for (String word : s.split(" ")) { collector.collect(new Tuple2 (word, 1)); } } }).keyBy(0).sum(1); sum.print(); executionEnvironment.execute(); } }
到此,相信大家對“Flink批處理怎么實現(xiàn)”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!