作者 | 一錘、明濟(jì)
10年積累的網(wǎng)站建設(shè)、做網(wǎng)站經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有浪卡子免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。阿里云EMR自2020年推出Remote Shuffle Service(RSS)以來,幫助了諸多客戶解決Spark作業(yè)的性能、穩(wěn)定性問題,并使得存算分離架構(gòu)得以實(shí)施。為了更方便大家使用和擴(kuò)展,RSS在2022年初開源,歡迎各路開發(fā)者共建。RSS的整體架構(gòu)請(qǐng)參考[1],本文將介紹RSS最新的兩個(gè)重要功能:支持Adaptive Query Execution(AQE),以及流控。
自適應(yīng)執(zhí)行(Adaptive Query Execution, AQE)是Spark3的重要功能[2],通過收集運(yùn)行時(shí)Stats,來動(dòng)態(tài)調(diào)整后續(xù)的執(zhí)行計(jì)劃,從而解決由于Optimizer無法準(zhǔn)確預(yù)估Stats導(dǎo)致生成的執(zhí)行計(jì)劃不夠好的問題。AQE主要有三個(gè)優(yōu)化場(chǎng)景: Partition合并(Partition Coalescing), Join策略切換(Switch Join Strategy),以及傾斜Join優(yōu)化(Optimize Skew Join)。這三個(gè)場(chǎng)景都對(duì)Shuffle框架的能力提出了新的需求。
Partition合并的目的是盡量讓reducer處理的數(shù)據(jù)量適中且均勻,做法是首先Mapper按較多的Partition數(shù)目進(jìn)行Shuffle Write,AQE框架統(tǒng)計(jì)每個(gè)Partition的Size,若連續(xù)多個(gè)Partition的數(shù)據(jù)量都比較小,則將這些Partition合并成一個(gè),交由一個(gè)Reducer去處理。過程如下所示。
由上圖可知,優(yōu)化后的Reducer2需讀取原屬于Reducer2-4的數(shù)據(jù),對(duì)Shuffle框架的需求是ShuffleReader需要支持范圍Partition:
def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C]
Join策略切換的目的是修正由于Stats預(yù)估不準(zhǔn)導(dǎo)致Optimizer把本應(yīng)做的Broadcast Join錯(cuò)誤的選擇了SortMerge Join或ShuffleHash Join。具體而言,在Join的兩張表做完Shuffle Write之后,AQE框架統(tǒng)計(jì)了實(shí)際大小,若發(fā)現(xiàn)小表符合Broadcast Join的條件,則將小表Broadcast出去,跟大表的本地Shuffle數(shù)據(jù)做Join。流程如下:成都服務(wù)器托管
Join策略切換有兩個(gè)優(yōu)化:1. 改寫成Broadcast Join; 2. 大表的數(shù)據(jù)通過LocalShuffleReader直讀本地。其中第2點(diǎn)對(duì)Shuffle框架提的新需求是支持Local Read。
傾斜Join優(yōu)化的目的是讓傾斜的Partition由更多的Reducer去處理,從而避免長(zhǎng)尾。具體而言,在Shuffle Write結(jié)束之后,AQE框架統(tǒng)計(jì)每個(gè)Partition的Size,接著根據(jù)特定規(guī)則判斷是否存在傾斜,若存在,則把該P(yáng)artition分裂成多個(gè)Split,每個(gè)Split跟另外一張表的對(duì)應(yīng)Partition做Join。如下所示。
Partiton分裂的做法是按照MapId的順序累加他們Shuffle Output的Size,累加值超過閾值時(shí)觸發(fā)分裂。對(duì)Shuffle框架的新需求是ShuffleReader要能支持范圍MapId。綜合Partition合并優(yōu)化對(duì)范圍Partition的需求,ShuffleReader的接口演化為:
def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
RSS的核心設(shè)計(jì)是Push Shuffle + Partition數(shù)據(jù)聚合,即不同的Mapper把屬于同一個(gè)Partition的數(shù)據(jù)推給同一個(gè)Worker做聚合,Reducer直讀聚合后的文件。如下圖所示。
在核心設(shè)計(jì)之外,RSS還實(shí)現(xiàn)了多副本,全鏈路容錯(cuò),Master HA,磁盤容錯(cuò),自適應(yīng)Pusher,滾動(dòng)升級(jí)等特性,詳見[1]。
Partition合并對(duì)Shuffle框架的需求是支持范圍Partition,在RSS中每個(gè)Partition對(duì)應(yīng)著一個(gè)文件,因此天然支持,如下圖所示。
Join策略切換對(duì)Shuffle框架的需求是能夠支持LocalShuffleReader。由于RSS的Remote屬性,數(shù)據(jù)存放在RSS集群,僅當(dāng)RSS和計(jì)算集群混部的場(chǎng)景下才會(huì)存在在本地,因此暫不支持Local Read(將來會(huì)優(yōu)化混部場(chǎng)景并加以支持)。需要注意的是,盡管不支持Local Read,但并不影響Join的改寫,RSS支持Join改寫優(yōu)化如下圖所示。
在AQE的三個(gè)場(chǎng)景中,RSS支持Join傾斜優(yōu)化是最為困難的一點(diǎn)。RSS的核心設(shè)計(jì)是Partition數(shù)據(jù)聚合,目的是把Shuffle Read的隨機(jī)讀轉(zhuǎn)變?yōu)轫樞蜃x,從而提升性能和穩(wěn)定性。多個(gè)Mapper同時(shí)推送給RSS Worker,RSS在內(nèi)存聚合后刷盤,因此Partition文件中來自不同Mapper的數(shù)據(jù)是無序的,如下圖所示。
Join傾斜優(yōu)化需要讀取范圍Map,例如讀Map1-2的數(shù)據(jù),常規(guī)的做法有兩種:成都服務(wù)器托管
這兩種做法的問題顯而易見。方法1會(huì)導(dǎo)致大量冗余的磁盤讀;方法2本質(zhì)上回退成了隨機(jī)讀,喪失了RSS最核心的優(yōu)勢(shì),并且創(chuàng)建索引文件成為通用的Overhead,即使是針對(duì)非傾斜的數(shù)據(jù)(Shuffle Write過程中難以準(zhǔn)確預(yù)測(cè)是否存在傾斜)。
為了解決以上兩個(gè)問題,我們提出了新的設(shè)計(jì):主動(dòng)Split + Sort On Read。
傾斜的Partition大概率Size非常大,極端情況會(huì)直接打爆磁盤,即使在非傾斜場(chǎng)景出現(xiàn)大Partition的幾率依然不小。因此,從磁盤負(fù)載均衡的角度,監(jiān)控Partition文件的Size并做主動(dòng)Split(默認(rèn)閾值256m)是非常必要的。
Split發(fā)生時(shí),RSS會(huì)為當(dāng)前Partition重新分配一對(duì)Worker(主副本),后續(xù)數(shù)據(jù)將推給新的Worker。為了避免Split對(duì)正在運(yùn)行的Mapper產(chǎn)生影響,我們提出了Soft Split的方法,即當(dāng)觸發(fā)Split時(shí),RSS異步去準(zhǔn)備新的Worker,Ready之后去熱更新Mapper的PartitionLocation信息,因此不會(huì)對(duì)Mapper的PushData產(chǎn)生任何干擾。整體流程如下圖所示。
為了避免隨機(jī)讀的問題,RSS采用了Sort On Read的策略。具體而言,F(xiàn)ile Split的首次Range讀會(huì)觸發(fā)排序(非Range讀不會(huì)觸發(fā)),排好序的文件連同其位置索引寫回磁盤。后續(xù)的Range讀即可保證是順序讀取。如下圖所示。
為了避免多個(gè)Sub-Reducer等待同一個(gè)File Split的排序,我們打散了各個(gè)Sub-Reducer讀取Split的順序,如下圖所示。
Sort On Read可以有效避免冗余讀和隨機(jī)讀,但需要對(duì)Split File(256m)做排序,本節(jié)討論排序的實(shí)現(xiàn)及開銷。文件排序包括3個(gè)步驟:讀文件,對(duì)MapId做排序,寫文件。RSS的Block默認(rèn)256k,Block的數(shù)量大概是1000,因此排序的過程非常快,主要開銷在文件讀寫。整個(gè)排序過程大致有三種方案:成都服務(wù)器托管
從IO的視角,乍看之下,方案1通過使用足量?jī)?nèi)存,不存在順序讀寫;方案2存在隨機(jī)讀和隨機(jī)寫;方案3存在隨機(jī)寫;直觀上方案1性能更好。綿陽服務(wù)器托管然而,由于PageCache的存在,方案3在寫文件時(shí)原文件大概率緩存在PageCache中,因此實(shí)測(cè)下來方案3的性能更好,如下圖所示。
同時(shí)方案3無需占用進(jìn)程額外內(nèi)存,故RSS采用方案3的算法。我們同時(shí)還測(cè)試了Sort On Read跟上述的不排序、僅做索引的隨機(jī)讀方法的對(duì)比,如下圖所示。
RSS支持Join傾斜優(yōu)化的整體流程如下圖所示。
流控的主要目的是防止RSS Worker內(nèi)存被打爆。流控通常有兩種方式:成都服務(wù)器托管
由于PushData是非常高頻且性能關(guān)鍵的操作,若每次推送都額外進(jìn)行一次RPC交互,則開銷太大,因此我們采用了反壓的策略。以Worker的視角,流入數(shù)據(jù)有兩個(gè)源:成都服務(wù)器托管
如下圖所示,Worker2既接收來自Mapper推送的Partition3的數(shù)據(jù),也接收Worker1發(fā)送的Partition1的副本數(shù)據(jù),同時(shí)會(huì)把Partition3的數(shù)據(jù)發(fā)給對(duì)應(yīng)的從副本。
其中,來自Mapper推送的數(shù)據(jù),當(dāng)且僅當(dāng)同時(shí)滿足以下條件時(shí)才會(huì)釋放內(nèi)存:成都服務(wù)器托管
來自主副本推送的數(shù)據(jù),當(dāng)且僅當(dāng)滿足以下條件時(shí)才會(huì)釋放內(nèi)存:成都服務(wù)器托管
我們?cè)谠O(shè)計(jì)流控策略時(shí),不僅要考慮限流(降低流入的數(shù)據(jù)),更要考慮泄流(內(nèi)存能及時(shí)釋放)。具體而言,高水位我們定義了兩檔內(nèi)存閾值(分別對(duì)應(yīng)85%和95%內(nèi)存使用),低水位只有一檔(50%內(nèi)存使用)。達(dá)到高水位一檔閾值時(shí),觸發(fā)流控,暫停接收Mapper推送的數(shù)據(jù),同時(shí)強(qiáng)制刷盤,從而達(dá)到泄流的目標(biāo)。僅限制來自Mapper的流入并不能控制來自主副本的流量,因此我們定義了高水位第二檔,達(dá)到此閾值時(shí)將同時(shí)暫停接收主副本發(fā)送的數(shù)據(jù)。當(dāng)水位低于低水位后,恢復(fù)正常狀態(tài)。整體流程如下圖所示。
我們對(duì)比了RSS和原生的External Shufle Service(ESS)在Spark3.2.0開啟AQE的性能。RSS采用混部的方式,沒有額外占用任何機(jī)器資源。此外,RSS所使用的內(nèi)存為8g,僅占機(jī)器內(nèi)存的2.3%(機(jī)器內(nèi)存352g)。具體環(huán)境如下。
硬件:成都服務(wù)器托管
header 機(jī)器組 1x ecs.g5.4xlargeworker 機(jī)器組 8x ecs.d2c.24xlarge,96 CPU,352 GB,12x 3700GB HDD。
Spark AQE相關(guān)配置:
spark.sql.adaptive.enabled true spark.sql.adaptive.coalescePartitions.enabled true spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000 spark.sql.adaptive.skewJoin.enabled true spark.sql.adaptive.localShuffleReader.enabled false
RSS相關(guān)配置:
RSS_MASTER_MEMORY=2gRSS_WORKER_MEMORY=1gRSS_WORKER_OFFHEAP_MEMORY=7g
我們測(cè)試了10T的TPCDS,E2E來看,ESS耗時(shí)11734s,RSS單副本/兩副本分別耗時(shí)8971s/10110s,分別比ESS快了23.5%/13.8%,如下圖所示。我們觀察到RSS開啟兩副本時(shí)網(wǎng)絡(luò)帶寬達(dá)到上限,這也是兩副本比單副本低的主要因素。
具體每個(gè)Query的時(shí)間對(duì)比如下:
github地址:https://github.com/alibaba/RemoteShuffleService
[1]阿里云EMR Remote Shuffle Service在小米的實(shí)踐,以及開源. https://developer.aliyun.com/article/857757
[2]Adaptive Query Execution: Speeding Up Spark SQL at Runtime. https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
原文地址:https://mp.weixin.qq.com/s/NoVCmFQA4d1OJqNK47XgOg