- 本文首發(fā)于 vivo互聯(lián)網(wǎng)技術(shù) 微信公眾號 https://mp.weixin.qq.com/s/lqMu6lfk-Ny1ZHYruEeBdA
- 作者簡介:鄭志彬,畢業(yè)于華南理工大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)(雙語班)。先后從事過電子商務(wù)、開放平臺、移動(dòng)瀏覽器、推薦廣告和大數(shù)據(jù)、人工智能等相關(guān)開發(fā)和架構(gòu)。目前在vivo智能平臺中心從事 AI中臺建設(shè)以及廣告推薦業(yè)務(wù)。擅長各種業(yè)務(wù)形態(tài)的業(yè)務(wù)架構(gòu)、平臺化以及各種業(yè)務(wù)解決方案。
本文從數(shù)據(jù)傾斜的危害、現(xiàn)象、原因等方面,由淺入深闡述Spark數(shù)據(jù)傾斜及其解決方案。
成都創(chuàng)新互聯(lián)公司科技有限公司專業(yè)互聯(lián)網(wǎng)基礎(chǔ)服務(wù)商,為您提供德陽機(jī)房托管,高防主機(jī),成都IDC機(jī)房托管,成都主機(jī)托管等互聯(lián)網(wǎng)服務(wù)。對 Spark/Hadoop 這樣的分布式大數(shù)據(jù)系統(tǒng)來講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
對于分布式系統(tǒng)而言,理想情況下,隨著系統(tǒng)規(guī)模(節(jié)點(diǎn)數(shù)量)的增加,應(yīng)用整體耗時(shí)線性下降。如果一臺機(jī)器處理一批大量數(shù)據(jù)需要120分鐘,當(dāng)機(jī)器數(shù)量增加到3臺時(shí),理想的耗時(shí)為120 / 3 = 40分鐘。但是,想做到分布式情況下每臺機(jī)器執(zhí)行時(shí)間是單機(jī)時(shí)的1 / N,就必須保證每臺機(jī)器的任務(wù)量相等。不幸的是,很多時(shí)候,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個(gè)別機(jī)器上,其它大部分機(jī)器所分配的任務(wù)量只占總得的小部分。比如一臺機(jī)器負(fù)責(zé)處理 80% 的任務(wù),另外兩臺機(jī)器各處理 10% 的任務(wù)。
『不患多而患不均』,這是分布式環(huán)境下大的問題。意味著計(jì)算能力不是線性擴(kuò)展的,而是存在短板效應(yīng): 一個(gè) Stage 所耗費(fèi)的時(shí)間,是由最慢的那個(gè) Task 決定。
由于同一個(gè) Stage 內(nèi)的所有 task 執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同 task 之間耗時(shí)的差異主要由該 task 所處理的數(shù)據(jù)量決定。所以,要想發(fā)揮分布式系統(tǒng)并行計(jì)算的優(yōu)勢,就必須解決數(shù)據(jù)傾斜問題。
當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過大,未能充分發(fā)揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢?! ?/p>
另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時(shí),部分任務(wù)處理的數(shù)據(jù)量過大,可能造成內(nèi)存不足使得任務(wù)失敗,并進(jìn)而引進(jìn)整個(gè)應(yīng)用失敗?! ?/p>
當(dāng)發(fā)現(xiàn)如下現(xiàn)象時(shí),十有八九是發(fā)生數(shù)據(jù)傾斜了:
絕大多數(shù) task 執(zhí)行得都非常快,但個(gè)別 task 執(zhí)行極慢,整體任務(wù)卡在某個(gè)階段不能結(jié)束。
TIPS
在 Spark streaming 程序中,數(shù)據(jù)傾斜更容易出現(xiàn),特別是在程序中包含一些類似 sql 的 join、group 這種操作的時(shí)候。因?yàn)?Spark Streaming 程序在運(yùn)行的時(shí)候,我們一般不會(huì)分配特別多的內(nèi)存,因此一旦在這個(gè)過程中出現(xiàn)一些數(shù)據(jù)傾斜,就十分容易造成 OOM。
在進(jìn)行 shuffle 的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的 key 拉取到某個(gè)節(jié)點(diǎn)上的一個(gè) task 來進(jìn)行處理,比如按照 key 進(jìn)行聚合或 join 等操作。此時(shí)如果某個(gè) key 對應(yīng)的數(shù)據(jù)量特別大的話,就會(huì)發(fā)生數(shù)據(jù)傾斜。比如大部分 key 對應(yīng)10條數(shù)據(jù),但是個(gè)別 key 卻對應(yīng)了100萬條數(shù)據(jù),那么大部分 task 可能就只會(huì)分配到10條數(shù)據(jù),然后1秒鐘就運(yùn)行完了;但是個(gè)別 task 可能分配到了100萬數(shù)據(jù),要運(yùn)行一兩個(gè)小時(shí)。
因此出現(xiàn)數(shù)據(jù)傾斜的時(shí)候,Spark 作業(yè)看起來會(huì)運(yùn)行得非常緩慢,甚至可能因?yàn)槟硞€(gè) task 處理的數(shù)據(jù)量過大導(dǎo)致內(nèi)存溢出。
通過 Spark Web UI 來查看當(dāng)前運(yùn)行的 stage 各個(gè) task 分配的數(shù)據(jù)量(Shuffle Read Size/Records),從而進(jìn)一步確定是不是 task 分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。
知道數(shù)據(jù)傾斜發(fā)生在哪一個(gè) stage 之后,接著我們就需要根據(jù) stage 劃分原理,推算出來發(fā)生傾斜的那個(gè) stage 對應(yīng)代碼中的哪一部分,這部分代碼中肯定會(huì)有一個(gè) shuffle 類算子??梢酝ㄟ^ countByKey 查看各個(gè) key 的分布。
TIPS
數(shù)據(jù)傾斜只會(huì)發(fā)生在 shuffle 過程中。這里給大家羅列一些常用的并且可能會(huì)觸發(fā) shuffle 操作的算子: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出現(xiàn)數(shù)據(jù)傾斜時(shí),可能就是你的代碼中使用了這些算子中的某一個(gè)所導(dǎo)致的。
也可以通過抽樣統(tǒng)計(jì) key 的出現(xiàn)次數(shù)驗(yàn)證。
由于數(shù)據(jù)量巨大,可以采用抽樣的方式,對數(shù)據(jù)進(jìn)行抽樣,統(tǒng)計(jì)出現(xiàn)的次數(shù),根據(jù)出現(xiàn)次數(shù)大小排序取出前幾個(gè):
df.select("key").sample(false, 0.1) // 數(shù)據(jù)采樣
.(k => (k, 1)).reduceBykey(_ + _) // 統(tǒng)計(jì) key 出現(xiàn)的次數(shù)
.map(k => (k._2, k._1)).sortByKey(false) // 根據(jù) key 出現(xiàn)次數(shù)進(jìn)行排序
.take(10) // 取前 10 個(gè)。
如果發(fā)現(xiàn)多數(shù)數(shù)據(jù)分布都較為平均,而個(gè)別數(shù)據(jù)比其他數(shù)據(jù)大上若干個(gè)數(shù)量級,則說明發(fā)生了數(shù)據(jù)傾斜。
業(yè)務(wù)邏輯: 我們從業(yè)務(wù)邏輯的層面上來優(yōu)化數(shù)據(jù)傾斜,比如要統(tǒng)計(jì)不同城市的訂單情況,那么我們單獨(dú)對這一線城市來做 count,最后和其它城市做整合。
程序?qū)崿F(xiàn): 比如說在 Hive 中,經(jīng)常遇到 count(distinct)操作,這樣會(huì)導(dǎo)致最終只有一個(gè) reduce,我們可以先 group 再在外面包一層 count,就可以了;在 Spark 中使用 reduceByKey 替代 groupByKey 等。
如果導(dǎo)致數(shù)據(jù)傾斜的 key 是異常數(shù)據(jù),那么簡單的過濾掉就可以了。
首先要對 key 進(jìn)行分析,判斷是哪些 key 造成數(shù)據(jù)傾斜。具體方法上面已經(jīng)介紹過了,這里不贅述。
然后對這些 key 對應(yīng)的記錄進(jìn)行分析:
空值或者異常值之類的,大多是這個(gè)原因引起
無效數(shù)據(jù),大量重復(fù)的測試數(shù)據(jù)或是對結(jié)果影響不大的有效數(shù)據(jù)
解決方案
對于第 1,2 種情況,直接對數(shù)據(jù)進(jìn)行過濾即可。
第3種情況則需要特殊的處理,具體我們下面詳細(xì)介紹。
Spark 在做 Shuffle 時(shí),默認(rèn)使用 HashPartitioner(非 Hash Shuffle)對數(shù)據(jù)進(jìn)行分區(qū)。如果并行度設(shè)置的不合適,可能造成大量不相同的 Key 對應(yīng)的數(shù)據(jù)被分配到了同一個(gè) Task 上,造成該 Task 所處理的數(shù)據(jù)遠(yuǎn)大于其它 Task,從而造成數(shù)據(jù)傾斜。
如果調(diào)整 Shuffle 時(shí)的并行度,使得原本被分配到同一 Task 的不同 Key 發(fā)配到不同 Task 上處理,則可降低原 Task 所需處理的數(shù)據(jù)量,從而緩解數(shù)據(jù)傾斜問題造成的短板效應(yīng)。
(1)操作流程
RDD 操作 可在需要 Shuffle 的操作算子上直接設(shè)置并行度或者使用 spark.default.parallelism 設(shè)置。如果是 Spark SQL,還可通過 SET spark.sql.shuffle.partitions=[num_tasks]?設(shè)置并行度。默認(rèn)參數(shù)由不同的 Cluster Manager 控制。
dataFrame 和 sparkSql 可以設(shè)置 spark.sql.shuffle.partitions=[num_tasks]?參數(shù)控制 shuffle 的并發(fā)度,默認(rèn)為200。
(2)適用場景
大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過大。
(3)解決方案
調(diào)整并行度。一般是增大并行度,但有時(shí)如減小并行度也可達(dá)到效果。
(4)優(yōu)勢
實(shí)現(xiàn)簡單,只需要參數(shù)調(diào)優(yōu)??捎米钚〉拇鷥r(jià)解決問題。一般如果出現(xiàn)數(shù)據(jù)傾斜,都可以通過這種方法先試驗(yàn)幾次,如果問題未解決,再嘗試其它方法。
(5)劣勢
適用場景少,只是讓每個(gè) task 執(zhí)行更少的不同的key。無法解決個(gè)別key特別大的情況造成的傾斜,如果某些 key 的大小非常大,即使一個(gè) task 單獨(dú)執(zhí)行它,也會(huì)受到數(shù)據(jù)傾斜的困擾。并且該方法一般只能緩解數(shù)據(jù)傾斜,沒有徹底消除問題。從實(shí)踐經(jīng)驗(yàn)來看,其效果一般。
TIPS 可以把數(shù)據(jù)傾斜類比為 hash 沖突。提高并行度就類似于 提高 hash 表的大小。
(1)原理
使用自定義的 Partitioner(默認(rèn)為 HashPartitioner),將原本被分配到同一個(gè) Task 的不同 Key 分配到不同 Task。
例如,我們在?groupByKey?算子上,使用自定義的 Partitioner:
.groupByKey(new?Partitioner()?{
@Override
public int numPartitions() {
return 12;
}
@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})
TIPS 這個(gè)做法相當(dāng)于自定義 hash 表的 哈希函數(shù)。
(2)適用場景
大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過大。
(3)解決方案
使用自定義的 Partitioner 實(shí)現(xiàn)類代替默認(rèn)的 HashPartitioner,盡量將所有不同的 Key 均勻分配到不同的 Task 中。
(4)優(yōu)勢
不影響原有的并行度設(shè)計(jì)。如果改變并行度,后續(xù) Stage 的并行度也會(huì)默認(rèn)改變,可能會(huì)影響后續(xù) Stage。
(5)劣勢
適用場景有限,只能將不同 Key 分散開,對于同一 Key 對應(yīng)數(shù)據(jù)集非常大的場景不適用。效果與調(diào)整并行度類似,只能緩解數(shù)據(jù)傾斜而不能完全消除數(shù)據(jù)傾斜。而且需要根據(jù)數(shù)據(jù)特點(diǎn)自定義專用的 Partitioner,不夠靈活。
通過 Spark 的 Broadcast 機(jī)制,將 Reduce 端 Join 轉(zhuǎn)化為 Map 端 Join,這意味著 Spark 現(xiàn)在不需要跨節(jié)點(diǎn)做 shuffle 而是直接通過本地文件進(jìn)行 join,從而完全消除 Shuffle 帶來的數(shù)據(jù)傾斜。
from?pyspark.sql.functions?import?broadcast
result = broadcast(A).join(B, ["join_col"], "left")
其中 A 是比較小的 dataframe 并且能夠整個(gè)存放在 executor 內(nèi)存中。
(1)適用場景
參與Join的一邊數(shù)據(jù)集足夠小,可被加載進(jìn) Driver 并通過 Broadcast 方法廣播到各個(gè) Executor 中。
(2)解決方案
在 Java/Scala 代碼中將小數(shù)據(jù)集數(shù)據(jù)拉取到 Driver,然后通過 Broadcast 方案將小數(shù)據(jù)集的數(shù)據(jù)廣播到各 Executor。或者在使用 SQL 前,將 Broadcast 的閾值調(diào)整得足夠大,從而使 Broadcast 生效。進(jìn)而將 Reduce Join 替換為 Map Join。
(3)優(yōu)勢
避免了 Shuffle,徹底消除了數(shù)據(jù)傾斜產(chǎn)生的條件,可極大提升性能。
(4)劣勢
因?yàn)槭窍葘⑿?shù)據(jù)通過 Broadcase 發(fā)送到每個(gè) executor 上,所以需要參與 Join 的一方數(shù)據(jù)集足夠小,并且主要適用于 Join 的場景,不適合聚合的場景,適用條件有限。
NOTES
使用Spark SQL時(shí)需要通過 SET spark.sql.autoBroadcastJoinThreshold=104857600?將 Broadcast 的閾值設(shè)置得足夠大,才會(huì)生效。
思路很簡單,就是將一個(gè) join 拆分成 傾斜數(shù)據(jù)集 Join 和 非傾斜數(shù)據(jù)集 Join,最后進(jìn)行 union:
對包含少數(shù)幾個(gè)數(shù)據(jù)量過大的 key 的那個(gè) RDD (假設(shè)是 leftRDD),通過 sample 算子采樣出一份樣本來,然后統(tǒng)計(jì)一下每個(gè) key 的數(shù)量,計(jì)算出來數(shù)據(jù)量大的是哪幾個(gè) key。具體方法上面已經(jīng)介紹過了,這里不贅述。
然后將這 k 個(gè) key 對應(yīng)的數(shù)據(jù)從 leftRDD 中單獨(dú)過濾出來,并給每個(gè) key 都打上 1~n 以內(nèi)的隨機(jī)數(shù)作為前綴,形成一個(gè)單獨(dú)的 leftSkewRDD;而不會(huì)導(dǎo)致傾斜的大部分 key 形成另外一個(gè) leftUnSkewRDD。
接著將需要 join 的另一個(gè) rightRDD,也過濾出來那幾個(gè)傾斜 key 并通過 flatMap 操作將該數(shù)據(jù)集中每條數(shù)據(jù)均轉(zhuǎn)換為 n 條數(shù)據(jù)(這 n 條數(shù)據(jù)都按順序附加一個(gè) 0~n 的前綴),形成單獨(dú)的 rightSkewRDD;不會(huì)導(dǎo)致傾斜的大部分 key 也形成另外一個(gè) rightUnSkewRDD。
現(xiàn)在將 leftSkewRDD 與 膨脹 n 倍的 rightSkewRDD 進(jìn)行 join,且在 Join 過程中將隨機(jī)前綴去掉,得到傾斜數(shù)據(jù)集的 Join 結(jié)果 skewedJoinRDD。注意到此時(shí)我們已經(jīng)成功將原先相同的 key 打散成 n 份,分散到多個(gè) task 中去進(jìn)行 join 了。
對 leftUnSkewRDD 與 rightUnRDD 進(jìn)行Join,得到 Join 結(jié)果 unskewedJoinRDD。
TIPS
- rightRDD 與傾斜 Key 對應(yīng)的部分?jǐn)?shù)據(jù),需要與隨機(jī)前綴集 (1~n) 作笛卡爾乘積 (即將數(shù)據(jù)量擴(kuò)大 n 倍),從而保證無論數(shù)據(jù)傾斜側(cè)傾斜 Key 如何加前綴,都能與之正常 Join。
- skewRDD 的 join 并行度可以設(shè)置為 n * k (k 為 topSkewkey 的個(gè)數(shù))。
- 由于傾斜Key與非傾斜Key的操作完全獨(dú)立,可并行進(jìn)行。
(1)適用場景
兩張表都比較大,無法使用 Map 端 Join。其中一個(gè) RDD 有少數(shù)幾個(gè) Key 的數(shù)據(jù)量過大,另外一個(gè) RDD 的 Key 分布較為均勻。
(2)解決方案
將有數(shù)據(jù)傾斜的 RDD 中傾斜 Key 對應(yīng)的數(shù)據(jù)集單獨(dú)抽取出來加上隨機(jī)前綴,另外一個(gè) RDD 每條數(shù)據(jù)分別與隨機(jī)前綴結(jié)合形成新的RDD(相當(dāng)于將其數(shù)據(jù)增到到原來的N倍,N即為隨機(jī)前綴的總個(gè)數(shù)),然后將二者Join并去掉前綴。然后將不包含傾斜Key的剩余數(shù)據(jù)進(jìn)行Join。最后將兩次Join的結(jié)果集通過union合并,即可得到全部Join結(jié)果。
(3)優(yōu)勢
相對于 Map 則 Join,更能適應(yīng)大數(shù)據(jù)集的 Join。如果資源充足,傾斜部分?jǐn)?shù)據(jù)集與非傾斜部分?jǐn)?shù)據(jù)集可并行進(jìn)行,效率提升明顯。且只針對傾斜部分的數(shù)據(jù)做數(shù)據(jù)擴(kuò)展,增加的資源消耗有限。
(4)劣勢
如果傾斜 Key 非常多,則另一側(cè)數(shù)據(jù)膨脹非常大,此方案不適用。而且此時(shí)對傾斜 Key 與非傾斜 Key 分開處理,需要掃描數(shù)據(jù)集兩遍,增加了開銷。
如果出現(xiàn)數(shù)據(jù)傾斜的 Key 比較多,上一種方法將這些大量的傾斜 Key 分拆出來,意義不大。此時(shí)更適合直接對存在數(shù)據(jù)傾斜的數(shù)據(jù)集全部加上隨機(jī)前綴,然后對另外一個(gè)不存在嚴(yán)重?cái)?shù)據(jù)傾斜的數(shù)據(jù)集整體與隨機(jī)前綴集作笛卡爾乘積(即將數(shù)據(jù)量擴(kuò)大N倍)。
其實(shí)就是上一個(gè)方法的特例或者簡化。少了拆分,也就沒有 union。
(1)適用場景
一個(gè)數(shù)據(jù)集存在的傾斜 Key 比較多,另外一個(gè)數(shù)據(jù)集數(shù)據(jù)分布比較均勻。
(2)優(yōu)勢
對大部分場景都適用,效果不錯(cuò)。
(3)劣勢
需要將一個(gè)數(shù)據(jù)集整體擴(kuò)大 N 倍,會(huì)增加資源消耗。
在 map 端加個(gè) combiner 函數(shù)進(jìn)行局部聚合。加上 combiner 相當(dāng)于提前進(jìn)行 reduce ,就會(huì)把一個(gè) mapper 中的相同 key 進(jìn)行聚合,減少 shuffle 過程中數(shù)據(jù)量 以及 reduce 端的計(jì)算量。這種方法可以有效的緩解數(shù)據(jù)傾斜問題,但是如果導(dǎo)致數(shù)據(jù)傾斜的 key 大量分布在不同的 mapper 的時(shí)候,這種方法就不是很有效了。
TIPS 使用 reduceByKey 而不是 groupByKey。
這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合,先給每個(gè) key 都打上一個(gè) 1~n 的隨機(jī)數(shù),比如 3 以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的 key 就變成不一樣的了,比如?(hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1),就會(huì)變成?(1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)。接著對打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行 reduceByKey 等聚合操作,進(jìn)行局部聚合,那么局部聚合結(jié)果,就會(huì)變成了?(1_hello, 2) (2_hello, 2) (3_hello, 1)。然后將各個(gè) key 的前綴給去掉,就會(huì)變成?(hello, 2) (hello, 2) (hello, 1),再次進(jìn)行全局聚合操作,就可以得到最終結(jié)果了,比如?(hello, 5)。
def antiSkew(): RDD[(String, Int)] = {
val SPLIT = "-"
val prefix = new Random().nextInt(10)
pairs.map(t => ( prefix + SPLIT + t._1, 1))
.reduceByKey((v1, v2) => v1 + v2)
.map(t => (t._1.split(SPLIT)(1), t2._2))
.reduceByKey((v1, v2) => v1 + v2)
}
不過進(jìn)行兩次 mapreduce,性能稍微比一次的差些。
Hadoop 中直接貼近用戶使用的是 Mapreduce 程序和 Hive 程序,雖說 Hive 最后也是用 MR 來執(zhí)行(至少目前 Hive 內(nèi)存計(jì)算并不普及),但是畢竟寫的內(nèi)容邏輯區(qū)別很大,一個(gè)是程序,一個(gè)是Sql,因此這里稍作區(qū)分。
Hadoop 中的數(shù)據(jù)傾斜主要表現(xiàn)在 ruduce 階段卡在99.99%,一直99.99%不能結(jié)束。
這里如果詳細(xì)的看日志或者和監(jiān)控界面的話會(huì)發(fā)現(xiàn):
有一個(gè)多幾個(gè) reduce 卡住
各種 container報(bào)錯(cuò) OOM
讀寫的數(shù)據(jù)量極大,至少遠(yuǎn)遠(yuǎn)超過其它正常的 reduce
經(jīng)驗(yàn):?Hive的數(shù)據(jù)傾斜,一般都發(fā)生在 Sql 中 Group 和 On 上,而且和數(shù)據(jù)邏輯綁定比較深。
優(yōu)化方法
這里列出來一些方法和思路,具體的參數(shù)和用法在官網(wǎng)看就行了。
map join 方式
count distinct 的操作,先轉(zhuǎn)成 group,再 count
參數(shù)調(diào)優(yōu)
set hive.map.aggr=true
set hive.groupby.skewindata=true
left semi jion 的使用
說明
hive.map.aggr=true: 在map中會(huì)做部分聚集操作,效率更高但需要更多的內(nèi)存。
hive.groupby.skewindata=true: 數(shù)據(jù)傾斜時(shí)負(fù)載均衡,當(dāng)選項(xiàng)設(shè)定為true,生成的查詢計(jì)劃會(huì)有兩個(gè)MRJob。第一個(gè)MRJob 中,Map的輸出結(jié)果集合會(huì)隨機(jī)分布到Reduce中,每個(gè)Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的GroupBy Key有可能被分發(fā)到不同的Reduce中,從而達(dá)到負(fù)載均衡的目的;第二個(gè)MRJob再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照GroupBy Key分布到Reduce中(這個(gè)過程可以保證相同的GroupBy Key被分布到同一個(gè)Reduce中),最后完成最終的聚合操作。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。