真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps

大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps,SparkSubmit?

成都創(chuàng)新互聯(lián)公司是一家以重慶網(wǎng)站建設(shè)公司、網(wǎng)頁設(shè)計(jì)、品牌設(shè)計(jì)、軟件運(yùn)維、營(yíng)銷推廣、小程序App開發(fā)等移動(dòng)開發(fā)為一體互聯(lián)網(wǎng)公司。已累計(jì)為戶外休閑椅等眾行業(yè)中小客戶提供優(yōu)質(zhì)的互聯(lián)網(wǎng)建站和軟件開發(fā)服務(wù)。

類啟動(dòng)后的服務(wù)進(jìn)程,用于提交任務(wù),

哪一段啟動(dòng)提交任務(wù),哪一段啟動(dòng)submit(Driver端)

?

提交任務(wù)流程

大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps

1.Driver端提交任務(wù)到Master(啟動(dòng)sparkSubmit進(jìn)程)

2.Master生成任務(wù)信息,放入對(duì)列中

3.Master通知Worker啟動(dòng)Executor,(Master過濾出存活的Worker,將任務(wù)分配給空閑資源多的worker)

4.worker的Executor向Driver端注冊(cè)(只有executor真正參與計(jì)算) -> worker從Dirver端拿信息

5.Driver端啟動(dòng)Executor將任務(wù)劃分階段,分成小的task,再?gòu)V播給相應(yīng)的Worker讓他去執(zhí)行

6.worker會(huì)將執(zhí)行完的任務(wù)回傳給Driver

?

?

range 相當(dāng)于集合子類

scala> 1.to(10)

res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8,

?9, 10)

?

scala> 1 to 10

res1: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8,

?9, 10)

?

提交任務(wù)到集群的任務(wù)類?:

Spark context?available as?sc

SQL context available as sqlContext

直接調(diào)用:

大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps

spark WordCount

構(gòu)建模板代碼:

SparkConf:構(gòu)建配置信息類,該配置優(yōu)先于集群配置文件

setAppName:指定應(yīng)用程序名稱,如果不指定,會(huì)自動(dòng)生成一個(gè)類似于uuid產(chǎn)生的名稱

setMaster:指定運(yùn)行模式:local-用1個(gè)線程模擬集群運(yùn)行,

local[2]: 用2個(gè)線程模擬集群運(yùn)行,loca[*]-當(dāng)前有多少空閑到的線程就用多少線程來運(yùn)行該任務(wù)

/**

??* 用spark實(shí)現(xiàn)單詞計(jì)數(shù)

??*/

object SparkWordCount {

??def main(args: Array[String]): Unit = {

????/**

??????* 構(gòu)建模板代碼

??????*/

????val conf: SparkConf = new SparkConf()

??????.setAppName("SparkWordCount")

// ?????.setMaster("local[2]")

?

????// 創(chuàng)建提交任務(wù)到集群的入口類(上下文對(duì)象)

????val sc: SparkContext = new SparkContext(conf)

?

????// 獲取HDFS的數(shù)據(jù)

????val lines: RDD[String] = sc.textFile(args(0))

?

????// 切分?jǐn)?shù)據(jù),生成一個(gè)個(gè)單詞

????val words: RDD[String] = lines.flatMap(_.split(" "))

?

????// 把單詞生成一個(gè)個(gè)元組

????val tuples: RDD[(String, Int)] = words.map((_, 1))

?

????// 進(jìn)行聚合操作

// ???tuples.reduceByKey((x, y) => x + y)

????val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)

?

????// 以單詞出現(xiàn)的次數(shù)進(jìn)行降序排序

????val sorted: RDD[(String, Int)] = sumed.sortBy(_._2, false)

?

????// 打印到控制臺(tái)

// ???println(sorted.collect.toBuffer)

// ???sorted.foreach(x => println(x))

// ???sorted.foreach(println)

?

????// 把結(jié)果存儲(chǔ)到HDFS

????sorted.saveAsTextFile(args(1))

?

????// 釋放資源

????sc.stop()

??}

}

打包后上傳Linux

1.首先啟動(dòng)zookeeper,hdfs和Spark集群

啟動(dòng)hdfs

/usr/local/hadoop-2.6.1/sbin/start-dfs.sh

啟動(dòng)spark

/usr/local/spark-1.6.1-bin-hadoop2.6/sbin/start-all.sh

?

2.使用spark-submit命令提交Spark應(yīng)用(注意參數(shù)的順序)

/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \

--class com.qf.spark.WordCount \

--master spark://node01:7077 \

--executor-memory 2G \

--total-executor-cores 4 \

/root/spark-mvn-1.0-SNAPSHOT.jar \

hdfs://node01:9000/words.txt \

hdfs://node01:9000/out

?

3.查看程序執(zhí)行結(jié)果

hdfs dfs -cat hdfs://node01:9000/out/part-00000

?

javaSparkWC

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

?

import java.util.Arrays;

import java.util.List;

?

public class JavaSparkWC {

????public static void main(String[] args) {

????????SparkConf conf = new SparkConf()

????????????????.setAppName("JavaSparkWC").setMaster("local[1]");

????????

//提交任務(wù)入口類

????????JavaSparkContext jsc = new JavaSparkContext(conf);

?

????????//獲取數(shù)據(jù)

????????JavaRDD lines = jsc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt");

????????//切分?jǐn)?shù)據(jù)

????????JavaRDD words =

lines.flatMap(new FlatMapFunction() {

????????????@Override

????????????public Iterable call(String s) throws Exception {

????????????????List splited = Arrays.asList(s.split(" ")); //生成list

????????????????return splited;

????????????}

????????});

?

????????//生成元祖 ??????????????????????????????//一對(duì)一組 ,(輸入單詞,輸出單詞,輸出1)

????????JavaPairRDD tuples =

words.mapToPair(new PairFunction() {

????????????@Override

????????????public Tuple2 call(String s) throws Exception {

????????????????return new Tuple2(s, 1);

????????????}

????????});

?

????????//聚合 ?????????????????????????????????????????????????//2個(gè)相同key的value,聚合

????????JavaPairRDD sumed =

tuples.reduceByKey(new Function2() {

????????????@Override

????????????public Integer call(Integer v1, Integer v2) throws Exception {

????????????????return v1 + v2;

????????????}

????????});

?

????????//此前key為String類型,沒有辦法排序

????????//Java api并沒有提供sortBy算子,此時(shí)需要把兩個(gè)值位置調(diào)換,排序完成后,在換回來

????????final JavaPairRDD swaped =

sumed.mapToPair(new PairFunction, Integer, String>() {

????????????@Override

????????????public Tuple2 call(Tuple2 tup) throws Exception {

// ???????????????return new Tuple2(tup._2, tup._1);

????????????????return tup.swap(); //swap(),交換方法

????????????}

????????});

?

????????//降序排序

????????JavaPairRDD sorted = swaped.sortByKey(false);

????????//再次交換

????????JavaPairRDD res = sorted.mapToPair(

????????????new PairFunction, String, Integer>() {

???????????????@Override

???????????????public Tuple2 call(Tuple2 tup)throws Exception {

????????????????????return tup.swap();

???????????????}

????????});

?

????????System.out.println(res.collect());

?

????????jsc.stop();//釋放資源

????}

}


當(dāng)前文章:大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps
URL地址:http://weahome.cn/article/pijssp.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部