真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Executor容錯(cuò)安全性實(shí)例分析

這篇文章主要介紹“Executor容錯(cuò)安全性實(shí)例分析”,在日常操作中,相信很多人在Executor容錯(cuò)安全性實(shí)例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Executor容錯(cuò)安全性實(shí)例分析”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出邢臺縣免費(fèi)做網(wǎng)站回饋大家。

sparkstreaming會不斷的接收數(shù)據(jù)、不斷的產(chǎn)生job、不斷的提交job。所以有一個(gè)至關(guān)重要的問題就是數(shù)據(jù)安全性。由于sparkstreaming是基于sparkcore的,如果我們可以確保數(shù)據(jù)安全可靠的話(sparkstreaming生產(chǎn)job的時(shí)候里面是基于RDD),即使運(yùn)行的時(shí)候出現(xiàn)錯(cuò)誤或者故障,也可以基于RDD的容錯(cuò)的能力自動(dòng)進(jìn)行恢復(fù)。所以要確保數(shù)據(jù)的安全性。

對于executor的安全容錯(cuò)主要是數(shù)據(jù)的安全容錯(cuò)。Executor計(jì)算時(shí)候的安全容錯(cuò)是借助spark core的RDD的,所以天然是安全的。

數(shù)據(jù)安全性的一種方式是存儲一份副本,另一種方式是不做副本,但是數(shù)據(jù)源支持重放(也就是可以反復(fù)的讀取數(shù)據(jù)源的數(shù)據(jù)),如果之前讀取的數(shù)據(jù)出現(xiàn)問題,可以重新讀取數(shù)據(jù)。

做副本的方式可以借助blockmanager做備份。Blockmanager存儲數(shù)據(jù)的時(shí)候有很多storagelevel,Receiver接收數(shù)據(jù)后,存儲的時(shí)候指定storagelevel為MEMORY_AND_DISK_SER_2的方式。Blockmanager早存儲的時(shí)候會先考慮memory,只有memory不夠的時(shí)候才會考慮disk,一般memory都是夠的。所以至少兩個(gè)executor上都會有數(shù)據(jù),假設(shè)一個(gè)executor掛掉,就會馬上切換到另一個(gè)executor。

ReceiverSupervisorImpl在存儲數(shù)據(jù)的時(shí)候會有兩種方式,一種是WAL的方式,究竟是不是WAL得方式是通過配置修改的。默認(rèn)是false。如果用WAL的方式必須有checkpoint的目錄,因?yàn)閃AL的數(shù)據(jù)是放在checkpoint的目錄之下的。

def enableReceiverLog(conf: SparkConf): Boolean = {
  conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}

Storagelevel是在構(gòu)建inputDstream的時(shí)候傳入的,默認(rèn)就是MEMORY_AND_DISK_SER_2。

* @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */

def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

現(xiàn)在來看ReceiverSupervisorImpl在存儲數(shù)據(jù)的另一種方式(副本方式)。注釋中說的很清楚,根據(jù)指定的storagelevel把接收的blocks交給blockmanager。也就是通過blockmanager來存儲。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */

private[streaming] class BlockManagerBasedBlockHandler(
    blockManager: BlockManager, storageLevel: StorageLevel)

Blockmanager存儲的時(shí)候會分為多種不同的數(shù)據(jù)類型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。

Blockmanager存儲數(shù)據(jù)前面已經(jīng)講過了。Receiver在接收到數(shù)據(jù)后除了在自己這個(gè)executor上面存儲,還會在另外一個(gè)executor上存儲。如果一個(gè)executor出現(xiàn)問題會瞬間切換到另一個(gè)executor。

WAL的方式原理:在具體的目錄下會做一份日志,假設(shè)后續(xù)處理的過程中出了問題,可以基于日志恢復(fù),日志是寫在checkpoint下。在生產(chǎn)環(huán)境下checkpoint是在HDFS上,這樣日志就會有三份副本。

下面就是用WAL存儲數(shù)據(jù)的類,先寫日志再交給blockmanager存儲。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks in both, a write ahead log and a block manager.
 */

