這篇文章主要介紹“ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
十年的莊浪網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都營(yíng)銷網(wǎng)站建設(shè)的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整莊浪建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“莊浪網(wǎng)站設(shè)計(jì)”,“莊浪網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
先回顧下 在 Executor執(zhí)行的具體的方法
實(shí)例化ReceiverSupervisorImpl
start之后等待awaitTermination
// ReceiverTracker.scala line 564 val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } }
看下ReceiverSupervisorImpl的父類 ReceiverSupervisor的構(gòu)造。
成員變量賦值、將當(dāng)前supervisor與receiver關(guān)聯(lián)( receiver.attachSupervisor(this) )
注釋也很清晰:在Worker上負(fù)責(zé)監(jiān)督Receiver。提供所需所有 處理從receiver接收到的數(shù)據(jù) 的接口
// ReceiverSupervisor.scala line 31 /** * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. */ private[streaming] abstract class ReceiverSupervisor( receiver: Receiver[_], conf: SparkConf ) extends Logging { /** Enumeration to identify current state of the Receiver */ object ReceiverState extends Enumeration { type CheckpointState = Value val Initialized, Started, Stopped = Value } import ReceiverState._ // Attach the supervisor to the receiver receiver.attachSupervisor(this) // 將receiver與supervisor關(guān)聯(lián) private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128)) /** Receiver id */ protected val streamId = receiver.streamId /** Has the receiver been marked for stop. */ private val stopLatch = new CountDownLatch(1) /** Time between a receiver is stopped and started again */ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) /** The current maximum rate limit for this receiver. */ private[streaming] def getCurrentRateLimit: Long = Long.MaxValue /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null /** State of the receiver */ @volatile private[streaming] var receiverState = Initialized // 一些方法,其實(shí)就是 數(shù)據(jù)處理接口 }
ReceiverSupervisorImpl的實(shí)例化
實(shí)例化了 BlockManagerBasedBlockHandler,用于將數(shù)據(jù)發(fā)送到BlockManager
實(shí)例化RpcEndpoint
實(shí)例化 BlockGenerator
實(shí)例化 BlockGeneratorListener 監(jiān)聽器
// ReceiverSupervisorImpl.scala line 43 /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. */ private[streaming] class ReceiverSupervisorImpl( receiver: Receiver[_], env: SparkEnv, hadoopConf: Configuration, checkpointDirOption: Option[String] ) extends ReceiverSupervisor(receiver, env.conf) with Logging { private val host = SparkEnv.get.blockManager.blockManagerId.host private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { // 默認(rèn)是不開啟 if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } } /** Remote RpcEndpointRef for the ReceiverTracker */ private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv) /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */ private val endpoint = env.rpcEnv.setupEndpoint( "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint { override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction[Any, Unit] = { case StopReceiver => logInfo("Received stop signal") ReceiverSupervisorImpl.this.stop("Stopped by driver", None) case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") registeredBlockGenerators.foreach { bg => bg.updateRate(eps) } } }) /** Unique block ids if one wants to add blocks directly */ private val newBlockId = new AtomicLong(System.currentTimeMillis()) private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] // 典型的面包模式 with mutable.SynchronizedBuffer[BlockGenerator] /** Divides received data records into data blocks for pushing in BlockManager. */ private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } } private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener) // ... 一些方法 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) } /** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } }
看看BlockGenerator
注釋很清晰,有兩個(gè)線程
周期性的 將上一批數(shù)據(jù) 作為一個(gè)block,并新建下一個(gè)批次的數(shù)據(jù);RecurringTimer類,內(nèi)部有Thread
將數(shù)據(jù)push到BlockManager
// /** * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. */ private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock() ) extends RateLimiter(conf) with Logging{ private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) /** * The BlockGenerator can be in 5 possible states, in the order as follows. * * - Initialized: Nothing has been started * - Active: start() has been called, and it is generating blocks on added data. * - StoppedAddingData: stop() has been called, the adding of data has been stopped, * but blocks are still being generated and pushed. * - StoppedGeneratingBlocks: Generating of blocks has been stopped, but * they are still being pushed. * - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed. */ private object GeneratorState extends Enumeration { type GeneratorState = Value val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value } import GeneratorState._ private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value") private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") // 周期性線程 private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 負(fù)責(zé)將數(shù)據(jù)push的 @volatile private var currentBuffer = new ArrayBuffer[Any] @volatile private var state = Initialized //... }
至此,ReceiverSupervisorImpl實(shí)例化完成。不過,截至目前為止Receiver還未啟動(dòng)。
到此,關(guān)于“ReceiverSupervisorImpl實(shí)例化怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!