真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

SparkStreaming算子開(kāi)發(fā)實(shí)例分析

本篇文章為大家展示了SparkStreaming算子開(kāi)發(fā)實(shí)例分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

創(chuàng)新互聯(lián)主要從事網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)南岸,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):13518219792

Spark Streaming算子開(kāi)發(fā)實(shí)例

transform算子開(kāi)發(fā)

transform操作應(yīng)用在DStream上時(shí),可以用于執(zhí)行任意的RDD到RDD的轉(zhuǎn)換操作,還可以用于實(shí)現(xiàn)DStream API中所沒(méi)有提供的操作,比如說(shuō),DStreamAPI中并沒(méi)有提供將一個(gè)DStream中的每個(gè)batch,與一個(gè)特定的RDD進(jìn)行join的操作,DStream中的join算子只能join其他DStream,但是我們自己就可以使用transform操作來(lái)實(shí)現(xiàn)該功能。

實(shí)例:黑名單用戶實(shí)時(shí)過(guò)濾

package StreamingDemoimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}

/** * 實(shí)時(shí)黑名單過(guò)濾 */object TransformDemo { def main(args: Array[String]): Unit = {  

//設(shè)置日志級(jí)別  Logger.getLogger("org")

.setLevel(Level.WARN)  val conf = new SparkConf()   

.setAppName(this.getClass.getSimpleName)   

.setMaster("local[2]")  val ssc = new StreamingContext(conf, Seconds(2))  

//創(chuàng)建一個(gè)黑名單的RDD  val blackRDD =   ssc.sparkContext.parallelize(Array(("zs", true), ("lisi", true)))  

//通過(guò)socket從nc中獲取數(shù)據(jù)  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)  

/**   * 過(guò)濾黑名單用戶發(fā)言   * zs sb sb sb sb   * lisi fuck fuck fuck   * jack hello   */  linesDStream   .map(x => {    val info = x.split(" ")    (info(0), info.toList.tail.mkString(" "))   })   .transform(rdd => { 

//transform是一個(gè)RDD->RDD的操作,所以返回值必須是RDD    

/**     * 經(jīng)過(guò)leftouterjoin操作之后,產(chǎn)生的結(jié)果如下:     * (zs,(sb sb sb sb),Some(true)))     * (lisi,(fuck fuck fuck),some(true)))     * (jack,(hello,None))     */    val joinRDD = rdd.leftOuterJoin(blackRDD)   

 //如果是Some(true)的,說(shuō)明就是黑名單用戶,如果是None的,說(shuō)明不在黑名單內(nèi),把非黑名單的用戶保留下來(lái)    val filterRDD = joinRDD.filter(x => x._2._2.isEmpty)    filterRDD   })   

.map(x=>(x._1,x._2._1)).print()  ssc.start()  ssc.awaitTermination() }}

測(cè)試

啟動(dòng)nc,傳入用戶及其發(fā)言信息

可以看到程序?qū)崟r(shí)的過(guò)濾掉了在黑名單里的用戶發(fā)言

updateStateByKey算子開(kāi)發(fā)

updateStateByKey算子可以保持任意狀態(tài),同時(shí)不斷有新的信息進(jìn)行更新,這個(gè)算子可以為每個(gè)key維護(hù)一份state,并持續(xù)不斷的更新state。對(duì)于每個(gè)batch來(lái)說(shuō),Spark都會(huì)為每個(gè)之前已經(jīng)存在的key去應(yīng)用一次State更新函數(shù),無(wú)論這個(gè)key在batch中是否有新的值,如果State更新函數(shù)返回的值是none,那么這個(gè)key對(duì)應(yīng)的state就會(huì)被刪除;對(duì)于新出現(xiàn)的key也會(huì)執(zhí)行state更新函數(shù)。

要使用該算子,必須進(jìn)行兩個(gè)步驟

定義state——state可以是任意的數(shù)據(jù)類型  定義state更新函數(shù)——用一個(gè)函數(shù)指定如何使用之前的狀態(tài),以及從輸入流中獲取新值更新?tīng)顟B(tài)

注意:updateStateByKey操作,要求必須開(kāi)啟Checkpoint機(jī)制

實(shí)例:基于緩存的實(shí)時(shí)WordCount

package StreamingDemoimport org.apache.log4j.{

Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}

/** * 基于緩存的實(shí)時(shí)WordCount,在全局范圍內(nèi)統(tǒng)計(jì)單詞出現(xiàn)次數(shù) */object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = {  

//設(shè)置日志級(jí)別  Logger.getLogger("org").setLevel(Level.WARN)  

/**   * 如果沒(méi)有啟用安全認(rèn)證或者從Kerberos獲取的用戶為null,那么獲取HADOOP_USER_NAME環(huán)境變量,   * 并將它的值作為Hadoop執(zhí)行用戶設(shè)置hadoop username   * 這里實(shí)驗(yàn)了一下在沒(méi)有啟用安全認(rèn)證的情況下,就算不顯式添加,也會(huì)自動(dòng)獲取我的用戶名   */  //System.setProperty("HADOOP_USER_NAME","Setsuna")  val conf = new SparkConf()   .setAppName(this.getClass.getSimpleName)   .setMaster("local[2]")  val ssc = new StreamingContext(conf, Seconds(2))  

//設(shè)置Checkpoint存放的路徑  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")  

//創(chuàng)建輸入DStream  val lineDStream = ssc.socketTextStream("Hadoop01", 6666)  val wordDStream = lineDStream.flatMap(_.split(" "))  val pairsDStream = wordDStream.map((_, 1))  

/**   * state:代表之前的狀態(tài)值   * values:代表當(dāng)前batch中key對(duì)應(yīng)的values值   */  val resultDStream =   pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {    

//當(dāng)state為none,表示沒(méi)有對(duì)這個(gè)單詞做統(tǒng)計(jì),則返回0值給計(jì)數(shù)器count    var count = state.getOrElse(0)    

//遍歷values,累加新出現(xiàn)的單詞的value值    for (value <- values) {     count += value    }    

//返回key對(duì)應(yīng)的新state,即單詞的出現(xiàn)次數(shù)    Option(count)   })  

//在控制臺(tái)輸出  resultDStream.print()  ssc.start()  ssc.awaitTermination() }}

測(cè)試

開(kāi)啟nc,輸入單詞

控制臺(tái)實(shí)時(shí)輸出的結(jié)果

window滑動(dòng)窗口算子開(kāi)發(fā)

Spark Streaming提供了滑動(dòng)窗口操作的支持,可以對(duì)一個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù)執(zhí)行計(jì)算操作在滑動(dòng)窗口中,包含批處理間隔、窗口間隔、滑動(dòng)間隔

對(duì)于窗口操作而言,在其窗口內(nèi)部會(huì)有N個(gè)批處理數(shù)據(jù)  批處理數(shù)據(jù)的大小由窗口間隔決定,而窗口間隔指的就是窗口的持續(xù)時(shí)間,也就是窗口的長(zhǎng)度  滑動(dòng)時(shí)間間隔指的是經(jīng)過(guò)多長(zhǎng)時(shí)間窗口滑動(dòng)一次,形成新的窗口,滑動(dòng)間隔默認(rèn)情況下和批處理時(shí)間間隔的相同

注意:滑動(dòng)時(shí)間間隔和窗口時(shí)間間隔的大小一定得設(shè)置為批處理間隔的整數(shù)倍

用一個(gè)官方的圖來(lái)作為說(shuō)明

批處理間隔是1個(gè)時(shí)間單位,窗口間隔是3個(gè)時(shí)間單位,滑動(dòng)間隔是2個(gè)時(shí)間單位。對(duì)于初始的窗口time1-time3,只有窗口間隔滿足了才觸發(fā)數(shù)據(jù)的處理。所以滑動(dòng)窗口操作都必須指定兩個(gè)參數(shù),窗口長(zhǎng)度和滑動(dòng)時(shí)間間隔。在Spark Streaming中對(duì)滑動(dòng)窗口的支持是比Storm更加完善的。

Window滑動(dòng)算子操作

算子      描述              window()      對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行自定義的計(jì)算              countByWindow()      對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行count操作              reduceByWindow()      對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行reduce操作              reduceByKeyAndWindow()      對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行reduceByKey操作              countByValueAndWindow()      對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行countByValue操作

reduceByKeyAndWindow算子開(kāi)發(fā)

實(shí)例:在線熱點(diǎn)搜索詞實(shí)時(shí)滑動(dòng)統(tǒng)計(jì)

每隔2秒鐘,統(tǒng)計(jì)最近5秒鐘的搜索詞中排名最靠前的3個(gè)搜索詞以及出現(xiàn)次數(shù)

package StreamingDemoimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}

/** * 需求:每隔2秒鐘,統(tǒng)計(jì)最近5秒鐘的搜索詞中排名最靠前的3個(gè)搜索詞以及出現(xiàn)次數(shù) */object ReduceByKeyAndWindowDemo { def main(args: Array[String]): Unit = {  

//設(shè)置日志級(jí)別  Logger.getLogger("org").setLevel(Level.WARN)  

//基礎(chǔ)配置  val conf = new SparkConf()   .setAppName(this.getClass.getSimpleName)   .setMaster("local[2]")  

//批處理間隔設(shè)置為1s  val ssc = new StreamingContext(conf, Seconds(1))  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)  linesDStream   .flatMap(_.split(" ")) 

//根據(jù)空格來(lái)做分詞   .map((_, 1)) 

//返回(word,1)   .reduceByKeyAndWindow(    //定義窗口如何計(jì)算的函數(shù)    

//x代表的是聚合后的結(jié)果,y代表的是這個(gè)Key對(duì)應(yīng)的下一個(gè)需要聚合的值    (x: Int, y: Int) => x + y,    

//窗口長(zhǎng)度為5秒    Seconds(5),    

//窗口時(shí)間間隔為2秒    Seconds(2)   )   .transform(rdd => { 

//transform算子對(duì)rdd做處理,轉(zhuǎn)換為另一個(gè)rdd    

//根據(jù)Key的出現(xiàn)次數(shù)來(lái)進(jìn)行排序,然后降序排列,獲取最靠前的3個(gè)搜索詞    val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)    

//將Array轉(zhuǎn)換為resultRDD    val resultRDD = ssc.sparkContext.parallelize(info)    resultRDD   })   .map(x => s"${x._1}出現(xiàn)的次數(shù)是:${x._2}")   .print()  ssc.start()  ssc.awaitTermination() }}

測(cè)試結(jié)果

DStream Output操作概覽

Spark Streaming允許DStream的數(shù)據(jù)輸出到外部系統(tǒng),DSteram中的所有計(jì)算,都是由output操作觸發(fā)的,foreachRDD輸出操作,也必須在里面對(duì)RDD執(zhí)行action操作,才能觸發(fā)對(duì)每一個(gè)batch的計(jì)算邏輯。

轉(zhuǎn)換描述              

print()在Driver中打印出DStream中數(shù)據(jù)的前10個(gè)元素。主要用于測(cè)試,或者是不需要執(zhí)行什么output操作時(shí),用于簡(jiǎn)單觸發(fā)一下job。              

saveAsTextFiles(prefix, [suffix]將DStream中的內(nèi)容以文本的形式保存為文本文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。              

saveAsObjectFiles(prefix, [suffix])將DStream中的內(nèi)容按對(duì)象序列化并且以SequenceFile的格式保存。其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。              

saveAsHadoopFiles(prefix, [suffix])將DStream中的內(nèi)容以文本的形式保存為Hadoop文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。              

foreachRDD(func)最基本的輸出操作,將func函數(shù)應(yīng)用于DStream中的RDD上,這個(gè)操作會(huì)輸出數(shù)據(jù)到外部系統(tǒng),比如保存RDD到文件或者網(wǎng)絡(luò)數(shù)據(jù)庫(kù)等。需要注意的是func函數(shù)是在運(yùn)行該streaming      應(yīng)用的Driver進(jìn)程里執(zhí)行的。

foreachRDD算子開(kāi)發(fā)

foreachRDD是最常用的output操作,可以遍歷DStream中的每個(gè)產(chǎn)生的RDD并進(jìn)行處理,然后將每個(gè)RDD中的數(shù)據(jù)寫入外部存儲(chǔ),如文件、數(shù)據(jù)庫(kù)、緩存等,通常在其中針對(duì)RDD執(zhí)行action操作,比如foreach

使用foreachRDD操作數(shù)據(jù)庫(kù)

通常在foreachRDD中都會(huì)創(chuàng)建一個(gè)Connection,比如JDBC Connection,然后通過(guò)Connection將數(shù)據(jù)寫入外部存儲(chǔ)

誤區(qū)一:在RDD的foreach操作外部創(chuàng)建Connection

dstream.foreachRDD { rdd =>  val connection=createNewConnection()  rdd.foreach { record => connection.send(record)  }}

這種方式是錯(cuò)誤的,這樣的方式會(huì)導(dǎo)致Connection對(duì)象被序列化后被傳輸?shù)矫恳粋€(gè)task上,但是Connection對(duì)象是不支持序列化的,所以也就無(wú)法被傳輸

誤區(qū)二:在RDD的foreach操作內(nèi)部創(chuàng)建Connection

dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection()    connection.send(record)    connection.close()  }}

