真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

第12課:SparkStreaming源碼解讀之Execu

Receiver接收到的數(shù)據(jù)交由ReceiverSupervisorImpl來管理。

創(chuàng)新互聯(lián)公司主要從事網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)西工,10余年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575

ReceiverSupervisorImpl接收到數(shù)據(jù)后,會數(shù)據(jù)存儲并且將數(shù)據(jù)的元數(shù)據(jù)報告給ReceiverTracker 。

Executor的數(shù)據(jù)容錯可以有三種方式:

  1. WAL日志

  2. 數(shù)據(jù)副本

  3. 接收receiver的數(shù)據(jù)流回放

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
  ) {
  val blockId = blockIdOption.getOrElse(nextBlockId)
  val time = System.currentTimeMillis
  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
  val numRecords = blockStoreResult.numRecords
  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

數(shù)據(jù)的存儲,是借助receiverBlockHandler,它的實現(xiàn)有兩種方式:

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    if (checkpointDirOption.isEmpty) {
      throw new SparkException(
        "Cannot enable receiver write-ahead log without checkpoint directory set. " +
          "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
          "See documentation for more details.")
    }
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

WriteAheadLogBaseBlockHandler 一方面將數(shù)據(jù)交由BlockManager管理,另一方面會寫WAL日志。

一旦節(jié)點崩潰,可以由WAL日志恢復(fù)內(nèi)存中的數(shù)據(jù)。在WAL開始時,就不在建議數(shù)據(jù)存儲多個副本。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

而BlockManagerBaseBlockHandler直接將數(shù)據(jù)交由BlockManager管理。

如果不寫WAL,當(dāng)節(jié)點崩潰了一定會數(shù)據(jù)丟失嗎? 這個也不一定。因為在構(gòu)建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的時候會將receiver的storageLevel傳入。storageLevel用來描述數(shù)據(jù)保存的地方(內(nèi)存、磁盤)以及副本個數(shù)。

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable

公有如下種類的StorageLevel:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)

默認(rèn)情況,數(shù)據(jù)采用MEMORY_AND_DISK_2,也就是說數(shù)據(jù)會產(chǎn)生兩個副本,并且內(nèi)存不足時會寫入磁盤。

數(shù)據(jù)的最終存儲是由BlockManager完成并管理的:

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

  var numRecords = None: Option[Long]

  val putResult: Seq[(BlockId, BlockStatus)] = block match {
    case ArrayBufferBlock(arrayBuffer) =>
      numRecords = Some(arrayBuffer.size.toLong)
      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
        tellMaster = true)
    case IteratorBlock(iterator) =>
      val countIterator = new CountingIterator(iterator)
      val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
        tellMaster = true)
      numRecords = countIterator.count
      putResult
    case ByteBufferBlock(byteBuffer) =>
      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
    case o =>
      throw new SparkException(
        s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
  }
  if (!putResult.map { _._1 }.contains(blockId)) {
    throw new SparkException(
      s"Could not store $blockId to block manager with storage level $storageLevel")
  }
  BlockManagerBasedStoreResult(blockId, numRecords)
}

對于從kafka中直接讀取數(shù)據(jù),可以通過記錄數(shù)據(jù)offset的方法來進(jìn)行容錯。如果程序崩潰,下次啟動時,從上次未處理數(shù)據(jù)的offset再次讀取數(shù)據(jù)即可。

備注:

1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


網(wǎng)站題目:第12課:SparkStreaming源碼解讀之Execu
文章源于:http://weahome.cn/article/pseggd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部