本篇文章給大家分享的是有關(guān)如何進(jìn)行spark python編程,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
創(chuàng)新互聯(lián)公司專注于蔡甸企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),商城網(wǎng)站制作。蔡甸網(wǎng)站建設(shè)公司,為蔡甸等地區(qū)提供建站服務(wù)。全流程按需定制,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)
spark應(yīng)用程序結(jié)構(gòu)
Spark應(yīng)用程序可分兩部分:driver部分和executor部分初始化SparkContext和主體程序
A:driver部分
driver部分主要是對(duì)SparkContext進(jìn)行配置、初始化以及關(guān)閉。初始化SparkContext是為了構(gòu)建Spark應(yīng)用程序的運(yùn)行環(huán)境,在初始化SparkContext,要先導(dǎo)入一些Spark的類和隱式轉(zhuǎn)換;在executor部分運(yùn)行完畢后,需要將SparkContext關(guān)閉。
B:executor部分
Spark應(yīng)用程序的executor部分是對(duì)數(shù)據(jù)的處理,數(shù)據(jù)分三種:
原生數(shù)據(jù),包含輸入的數(shù)據(jù)和輸出的數(shù)據(jù)
生成Scala標(biāo)量數(shù)據(jù),如count(返回RDD中元素的個(gè)數(shù))、reduce、fold/aggregate;返回幾個(gè)標(biāo)量,如take(返回前幾個(gè)元素)。
生成Scala集合數(shù)據(jù)集,如collect(把RDD中的所有元素倒入 Scala集合類型)、lookup(查找對(duì)應(yīng)key的所有值)。
生成hadoop數(shù)據(jù)集,如saveAsTextFile、saveAsSequenceFile
scala集合數(shù)據(jù)集,如Array(1,2,3,4,5),Spark使用parallelize方法轉(zhuǎn)換成RDD。
hadoop數(shù)據(jù)集,Spark支持存儲(chǔ)在hadoop上的文件和hadoop支持的其他文件系統(tǒng),如本地文件、HBase、SequenceFile和Hadoop的輸入格式。例如Spark使用txtFile方法可以將本地文件或HDFS文件轉(zhuǎn)換成RDD。
對(duì)于輸入原生數(shù)據(jù),Spark目前提供了兩種:
對(duì)于輸出數(shù)據(jù),Spark除了支持以上兩種數(shù)據(jù),還支持scala標(biāo)量
RDD,Spark進(jìn)行并行運(yùn)算的基本單位,其細(xì)節(jié)參見RDD 細(xì)解。RDD提供了四種算子:
窄依賴算子
寬依賴算子,寬依賴會(huì)涉及shuffle類,在DAG圖解析時(shí)以此為邊界產(chǎn)生Stage,如圖所示。
輸入輸出一對(duì)一的算子,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變,主要是map、flatMap;
輸入輸出一對(duì)一,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生了變化,如union、coalesce;
從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample。
對(duì)單個(gè)RDD基于key進(jìn)行重組和reduce,如groupByKey、reduceByKey;
對(duì)兩個(gè)RDD基于key進(jìn)行join和重組,如join、cogroup。
輸入算子,將原生數(shù)據(jù)轉(zhuǎn)換成RDD,如parallelize、txtFile等
轉(zhuǎn)換算子,最主要的算子,是Spark生成DAG圖的對(duì)象,轉(zhuǎn)換算子并不立即執(zhí)行,在觸發(fā)行動(dòng)算子后再提交給driver處理,生成DAG圖 --> Stage --> Task --> Worker執(zhí)行。按轉(zhuǎn)化算子在DAG圖中作用,可以分成兩種:
緩存算子,對(duì)于要多次使用的RDD,可以緩沖加快運(yùn)行速度,對(duì)重要數(shù)據(jù)可以采用多備份緩存。
行動(dòng)算子,將運(yùn)算結(jié)果RDD轉(zhuǎn)換成原生數(shù)據(jù),如count、reduce、collect、saveAsTextFile等。
共享變量,在Spark運(yùn)行時(shí),一個(gè)函數(shù)傳遞給RDD內(nèi)的patition操作時(shí),該函數(shù)所用到的變量在每個(gè)運(yùn)算節(jié)點(diǎn)上都復(fù)制并維護(hù)了一份,并且各個(gè)節(jié)點(diǎn)之間不會(huì)相互影響。但是在Spark Application中,可能需要共享一些變量,提供Task或驅(qū)動(dòng)程序使用。Spark提供了兩種共享變量:
廣播變量,可以緩存到各個(gè)節(jié)點(diǎn)的共享變量,通常為只讀,使用方法:
>>> from pyspark.context import SparkContext >>> sc = SparkContext('local', 'test') >>> b = sc.broadcast([1, 2, 3, 4, 5]) >>> b.value[1, 2, 3, 4, 5] >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
累計(jì)器,只支持加法操作的變量,可以實(shí)現(xiàn)計(jì)數(shù)器和變量求和。用戶可以調(diào)用SparkContext.accumulator(v)創(chuàng)建一個(gè)初始值為v的累加器,而運(yùn)行在集群上的Task可以使用“+=”操作,但這些任務(wù)卻不能讀取;只有驅(qū)動(dòng)程序才能獲取累加器的值。使用方法:
python編程
實(shí)驗(yàn)項(xiàng)目
sogou日志數(shù)據(jù)分析
實(shí)驗(yàn)數(shù)據(jù)來源:sogou精簡(jiǎn)版數(shù)據(jù)下載地址
數(shù)據(jù)格式說明:
訪問時(shí)間\t用戶ID\t[查詢?cè)~]\t該URL在返回結(jié)果中的排名\t用戶點(diǎn)擊的順序號(hào)\t用戶點(diǎn)擊的URL
其中,用戶ID是根據(jù)用戶使用瀏覽器訪問搜索引擎時(shí)的Cookie信息自動(dòng)賦值,即同一次使用瀏覽器輸入的不同查詢對(duì)應(yīng)同一個(gè)用戶ID。
以上數(shù)據(jù)格式是官方說明,實(shí)際上該數(shù)據(jù)集中排名和順序號(hào)之間不是\t分割,而是空格分割。
一個(gè)session內(nèi)查詢次數(shù)最多的用戶的session與相應(yīng)的查詢次數(shù)
import sys from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: SogouC" exit(-1) sc = SparkContext(appName="SogouC") sgRDD = sc.textFile(sys.argv[1]) print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10) sc.stop()
虛擬集群中任意節(jié)點(diǎn)運(yùn)行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt
運(yùn)行結(jié)果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]
以上就是如何進(jìn)行spark python編程,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。