Receiver接收到的數(shù)據(jù)交由ReceiverSupervisorImpl來管理。
創(chuàng)新互聯(lián)公司主要從事網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)西工,10余年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
ReceiverSupervisorImpl接收到數(shù)據(jù)后,會數(shù)據(jù)存儲并且將數(shù)據(jù)的元數(shù)據(jù)報告給ReceiverTracker 。
Executor的數(shù)據(jù)容錯可以有三種方式:
WAL日志
數(shù)據(jù)副本
接收receiver的數(shù)據(jù)流回放
/** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
數(shù)據(jù)的存儲,是借助receiverBlockHandler,它的實現(xiàn)有兩種方式:
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } }
WriteAheadLogBaseBlockHandler 一方面將數(shù)據(jù)交由BlockManager管理,另一方面會寫WAL日志。
一旦節(jié)點崩潰,可以由WAL日志恢復(fù)內(nèi)存中的數(shù)據(jù)。在WAL開始時,就不在建議數(shù)據(jù)存儲多個副本。
private val effectiveStorageLevel = { if (storageLevel.deserialized) { logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" + s" write ahead log is enabled, change to serialization false") } if (storageLevel.replication > 1) { logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " + s"write ahead log is enabled, change to replication 1") } StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1) }
而BlockManagerBaseBlockHandler直接將數(shù)據(jù)交由BlockManager管理。
如果不寫WAL,當(dāng)節(jié)點崩潰了一定會數(shù)據(jù)丟失嗎? 這個也不一定。因為在構(gòu)建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的時候會將receiver的storageLevel傳入。storageLevel用來描述數(shù)據(jù)保存的地方(內(nèi)存、磁盤)以及副本個數(shù)。
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable
公有如下種類的StorageLevel:
val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false)
默認(rèn)情況,數(shù)據(jù)采用MEMORY_AND_DISK_2,也就是說數(shù)據(jù)會產(chǎn)生兩個副本,并且內(nèi)存不足時會寫入磁盤。
數(shù)據(jù)的最終存儲是由BlockManager完成并管理的:
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId, numRecords) }
對于從kafka中直接讀取數(shù)據(jù),可以通過記錄數(shù)據(jù)offset的方法來進(jìn)行容錯。如果程序崩潰,下次啟動時,從上次未處理數(shù)據(jù)的offset再次讀取數(shù)據(jù)即可。
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains