本期內(nèi)容:
發(fā)展壯大離不開(kāi)廣大客戶長(zhǎng)期以來(lái)的信賴與支持,我們將始終秉承“誠(chéng)信為本、服務(wù)至上”的服務(wù)理念,堅(jiān)持“二合一”的優(yōu)良服務(wù)模式,真誠(chéng)服務(wù)每家企業(yè),認(rèn)真做好每個(gè)細(xì)節(jié),不斷完善自我,成就企業(yè),實(shí)現(xiàn)共贏。行業(yè)涉及成都航空箱等,在重慶網(wǎng)站建設(shè)公司、全網(wǎng)營(yíng)銷推廣、WAP手機(jī)網(wǎng)站、VI設(shè)計(jì)、軟件開(kāi)發(fā)等項(xiàng)目上具有豐富的設(shè)計(jì)經(jīng)驗(yàn)。
Spark Streaming資源動(dòng)態(tài)分配
Spark Streaming動(dòng)態(tài)控制消費(fèi)速率
為什么需要?jiǎng)討B(tài)?
Spark默認(rèn)情況下粗粒度的,先分配好資源再計(jì)算。而Spark Streaming有高峰值和低峰值,但是他們需要的資源是不一樣的,如果按照高峰值的角度的話,就會(huì)有大量的資源浪費(fèi)。
Spark Streaming不斷的運(yùn)行,對(duì)資源消耗和管理也是我們要考慮的因素。
Spark Streaming資源動(dòng)態(tài)調(diào)整的時(shí)候會(huì)面臨挑戰(zhàn):
Spark Streaming是按照Batch Duration運(yùn)行的,Batch Duration需要很多資源,下一次Batch Duration就不需要那么多資源了,調(diào)整資源的時(shí)候還沒(méi)調(diào)整完Batch Duration運(yùn)行就已經(jīng)過(guò)期了。這個(gè)時(shí)候調(diào)整時(shí)間間隔。
Spark Streaming資源動(dòng)態(tài)申請(qǐng)
1. 在SparkContext中默認(rèn)是不開(kāi)啟動(dòng)態(tài)資源分配的,但是可以通過(guò)手動(dòng)在SparkConf中配置。
// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && //參數(shù)配置是否開(kāi)啟資源動(dòng)態(tài)分配_conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None } _executorAllocationManager.foreach(_.start())
ExecutorAllocationManager: 有定時(shí)器會(huì)不斷的去掃描Executor的情況,正在運(yùn)行的Stage,要運(yùn)行在不同的Executor中,要么增加Executor或者減少。
ExecutorAllocationManager中schedule方法會(huì)被周期性觸發(fā)進(jìn)行資源動(dòng)態(tài)調(diào)整。
/** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. * * First, adjust our requested executors based on the add time and our current needs. * Then, if the remove time for an existing executor has expired, kill the executor. * * This is factored out into its own method for testing. */private def schedule(): Unit = synchronized { val now = clock.getTimeMillis updateAndSyncNumExecutorsTarget(now) removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false removeExecutor(executorId) } !expired } }
在ExecutorAllocationManager中會(huì)在線程池中定時(shí)器會(huì)不斷的運(yùn)行schedule.
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = { listenerBus.addListener(listener) val scheduleTask = new Runnable() { override def run(): Unit = { try { schedule() } catch { case ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } }// intervalMillis定時(shí)器觸發(fā)時(shí)間 executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) }
動(dòng)態(tài)控制消費(fèi)速率: Spark Streaming提供了一種彈性機(jī)制,流進(jìn)來(lái)的速度和處理速度的關(guān)系,是否來(lái)得及處理數(shù)據(jù)。如果不能來(lái)得及的話,他會(huì)自動(dòng)動(dòng)態(tài)控制數(shù)據(jù)流進(jìn)來(lái)的速度,spark.streaming.backpressure.enabled參數(shù)設(shè)置。
動(dòng)態(tài)控制消費(fèi)速率的原理可參考論文 Adaptive Stream Processing using Dynamic Batch Sizing
備注:
1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains