本期內(nèi)容:
創(chuàng)新互聯(lián)是一家以網(wǎng)絡(luò)技術(shù)公司,為中小企業(yè)提供網(wǎng)站維護(hù)、成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、網(wǎng)站備案、服務(wù)器租用、國(guó)際域名空間、軟件開(kāi)發(fā)、微信小程序開(kāi)發(fā)等企業(yè)互聯(lián)網(wǎng)相關(guān)業(yè)務(wù),是一家有著豐富的互聯(lián)網(wǎng)運(yùn)營(yíng)推廣經(jīng)驗(yàn)的科技公司,有著多年的網(wǎng)站建站經(jīng)驗(yàn),致力于幫助中小企業(yè)在互聯(lián)網(wǎng)讓打出自已的品牌和口碑,讓企業(yè)在互聯(lián)網(wǎng)上打開(kāi)一個(gè)面向全國(guó)乃至全球的業(yè)務(wù)窗口:建站歡迎聯(lián)系:028-86922220
1、Executor的WAL容錯(cuò)機(jī)制
2、消息重放
Executor的安全容錯(cuò)主要是數(shù)據(jù)的安全容錯(cuò),那為什么不考慮數(shù)據(jù)計(jì)算的安全容錯(cuò)呢?
原因是計(jì)算的時(shí)候Spark Streaming是借助于Spark Core上RDD的安全容錯(cuò)的,所以天然的安全可靠的。
Executor的安全容錯(cuò)主要有:
1、數(shù)據(jù)副本:
有兩種方式:a.借助底層的BlockManager,BlockManager做備份,通過(guò)傳入的StorageLevel進(jìn)行備份。
b. WAL方式進(jìn)行容錯(cuò)。
2、接受到數(shù)據(jù)之后,不做副本,但是數(shù)據(jù)源支持存放,所謂存放就是可以反復(fù)的讀取源數(shù)據(jù)。
容錯(cuò)的弊端:耗時(shí)間、耗空間。
簡(jiǎn)單的看下源代碼:
/** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通過(guò)WAL容錯(cuò) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通過(guò)BlockManager進(jìn)行容錯(cuò) } }
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId, numRecords) }
簡(jiǎn)單流程圖:
參考博客:http://blog.csdn.net/hanburgud/article/details/51471089