這篇文章主要介紹“Executor容錯(cuò)安全性實(shí)例分析”,在日常操作中,相信很多人在Executor容錯(cuò)安全性實(shí)例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Executor容錯(cuò)安全性實(shí)例分析”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出邢臺縣免費(fèi)做網(wǎng)站回饋大家。
sparkstreaming會不斷的接收數(shù)據(jù)、不斷的產(chǎn)生job、不斷的提交job。所以有一個(gè)至關(guān)重要的問題就是數(shù)據(jù)安全性。由于sparkstreaming是基于sparkcore的,如果我們可以確保數(shù)據(jù)安全可靠的話(sparkstreaming生產(chǎn)job的時(shí)候里面是基于RDD),即使運(yùn)行的時(shí)候出現(xiàn)錯(cuò)誤或者故障,也可以基于RDD的容錯(cuò)的能力自動(dòng)進(jìn)行恢復(fù)。所以要確保數(shù)據(jù)的安全性。
對于executor的安全容錯(cuò)主要是數(shù)據(jù)的安全容錯(cuò)。Executor計(jì)算時(shí)候的安全容錯(cuò)是借助spark core的RDD的,所以天然是安全的。
數(shù)據(jù)安全性的一種方式是存儲一份副本,另一種方式是不做副本,但是數(shù)據(jù)源支持重放(也就是可以反復(fù)的讀取數(shù)據(jù)源的數(shù)據(jù)),如果之前讀取的數(shù)據(jù)出現(xiàn)問題,可以重新讀取數(shù)據(jù)。
做副本的方式可以借助blockmanager做備份。Blockmanager存儲數(shù)據(jù)的時(shí)候有很多storagelevel,Receiver接收數(shù)據(jù)后,存儲的時(shí)候指定storagelevel為MEMORY_AND_DISK_SER_2的方式。Blockmanager早存儲的時(shí)候會先考慮memory,只有memory不夠的時(shí)候才會考慮disk,一般memory都是夠的。所以至少兩個(gè)executor上都會有數(shù)據(jù),假設(shè)一個(gè)executor掛掉,就會馬上切換到另一個(gè)executor。
ReceiverSupervisorImpl在存儲數(shù)據(jù)的時(shí)候會有兩種方式,一種是WAL的方式,究竟是不是WAL得方式是通過配置修改的。默認(rèn)是false。如果用WAL的方式必須有checkpoint的目錄,因?yàn)閃AL的數(shù)據(jù)是放在checkpoint的目錄之下的。
def enableReceiverLog(conf: SparkConf): Boolean = {
conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}
Storagelevel是在構(gòu)建inputDstream的時(shí)候傳入的,默認(rèn)就是MEMORY_AND_DISK_SER_2。
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
現(xiàn)在來看ReceiverSupervisorImpl在存儲數(shù)據(jù)的另一種方式(副本方式)。注釋中說的很清楚,根據(jù)指定的storagelevel把接收的blocks交給blockmanager。也就是通過blockmanager來存儲。
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
Blockmanager存儲的時(shí)候會分為多種不同的數(shù)據(jù)類型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。
Blockmanager存儲數(shù)據(jù)前面已經(jīng)講過了。Receiver在接收到數(shù)據(jù)后除了在自己這個(gè)executor上面存儲,還會在另外一個(gè)executor上存儲。如果一個(gè)executor出現(xiàn)問題會瞬間切換到另一個(gè)executor。
WAL的方式原理:在具體的目錄下會做一份日志,假設(shè)后續(xù)處理的過程中出了問題,可以基于日志恢復(fù),日志是寫在checkpoint下。在生產(chǎn)環(huán)境下checkpoint是在HDFS上,這樣日志就會有三份副本。
下面就是用WAL存儲數(shù)據(jù)的類,先寫日志再交給blockmanager存儲。
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
如果采用WAL的方式,存儲數(shù)據(jù)的時(shí)候就不需要有兩份副本,這樣太浪費(fèi)內(nèi)存,如果storagelevel.replication大于1就會打印警告日志。
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}
這里采用兩條線程的線程池,使得blockmanager存儲數(shù)據(jù)和write ahead log可以并發(fā)的執(zhí)行。
// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
這個(gè)是把日志寫入WAL中
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
負(fù)責(zé)讀寫WAL的是WriteAheadLog,這是一個(gè)抽象類,負(fù)責(zé)寫入、讀取、清除數(shù)據(jù)的功能。在寫入數(shù)據(jù)后會返回一個(gè)句柄,以供讀取數(shù)據(jù)使用。
看一下具體寫入數(shù)據(jù)的實(shí)現(xiàn)。如果失敗并且失敗次數(shù)小于最大的失敗次數(shù)就會重試。確實(shí)是返回了一個(gè)句柄。
/**
* Write a byte buffer to the log file. This method synchronously writes the data in the
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
var fileSegment: FileBasedWriteAheadLogSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
fileSegment = getLogWriter(time).write(byteBuffer)
if (closeFileAfterWrite) {
resetWriter()
}
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to write to write ahead log")
resetWriter()
failures += 1
}
}
if (fileSegment == null) {
logError(s"Failed to write to write ahead log after $failures failures")
throw lastException
}
fileSegment
}
下面就是把數(shù)據(jù)寫入HDFS的代碼
/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
assertOpen()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
// If the buffer is not backed by an array, we transfer using temp array
// Note that despite the extra array copy, this should be faster than byte-by-byte copy
while (data.hasRemaining) {
val array = new Array[Byte](data.remaining)
data.get(array)
stream.write(array)
}
}
flush()
nextOffset = stream.getPos()
segment
}
不管是WAL還是直接交給blockmanager都是采用副本的方式。還有一種是數(shù)據(jù)源支持?jǐn)?shù)據(jù)存放,典型的就是kafka。Kafka已經(jīng)成為了數(shù)據(jù)存儲系統(tǒng),它天然具有容錯(cuò)和數(shù)據(jù)副本。
Kafka有receiver和direct的方式。Receiver的方式其實(shí)是交給zookeper來管理matadata的(偏移量offset),如果數(shù)據(jù)處理失敗后,kafka會基于offset重新讀取數(shù)據(jù)。為什么可以重新讀???如果程序崩潰或者數(shù)據(jù)沒處理完是不會給zookeper發(fā)ack。Zookeper就認(rèn)為這個(gè)數(shù)據(jù)沒有被消費(fèi)。實(shí)際生產(chǎn)環(huán)境下越來越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。這就可以保證有且只有一次的容錯(cuò)處理。DirectKafkaInputDstream,它會去看最新的offset,并把這個(gè)內(nèi)容放入batch中。
獲取最新的offset,通過最新的offset減去上一個(gè)offset就可以確定讀哪些數(shù)據(jù),也就是一個(gè)batch中的數(shù)據(jù)。
@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}
容錯(cuò)的弊端就是消耗性能,占用時(shí)間。也不是所有情況都不能容忍數(shù)據(jù)丟失。有些情況下可以不進(jìn)行容錯(cuò)來提高性能。
假如一次處理1000個(gè)block,但是有1個(gè)block出錯(cuò),就需要把1000個(gè)block進(jìn)行重新讀取或者恢復(fù),這也有性能問題。
到此,關(guān)于“Executor容錯(cuò)安全性實(shí)例分析”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!