怎么實現(xiàn)SparkStreaming轉(zhuǎn)化操作,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
成都創(chuàng)新互聯(lián)長期為1000+客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為南平企業(yè)提供專業(yè)的網(wǎng)站制作、網(wǎng)站建設(shè),南平網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。
DStream的轉(zhuǎn)化操作分為無狀態(tài)和有狀態(tài)兩種
在無狀態(tài)轉(zhuǎn)化操作中,每個批次的處理不依賴于之前批次的數(shù)據(jù)。
有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或者中間結(jié)果來計算當(dāng)前批次的數(shù)據(jù),有狀態(tài)轉(zhuǎn)化操作包括基于滑動窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)換操作。
無狀態(tài)轉(zhuǎn)化操作的實質(zhì)就說把簡單的RDD轉(zhuǎn)化操作應(yīng)用到每個批次上,也就是轉(zhuǎn)化DStream的每一個RDD
Transform 允許 DStream 上執(zhí)行任意的 RDD-to-RDD 函數(shù)。即使這些函數(shù)并沒有在 DStream 的 API 中暴露出來,通過該函數(shù)可以方便的擴展 Spark API。該函數(shù)每一批次調(diào)度一次。其實也 就是對 DStream 中的 RDD 應(yīng)用轉(zhuǎn)換。
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]") val sc: StreamingContext = new StreamingContext(conf, Seconds(3)) val lines = sc.socketTextStream("localhost", 9999) // transform方法可以將底層RDD獲取到后進(jìn)行操作 // 1. DStream功能不完善 // 2. 需要代碼周期性的執(zhí)行 // Code : Driver端 val newDS: DStream[String] = lines.transform( rdd => { // Code : Driver端,(周期性執(zhí)行) rdd.map( str => { // Code : Executor端 str } ) } ) // Code : Driver端 val newDS1: DStream[String] = lines.map( data => { // Code : Executor端 data } ) sc.start() sc.awaitTermination() }
兩個流之間的 join 需要兩個流的批次大小一致,這樣才能做到同時觸發(fā)計算。計算過程就是對當(dāng)前批次的兩個流中各自的 RDD 進(jìn)行 join,與兩個 RDD 的 join 效果相同。
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(5)) val data9999 = ssc.socketTextStream("localhost", 9999) val data8888 = ssc.socketTextStream("localhost", 8888) val map9999: DStream[(String, Int)] = data9999.map((_,9)) val map8888: DStream[(String, Int)] = data8888.map((_,8)) // 所謂的DStream的Join操作,其實就是兩個RDD的join val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888) joinDS.print() ssc.start() ssc.awaitTermination() }
有狀態(tài)轉(zhuǎn)化操作是跨時間區(qū)間跟蹤數(shù)據(jù)的操作,也就是說,一些先前批次的數(shù)據(jù)也被用來在新的批次中用于計算結(jié)果。有狀態(tài)轉(zhuǎn)換的主要的兩種類型:
滑動窗口:以一個時間階段為滑動窗口進(jìn)行操作
updateStateByKey():通過key值來跟蹤數(shù)據(jù)的狀態(tài)變化
有狀態(tài)轉(zhuǎn)化操作需要在StreamingContext中打開檢查點機制來提高容錯
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("updateStateByKey") val sc: StreamingContext = new StreamingContext(conf, Seconds(4)) sc.checkpoint("cp") val ds: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999) val value: DStream[(String, Int)] = ds.map(((_: String), 1)) // updateStateByKey:根據(jù)key對數(shù)據(jù)的狀態(tài)進(jìn)行更新 // 傳遞的參數(shù)中含有兩個值 // 第一個值表示相同的key的value數(shù)據(jù)的集合 // 第二個值表示緩存區(qū)key對應(yīng)的計算值 val state: DStream[(String, Int)] = value.updateStateByKey((seq: Seq[Int], option: Option[Int]) => { val newCount: Int = option.getOrElse(0) + seq.sum Option(newCount) }) state.print() sc.start() sc.awaitTermination() }
所有基于窗口的函數(shù)都需要兩個參數(shù),分別對應(yīng)窗口時長和滑動步長,并且兩者都必須是SparkStreaming的批次間隔的整數(shù)倍。
窗口時長控制的是每次用來計算的批次的個數(shù)
滑動步長用于控制對新的DStream進(jìn)行計算的間隔
基于window進(jìn)行窗口內(nèi)元素計數(shù)操作
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) val lines = ssc.socketTextStream("localhost", 9999) val wordToOne = lines.map((_,1)) val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6)) val wordToCount = windowDS.reduceByKey(_+_) wordToCount.print() ssc.start() ssc.awaitTermination() }
有逆操作規(guī)約是一種更高效的規(guī)約操作,通過只考慮新進(jìn)入窗口的元素和離開窗口的元素,讓spark增量計算歸約的結(jié)果,其在代碼上的體現(xiàn)就是reduceFunc和 invReduceFunc
普通歸約操作
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") val lines = ssc.socketTextStream("localhost", 9999) lines.reduceByWindow( (x: String, y: String) => { x + "-" + y }, Seconds(9), Seconds(3) ).print() ssc.start() ssc.awaitTermination() }
有逆歸約操作
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") val lines = ssc.socketTextStream("localhost", 9999) val wordToOne = lines.map((_,1)) /** * 基于窗口進(jìn)行有逆歸約:通過控制窗口流出和進(jìn)入的元素來提高性能 */ val windowDS: DStream[(String, Int)] = wordToOne.reduceByKeyAndWindow( (x:Int, y:Int) => { x + y}, (x:Int, y:Int) => {x - y}, Seconds(9), Seconds(3)) windowDS.print() ssc.start() ssc.awaitTermination() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") val lines = ssc.socketTextStream("localhost", 9999) /** * 統(tǒng)計窗口中輸入數(shù)據(jù)的個數(shù) * 比如 3s內(nèi)輸入了10條數(shù)據(jù),則打印10 */ val countByWindow: DStream[Long] = lines.countByWindow( Seconds(9), Seconds(3) ) countByWindow.print() /** * 統(tǒng)計窗口中每個值的個數(shù) * 比如 3s內(nèi)輸入了1個3 2個4 3個5,則打印(3,1)(2,4)(3,5) */ val countByValueAndWindow: DStream[(String, Long)] = lines.countByValueAndWindow( Seconds(9), Seconds(3) ) countByValueAndWindow.print() ssc.start() ssc.awaitTermination() }
看完上述內(nèi)容,你們掌握怎么實現(xiàn)SparkStreaming轉(zhuǎn)化操作的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!