private[streaming] class WriteAheadLogBasedBlockHandler(

如果采用WAL的方式,存儲數(shù)據(jù)的時(shí)候就不需要有兩份副本,這樣太浪費(fèi)內(nèi)存,如果storagelevel.replication大于1就會打印警告日志。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

這里采用兩條線程的線程池,使得blockmanager存儲數(shù)據(jù)和write ahead log可以并發(fā)的執(zhí)行。

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
  ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

這個(gè)是把日志寫入WAL中

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
  writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

負(fù)責(zé)讀寫WAL的是WriteAheadLog,這是一個(gè)抽象類,負(fù)責(zé)寫入、讀取、清除數(shù)據(jù)的功能。在寫入數(shù)據(jù)后會返回一個(gè)句柄,以供讀取數(shù)據(jù)使用。

看一下具體寫入數(shù)據(jù)的實(shí)現(xiàn)。如果失敗并且失敗次數(shù)小于最大的失敗次數(shù)就會重試。確實(shí)是返回了一個(gè)句柄。

/**
 * Write a byte buffer to the log file. This method synchronously writes the data in the
 * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
 * to HDFS, and will be available for readers to read.
 */

def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
  var fileSegment: FileBasedWriteAheadLogSegment = null
  var failures = 0
  var lastException: Exception = null
  var succeeded = false
  while (!succeeded && failures < maxFailures) {
    try {
      fileSegment = getLogWriter(time).write(byteBuffer)
      if (closeFileAfterWrite) {
        resetWriter()
      }
      succeeded = true
    catch {
      case ex: Exception =>
        lastException = ex
        logWarning("Failed to write to write ahead log")
        resetWriter()
        failures += 1
    }
  }
  if (fileSegment == null) {
    logError(s"Failed to write to write ahead log after $failures failures")
    throw lastException
  }
  fileSegment
}

下面就是把數(shù)據(jù)寫入HDFS的代碼

/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
  assertOpen()
  data.rewind() // Rewind to ensure all data in the buffer is retrieved
  val lengthToWrite = data.remaining()
  val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
  stream.writeInt(lengthToWrite)
  if (data.hasArray) {
    stream.write(data.array())
  } else {
    // If the buffer is not backed by an array, we transfer using temp array
    // Note that despite the extra array copy, this should be faster than byte-by-byte copy
    while (data.hasRemaining) {
      val array = new Array[Byte](data.remaining)
      data.get(array)
      stream.write(array)
    }
  }
  flush()
  nextOffset stream.getPos()
  segment
}

不管是WAL還是直接交給blockmanager都是采用副本的方式。還有一種是數(shù)據(jù)源支持?jǐn)?shù)據(jù)存放,典型的就是kafka。Kafka已經(jīng)成為了數(shù)據(jù)存儲系統(tǒng),它天然具有容錯(cuò)和數(shù)據(jù)副本。

Kafka有receiver和direct的方式。Receiver的方式其實(shí)是交給zookeper來管理matadata的(偏移量offset),如果數(shù)據(jù)處理失敗后,kafka會基于offset重新讀取數(shù)據(jù)。為什么可以重新讀???如果程序崩潰或者數(shù)據(jù)沒處理完是不會給zookeper發(fā)ack。Zookeper就認(rèn)為這個(gè)數(shù)據(jù)沒有被消費(fèi)。實(shí)際生產(chǎn)環(huán)境下越來越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。這就可以保證有且只有一次的容錯(cuò)處理。DirectKafkaInputDstream,它會去看最新的offset,并把這個(gè)內(nèi)容放入batch中。

獲取最新的offset,通過最新的offset減去上一個(gè)offset就可以確定讀哪些數(shù)據(jù),也就是一個(gè)batch中的數(shù)據(jù)。

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
  // Either.fold would confuse @tailrec, do it manually
  if (o.isLeft) {
    val err = o.left.get.toString
    if (retries <= 0) {
      throw new SparkException(err)
    } else {
      log.error(err)
      Thread.sleep(kc.config.refreshLeaderBackoffMs)
      latestLeaderOffsets(retries - 1)
    }
  } else {
    o.right.get
  }
}

容錯(cuò)的弊端就是消耗性能,占用時(shí)間。也不是所有情況都不能容忍數(shù)據(jù)丟失。有些情況下可以不進(jìn)行容錯(cuò)來提高性能。

假如一次處理1000個(gè)block,但是有1個(gè)block出錯(cuò),就需要把1000個(gè)block進(jìn)行重新讀取或者恢復(fù),這也有性能問題。

到此,關(guān)于“Executor容錯(cuò)安全性實(shí)例分析”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!


標(biāo)題名稱:Executor容錯(cuò)安全性實(shí)例分析
分享網(wǎng)址:http://weahome.cn/article/jgjcdp.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部