本篇文章為大家展示了如何進(jìn)行JobScheduler內(nèi)幕實(shí)現(xiàn)和深度思考,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
創(chuàng)新互聯(lián)專(zhuān)業(yè)為企業(yè)提供五華網(wǎng)站建設(shè)、五華做網(wǎng)站、五華網(wǎng)站設(shè)計(jì)、五華網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、五華企業(yè)網(wǎng)站模板建站服務(wù),十多年五華做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
DStream的foreachRDD方法,實(shí)例化ForEachDStream對(duì)象,并將用戶(hù)定義的函數(shù)foreachFunc傳入到該對(duì)象中。foreachRDD方法是輸出操作,foreachFunc方法會(huì)作用到這個(gè)DStream中的每個(gè)RDD。
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFuncforeachRDD function * @param displayInnerRDDOpsWhether the detailed callsites and scopes of the RDDs generated * in the `foreachFunc` to be displayed in the UI. If `false`, then * only the scopes and callsites of `foreachRDD` will override those * of the RDDs on the display. */ private defforeachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { newForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() } |
ForEachDStream對(duì)象中重寫(xiě)了generateJob方法,調(diào)用父DStream的getOrCompute方法來(lái)生成RDD并封裝Job,傳入對(duì)該RDD的操作函數(shù)foreachFunc和time。dependencies方法定義為父DStream的集合。
/** * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent Parent DStream * @param foreachFunc Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOpsWhether the detailed callsites and scopes of the RDDs generated * by `foreachFunc` will be displayed in the UI; only the scope and * callsite of `DStream.foreachRDD` will be displayed. */ private[streaming] classForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extendsDStream[Unit](parent.ssc) {
override defdependencies: List[DStream[_]] = List(parent)
override defslideDuration: Duration = parent.slideDuration
override defcompute(validTime: Time): Option[RDD[Unit]] = None
override defgenerateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match{ caseSome(rdd) => valjobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(newJob(time, jobFunc)) caseNone => None } } } |
DStreamGraph的generateJobs方法中會(huì)調(diào)用outputStream的generateJob方法,就是調(diào)用ForEachDStream的generateJob方法。
defgenerateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time "+ time) valjobs = this.synchronized { outputStreams.flatMap { outputStream => valjobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated "+ jobs.length + " jobs for time "+ time) jobs } |
DStream的generateJob定義如下,其子類(lèi)中只有ForEachDStream重寫(xiě)了generateJob方法。
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] defgenerateJob(time: Time): Option[Job] = { getOrCompute(time) match{ caseSome(rdd) => { valjobFunc = () => { valemptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(newJob(time, jobFunc)) } caseNone => None } } |
DStream的print方法內(nèi)部還是調(diào)用foreachRDD來(lái)實(shí)現(xiàn),傳入了內(nèi)部方法foreachFunc,來(lái)取出num+1個(gè)數(shù)后打印輸出。
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ defprint(num: Int): Unit = ssc.withScope { defforeachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: "+ time) println("-------------------------------------------") firstNum.take(num).foreach(println) if(firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) } |
總結(jié):JobScheduler是SparkStreaming 所有Job調(diào)度的中心,內(nèi)部有兩個(gè)重要的成員:
JobGenerator負(fù)責(zé)Job的生成,ReceiverTracker負(fù)責(zé)記錄輸入的數(shù)據(jù)源信息。
JobScheduler的啟動(dòng)會(huì)導(dǎo)致ReceiverTracker和JobGenerator的啟動(dòng)。ReceiverTracker的啟動(dòng)導(dǎo)致運(yùn)行在Executor端的Receiver啟動(dòng)并且接收數(shù)據(jù),ReceiverTracker會(huì)記錄Receiver接收到的數(shù)據(jù)meta信息。JobGenerator的啟動(dòng)導(dǎo)致每隔BatchDuration,就調(diào)用DStreamGraph生成RDD Graph,并生成Job。JobScheduler中的線程池來(lái)提交封裝的JobSet對(duì)象(時(shí)間值,Job,數(shù)據(jù)源的meta)。Job中封裝了業(yè)務(wù)邏輯,導(dǎo)致最后一個(gè)RDD的action被觸發(fā),被DAGScheduler真正調(diào)度在Spark集群上執(zhí)行該Job。
上述內(nèi)容就是如何進(jìn)行JobScheduler內(nèi)幕實(shí)現(xiàn)和深度思考,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
本文名稱(chēng):如何進(jìn)行JobScheduler內(nèi)幕實(shí)現(xiàn)和深度思考
網(wǎng)頁(yè)網(wǎng)址:
http://weahome.cn/article/ispgjg.html