真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

ReceiverTracker是怎么處理數(shù)據(jù)的

本篇內(nèi)容介紹了“ReceiverTracker是怎么處理數(shù)據(jù)的”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序定制開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了曲靖免費(fèi)建站歡迎大家使用!

ReceiverTracker可以以Driver中具體的算法計(jì)算出在具體的executor上啟動(dòng)Receiver。啟動(dòng)Receiver的方法是封裝在一個(gè)task中運(yùn)行,這個(gè)task是job中唯一的task。實(shí)質(zhì)上講,ReceiverTracker啟動(dòng)Receiver時(shí)封裝成一個(gè)又一個(gè)的job。啟動(dòng)Receiver的方法中有一個(gè)ReceiverSupervisorImpl,ReceiverSupervisorImpl的start方法會(huì)導(dǎo)致Receiver早work節(jié)點(diǎn)上真正的執(zhí)行。轉(zhuǎn)過(guò)來(lái)通過(guò)BlockGenerator把接收到的數(shù)據(jù)放入block中,并通過(guò)ReceiverSupervisorImpl把block進(jìn)行存儲(chǔ),然后把數(shù)據(jù)的元數(shù)據(jù)匯報(bào)給ReceiverTracker。

下面就講ReceiverTracker在接收到數(shù)據(jù)之后具體怎么處理。

ReceiverSupervisorImpl把block進(jìn)行存儲(chǔ)是通過(guò)receivedBlockHandler來(lái)寫的。

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    ...
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

一種是通過(guò)WAL的方式,一種是通過(guò)BlockManager的方式。

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

把數(shù)據(jù)存儲(chǔ)起來(lái)切向ReceiverTracker匯報(bào)。匯報(bào)的時(shí)候是元數(shù)據(jù)。

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
    streamId: Int,
    numRecords: Option[Long],
    metadataOption: Option[Any],
    blockStoreResult: ReceivedBlockStoreResult

Sealed關(guān)鍵字的意思就是所有的子類都在當(dāng)前的文件中

ReceiverTracker管理Receiver的啟動(dòng)、回收、接收匯報(bào)的元數(shù)據(jù)。ReceiverTracker在實(shí)例化之前必須所有的input stream都已經(jīng)被added和streamingcontext.start()。因?yàn)镽eceiverTracker要為每個(gè)input stream啟動(dòng)一個(gè)Receiver。

ReceiverTracker中有所有的輸入數(shù)據(jù)來(lái)源和ID。

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds receiverInputStreams.map { _.id }

ReceiverTracker的狀態(tài)

/** Enumeration to identify current state of the ReceiverTracker */
object TrackerState extends Enumeration {
  type TrackerState = Value
  val InitializedStartedStoppingStopped = Value
}

下面看一下ReceiverTracker在接收到ReceiverSupervisorImpl發(fā)送的AddBlock的消息后的處理。

case AddBlock(receivedBlockInfo) =>
  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
    walBatchingThreadPool.execute(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        if (active) {
          context.reply(addBlock(receivedBlockInfo))
        } else {
          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }

先判斷一下是不是WAL得方式,如果是就用線程池中的一個(gè)線程來(lái)回復(fù)addBlock,因?yàn)閃AL非常消耗性能。否則就直接回復(fù)addBlock。

 讓后交給receiverBlockTracker 進(jìn)行處理

/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  receivedBlockTracker.addBlock(receivedBlockInfo)
}

ReceiverBlockTracker是在Driver端管理blockInfo的。

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  try {
    val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
    if (writeResult) {
      synchronized {
        getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
    case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
      false
  }
}

writeToLog的代碼很簡(jiǎn)單,首先判斷是不是WAL得方式,如果是就把blockInfo寫入到日志中,用于以后恢復(fù)數(shù)據(jù)。否則的話就直接返回true。然后就把block的信息放入streamIdToUnallocatedBlockQueues中。

private val streamIdToUnallocatedBlockQueues new mutable.HashMap[Int, ReceivedBlockQueue]

這個(gè)數(shù)據(jù)結(jié)構(gòu)很精妙,key是streamid,value是一個(gè)隊(duì)列,把每一個(gè)stream接收的block信息分開存儲(chǔ)。這樣ReceiverBlockTracker就有了所有stream接收到的block信息。

/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
  if (isWriteAheadLogEnabled) {
    logTrace(s"Writing record: $record")
    try {
      writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
        clock.getTimeMillis())
      true
    catch {
      case NonFatal(e) =>
        logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
        false
    }
  } else {
    true
  }
}

詳細(xì)看一下ReceiverBlockTracker的注釋。這個(gè)class會(huì)追蹤所有接收到的blocks,并把他們按batch分配,如果有需要這個(gè)class接收的所有action都可以寫WAL中,如果指定了checkpoint的目錄,當(dāng)Driver崩潰了,ReceiverBlockTracker的狀態(tài)(包括接收的blocks和分配的blocks)都可以恢復(fù)。如果實(shí)例化這個(gè)class的時(shí)候指定了checkpoint,就會(huì)從中讀取之前保存的信息。

/**
 * Class that keep track of all the received blocks, and allocate them to batches
 * when required. All actions taken by this class can be saved to a write ahead log
 * (if a checkpoint directory has been provided), so that the state of the tracker
 * (received blocks and block-to-batch allocations) can be recovered after driver failure.
 *
 * Note that when any instance of this class is created with a checkpoint directory,
 * it will try reading events from logs in the directory.
 */

private[streaming] class ReceivedBlockTracker(

下面看一下ReceiverTracker接收到CleanupOldBlocks后的處理。

case c: CleanupOldBlocks =>
  receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))

ReceiverTracker接收到這條消息后會(huì)給它管理的每一個(gè)Receiver發(fā)送這個(gè)消息。ReceiverSupervisorImpl接收到消息后使用receivedBlockHandler清理數(shù)據(jù)。

private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
  logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
 receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
}

ReceiverTracker還可以隨時(shí)調(diào)整某一個(gè)streamID接收數(shù)據(jù)的速度,向?qū)?yīng)的ReceiverSupervisorImpl發(fā)送UpdateRateLimit的消息。

case UpdateReceiverRateLimit(streamUID, newRate) =>
  for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
    eP.send(UpdateRateLimit(newRate))
  }

ReceiverSupervisorImpl接收到消息后。

case UpdateRateLimit(eps) =>
  logInfo(s"Received a new rate limit: $eps.")
  registeredBlockGenerators.foreach { bg =>
    bg.updateRate(eps)
  }

/**
 * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
 * {{{spark.streaming.receiver.maxRate}}}, even if `newRateis higher than that.
 *
 * 
@param newRate A new rate in events per second. It has no effect if it's 0 or negative.
 */
private[receiver] def updateRate(newRate: Long): Unit =
  if (newRate > 0) {
    if (maxRateLimit > 0) {
      rateLimiter.setRate(newRate.min(maxRateLimit))
    } else {
      rateLimiter.setRate(newRate)
    }
  }

ReceiverTracker是一個(gè)門面設(shè)計(jì)模式,看似調(diào)用的是ReceiverTracker的功能,其實(shí)調(diào)用的是別的類的功能。

“ReceiverTracker是怎么處理數(shù)據(jù)的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


分享名稱:ReceiverTracker是怎么處理數(shù)據(jù)的
當(dāng)前URL:http://weahome.cn/article/jcccjp.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部