在本文中, 我們借由深入剖析wordcount.py, 來(lái)揭開(kāi)Spark內(nèi)部各種概念的面紗。我們?cè)俅位仡檞ordcount.py代碼來(lái)回答如下問(wèn)題
10年積累的成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有平和免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
對(duì)于大多數(shù)語(yǔ)言的Hello Word示例,都有main()函數(shù), wordcount.py的main函數(shù),或者說(shuō)調(diào)用Spark的main() 在哪里
數(shù)據(jù)的讀入,各個(gè)RDD數(shù)據(jù)如何轉(zhuǎn)換
map與flatMap的工作機(jī)制,以及區(qū)別
reduceByKey的作用
WordCount.py 的代碼如下:
from __future__ import print_functionimport sysfrom operator import add# SparkSession:是一個(gè)對(duì)Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調(diào)用Dataset和DataFrame API# SparkSession可用于創(chuàng)建DataFrame,將DataFrame注冊(cè)為表,在表上執(zhí)行SQL,緩存表和讀取parquet文件。from pyspark.sql import SparkSessionif __name__ == "__main__": # Python 常用的簡(jiǎn)單參數(shù)傳入 if len(sys.argv) != 2: print("Usage: wordcount", file=sys.stderr) exit(-1) # appName 為 Spark 應(yīng)用設(shè)定一個(gè)應(yīng)用名,改名會(huì)顯示在 Spark Web UI 上 # 假如SparkSession 已經(jīng)存在就取得已存在的SparkSession,否則創(chuàng)建一個(gè)新的。 spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() # 讀取傳入的文件內(nèi)容,并寫(xiě)入一個(gè)新的RDD實(shí)例lines中,此條語(yǔ)句所做工作有些多,不適合初學(xué)者,可以截成兩條語(yǔ)句以便理解。 # map是一種轉(zhuǎn)換函數(shù),將原來(lái)RDD的每個(gè)數(shù)據(jù)項(xiàng)通過(guò)map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素。原始RDD中的數(shù)據(jù)項(xiàng)與新RDD中的數(shù)據(jù)項(xiàng)是一一對(duì)應(yīng)的關(guān)系。 lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) # flatMap與map類(lèi)似,但每個(gè)元素輸入項(xiàng)都可以被映射到0個(gè)或多個(gè)的輸出項(xiàng),最終將結(jié)果”扁平化“后輸出 counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) # collect() 在驅(qū)動(dòng)程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回。 這在返回足夠小的數(shù)據(jù)子集的過(guò)濾器或其他操作之后通常是有用的。由于collect 是將整個(gè)RDD匯聚到一臺(tái)機(jī)子上,所以通常需要預(yù)估返回?cái)?shù)據(jù)集的大小以免溢出。 output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) spark.stop()
Spark2.0中引入了SparkSession的概念,它為用戶提供了一個(gè)統(tǒng)一的切入點(diǎn)來(lái)使用Spark的各項(xiàng)功能,這邊不妨對(duì)照Http Session, 在此Spark就在充當(dāng)Web service的角色,程序調(diào)用Spark功能的時(shí)候需要先建立一個(gè)Session。因此看到getOrCreate()就很容易理解了, 表明可以視情況新建session或利用已有的session。
spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate()
既然將Spark 想象成一個(gè)Web server, 也就意味著可能用多個(gè)訪問(wèn)在進(jìn)行,為了便于監(jiān)控管理, 對(duì)應(yīng)用命名一個(gè)恰當(dāng)?shù)拿Q(chēng)是個(gè)好辦法。Web UI并不是本文的重點(diǎn),有興趣的同學(xué)可以參考 ?Spark Application’s Web Console
在建立SparkSession之后, 就是讀入數(shù)據(jù)并寫(xiě)入到Dateset中。
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
為了更好的分解執(zhí)行過(guò)程,是時(shí)候借助PySpark了, PySpark是python調(diào)用Spark的 API,它可以啟動(dòng)一個(gè)交互式Python Shell。為了方便腳本調(diào)試,暫時(shí)切換到Linux執(zhí)行
# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) SparkSession available as 'spark'.>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')>>> type(ds)>>> print ds DataFrame[value: string]>>> lines = ds.rdd
交互式Shell的好處是可以方便的查看變量?jī)?nèi)容和類(lèi)型。此刻文件a.txt已經(jīng)加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式數(shù)據(jù)集的實(shí)例。
RDD在內(nèi)存中的結(jié)構(gòu)可以參考論文, 理解RDD有兩點(diǎn)比較重要:
一是RDD一種只讀、只能由已存在的RDD變換而來(lái)的共享內(nèi)存,然后將所有數(shù)據(jù)都加載到內(nèi)存中,方便進(jìn)行多次重用。
二是RDD的數(shù)據(jù)默認(rèn)情況下存放在集群中不同節(jié)點(diǎn)的內(nèi)存中,本身提供了容錯(cuò)性,可以自動(dòng)從節(jié)點(diǎn)失敗中恢復(fù)過(guò)來(lái)。即如果某個(gè)節(jié)點(diǎn)上的RDD partition,因?yàn)楣?jié)點(diǎn)故障,導(dǎo)致數(shù)據(jù)丟了,那么RDD會(huì)自動(dòng)通過(guò)自己的數(shù)據(jù)來(lái)源重新計(jì)算該partition。
為了探究RDD內(nèi)部的數(shù)據(jù)內(nèi)容,可以利用collect()函數(shù), 它能夠以數(shù)組的形式,返回RDD數(shù)據(jù)集的所有元素。
>>> lines = ds.rdd>>> for i in lines.collect():... print i... Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')
lines存儲(chǔ)的是Row object類(lèi)型,而我們希望的是對(duì)String類(lèi)型進(jìn)行處理,所以需要利用map api進(jìn)一步轉(zhuǎn)換RDD
>>> lines_map = lines.map(lambda x: x[0])>>> for i in lines_map.collect():... print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.
為了統(tǒng)計(jì)每個(gè)單詞的出現(xiàn)頻率,需要對(duì)每個(gè)單詞分別統(tǒng)計(jì),那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來(lái),并為每個(gè)詞設(shè)置一個(gè)計(jì)數(shù)器。比如 These出現(xiàn)次數(shù)是1, 我們期望的數(shù)據(jù)結(jié)構(gòu)是['There', 1]。但是如何將包含字符串的RDD轉(zhuǎn)換成元素為類(lèi)似 ['There', 1] 的RDD呢?
>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))>>> rdd_map = flat_map.map(lambda x: [x, 1])>>> for i in rdd_map.collect():... print i... [u'These', 1] [u'examples', 1] [u'give', 1] [u'a', 1] [u'quick', 1]
下圖簡(jiǎn)要的講述了flatMap 和 map的轉(zhuǎn)換過(guò)程。
transfrom.png
不難看出,map api只是為所有出現(xiàn)的單詞初始化了計(jì)數(shù)器為1,并沒(méi)有統(tǒng)計(jì)相同詞,接下來(lái)這個(gè)任務(wù)由reduceByKey()來(lái)完成。在rdd_map 中,所有的詞被視為一個(gè)key,而key相同的value則執(zhí)行reduceByKey內(nèi)的算子操作,因?yàn)榻y(tǒng)計(jì)相同key是累加操作,所以可以直接add操作。
>>> from operator import add>>> add_map = rdd_map.reduceByKey(add)>>> for i in add_map.collect():... print i... (u'a', 1) (u'on', 1) (u'of', 2) (u'arbitrary', 1) (u'quick', 1) (u'the', 2) (u'or', 1)>>> print rdd_map.count()26>>> print add_map.count()23
根據(jù)a.txt 的內(nèi)容,可知只有 of 和 the 兩個(gè)單詞出現(xiàn)了兩次,符合預(yù)期。
以上的分解步驟,可以幫我們理解RDD的操作,需要提示的是,RDD將操作分為兩類(lèi):transformation與action。無(wú)論執(zhí)行了多少次transformation操作,RDD都不會(huì)真正執(zhí)行運(yùn)算,只有當(dāng)action操作被執(zhí)行時(shí),運(yùn)算才會(huì)觸發(fā)。也就是說(shuō),上面所有的RDD都是通過(guò)collect()觸發(fā)的, 那么如果將上述的transformation放入一條簡(jiǎn)練語(yǔ)句中, 則展現(xiàn)為原始wordcount.py的書(shū)寫(xiě)形式。
counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add)
而真正的action 則是由collect()完成。
output = counts.collect()
至此,已經(jīng)完成了對(duì)wordcount.py的深入剖析,但是有意的忽略了一些更底層的執(zhí)行過(guò)程,比如DAG, stage, 以及Driver程序。
作者:或然子
鏈接:https://www.jianshu.com/p/067907b23546
來(lái)源:簡(jiǎn)書(shū)
簡(jiǎn)書(shū)著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請(qǐng)聯(lián)系作者獲得授權(quán)并注明出處。