真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

第17課:SparkStreaming資源動(dòng)態(tài)申請(qǐng)和動(dòng)態(tài)控制消費(fèi)速率原理剖析

本期內(nèi)容:

發(fā)展壯大離不開(kāi)廣大客戶長(zhǎng)期以來(lái)的信賴與支持,我們將始終秉承“誠(chéng)信為本、服務(wù)至上”的服務(wù)理念,堅(jiān)持“二合一”的優(yōu)良服務(wù)模式,真誠(chéng)服務(wù)每家企業(yè),認(rèn)真做好每個(gè)細(xì)節(jié),不斷完善自我,成就企業(yè),實(shí)現(xiàn)共贏。行業(yè)涉及成都航空箱等,在重慶網(wǎng)站建設(shè)公司、全網(wǎng)營(yíng)銷推廣、WAP手機(jī)網(wǎng)站、VI設(shè)計(jì)、軟件開(kāi)發(fā)等項(xiàng)目上具有豐富的設(shè)計(jì)經(jīng)驗(yàn)。

  • Spark Streaming資源動(dòng)態(tài)分配

  • Spark Streaming動(dòng)態(tài)控制消費(fèi)速率

為什么需要?jiǎng)討B(tài)?

  • Spark默認(rèn)情況下粗粒度的,先分配好資源再計(jì)算。而Spark Streaming有高峰值和低峰值,但是他們需要的資源是不一樣的,如果按照高峰值的角度的話,就會(huì)有大量的資源浪費(fèi)。

  • Spark Streaming不斷的運(yùn)行,對(duì)資源消耗和管理也是我們要考慮的因素。

  • Spark Streaming資源動(dòng)態(tài)調(diào)整的時(shí)候會(huì)面臨挑戰(zhàn):

  • Spark Streaming是按照Batch Duration運(yùn)行的,Batch Duration需要很多資源,下一次Batch Duration就不需要那么多資源了,調(diào)整資源的時(shí)候還沒(méi)調(diào)整完Batch Duration運(yùn)行就已經(jīng)過(guò)期了。這個(gè)時(shí)候調(diào)整時(shí)間間隔。

Spark Streaming資源動(dòng)態(tài)申請(qǐng) 

1. 在SparkContext中默認(rèn)是不開(kāi)啟動(dòng)態(tài)資源分配的,但是可以通過(guò)手動(dòng)在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && //參數(shù)配置是否開(kāi)啟資源動(dòng)態(tài)分配_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
  if (dynamicAllocationEnabled) {    Some(new ExecutorAllocationManager(this, listenerBus, _conf))
  } else {    None
  }
_executorAllocationManager.foreach(_.start())
  1. ExecutorAllocationManager: 有定時(shí)器會(huì)不斷的去掃描Executor的情況,正在運(yùn)行的Stage,要運(yùn)行在不同的Executor中,要么增加Executor或者減少。

  2. ExecutorAllocationManager中schedule方法會(huì)被周期性觸發(fā)進(jìn)行資源動(dòng)態(tài)調(diào)整。

/** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. * * First, adjust our requested executors based on the add time and our current needs. * Then, if the remove time for an existing executor has expired, kill the executor. * * This is factored out into its own method for testing. */private def schedule(): Unit = synchronized {  val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now)

  removeTimes.retain { case (executorId, expireTime) =>
    val expired = now >= expireTime    if (expired) {
      initializing = false
      removeExecutor(executorId)
    }    !expired
  }
}
  1. 在ExecutorAllocationManager中會(huì)在線程池中定時(shí)器會(huì)不斷的運(yùn)行schedule.

/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = {
  listenerBus.addListener(listener)  val scheduleTask = new Runnable() {    override def run(): Unit = {      try {
        schedule()
      } catch {        case ct: ControlThrowable =>
          throw ct        case t: Throwable =>
          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
      }
    }
  }// intervalMillis定時(shí)器觸發(fā)時(shí)間
  executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

動(dòng)態(tài)控制消費(fèi)速率: Spark Streaming提供了一種彈性機(jī)制,流進(jìn)來(lái)的速度和處理速度的關(guān)系,是否來(lái)得及處理數(shù)據(jù)。如果不能來(lái)得及的話,他會(huì)自動(dòng)動(dòng)態(tài)控制數(shù)據(jù)流進(jìn)來(lái)的速度,spark.streaming.backpressure.enabled參數(shù)設(shè)置。

動(dòng)態(tài)控制消費(fèi)速率的原理可參考論文 Adaptive Stream Processing using Dynamic Batch Sizing

備注:

1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark 
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains


當(dāng)前題目:第17課:SparkStreaming資源動(dòng)態(tài)申請(qǐng)和動(dòng)態(tài)控制消費(fèi)速率原理剖析
本文地址:http://weahome.cn/article/pcpogj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部