這種方式雖然是可以的,但是執(zhí)行效率會(huì)很低,因?yàn)樗鼤?huì)導(dǎo)致對(duì)RDD中的每一條數(shù)據(jù)都創(chuàng)建一個(gè)Connection對(duì)象,通常Connection對(duì)象的創(chuàng)建都是很消耗性能的

合理的方式

第一種:使用RDD的foreachPartition操作,并且在該操作內(nèi)部創(chuàng)建Connection對(duì)象,這樣就相當(dāng)于為RDD的每個(gè)partition創(chuàng)建一個(gè)Connection對(duì)象,節(jié)省了很多資源  第二種:自己手動(dòng)封裝一個(gè)靜態(tài)連接池,使用RDD的foreachPartition操作,并且在該操作內(nèi)部從靜態(tài)連接池中,通過(guò)靜態(tài)方法獲取到一個(gè)連接,連接使用完之后再放回連接池中。這樣的話,可以在多個(gè)RDD的partition之間復(fù)用連接了

實(shí)例:實(shí)時(shí)全局統(tǒng)計(jì)WordCount,并將結(jié)果保存到MySQL數(shù)據(jù)庫(kù)中

MySQL數(shù)據(jù)庫(kù)建表語(yǔ)句如下

CREATE TABLE wordcount (  word varchar(100) CHARACTER SET utf8 NOT NULL,  count int(10) NOT NULL,  PRIMARY KEY (word)) ENGINE=InnoDB DEFAULT CHARSET=latin1;

在IDEA中添加mysql-connector-java-5.1.40-bin.jar

代碼如下

連接池的代碼,其實(shí)一開(kāi)始有想過(guò)用靜態(tài)塊來(lái)寫個(gè)池子直接獲取,但是如果考慮到池子寬度不夠用的問(wèn)題,這樣的方式其實(shí)更好,一開(kāi)始,實(shí)例化一個(gè)連接池出來(lái),被調(diào)用獲取連接,當(dāng)連接全部都被獲取了的時(shí)候,池子空了,就再實(shí)例化一個(gè)池子出來(lái)

