shuflle write
樂安網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司,樂安網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為樂安上千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\
外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的
樂安做網(wǎng)站的公司定做!
- 上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數(shù)為 2,可以同時運行兩個 task。
- 在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。
shuflle read
- 在什么時候 fetch 數(shù)據(jù)?當(dāng) parent stage 的所有 ShuffleMapTasks 結(jié)束后再 fetch。
- 邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。使用可以 aggregate 的數(shù)據(jù)結(jié)構(gòu),比如 HashMap,每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 record,直接將其放進(jìn) HashMap 里面。如果該 HashMap 已經(jīng)存在相應(yīng)的 Key,那么直接進(jìn)行 aggregate 也就是 func(hashMap.get(Key), Value)
- fetch 來的數(shù)據(jù)存放到哪里?剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。
- 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數(shù)據(jù)位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給MapOutputTrackerMaster。
Shuffle read 中的 HashMap
ashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數(shù)據(jù)結(jié)構(gòu)。Spark 設(shè)計了兩種:一種是全內(nèi)存的 AppendOnlyMap,另一種是內(nèi)存+磁盤的 ExternalAppendOnlyMap。
- 類似 HashMap,但沒有remove(key)方法。其實現(xiàn)原理很簡單,開一個大 Object 數(shù)組,藍(lán)色部分存儲 Key,白色部分存儲 Value。
- 如果 Array 的利用率達(dá)到 70%,那么就擴(kuò)張一倍,并對所有 key 進(jìn)行 rehash 后,重新排列每個 key 的位置。
- ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。
- 如果 AppendOnlyMap 快被裝滿時檢查一下內(nèi)存剩余空間是否可以夠擴(kuò)展,夠就直接在內(nèi)存中擴(kuò)展,不夠就 sort 一下 AppendOnlyMap,將其內(nèi)部所有 records 都 spill 到磁盤上。
- 每次 spill 完在磁盤上生成一個 spilledMap 文件,然后重新 new 出來一個 AppendOnlyMap。
- 最后一個 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經(jīng)被處理完,因為每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進(jìn)行 aggregate,并不是與所有的 records 進(jìn)行 aggregate(一些 records 已經(jīng)被 spill 到磁盤上了)。因此當(dāng)需要 aggregate 的最終結(jié)果時,需要對 AppendOnlyMap 和所有的 spilledMaps 進(jìn)行全局 merge-aggregate。
- 全局 merge-aggregate 的流程:先將 AppendOnlyMap 中的 records 進(jìn)行 sort,形成 sortedMap。
- 然后分別從 sortedMap 和各個 spilledMap 讀出一部分?jǐn)?shù)據(jù)(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key)
- mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個個放入 mergeBuffers 中,放入的時候與已經(jīng)存在于 mergeBuffers 中的 StreamBuffer 進(jìn)行 merge-combine
在Sort Based Shuffle的Shuffle Write階段,map端的任務(wù)會按照Partition id以及key對記錄進(jìn)行排序。同時將全部結(jié)果寫到一個數(shù)據(jù)文件中,同時生成一個索引文件,reduce端的Task可以通過該索引文件獲取相關(guān)的數(shù)據(jù)。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
名稱欄目:spark(四):shuffle-創(chuàng)新互聯(lián)
網(wǎng)站URL:
http://weahome.cn/article/dsshjs.html