大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps,SparkSubmit?
成都創(chuàng)新互聯(lián)公司是一家以重慶網(wǎng)站建設(shè)公司、網(wǎng)頁設(shè)計(jì)、品牌設(shè)計(jì)、軟件運(yùn)維、營(yíng)銷推廣、小程序App開發(fā)等移動(dòng)開發(fā)為一體互聯(lián)網(wǎng)公司。已累計(jì)為戶外休閑椅等眾行業(yè)中小客戶提供優(yōu)質(zhì)的互聯(lián)網(wǎng)建站和軟件開發(fā)服務(wù)。
類啟動(dòng)后的服務(wù)進(jìn)程,用于提交任務(wù),
哪一段啟動(dòng)提交任務(wù),哪一段啟動(dòng)submit(Driver端)
?
提交任務(wù)流程
1.Driver端提交任務(wù)到Master(啟動(dòng)sparkSubmit進(jìn)程)
2.Master生成任務(wù)信息,放入對(duì)列中
3.Master通知Worker啟動(dòng)Executor,(Master過濾出存活的Worker,將任務(wù)分配給空閑資源多的worker)
4.worker的Executor向Driver端注冊(cè)(只有executor真正參與計(jì)算) -> worker從Dirver端拿信息
5.Driver端啟動(dòng)Executor將任務(wù)劃分階段,分成小的task,再?gòu)V播給相應(yīng)的Worker讓他去執(zhí)行
6.worker會(huì)將執(zhí)行完的任務(wù)回傳給Driver
?
?
range 相當(dāng)于集合子類
scala> 1.to(10) res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, ?9, 10) ? scala> 1 to 10 res1: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, ?9, 10) |
?
提交任務(wù)到集群的任務(wù)類?:
Spark context?available as?sc
SQL context available as sqlContext
直接調(diào)用:
spark WordCount
構(gòu)建模板代碼:
SparkConf:構(gòu)建配置信息類,該配置優(yōu)先于集群配置文件
setAppName:指定應(yīng)用程序名稱,如果不指定,會(huì)自動(dòng)生成一個(gè)類似于uuid產(chǎn)生的名稱
setMaster:指定運(yùn)行模式:local-用1個(gè)線程模擬集群運(yùn)行,
local[2]: 用2個(gè)線程模擬集群運(yùn)行,loca[*]-當(dāng)前有多少空閑到的線程就用多少線程來運(yùn)行該任務(wù)
/** ??* 用spark實(shí)現(xiàn)單詞計(jì)數(shù) ??*/ object SparkWordCount { ??def main(args: Array[String]): Unit = { ????/** ??????* 構(gòu)建模板代碼 ??????*/ ????val conf: SparkConf = new SparkConf() ??????.setAppName("SparkWordCount") // ?????.setMaster("local[2]") ? ????// 創(chuàng)建提交任務(wù)到集群的入口類(上下文對(duì)象) ????val sc: SparkContext = new SparkContext(conf) ? ????// 獲取HDFS的數(shù)據(jù) ????val lines: RDD[String] = sc.textFile(args(0)) ? ????// 切分?jǐn)?shù)據(jù),生成一個(gè)個(gè)單詞 ????val words: RDD[String] = lines.flatMap(_.split(" ")) ? ????// 把單詞生成一個(gè)個(gè)元組 ????val tuples: RDD[(String, Int)] = words.map((_, 1)) ? ????// 進(jìn)行聚合操作 // ???tuples.reduceByKey((x, y) => x + y) ????val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_) ? ????// 以單詞出現(xiàn)的次數(shù)進(jìn)行降序排序 ????val sorted: RDD[(String, Int)] = sumed.sortBy(_._2, false) ? ????// 打印到控制臺(tái) // ???println(sorted.collect.toBuffer) // ???sorted.foreach(x => println(x)) // ???sorted.foreach(println) ? ????// 把結(jié)果存儲(chǔ)到HDFS ????sorted.saveAsTextFile(args(1)) ? ????// 釋放資源 ????sc.stop() ??} } |
打包后上傳Linux
1.首先啟動(dòng)zookeeper,hdfs和Spark集群
啟動(dòng)hdfs
/usr/local/hadoop-2.6.1/sbin/start-dfs.sh
啟動(dòng)spark
/usr/local/spark-1.6.1-bin-hadoop2.6/sbin/start-all.sh
?
2.使用spark-submit命令提交Spark應(yīng)用(注意參數(shù)的順序)
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
--class com.qf.spark.WordCount \
--master spark://node01:7077 \
--executor-memory 2G \
--total-executor-cores 4 \
/root/spark-mvn-1.0-SNAPSHOT.jar \
hdfs://node01:9000/words.txt \
hdfs://node01:9000/out
?
3.查看程序執(zhí)行結(jié)果
hdfs dfs -cat hdfs://node01:9000/out/part-00000
?
javaSparkWC
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; ? import java.util.Arrays; import java.util.List; ? public class JavaSparkWC { ????public static void main(String[] args) { ????????SparkConf conf = new SparkConf() ????????????????.setAppName("JavaSparkWC").setMaster("local[1]"); ???????? //提交任務(wù)入口類 ????????JavaSparkContext jsc = new JavaSparkContext(conf); ? ????????//獲取數(shù)據(jù) ????????JavaRDD ????????//切分?jǐn)?shù)據(jù) ????????JavaRDD lines.flatMap(new FlatMapFunction ????????????@Override ????????????public Iterable ????????????????List ????????????????return splited; ????????????} ????????}); ? ????????//生成元祖 ??????????????????????????????//一對(duì)一組 ,(輸入單詞,輸出單詞,輸出1) ????????JavaPairRDD words.mapToPair(new PairFunction ????????????@Override ????????????public Tuple2 ????????????????return new Tuple2 ????????????} ????????}); ? ????????//聚合 ?????????????????????????????????????????????????//2個(gè)相同key的value,聚合 ????????JavaPairRDD tuples.reduceByKey(new Function2 ????????????@Override ????????????public Integer call(Integer v1, Integer v2) throws Exception { ????????????????return v1 + v2; ????????????} ????????}); ? ????????//此前key為String類型,沒有辦法排序 ????????//Java api并沒有提供sortBy算子,此時(shí)需要把兩個(gè)值位置調(diào)換,排序完成后,在換回來 ????????final JavaPairRDD sumed.mapToPair(new PairFunction ????????????@Override ????????????public Tuple2 // ???????????????return new Tuple2 ????????????????return tup.swap(); //swap(),交換方法 ????????????} ????????}); ? ????????//降序排序 ????????JavaPairRDD ????????//再次交換 ????????JavaPairRDD ????????????new PairFunction ???????????????@Override ???????????????public Tuple2 ????????????????????return tup.swap(); ???????????????} ????????}); ? ????????System.out.println(res.collect()); ? ????????jsc.stop();//釋放資源 ????} } |