package StreamingDemoimport java.sql.{Connection, DriverManager, SQLException}import java.utilobject JDBCManager { var connectionQue: java.util.LinkedList[Connection] = null 

/**  * 從數(shù)據(jù)庫(kù)連接池中獲取連接對(duì)象  * @return  */ def getConnection(): Connection = {  synchronized({   try {    

//如果連接池是空的,那么就實(shí)例化一個(gè)Connection類型的鏈表    

if (connectionQue == null) {    

 connectionQue = new util.LinkedList[Connection]()     

for (i <- 0 until (10)) {      

//生成10個(gè)連接,并配置相關(guān)信息      val connection = DriverManager.getConnection(       "jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8",       "root",       "root")      

//將連接push進(jìn)連接池      connectionQue.push(connection)     }    }   } catch {    

//捕獲異常并輸出    case e: SQLException => e.printStackTrace()   }   

//如果連接池不為空,則返回表頭元素,并將它在鏈表里刪除   return connectionQue.poll()  }) } 

/**  * 當(dāng)連接對(duì)象用完后,需要調(diào)用這個(gè)方法歸還連接  * @param connection  */ def returnConnection(connection: Connection) = {  

//插入元素  connectionQue.push(connection) } def main(args: Array[String]): Unit = {  

//main方法測(cè)試  getConnection()  println(connectionQue.size()) }}

wordcount代碼

package StreamingDemoimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, streaming}import org.apache.spark.streaming.{Seconds, StreamingContext}object ForeachRDDDemo { def main(args: Array[String]): Unit = {  

//設(shè)置日志級(jí)別,避免INFO信息過(guò)多  Logger.getLogger("org").setLevel(Level.WARN)  

//設(shè)置Hadoop的用戶,不加也可以  System.setProperty("HADOOP_USER_NAME", "Setsuna")  

//Spark基本配置  val conf = new SparkConf()   .setAppName(this.getClass.getSimpleName)   .setMaster("local[2]")  val ssc = new StreamingContext(conf, streaming.Seconds(2))  

//因?yàn)橐褂胾pdateStateByKey,所以需要使用checkpoint  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")  

//設(shè)置socket,跟nc配置的一樣  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)  val wordCountDStream = linesDStream   .flatMap(_.split(" "))   

//根據(jù)空格做分詞   .map((_, 1)) 

//生成(word,1)   .updateStateByKey((values: Seq[Int], state: Option[Int]) => {    

//實(shí)時(shí)更新?tīng)顟B(tài)信息    var count = state.getOrElse(0)    for (value <- values) {     count += value    }    Option(count)   })  wordCountDStream.foreachRDD(rdd => {   

if (!rdd.isEmpty()) {    rdd.foreachPartition(part => {     

//從連接池中獲取連接     val connection = JDBCManager.getConnection()     part.foreach(data => {      val sql = //往wordcount表中插入wordcount信息,on duplicate key update子句是有則更新無(wú)則插入       s"insert into wordcount (word,count) " +        s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}"     

 //使用prepareStatement來(lái)使用sql語(yǔ)句      val pstmt = connection.prepareStatement(sql)      pstmt.executeUpdate()     })     

//在連接處提交完數(shù)據(jù)后,歸還連接到連接池     JDBCManager.returnConnection(connection)    })   }  })  ssc.start()  ssc.awaitTermination() }}

打開(kāi)nc,輸入數(shù)據(jù)

在另一個(gè)終端對(duì)wordcount的結(jié)果進(jìn)行查詢,可以發(fā)現(xiàn)是實(shí)時(shí)發(fā)生變化的

上述內(nèi)容就是SparkStreaming算子開(kāi)發(fā)實(shí)例分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


文章名稱:SparkStreaming算子開(kāi)發(fā)實(shí)例分析
新聞來(lái)源:http://weahome.cn/article/psihij.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部