[TOC]
創(chuàng)新互聯(lián)建站專(zhuān)注為客戶(hù)提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)、石城網(wǎng)絡(luò)推廣、微信平臺(tái)小程序開(kāi)發(fā)、石城網(wǎng)絡(luò)營(yíng)銷(xiāo)、石城企業(yè)策劃、石城品牌公關(guān)、搜索引擎seo、人物專(zhuān)訪(fǎng)、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)建站為所有大學(xué)生創(chuàng)業(yè)者提供石城建站搭建服務(wù),24小時(shí)服務(wù)熱線(xiàn):028-86922220,官方網(wǎng)址:www.cdcxhl.com
Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開(kāi)源,2013年6月成為Apache孵化項(xiàng)目,2014年2月成為Apache頂級(jí)項(xiàng)目。目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項(xiàng)目,Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架。Spark基于內(nèi)存計(jì)算,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶(hù)將Spark部署在大量廉價(jià)硬件之上,形成集群。Spark得到了眾多大數(shù)據(jù)公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優(yōu)酷土豆。當(dāng)前百度的Spark已應(yīng)用于鳳巢、大搜索、直達(dá)號(hào)、百度大數(shù)據(jù)等業(yè)務(wù);阿里利用GraphX構(gòu)建了大規(guī)模的圖計(jì)算和圖挖掘系統(tǒng),實(shí)現(xiàn)了很多生產(chǎn)系統(tǒng)的推薦算法;騰訊Spark集群達(dá)到8000臺(tái)的規(guī)模,是當(dāng)前已知的世界上最大的Spark集群。
? 在這里必須對(duì)比MapReduce,MapReduce最大的性能短板就在于shuffle過(guò)程中,會(huì)將中間結(jié)果輸出到磁盤(pán)上(也就是hdfs上),這個(gè)過(guò)程中至少會(huì)產(chǎn)生6次的IO。也正是這些頻繁的IO使得mr的性能不盡人意。
? 對(duì)于spark來(lái)說(shuō),中間結(jié)果是都在內(nèi)存中的(checkpoint另說(shuō)),就從這點(diǎn)來(lái)說(shuō),就少了很多IO導(dǎo)致的性能問(wèn)題。當(dāng)然這只是其中一點(diǎn),后面會(huì)細(xì)說(shuō)
與Hadoop的MapReduce相比,Spark基于內(nèi)存的運(yùn)算速度要快100倍以上,即使,Spark基于硬盤(pán)的運(yùn)算也要快10倍。Spark實(shí)現(xiàn)了高效的DAG執(zhí)行引擎,從而可以通過(guò)內(nèi)存來(lái)高效處理數(shù)據(jù)流。
Spark支持Java、Python和Scala的API,還支持超過(guò)80種高級(jí)算法,使用戶(hù)可以快速構(gòu)建不同的應(yīng)用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來(lái)驗(yàn)證解決問(wèn)題的方法。
Spark提供了統(tǒng)一的解決方案。Spark可以用于批處理、交互式查詢(xún)(Spark SQL)、實(shí)時(shí)流處理(Spark Streaming)、機(jī)器學(xué)習(xí)(Spark MLlib)和圖計(jì)算(GraphX)。這些不同類(lèi)型的處理都可以在同一個(gè)應(yīng)用中無(wú)縫使用。Spark統(tǒng)一的解決方案非常具有吸引力,畢竟任何公司都想用統(tǒng)一的平臺(tái)去處理遇到的問(wèn)題,減少開(kāi)發(fā)和維護(hù)的人力成本和部署平臺(tái)的物力成本。
另外Spark還可以很好的融入Hadoop的體系結(jié)構(gòu)中可以直接操作HDFS,并提供Hive on Spark、Pig on Spark的框架集成Hadoop。
Spark可以非常方便地與其他的開(kāi)源產(chǎn)品進(jìn)行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調(diào)度器,并且可以處理所有Hadoop支持的數(shù)據(jù),包括HDFS、HBase和Cassandra等。這對(duì)于已經(jīng)部署Hadoop集群的用戶(hù)特別重要,因?yàn)椴恍枰鋈魏螖?shù)據(jù)遷移就可以使用Spark的強(qiáng)大處理能力。Spark也可以不依賴(lài)于第三方的資源管理和調(diào)度器,它實(shí)現(xiàn)了Standalone作為其內(nèi)置的資源管理和調(diào)度框架,這樣進(jìn)一步降低了Spark的使用門(mén)檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。
Spark生態(tài)圈:
Spark Core :最重要,其中最重要的是 RDD (彈性分布式數(shù)據(jù)集)
Spark SQL :類(lèi)似于Hive 使用SQL語(yǔ)句操作RDD DataFrame(表)
Spark Streaming : 流式計(jì)算
前面三個(gè)用到比較多,后面這兩個(gè)看需求吧
Spark MLLib :Spark機(jī)器學(xué)習(xí)類(lèi)庫(kù)
Spark GraphX : 圖計(jì)算
圖2.1 spark架構(gòu)
spark大致有幾個(gè)大組件,分別為:driver、master(cluster manager)、worker。
圖2.2 spark工作任務(wù)圖
上面這圖說(shuō)明了每個(gè)組件的功能。
spark可以部署在以上幾種環(huán)境之上:
Standalone:spark內(nèi)置的資源管理器
YARN:hadoop的資源管理器
Mesos
Amazon EC2
使用scala版本為scala2.11.8,spark版本為spark-2.1.0-bin-hadoop2.7。
jdk版本1.8,hadoop版本2.8.4
解壓好spark程序之后,進(jìn)入解壓目錄下。修改配置文件:
cd conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
添加以下內(nèi)容:
export JAVA_HOME=/opt/modules/jdk1.8.0_144
# 指定master節(jié)點(diǎn)主機(jī)名以及端口
export SPARK_MASTER_HOST=bigdata121 這里自己按實(shí)際的ip改,為master節(jié)點(diǎn)
export SPARK_MASTER_PORT=7077
cp slaves.template slaves
vim slaves
# 配置從節(jié)點(diǎn)主機(jī)名,指定worker節(jié)點(diǎn)主機(jī)
bigdata121
配置完成后,啟動(dòng)集群:
cd sbin
./start-all.sh
jps 查看是否有master和worker進(jìn)程
20564 JobHistoryServer
127108 Jps
51927 Worker
41368 ResourceManager
11130 SecondaryNameNode
10875 NameNode
41467 NodeManager
51868 Master
10973 DataNode
基本和偽分布式是一樣的,也就是 conf/slaves文件中配置多幾個(gè)worker節(jié)點(diǎn)而已,然后照樣啟動(dòng)集群就OK了。
搭建完成了可以進(jìn)入 http://masterIP:8080 查看集群狀態(tài)
? 在spark中,master節(jié)點(diǎn)作為整個(gè)集群的管理者,是單點(diǎn)的,容易發(fā)生單點(diǎn)故障,所以為了保障master節(jié)點(diǎn)的可用性,需要給它實(shí)現(xiàn)HA
? 主要用于開(kāi)發(fā)或測(cè)試環(huán)境。spark提供目錄保存spark Application和worker的注冊(cè)信息,并將他們的恢復(fù)狀態(tài)信息寫(xiě)入該目錄中,這時(shí),一旦Master發(fā)生故障,就可以通過(guò)重新啟動(dòng)Master進(jìn)程(sbin/start-master.sh),恢復(fù)已運(yùn)行的spark Application和worker的注冊(cè)信息。
? 基于文件系統(tǒng)的單點(diǎn)恢復(fù),主要是在spark-env.sh里SPARK_DAEMON_JAVA_OPTS設(shè)置以下內(nèi)容:
指定兩個(gè)運(yùn)行參數(shù):
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
其中:
spark.deploy.recoveryMode=FILESYSTEM 設(shè)置為FILESYSTEM開(kāi)啟單點(diǎn)恢復(fù)功能,默認(rèn)值:NONE
spark.deploy.recoveryDirectory Spark 保存恢復(fù)狀態(tài)的目錄
要注意的是,這種方式本質(zhì)上還是只有一個(gè)master節(jié)點(diǎn),只不過(guò)是重啟master節(jié)點(diǎn)時(shí)可以自動(dòng)還原worker以及application信息,防止master掛了之后,所有任務(wù)都丟失執(zhí)行狀態(tài),然后master重啟之后需要重新從頭到尾執(zhí)行之前的任務(wù)。
? ZooKeeper提供了一個(gè)Leader Election機(jī)制,利用這個(gè)機(jī)制可以保證雖然集群存在多個(gè)Master,但是只有一個(gè)是Active的,其他的都是Standby。當(dāng)Active的Master出現(xiàn)故障時(shí),另外的一個(gè)Standby Master會(huì)被選舉出來(lái)。由于集群的信息,包括Worker, Driver和Application的信息都已經(jīng)持久化到ZooKeeper,因此在切換的過(guò)程中只會(huì)影響新Job的提交,對(duì)于正在進(jìn)行的Job沒(méi)有任何的影響。
? 這里分別用兩臺(tái)主機(jī)配置master節(jié)點(diǎn),而worker節(jié)點(diǎn)仍然是單節(jié)點(diǎn)(為了方便起見(jiàn)而已)。首先需保證zookeeper服務(wù)的正常運(yùn)行。這里不重復(fù)講,可以看之前zookeeper的文章。這里直接講spark 的配置。
修改spark-env.sh配置文件
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata121:2181,bigdata123:2181,bigdata122:2181 -Dspark.deploy.zookeeper.dir=/spark"
其中:
spark.deploy.recoveryMode=ZOOKEEPER 設(shè)置為ZOOKEEPER開(kāi)啟單點(diǎn)恢復(fù)功能,默認(rèn)值:NONE
spark.deploy.zookeeper.url ZooKeeper集群的地址
spark.deploy.zookeeper.dir Spark信息在ZK中的保存目錄,默認(rèn):/spark
另外:每個(gè)節(jié)點(diǎn)上,需要將以下兩行注釋掉
# export SPARK_MASTER_HOST=bigdata121
# export SPARK_MASTER_PORT=7077
會(huì)自動(dòng)選出哪個(gè)是active的master節(jié)點(diǎn)地址以及端口,不需要指定。
以上配置需要保證在整個(gè)spark集群的所有master和worker節(jié)點(diǎn)所在主機(jī)的配置一樣。
配置完成后,啟動(dòng)集群
隨便在一臺(tái)master節(jié)點(diǎn)上啟動(dòng)整個(gè)集群:
sbin/start.all.sh
接著再另外一個(gè)master節(jié)點(diǎn)單獨(dú)啟動(dòng)master:
sbin/start-master.sh
啟動(dòng)完成后,可以到兩個(gè)master的管理頁(yè)面上看對(duì)應(yīng)的狀態(tài):
http://masterip1:8080
http://masterip2:8080
如果正常工作的話(huà),一般是一個(gè)顯示active,一個(gè)顯示standby
接著我們看看zookeeper上存儲(chǔ)什么信息:
會(huì)在zk上創(chuàng)建 /spark節(jié)點(diǎn),有如下兩個(gè)目錄:
master_status 這個(gè)節(jié)點(diǎn)下有以worker名創(chuàng)建的子節(jié)點(diǎn),也就是worker信息
leader_election 主備master節(jié)點(diǎn)所在主機(jī)的心跳信息,都是臨時(shí)節(jié)點(diǎn)。如果失去心跳,那么就會(huì)對(duì)應(yīng)的節(jié)點(diǎn)消失
如:
這個(gè)看節(jié)點(diǎn)名字就知道了,是worker的信息節(jié)點(diǎn)
[zk: localhost:2181(CONNECTED) 0] ls /spark/master_status
[worker_worker-20190822120853-192.168.50.121-59531]
這個(gè)則是兩個(gè)master節(jié)點(diǎn)的的狀態(tài)節(jié)點(diǎn),如果沒(méi)有心跳就消失了
[zk: localhost:2181(CONNECTED) 1] ls /spark/leader_election
[_c_dcc9ec86-80f9-4212-a5db-d1ec259add80-latch-0000000003, _c_fa42411d-0aa0-4da8-bbe9-483c198ee1f9-latch-0000000004]
spark提供了一些實(shí)例程序,
[root@bigdata121 spark-2.1.0-bin-hadoop2.7]# ls examples/jars/
scopt_2.11-3.3.0.jar spark-examples_2.11-2.1.0.jar
spark提供了兩個(gè)工具用于提交執(zhí)行spark任務(wù),分別是spark-shell和spark-submit
一般用在生產(chǎn)環(huán)境中用于提交任務(wù)到集群中執(zhí)行
例子:蒙特卡羅求PI
spark-submit --master spark://bigdata121:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
--master 指定master地址
--class 運(yùn)行的類(lèi)的全類(lèi)名
Jar包位置
運(yùn)行的類(lèi)的main函數(shù)需要輸入的參數(shù)(其實(shí)就是main函數(shù)中的args參數(shù))
如果有多個(gè)額外的jar包,這樣寫(xiě):
submit --master xxx --jars jar1 jar2..... --class 全類(lèi)名 包含運(yùn)行類(lèi)的jar包 參數(shù)
--jar 用于指定除了運(yùn)行類(lèi)之外的jar包地址,比如依賴(lài)的各種jar包
如果需要指定一些driver,比如MySQL-connecter,需要加一個(gè)選項(xiàng)
--driver-class-path xxxx
或者方便一點(diǎn)的話(huà)就直接加到spark的jars目錄下
一般在生產(chǎn)環(huán)境中,在IDE中編寫(xiě)完spark程序后,會(huì)打包成jar包,然后上傳到集群中。通過(guò)上面的spark-submit命令,將任務(wù)提交至集群中執(zhí)行
spark-shell是Spark自帶的交互式Shell程序,方便用戶(hù)進(jìn)行交互式編程,用戶(hù)可以在該命令行下用scala編寫(xiě)spark程序。一般用于測(cè)試
有兩種運(yùn)行模式:
(1)本地模式:不需要連接到Spark集群,在本地直接運(yùn)行,用于測(cè)試
啟動(dòng):bin/spark-shell 后面不寫(xiě)任何參數(shù),代表本地模式
Spark context available as 'sc' (master = local[*], app id = local-1553936033811).
local代表本地模式
local[*] 表示cpu核數(shù)
(2)集群模式
命令:bin/spark-shell --master spark://.....
Spark context available as 'sc' (master = spark://node3:7077, app id = app-20190614091350-0000).
特殊說(shuō)明:
Spark session 保存為 spark: Spark2.0以后提供的,利用session可以訪(fǎng)問(wèn)所有spark組件(core sql..)
spark context 保存為 sc,是任務(wù)的上下文對(duì)象。
spark sc 兩個(gè)對(duì)象,可以直接使用
例子:在Spark Shell中編寫(xiě)WordCount程序
程序如下:
sc.textFile("hdfs://bigdata121:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://bigdata121:9000/output/spark/wc")
注意:hdfs://bigdata121:9000/data/data.txt上的文件需自行先上傳到你的hdfs集群上,而且需保證輸出目錄不存在
說(shuō)明:
sc是SparkContext對(duì)象,該對(duì)象時(shí)提交spark程序的入口
textFile("hdfs://bigdata121/data/data.txt")是hdfs中讀取數(shù)據(jù)
flatMap(_.split(" "))先map在壓平
map((_,1))將單詞和1構(gòu)成元組
reduceByKey(_+_)按照key進(jìn)行reduce,并將value累加
saveAsTextFile("hdfs://bigdata121:9000/output/spark/wc")將結(jié)果寫(xiě)入到hdfs中
首先需要idea配置好scala開(kāi)發(fā)環(huán)境。
到插件中心安裝scala插件。
創(chuàng)建maven工程,然后add framework support添加scala支持
到project structure添加scala源碼文件夾
最后右鍵就可以看到可以創(chuàng)建scala class 的選項(xiàng)了。
注意:本地得安裝scala以及jdk
配置好scala環(huán)境后,需要添加spark對(duì)應(yīng)的maven依賴(lài),添加依賴(lài)到pom.xml中:
4.0.0
king
sparkTest
1.0-SNAPSHOT
UTF-8
2.1.0
2.11.8
2.7.3
org.apache.spark
spark-core_2.11
2.1.0
org.apache.spark
spark-sql_2.11
2.1.0
org.apache.spark
spark-hive_2.11
2.1.0
provided
org.apache.spark
spark-streaming_2.11
2.1.0
provided
org.apache.spark
spark-mllib_2.11
2.1.0
runtime
org.apache.hadoop
hadoop-client
${hadoop.version}
org.apache.spark
spark-streaming-kafka_2.11
1.6.3
mysql
mysql-connector-java
8.0.12
junit
junit
4.12
org.apache.hive
hive-jdbc
1.2.1
org.scala-tools
maven-scala-plugin
2.15.2
compile
testCompile
maven-compiler-plugin
3.6.0
1.8
org.apache.maven.plugins
maven-surefire-plugin
2.19
true
記住上面的關(guān)于build的配置千萬(wàn)不要漏掉。這里說(shuō)說(shuō)我之前的遇到的小坑。
小坑:
我用maven打包jar之后,到Linux上運(yùn)行時(shí),發(fā)現(xiàn)報(bào)錯(cuò),說(shuō)在jar包里找不到指定的主類(lèi)。重新打包好幾次都不行。接著我就到idea中將打包的jar添加為工程依賴(lài),然后到j(luò)ar包里看看有啥東西,結(jié)果發(fā)現(xiàn)我寫(xiě)的代碼并沒(méi)有打包到里面去。但是java的可以打包進(jìn)去,我就猜測(cè)是maven直接忽略了scala代碼,到網(wǎng)上一搜,需要加上上面的build配置,配置好就可以打包了。
wordcount實(shí)例代碼:
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//創(chuàng)建spark配置文件對(duì)象.設(shè)置app名稱(chēng),master地址,local表示為本地模式。
//如果是提交到集群中,通常不指定。因?yàn)榭赡茉诙鄠€(gè)集群匯上跑,寫(xiě)死不方便
val conf = new SparkConf().setAppName("wordCount").setMaster("local")
//創(chuàng)建spark context對(duì)象
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile(args(1))
}
}