真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯網站制作重慶分公司

spark(四):shuffle

shuflle write

spark(四):shuffle

成都創(chuàng)新互聯公司專注于陽高網站建設服務及定制,我們擁有豐富的企業(yè)做網站經驗。 熱誠為您提供陽高營銷型網站建設,陽高網站制作、陽高網頁設計、陽高網站官網定制、小程序開發(fā)服務,打造陽高網絡公司原創(chuàng)品牌,更為您提供陽高網站排名全網營銷落地服務。

  1. 上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數為 2,可以同時運行兩個 task。
  2. 在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。

shuflle read

  1. 在什么時候 fetch 數據?當 parent stage 的所有 ShuffleMapTasks 結束后再 fetch。
  2. 邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。使用可以 aggregate 的數據結構,比如 HashMap,每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 record,直接將其放進 HashMap 里面。如果該 HashMap 已經存在相應的 Key,那么直接進行 aggregate 也就是 func(hashMap.get(Key), Value)
  3. fetch 來的數據存放到哪里?剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區(qū),經過處理后的數據放在內存 + 磁盤上。
  4. 怎么獲得要 fetch 的數據的存放位置?reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數據位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給MapOutputTrackerMaster。

Shuffle read 中的 HashMap

ashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數據結構。Spark 設計了兩種:一種是全內存的 AppendOnlyMap,另一種是內存+磁盤的 ExternalAppendOnlyMap。
spark(四):shuffle

  1. 類似 HashMap,但沒有remove(key)方法。其實現原理很簡單,開一個大 Object 數組,藍色部分存儲 Key,白色部分存儲 Value。
  2. 如果 Array 的利用率達到 70%,那么就擴張一倍,并對所有 key 進行 rehash 后,重新排列每個 key 的位置。
    spark(四):shuffle
  3. ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。
  4. 如果 AppendOnlyMap 快被裝滿時檢查一下內存剩余空間是否可以夠擴展,夠就直接在內存中擴展,不夠就 sort 一下 AppendOnlyMap,將其內部所有 records 都 spill 到磁盤上。
  5. 每次 spill 完在磁盤上生成一個 spilledMap 文件,然后重新 new 出來一個 AppendOnlyMap。
  6. 最后一個 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經被處理完,因為每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進行 aggregate,并不是與所有的 records 進行 aggregate(一些 records 已經被 spill 到磁盤上了)。因此當需要 aggregate 的最終結果時,需要對 AppendOnlyMap 和所有的 spilledMaps 進行全局 merge-aggregate。
  7. 全局 merge-aggregate 的流程:先將 AppendOnlyMap 中的 records 進行 sort,形成 sortedMap。
  8. 然后分別從 sortedMap 和各個 spilledMap 讀出一部分數據(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key)
  9. mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個個放入 mergeBuffers 中,放入的時候與已經存在于 mergeBuffers 中的 StreamBuffer 進行 merge-combine

在Sort Based Shuffle的Shuffle Write階段,map端的任務會按照Partition id以及key對記錄進行排序。同時將全部結果寫到一個數據文件中,同時生成一個索引文件,reduce端的Task可以通過該索引文件獲取相關的數據。


名稱欄目:spark(四):shuffle
文章路徑:http://weahome.cn/article/ijsehi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部