基本的 RDD 轉(zhuǎn)化操作
常山ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書(shū)未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)的ssl證書(shū)銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書(shū)合作)期待與您的合作!
map()
? ? 語(yǔ)法:RDD.map(
? ? 轉(zhuǎn)化操作 map() 是所有轉(zhuǎn)化操作中最基本的。它將一個(gè)具名函數(shù)或匿名函數(shù)對(duì)數(shù)據(jù)集內(nèi)的所有元素進(jìn)行求值。map() 函數(shù)可以異步執(zhí)行,也不會(huì)嘗試與別的 map() 操作通信或同步。也就是說(shuō),這是無(wú)共享的操作。
? ? 參數(shù) preserversPatitioning 是可選的,為 Boolean 類型的參數(shù),用于定義了區(qū)分規(guī)則的 RDD,它們有定義好的鍵,并按照鍵的哈希值或范圍進(jìn)行了分組。如果這個(gè)參數(shù)被設(shè)為 True,這些分區(qū)會(huì)保存完整。這個(gè)參數(shù)可以被 Spark 調(diào)度器用于優(yōu)化后續(xù)操作,比如,基于分區(qū)的鍵進(jìn)行的連接操作。
? ? 轉(zhuǎn)化操作 map() 對(duì)輸入的每條記錄計(jì)算同一個(gè)函數(shù),并生成轉(zhuǎn)化后的輸出記錄。
# map()
map_rdd=sc.textFile('file:///usr/local/spark/README.md')
print(map_rdd.take(5))
map_rdd_new=map_rdd.map(lambda x:x.split(' '))
print(map_rdd_new.take(5))
# 輸出
['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']
[['#', 'Apache', 'Spark'], [''], ['Spark', 'is', 'a', 'fast', 'and', 'general', 'cluster', 'computing', 'system', 'for', 'Big', 'Data.', 'It', 'provides'], ['high-level', 'APIs', 'in', 'Scala,', 'Java,', 'Python,', 'and', 'R,', 'and', 'an', 'optimized', 'engine', 'that'], ['supports', 'general', 'computation', 'graphs', 'for', 'data', 'analysis.', 'It', 'also', 'supports', 'a']]
? ? 在這個(gè)例子中,split 函數(shù)接收一個(gè)字符串,生成一個(gè)列表,輸入數(shù)據(jù)中的每個(gè)字符串元素都被映射為輸出數(shù)據(jù)中的一個(gè)列表元素。產(chǎn)生的結(jié)果為一個(gè)列表的列表。
? ? 2.flatMap()
? ? 語(yǔ)法::RDD.flatMap(
? ? 轉(zhuǎn)化操作 flatMap() 和轉(zhuǎn)化操作 map() 類似,都將函數(shù)作用于輸入數(shù)據(jù)集的每條記錄。但是,flatMap() 還會(huì)“拍平”輸出數(shù)據(jù),這表示它會(huì)去掉一層嵌套。比如,給定一個(gè)包含字符串列表的列表,“拍平”操作會(huì)產(chǎn)生一個(gè)由字符串組成的列表,也就是“拍平”了所有嵌套的列表。
# flatMap()
flat_map_rdd=sc.textFile('file:///usr/local/spark/README.md')
print(flat_map_rdd.take(5))
map_rdd_new=flat_map_rdd.flatMap(lambda x:x.split(' '))
print(map_rdd_new.take(5))
# 輸出
['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']
['#', 'Apache', 'Spark', '', 'Spark']
? ? 在這個(gè)例子中,flatMap() 使用的匿名函數(shù)和 map() 操作所使用的相同。注意,每個(gè)字符串并沒(méi)有產(chǎn)生一個(gè)對(duì)應(yīng)的列表對(duì)象,所有的元素拍平到一個(gè)列表中。換句話說(shuō),這個(gè)例子里的 flatMap() 產(chǎn)生了一個(gè)組合的列表作為輸出,而不是 map() 中那個(gè)列表的列表。
? ? 3.filter()
? ? 語(yǔ)法:RDD.filter(
? ? 轉(zhuǎn)化操作 filter 講一個(gè) Boolean 類型的表達(dá)式對(duì)數(shù)據(jù)集里的每個(gè)元素進(jìn)行求值,這個(gè)表達(dá)式通常用匿名函數(shù)來(lái)表示。返回的布爾值決定了該記錄是否被包含在產(chǎn)生的輸出 RDD 里。這是一種常用的轉(zhuǎn)化操作,用于從 RDD 中移除不需要的記錄作為中間結(jié)果,或者移除不需要放在最終輸出里的記錄。
# filter()
licenses = sc.textFile('file:///usr/local/spark/README.md')
words = licenses.flatMap(lambda x:x.spilt(' '))
print(words.take(5))
lowercase = words.map(lambda x:x.lower())
print(lowercase.take(5))
longwords = lowercase.filter(lambda x:len(x) > 12)
print(longwords.take(5))
# 輸出
['#', 'Apache', 'Spark', '', 'Spark']
['#', 'apache', 'spark', '', 'spark']
['
? ? 4.distinct()
? ? 語(yǔ)法:RDD.distinct(numPartitions=None)
? ? 轉(zhuǎn)化操作 distinct() 返回一個(gè)新的 RDD,其中僅包含輸入 RDD 中去重后的元素。它可以用來(lái)去除重復(fù)的值。參數(shù) numPartitions 可以把數(shù)據(jù)重新分區(qū)為給定的分區(qū)數(shù)量。如果沒(méi)有提供這個(gè)參數(shù)或是使用了默認(rèn)值,那么轉(zhuǎn)化操作 distinct() 返回的分區(qū)數(shù)和輸入的 RDD 的 分區(qū)數(shù)保持一致。
# distinct()
licenses = sc.textFile('file:///usr/local/spark/README.md')
words = licenses.flatMap(lambda x : x.split(' '))
lowercase = words.map(lambda x : x.lower())
allwords = lowercase.count()
diswords = lowercase.distinct().count()
print ("Total words : {} ,Distinct words: {}".format(allwords,diswords))
# 輸出
Total words : 579 ,Distinct words: 276
? ? 5.groupBy()
? ? 語(yǔ)法:RDD.groupBy(
? ? 轉(zhuǎn)化操作 groupBy() 返回一個(gè)按指定函數(shù)對(duì)元素進(jìn)行分組的 RDD。參數(shù)
# groupBy()
licenses = sc.textFile('file:///usr/local/spark/README.md')
words = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x:len(x) > 0)
groupbyfirstletter = words.groupBy(lambda??x: x[0].lower)
print(groupbyfirstletter.take(1))
# 輸出
[(
? ? 6.sortBy()
? ? 語(yǔ)法:RDD.sortBy(
? ? 轉(zhuǎn)化操作 sortBy() 將 RDD 按照
# sortBy()
readme = sc.textFile('file:///usr/local/spark/README.md')
words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)
sortbyfirstletter = words.sortBy(lambda x:x[0].lower(),ascending=False)
print(sortbyfirstletter.take(5))
# 輸出
['You', 'you', 'You', 'you', 'you']
基本的 RDD 行動(dòng)操作
? ? Spark 中的行動(dòng)操作要么返回值,比如 count();要么返回?cái)?shù)據(jù),比如 collect();要么保存數(shù)據(jù)到外部,比如 saveAsTextFile()。在所有情況中,行動(dòng)操作都會(huì)對(duì) RDD 及其所有父 RDD 強(qiáng)制進(jìn)行計(jì)算。一些行動(dòng)操作返回計(jì)數(shù),或是數(shù)據(jù)的聚合值,或是 RDD 中全部或部分?jǐn)?shù)據(jù)。與這些不同的是,行動(dòng)操作 foreach() 會(huì)對(duì) RDD 中的每個(gè)元素執(zhí)行一個(gè)函數(shù)。
? ? 1.count()
? ? 語(yǔ)法:RDD.count()
? ? 行動(dòng)操作 count() 不接收參數(shù),返回一個(gè) long 類型的值,代表 RDD 中元素的個(gè)數(shù)。
# count()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.count())
# 輸出
22997
?? ?注意,對(duì)于不接收參數(shù)的行動(dòng)操作,需要在行動(dòng)操作名帶上空的括號(hào) ()。
? ? 2.collect()
? ? 語(yǔ)法:RDD.collect()
? ? 行動(dòng)操作 collect() 向 Spark 驅(qū)動(dòng)器進(jìn)程返回一個(gè)由 RDD 中所有元素組成的列表。collect() 沒(méi)有限制輸出,可能導(dǎo)致輸出量相當(dāng)大。一般只用在小規(guī)模 RDD 或開(kāi)發(fā)中。
# collect()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.collect())
# 輸出
['', '
? ? 3.take()
? ? 語(yǔ)法:RDD.take(n)
? ? 行動(dòng)操作 take() 返回 RDD 的前 n 個(gè)元素。選取的元素沒(méi)有特定的順序。事實(shí)上,行動(dòng)操作 take() 返回的元素是不確定的,這意味著再次運(yùn)行同一個(gè)行動(dòng)操作時(shí),返回的元素可能會(huì)不同,尤其是在完全分布式的環(huán)境中。
? ? 對(duì)于橫跨超過(guò)一個(gè)分區(qū)的 RDD,take() 會(huì)掃描一個(gè)分區(qū),并使用該分區(qū)的結(jié)果來(lái)預(yù)估還需掃描多少分區(qū)才能滿足獲取所要求數(shù)量的全部的值。
# take()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.take(5))
# 輸出
['', '
? ? 4.top()
? ? 語(yǔ)法:RDD.top(n,key=None)
? ? 行動(dòng)操作 top() 返回一個(gè) RDD 中的前 n 個(gè)元素,但是和 take() 不同的是,如果使用 top(),元素會(huì)排序并按照降序輸出。參數(shù) key 指定了按照什么對(duì)結(jié)果進(jìn)行排序以返回前 n 個(gè)元素。如果沒(méi)有提供,會(huì)使用根據(jù) RDD 的元素所推斷出來(lái)的鍵。
# top()
licenses = sc.textFile('file:///usr/local/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
print(words.distinct().top(5))
# 輸出
['·', '?', '}', 'your', 'you.']
? ? 5.first()
? ? 語(yǔ)法:RDD.first()
? ? 行動(dòng)操作 first() 返回 RDD 的第一個(gè)元素。first() 不考慮元素的順序,是一個(gè)非確定性的操作,尤其是在完全分布式的環(huán)境中。
# first()
readme = sc.textFile('file:///usr/local/spark/README.md')
words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)
print(words.distinct().first())
print(words.distinct().take(1))
# 輸出
#
['#']
? ? first() 和 take(1) 最主要的區(qū)別在于 first() 返回一個(gè)原子的數(shù)據(jù)元素,而 take() (即使 n=1)返回的是由數(shù)據(jù)元素組成的列表。
? ? 6.reduce() 和 fold()
? ? 行動(dòng)操作 reduce() 和 fold() 是執(zhí)行聚合的行動(dòng)操作,它們都執(zhí)行滿足交換律或結(jié)合律的操作,比如對(duì) RDD 里的一系列值求和。這里的交換律和結(jié)合律表示操作與執(zhí)行的順序無(wú)關(guān)。這是分布式處理所要求的,因?yàn)樵诜植际教幚碇?,順序無(wú)法保證。
? ? 語(yǔ)法:RDD.reduce(
? ? ? ? ? ? ? RDD.fold(zeroValue,
? ? 行動(dòng)操作 reduce() 使用指定的滿足交換律或結(jié)合律的運(yùn)算符來(lái)歸約 RDD 中的所有元素。參數(shù)
? ? 行動(dòng)操作 fold() 使用給定的 function 和 zeroValue 把 RDD 中每個(gè)分區(qū)的元素聚合,然后把每個(gè)分區(qū)的聚合結(jié)果再聚合。盡管 reduce() 和 fold() 的功能相似,但還是有區(qū)別的,fold() 不滿足交換律,因此需要給定第一個(gè)值和最后一個(gè)值(zeroValue)。
# reduce(),fold()
numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])
print(numbers.reduce(lambda x,y: x+y)) #輸出:45
print(numbers.fold(0,lambda x,y: x+y)) #輸出:45
empty = sc.parallelize([])
# print(empty.reduce(lambda x,y: x+y)) #輸出:ValueError: Can not reduce() empty RDD
print(empty.getNumPartitions()) #查看rdd分區(qū)數(shù),輸出為 2
print(empty.fold(1,lambda x,y: x+y)) #輸出:3
?? ?fold中zeroValue除了在每個(gè)分區(qū)計(jì)算中作為初始值使用之后,在最后的reduce操作仍然需要使用一次,所以fold()在zeroValue不為0是計(jì)算結(jié)果為reduce()+(分區(qū)數(shù)+1)*zeroValue(以加法為例),參考鏈接:Pyspark學(xué)習(xí)筆記,fold()官網(wǎng)源碼
? ??
?? ?7.foreach()
? ? 語(yǔ)法:RDD.foreach(
? ? 行動(dòng)操作 foreach() 把 <> 參數(shù)指定的具名或匿名函數(shù)應(yīng)用到 RDD 中的所有元素上。因?yàn)?foreach() 是行動(dòng)操作而不是轉(zhuǎn)化操作,可以使用在轉(zhuǎn)化操作中無(wú)法使用或不該使用的函數(shù)。
# foreach()
def printfunc(x):
????print(x)
licenses = sc.textFile('file:///usr/local/spark/licenses')
longwords = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x: len(x) > 12)
longwords.foreach(lambda x: printfunc(x))
# 輸出
...
href="#exhibit-a">Exhibit
id="section-1.10.1">1.10.1.
...
鍵值對(duì) RDD 的轉(zhuǎn)化操作
? ? 鍵值對(duì) RDD,也就是 PairRDD,它的記錄由鍵和值組成。鍵可以是整型或者字符串對(duì)象,也可以是元組這樣的復(fù)雜對(duì)象。而值可以是標(biāo)量值,也可以是列表、元組、字典或集合等數(shù)據(jù)結(jié)構(gòu)。這是在讀時(shí)系統(tǒng)和 NOSQL 系統(tǒng)上進(jìn)行各種結(jié)構(gòu)化數(shù)據(jù)分析時(shí)常用的數(shù)據(jù)表示形式。PairRDD 及其成員函數(shù)大致被分為如下四類:
字典函數(shù)
函數(shù)式轉(zhuǎn)化操作
分組操作、聚合操作與排序操作
連接操作
? ? 字典函數(shù)返回鍵值對(duì) RDD 的鍵的集合或值的集合,比如 keys() 和 values()。
? ? 1.keys()
? ? 語(yǔ)法:RDD.keys()
? ? keys() 函數(shù)返回鍵值對(duì) RDD 中所有鍵組成的 RDD,或者說(shuō)是由鍵值對(duì) RDD 中每個(gè)二元組的第一個(gè)元素組成的 RDD。
# keys()
kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])
print(kvpairs.keys().collect())
# 輸出
['city', 'state', 'zip', 'country']
? ? 2.values()
? ? 語(yǔ)法:RDD.values()
? ? values() 函數(shù)返回鍵值對(duì) RDD 中所有值組成的 RDD,或者說(shuō)是由鍵值對(duì) RDD 中每個(gè)二元組的第二個(gè)元素組成的 RDD。
# values()
kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])
print(kvpairs.values().collect())
# 輸出
['Beijing', 'SHIGEZHUANG', '000000', 'China']
? ? 3.keyBy()
? ? 語(yǔ)法:RDD.keyBy(
? ? 轉(zhuǎn)化操作 keyBy() 創(chuàng)建出由從 RDD 中的元素里提取的鍵與值組成的元組,其中
# keyBy()
locations = sc.parallelize([('city','Beijing',1),('state','SHIGEZHUANG',2),('zip','000000',3),('country','China',4)])
bylocno = locations.keyBy(lambda x: x[2])
print(bylocno.collect())
# 輸出
[(1, ('city', 'Beijing', 1)), (2, ('state', 'SHIGEZHUANG', 2)), (3, ('zip', '000000', 3)), (4, ('country', 'China', 4))]
? ? 4.mapValues()
? ? 語(yǔ)法:RDD.mapValues(
? ? 轉(zhuǎn)化操作 mapValues() 把鍵值對(duì) RDD 的每個(gè)值都傳給一個(gè)函數(shù)(通過(guò)
? ? 5.flatMapValues()
? ? 語(yǔ)法:RDD.flatMapValues(
? ? 轉(zhuǎn)化操作?flatMapValues() 把鍵值對(duì) RDD 的每個(gè)值都傳給一個(gè)函數(shù)處理,而鍵保持不變,并生成拍平的列表。對(duì)于每個(gè)輸入元素,返回 0 個(gè)乃至多個(gè)輸出元素。使用 flatMapValues() 是會(huì)保留原 RDD 的分區(qū)情況。
# mapValues(),flatMapValues()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
print('kvpairs: ',kvpairs.take(4))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
print('locwtemplist: ',locwtemplist.take(3))
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
print('locwtemps: ',locwtemps.take(4))
# 輸出
kvpairs:??[['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]
locwtemplist:??[('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]
locwtemps:??[('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72)]
? ? 6.groupByKey()
? ? 語(yǔ)法:RDD.groupByKey(numPartitions=None,partitionFunc=
? ? 轉(zhuǎn)化操作 groupByKey() 將鍵值對(duì)RDD 按各個(gè)鍵對(duì)值進(jìn)行分組,把同組的值整合成一個(gè)序列。參數(shù) numPartitions 指定要?jiǎng)?chuàng)建多少個(gè)分區(qū)(也就是多少個(gè)分組)。分區(qū)使用 partitionFunc 參數(shù)的值創(chuàng)建,默認(rèn)值為 Spark 內(nèi)置的哈希分區(qū)函數(shù)。如果 numPartitions 為默認(rèn)值 None,就使用系統(tǒng)默認(rèn)的分區(qū)數(shù)( spark.default.parallelism )。
# groupByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
print('kvpairs: ',kvpairs.collect())
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
print('locwtemplist: ',locwtemplist.collect())
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
print('locwtemps: ',locwtemps.collect())
grouped = locwtemps.groupByKey()
print('grouped: ',grouped.collect())
avgtemps = grouped.mapValues(lambda x: sum(x)/len(x))
print('avgtemps: ',avgtemps.collect())
# 輸出
kvpairs:??[['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]
locwtemplist:??[('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]
locwtemps:??[('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]
grouped:??[('Beijing',
avgtemps:??[('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]
? ? 注意 groupByKey() 返回的分組后的值是一個(gè)?resultiterable 對(duì)象。Python 中的 iterable 對(duì)象是可以循環(huán)遍歷的序列對(duì)象。Python 中的許多函數(shù)接受可迭代對(duì)象作為輸入,比如 sum() 和 len() 函數(shù)。
? ? 7.reduceByKey()
? ? 語(yǔ)法:RDD.reduceByKey(
? ? 轉(zhuǎn)化操作 reduceByKey() 使用滿足結(jié)合律的函數(shù)合并鍵對(duì)應(yīng)的值。調(diào)用鍵值對(duì)數(shù)據(jù)集的 reduceByKey() 方法,返回的是鍵值對(duì)的數(shù)據(jù)集,其數(shù)據(jù)按照鍵聚合了對(duì)應(yīng)的值。參數(shù) numPartitions 和? partitionFunc 與使用 groupByKey() 函數(shù)時(shí)的用法一模一樣。numPartitions 的值還影響 saveAsTextFile() 或是其他產(chǎn)生文件的行動(dòng)操作所產(chǎn)生的文件數(shù)量。例如,numPartitions = 2 會(huì)把 RDD 保存在硬盤(pán)時(shí)共生成兩個(gè)輸出文件。
# reduceByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
temptups = locwtemps.mapValues(lambda x: (x,1))
print('temptups: ',temptups.collect())
inputstoavg = temptups.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
print('inputstoavg: ',inputstoavg.collect())
averages = inputstoavg.map(lambda x:(x[0],x[1][0]/x[1][1]))
print('averages: ',averages.collect())
# 輸出
temptups:??[('Beijing', (71, 1)), ('Beijing', (72, 1)), ('Beijing', (73, 1)), ('Beijing', (72, 1)), ('Beijing', (70, 1)), ('Shanghai', (46, 1)), ('Shanghai', (42, 1)), ('Shanghai', (40, 1)), ('Shanghai', (37, 1)), ('Shanghai', (39, 1)), ('Tianjin', (50, 1)), ('Tianjin', (48, 1)), ('Tianjin', (51, 1)), ('Tianjin', (43, 1)), ('Tianjin', (44, 1))]
inputstoavg:??[('Beijing', (358, 5)), ('Shanghai', (204, 5)), ('Tianjin', (236, 5))]
averages:??[('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]
? ? 求平均值不是滿足結(jié)合律的操作,可以通過(guò)創(chuàng)建元組來(lái)繞過(guò)去,元組中包含每個(gè)鍵對(duì)應(yīng)的值和與每個(gè)鍵對(duì)應(yīng)的計(jì)數(shù),這兩個(gè)都滿足交換律和結(jié)合律,然后在最后一步計(jì)算平均值。
? ? 注意 reduceByKey() 比較高效,是因?yàn)樗诿總€(gè)執(zhí)行器本地對(duì)值進(jìn)行了先行組合,然后把組合后的列表發(fā)送到遠(yuǎn)程的執(zhí)行器來(lái)執(zhí)行最后的階段。這是一個(gè)會(huì)產(chǎn)生數(shù)據(jù)混洗的操作。
? ? 以求和函數(shù)為例,可以當(dāng)作是累加一個(gè)由和組成的列表,而不是對(duì)單個(gè)值組成的更大的列表進(jìn)行求和。因?yàn)樵跀?shù)據(jù)混洗時(shí)發(fā)送的數(shù)據(jù)更少,使用 reduceByKey() 進(jìn)行求和一般要比使用 groupByKey() 并指定 sum() 函數(shù)的性能更好。
? ? 8.foldByKey()
? ? 語(yǔ)法:RDD.foldByKey(zeroValue,
? ? 轉(zhuǎn)化操作 foldByKey() 在功能上和行動(dòng)操作 fold() 類似,但是 foldByKey() 是轉(zhuǎn)化操作,操作預(yù)先定義的鍵值對(duì)元素。foldByKey() 和 fold() 都提供了相同數(shù)據(jù)類型的 zeroValue 參數(shù)供 RDD 為空時(shí)使用。參數(shù) numPartitions 和? partitionFunc 與轉(zhuǎn)化操作 groupByKey() 和 reduceByKey() 中的作用一樣。
# foldByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
maxbycity = locwtemps.foldByKey(0,lambda x,y: x if x > y else y)
print('maxbycity: ',maxbycity.collect())
# 輸出
maxbycity:??[('Beijing', 73), ('Shanghai', 46), ('Tianjin', 51)]
?? ? 9.sortByKey()
? ? 語(yǔ)法:RDD.sortByKey(ascending=True,numPartitions=None,keyfunc=
? ? 轉(zhuǎn)化操作 sortByKey() 把鍵值對(duì) RDD 根據(jù)鍵進(jìn)行排序。排序依據(jù)取決于鍵對(duì)象的類型。該操作與 sort() 的區(qū)別之處在于 sort() 要求指定排序依據(jù)的鍵,而 sortByKey() 的鍵是鍵值對(duì) RDD 里定義的。
? ? 鍵按照 ascending 參數(shù)提供的順序進(jìn)行排序,該參數(shù)默認(rèn)值為 True,表示升序。參數(shù) numPartitions 指定了輸出多少分區(qū),分區(qū)函數(shù)為范圍分區(qū)函數(shù)。參數(shù) keyfunc 是一個(gè)可選參數(shù),可以通過(guò)對(duì)原鍵使用另一個(gè)函數(shù)而修改原鍵。
# sortByKey()
locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])
kvpairs = locwtemps.map(lambda x: x.split(','))
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))
sortedbykey = locwtemps.sortByKey()
print('sortedbykey: ',sortedbykey.collect())
sortedbyval = locwtemps.map(lambda x: (x[1],x[0])).sortByKey(ascending=False)
print('sortedbyval: ',sortedbyval.collect())
# 輸出
sortedbykey:??[('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]
sortedbyval:??[(73, 'Beijing'), (72, 'Beijing'), (72, 'Beijing'), (71, 'Beijing'), (70, 'Beijing'), (51, 'Tianjin'), (50, 'Tianjin'), (48, 'Tianjin'), (46, 'Shanghai'), (44, 'Tianjin'), (43, 'Tianjin'), (42, 'Shanghai'), (40, 'Shanghai'), (39, 'Shanghai'), (37, 'Shanghai')]
連接操作
? ? 1.join()
? ? 語(yǔ)法:RDD.join(
? ? 轉(zhuǎn)化操作 join() 是內(nèi)連接的一個(gè)實(shí)現(xiàn),根據(jù)鍵來(lái)匹配兩個(gè)鍵值對(duì) RDD??蛇x參數(shù) numPartitions 決定生成的數(shù)據(jù)集要?jiǎng)?chuàng)建多少分區(qū)。如果不指明這個(gè)參數(shù),缺省值為 spark.default.parallelism 配置參數(shù)對(duì)應(yīng)的值。返回的 RDD 是一個(gè)列表,其結(jié)構(gòu)包含匹配鍵,以及一個(gè)二元組。這個(gè)二元組包含來(lái)自兩個(gè) RDD 的一組匹配記錄。
# 連接操作
stores = sc.parallelize([(100,'Beijing'),(101,'Shanghai'),(102,'Tianjin'),(103,'Taiyuan')])
salespeople = sc.parallelize([(1,'Tom',100),(2,'Karen',100),(3,'Paul',101),(4,'Jimmy',102),(5,'Jack',None)])
# join()
print(salespeople.keyBy(lambda x: x[2]).join(stores).collect())
# 輸出
[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]
? ? 2.leftOuterJoin()
? ? 語(yǔ)法:RDD.leftOuterJoin(
? ? 轉(zhuǎn)化操作 leftOuterJoin() 返回第一個(gè) RDD 中包含的所有記錄或元素。如果第一個(gè) RDD(左 RDD)中的鍵在右 RDD 中存在,那么右 RDD? 中匹配的記錄會(huì)和左 RDD 的記錄一起返回。否則,右 RDD 的記錄為空。
# leftOuterJoin()
leftjoin = salespeople.keyBy(lambda x: x[2]).leftOuterJoin(stores)
print("leftjoin: ",leftjoin.collect())
print(leftjoin.filter(lambda x: x[1][1] is None).map(lambda x: "salesperson " + x[1][0][1] + " has no store").collect())
# 輸出
leftjoin:??[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]
['salesperson Jack has no store']
? ? 3.rightOuterJoin()
? ? 語(yǔ)法:RDD.rightOuterJoin(
? ? 轉(zhuǎn)化操作 rightOuterJoin() 返回第二個(gè) RDD 中包含的所有元素或記錄。如果第二個(gè) RDD(右 RDD)中的鍵在左 RDD 中存在,則左 RDD 中匹配的記錄會(huì)和右 RDD 中的記錄一起返回。否則,左 RDD 的記錄為 None(空)。
# rightOuterJoin()
print(
salespeople.keyBy(lambda x: x[2])\
????.rightOuterJoin(stores)\
????.filter(lambda x: x[1][0] is None)\
????.map(lambda x: x[1][1] + " store has no salespeople")\
????.collect()
)
# 輸出
['Taiyuan store has no salespeople']
? ? 4.fullOuterJoin()
? ? 語(yǔ)法:RDD.fullOuterJoin(
? ? fullOuterJoin() 無(wú)論是否有匹配的鍵,都會(huì)返回兩個(gè) RDD 中的所有元素。左數(shù)據(jù)集或右數(shù)據(jù)集中沒(méi)有匹配的元素都用 None(空)來(lái)表示。
# fullOuterJoin
print(
????salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).filter(lambda x: x[1][0] is None or x[1][1] is None).collect()
)
# 輸出
[(None, ((5, 'Jack', None), None)), (103, (None, 'Taiyuan'))]
print(
????salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).collect()
)
# 輸出
[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin')), (103, (None, 'Taiyuan'))]
? ? 5.cogroup()
? ? 語(yǔ)法:RDD.cogroup(
? ? 轉(zhuǎn)化操作 cogroup() 將多個(gè)鍵值對(duì)數(shù)據(jù)集按鍵進(jìn)行分組。在概念上和 fullOuterJoin() 有些類似,但在實(shí)現(xiàn)上有以下關(guān)鍵區(qū)別:
轉(zhuǎn)化操作 cogroup() 返回可迭代對(duì)象,類似 groupByKey() 函數(shù)。
轉(zhuǎn)化操作 cogroup() 將兩個(gè) RDD 中的多個(gè)元素進(jìn)行分組,而 fullOuterJoin() 則對(duì)同一個(gè)鍵創(chuàng)建出多個(gè)分開(kāi)的輸出元素。
轉(zhuǎn)化操作 cogroup() 可以通過(guò) Scala API 或者函數(shù)別名 groupWith() 對(duì)三個(gè)以上的 RDD 進(jìn)行分組。
? ? 對(duì) A、B 兩個(gè) RDD 按照鍵 K 進(jìn)行 cogroup() 操作生成的 RDD 輸出具有下面的結(jié)構(gòu):
[K, Iterable(K,VA,...), Iterable(K,VB,...) ]
? ? 如果一個(gè) RDD 中沒(méi)有另一個(gè) RDD 中包含的給定鍵的值,相應(yīng)的可迭代對(duì)象則為空。
# cogroup()
print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).take(1))
print('----------------')
print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).mapValues(lambda x: [item for sublist in x for item in sublist]).collect())
# 輸出
[(100, (
----------------
[(100, [(1, 'Tom', 100), (2, 'Karen', 100), 'Beijing']), (None, [(5, 'Jack', None)]), (101, [(3, 'Paul', 101), 'Shanghai']), (102, [(4, 'Jimmy', 102), 'Tianjin']), (103, ['Taiyuan'])]
? ? 6.cartesian()
? ? 語(yǔ)法:RDD.cartesian(
? ? 轉(zhuǎn)化操作?cartesian() 即笛卡爾集,有時(shí)也被稱為交叉連接,會(huì)根據(jù)兩個(gè) RDD 的記錄生成所有可能的組合。該操作生成的記錄條數(shù)等于第一個(gè) RDD 的記錄條數(shù)乘以第二個(gè) RDD 的記錄條數(shù)。
# cartesian()
print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).collect())
print('----------------')
print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).count())
# 輸出
[((100, (1, 'Tom', 100)), (100, 'Beijing')), ((100, (1, 'Tom', 100)), (101, 'Shanghai')), ((100, (2, 'Karen', 100)), (100, 'Beijing')), ((100, (2, 'Karen', 100)), (101, 'Shanghai')), ((100, (1, 'Tom', 100)), (102, 'Tianjin')), ((100, (1, 'Tom', 100)), (103, 'Taiyuan')), ((100, (2, 'Karen', 100)), (102, 'Tianjin')), ((100, (2, 'Karen', 100)), (103, 'Taiyuan')), ((101, (3, 'Paul', 101)), (100, 'Beijing')), ((101, (3, 'Paul', 101)), (101, 'Shanghai')), ((102, (4, 'Jimmy', 102)), (100, 'Beijing')), ((102, (4, 'Jimmy', 102)), (101, 'Shanghai')), ((None, (5, 'Jack', None)), (100, 'Beijing')), ((None, (5, 'Jack', None)), (101, 'Shanghai')), ((101, (3, 'Paul', 101)), (102, 'Tianjin')), ((101, (3, 'Paul', 101)), (103, 'Taiyuan')), ((102, (4, 'Jimmy', 102)), (102, 'Tianjin')), ((102, (4, 'Jimmy', 102)), (103, 'Taiyuan')), ((None, (5, 'Jack', None)), (102, 'Tianjin')), ((None, (5, 'Jack', None)), (103, 'Taiyuan'))]
----------------
20
集合操作
? ? 1.union()
? ? 語(yǔ)法:RDD.union(
? ? 轉(zhuǎn)化操作 union() 將另一個(gè) RDD 追加到 RDD 的后面,組合成一個(gè)輸出 RDD。兩個(gè) RDD 不一定要有相同的結(jié)構(gòu)。如果兩個(gè)輸入 RDD 有相同的記錄,轉(zhuǎn)化操作 union() 不會(huì)從輸出 RDD 中過(guò)濾這些重復(fù)的數(shù)據(jù)。
>>> fibonacci = sc.parallelize([0,1,2,3,5,8])
>>> odds = sc.parallelize([1,3,5,7,9])
>>> odds.union(fibonacci).collect()
[1, 3, 5, 7, 9, 0, 1, 2, 3, 5, 8]
? ? 2.intersection()
? ? 語(yǔ)法:RDD.intersection(
? ? 轉(zhuǎn)化操作?intersection() 返回兩個(gè) RDD 中共有的元素。也就是該操作會(huì)返回兩個(gè)集合中共有的元素。返回的元素或者記錄必須在兩個(gè)集合中是一模一樣的,需要記錄的數(shù)據(jù)結(jié)構(gòu)和每個(gè)字段都對(duì)的上。
>>> fibonacci = sc.parallelize([0,1,2,3,5,8])
>>> odds = sc.parallelize([1,3,5,7,9])
>>> odds.intersection(fibonacci).collect()
[1, 3, 5]
? ? 3.subtract()
? ? 語(yǔ)法:RDD.subtract(
? ? 轉(zhuǎn)化操作 subtract() 會(huì)返回第一個(gè) RDD 中所有沒(méi)有出現(xiàn)在第二個(gè) RDD 中的元素。這是數(shù)學(xué)上的集合減法的一個(gè)實(shí)現(xiàn)。
>>> fibonacci = sc.parallelize([0,1,2,3,5,8])
>>> odds = sc.parallelize([1,3,5,7,9])
>>> odds.subtract(fibonacci).collect()
[7, 9]
? ? 4.subtractByKey()
? ? 語(yǔ)法:RDD.subtractByKey(
? ? 轉(zhuǎn)化操作 subtractByKey() 是一個(gè)和 subtract 類似的集合操作。subtractByKey() 操作返回一個(gè)鍵值對(duì) RDD 中所有在另一個(gè)鍵值對(duì) RDD 中沒(méi)有對(duì)應(yīng)鍵的元素。參數(shù) numPartitions 可以指定生成的結(jié)果 RDD 包含多少個(gè)分區(qū),缺省值為配置項(xiàng) spark.default.parallelism 的值。
>>> cities1 = sc.parallelize([('Hayward',(37.668819,-122.080795)),
... ('Baumholder',(49.6489,7.3975)),
... ('Alexandria',(38.820450,-77.050552)),
... ('Melbourne',(37.663712,144.844788))])
>>> cities2 = sc.parallelize([('Boulder Creek',(64.0708333,-148.2236111)),
... ('Hayward',(37.668819,-122.080795)),
... ('Alexandria',(38.820450,-77.050552)),
... ('Arlington',(38.878337,-77.100703))])
>>> cities1.subtractByKey(cities2).collect()
[('Melbourne', (37.663712, 144.844788)), ('Baumholder', (49.6489, 7.3975))]
>>> cities2.subtractByKey(cities1).collect()
[('Boulder Creek', (64.0708333, -148.2236111)), ('Arlington', (38.878337, -77.100703))]
數(shù)值型 RDD 的操作
?? ?數(shù)值型 RDD 僅由數(shù)值組成,常用于統(tǒng)計(jì)分析。
>>> numbers = sc.parallelize([0,1,0,1,2,3,4,5,6,7,8,9])
>>> numbers.min()? #最小值
0
>>> numbers.max()? #最大值
9
>>> numbers.mean() #算術(shù)平均數(shù)
3.8333333333333335
>>> numbers.sum()? #求和
46
>>> numbers.stdev() #標(biāo)準(zhǔn)差
3.0230595245361753
>>> numbers.variance() #方差
9.138888888888888
>>> numbers.stats() #返回 StatCounter 對(duì)象,一次調(diào)用獲得一個(gè)包括 count()、mean()、stdev()、max()、min() 的結(jié)構(gòu)
(count: 12, mean: 3.83333333333, stdev: 3.02305952454, max: 9.0, min: 0.0)