真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Spark的Core深入(二)

Spark 的 Core 深入(二)

標(biāo)簽(空格分隔): Spark的部分

成都創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供達(dá)日網(wǎng)站建設(shè)、達(dá)日做網(wǎng)站、達(dá)日網(wǎng)站設(shè)計(jì)、達(dá)日網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、達(dá)日企業(yè)網(wǎng)站模板建站服務(wù),10多年達(dá)日做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。


  • 一: 日志清洗的優(yōu)化
  • 二:Spark RDD
  • 三:SparkContext三大功能
  • 四:Spark on YARN
  • 五: spark RDD 的 依賴

一、日志清洗的優(yōu)化:

1.1 日志清洗有臟數(shù)據(jù)問(wèn)題

hdfs dfs -mkdir /apachelog/
hdfs dfs -put access_log /apachelogs
hdfs dfs -ls /apachelogs

Spark 的Core深入(二)

 執(zhí)行結(jié)果報(bào)錯(cuò)。

Spark 的Core深入(二)

 LogAnalyzer.scala
package com.ibeifeng.bigdata.spark.app.core
import org.apache.spark.{SparkContext, SparkConf}
/**
 * Created by zhangyy on 2016/7/16.
 */
object LogAnalyzer {
  def main(args: Array[String]) {
    // step 0: SparkContext
    val sparkConf = new SparkConf()
      .setAppName("LogAnalyzer Applicaiton") // name
      .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
    // Create SparkContext
    val sc = new SparkContext(sparkConf)
    /** ================================================================== */
    val logFile = "/apachelogs/access_log"
    // step 1: input data
    val accessLogs = sc.textFile(logFile)
       // filer logs data
       .filter(ApacheAccessLog.isValidateLogLine) // closures
        /**
         * parse log
         */
        .map(line => ApacheAccessLog.parseLogLine(line))
    /**
     * The average, min, and max content size of responses returned from the server.
     */
    val contentSizes = accessLogs.map(log => log.contentSize)
    // compute
    val avgContentSize = contentSizes.reduce(_ + _) / contentSizes.count()
    val minContentSize = contentSizes.min()
    val maxContentSize = contentSizes.max()
    // println
    printf("Content Size Avg: %s , Min : %s , Max: %s".format(
      avgContentSize, minContentSize, maxContentSize
    ))
    /**
     * A count of response code's returned
     */
    val responseCodeToCount = accessLogs
      .map(log => (log.responseCode, 1))
      .reduceByKey(_ + _)
      .take(3)
    println(
      s"""Response Code Count: ${responseCodeToCount.mkString(", ")}"""
    )
    /**
     * All IPAddresses that have accessed this server more than N times
     */
    val ipAddresses = accessLogs
        .map(log => (log.ipAddress, 1))
        .reduceByKey( _ + _)
    //    .filter( x => (x._2 > 10))
        .take(5)
    println(
      s"""IP Address : ${ipAddresses.mkString("< ", ", " ," >")}"""
    )
    /**
     * The top endpoints requested by count
     */
    val topEndpoints = accessLogs
      .map(log => (log.endPoint, 1))
      .reduceByKey(_ + _)

      .top(3)(OrderingUtils.SecondValueOrdering)

     // .map(tuple => (tuple._2, tuple._1))

     // .sortByKey(false)
      //.take(3)
      //.map(tuple => (tuple._2, tuple._1))
    println(
      s"""Top Endpoints : ${topEndpoints.mkString("[", ", ", " ]")}"""
    )
    /** ================================================================== */
    // Stop SparkContext
    sc.stop()
  }
}
ApacheAccessLog.scala
package com.ibeifeng.bigdata.spark.app.core
/**
 * Created by zhangyy on 2016/7/16.
 *
 * 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800]
 * "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1"
 * 200 1234
 */
case class ApacheAccessLog (
                             ipAddress: String,
                             clientIndentd: String,
                             userId: String,
                             dateTime:String,
                             method: String,
                             endPoint: String,
                             protocol: String,
                             responseCode: Int,
                             contentSize: Long)
object ApacheAccessLog{
  // regex
  // 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
  val PARTTERN ="""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  /**
   *
   * @param log
   * @return
   */

  def isValidateLogLine(log: String): Boolean = {
    // parse log
    val res = PARTTERN.findFirstMatchIn(log)
    // invalidate
    if (res.isEmpty) {
      false
    }else{
      true
    }

  }

  /**
   *
   * @param log
   * @return
   */
  def parseLogLine(log: String): ApacheAccessLog ={
    // parse log
    val res = PARTTERN.findFirstMatchIn(log)
    // invalidate
    if(res.isEmpty){
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    // get value
    val m = res.get
    // return
    ApacheAccessLog( //
      m.group(1), //
      m.group(2),
      m.group(3),
      m.group(4),
      m.group(5),
      m.group(6),
      m.group(7),
      m.group(8).toInt,
      m.group(9).toLong)
  }
}
OrderingUtils.scala
package com.ibeifeng.bigdata.spark.app.core

import scala.math.Ordering

/**
 * Created by zhangyy on 2016/7/16.
 */
object OrderingUtils {

  object SecondValueOrdering extends Ordering[(String, Int)]{
    /**
     *
     * @param x
     * @param y
     * @return
     */
    override def compare(x: (String, Int), y: (String, Int)): Int = {
      x._2.compare(y._2)
      // x._2 compare y._2  // 1 to 10 | 1.to(10)
    }
  }
}

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)


二、Spark RDD

2.1:RDD的含義:

RDD,全稱為Resilient Distributed Datasets,是一個(gè)容錯(cuò)的、并行的數(shù)據(jù)結(jié)構(gòu),可以讓用戶顯式地將數(shù)據(jù)存儲(chǔ)到磁盤和內(nèi)存中,并能控制數(shù)據(jù)的分區(qū)。同時(shí),RDD還提供了一組豐富的操作來(lái)操作這些數(shù)據(jù)。在這些操作中,諸如map、flatMap、filter等轉(zhuǎn)換操作實(shí)現(xiàn)了monad模式,很好地契合了Scala的集合操作。除此之外,RDD還提供了諸如join、groupBy、reduceByKey等更為方便的操作(注意,reduceByKey是action,而非transformation),以支持常見(jiàn)的數(shù)據(jù)運(yùn)算

2.2、RDD 在 hdfs的結(jié)構(gòu)

Spark 的Core深入(二)

Spark 的Core深入(二)

val rdd = sc.textFile("/spark/rdd")
rdd.partitions.length

rdd.cache
rdd.count 

一個(gè)分區(qū)默認(rèn)一個(gè)task 分區(qū)去處理
默認(rèn)是兩個(gè)分區(qū)去處理

Spark 的Core深入(二)

Spark 的Core深入(二)

2.3、RDD的五個(gè)特點(diǎn)對(duì)應(yīng)方法

1. A list of partitions : (protected def getPartitions: Array[Partition])

   一系列的的分片,比如說(shuō)64M一片,類似于hadoop中的split

2. A function ofr computing each split :( @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T])

 在每個(gè)分片上都有一個(gè)方式去迭代/執(zhí)行/計(jì)算

 3. A list of dependencies on other RDD  :(protected def getDependencies: Seq[Dependency[_]] = deps)
 一系列的依賴:RDDa 轉(zhuǎn)換為RDDb,轉(zhuǎn)換為 RDDc, 那么RDDc 就依賴于RDDb , RDDb 又依賴于RDDa

 ---
 wordcount 程序:

 ## val rdd = sc.textFile("xxxx")

 val wordRdd = rdd.flatMap(_.split(""))

 val kvRdd = wordRdd.map((_,1))

 val WordCountRdd = kvRdd.reduceByKey(_ + _)

 # wrodcountRdd.saveAsTextFile("yy")

 kvRdd <- wordRdd <- rdd

 rdd.toDebugString

 ---

 4. Optionlly,a Partitioner for kev-values RDDs (e,g,to say that the RDDis hash-partitioned) :(/** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None)

 5. optionlly,a list of preferred location(s) to compute each split on (e,g,block location for an HDFS file)
 :(protected def getPreferredLocations(split: Partition): Seq[String] = Nil)
 要運(yùn)行的計(jì)算/執(zhí)行最好在哪(幾)個(gè)機(jī)器上運(yùn)行,數(shù)據(jù)本地型

 為什么會(huì)有那幾個(gè)呢?

 比如: hadoop 默認(rèn)有三個(gè)位置,或者spark cache 到內(nèi)存是可能同過(guò)StroageLevel 設(shè)置了多個(gè)副本,所以一個(gè)partition 可能返回多個(gè)最佳位置。

Spark 的Core深入(二)

2.4、 如何創(chuàng)建RDD的兩種方式

方式一:
    并行化集合:
     并行化集合
    List\Seq\Array

    SparkContext:
    ----
        def parallelize[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] 
---
list 創(chuàng)建:
val list = List("11","22","33")
val listRdd = sc.parallelize(list)
listRdd.count
listRdd.frist
listRdd.take(10)
seq 創(chuàng)建:
val seq = Sep("aa","bb","cc")
val seqRdd = sc.parallelize(seq)

seqRdd.count
seqRdd.frist 
seqRdd.take(10)
Array創(chuàng)建:
val array = Array(1,2,3,4,5)

val arryRdd = sc.parallelize(array)

arryRdd.first
arryRdd.count
arryRdd.take(10)
方式二:從外部存儲(chǔ)創(chuàng)建:

val disFile = sc.textFile("/input")

2.5、RDD的轉(zhuǎn)換過(guò)程

Spark 的Core深入(二)

Spark 的Core深入(二)

transformation 轉(zhuǎn)換
actions 執(zhí)行出結(jié)果

persistence  基本都是cache過(guò)程
2.5.1: rdd transformation 應(yīng)用

Spark 的Core深入(二)

union()合并應(yīng)用

val rdd1 = sc.parallelize(Array(1,2,3,4,5))

val rdd2 = sc.parallelize(Array(6,7,8,9,10))

val rdd = rdd1.union(rdd2)

rdd.collect

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

對(duì)于分布式計(jì)算框架來(lái)說(shuō),性能瓶頸
    IO
        -1,磁盤IO
        -2,網(wǎng)絡(luò)IO

    rdd1 -> rdd2
        Shuffle

============================================
groupByKey() & reduceByKey()

在實(shí)際開(kāi)發(fā)中,如果可以使用reduceByKey實(shí)現(xiàn)的功能,就不要使用groupBykey
    使用reduceByKey有聚合功能,類似MapReduce中啟用了Combiner
===============
join()
    -1,等值鏈接

    -2,左連接

數(shù)據(jù)去重
    結(jié)果數(shù)據(jù)
        res-pre.txt  - rdd1
    新數(shù)據(jù)進(jìn)行處理
        web.tsv - 10GB    - rdd2
        解析里面的url,
        如果res-pre.txt中包含,就不放入,不包含就加入或者不包含url進(jìn)行特殊處理

rdd2.leftJoin(rdd1)
join()應(yīng)用
val list =List("aa","bb","cc","dd")

val rdd1 = sc.parallelize(list).map((_, 1))

rdd1.collect

val list2 = List("bb","cc","ee","hh")

val rdd2 = sc.parallelize(list2).map((_, 1))

rdd2.collect

val rdd = rdd2.leftOuterJoin(rdd1)

rdd.collect

rdd.filter(tuple => tuple._2._2.isEmpty).collect
repartition()應(yīng)用:
val rdd = sc.textFile("/spark/rdd")

rdd.repartition(2)

rdd.count 

Spark 的Core深入(二)

2.5.2: RDD Actions 操作

Spark 的Core深入(二)

val list = List(("aa",1),("bb",4),("aa",56),("cc",0),("aa",89),("cc",34))
val rdd = sc.parallelize(list)
rdd.countByKey

Spark 的Core深入(二)


wordcount 轉(zhuǎn)變

val rdd = sc.textFile("\input")
rdd.flatMap(_.split(" ")).map((_, 1)).countByKey

Spark 的Core深入(二)

foreach() 應(yīng)用
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
rdd.foreach(line => println(line))
分組topkey
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
val rdd = sc.textFile("/topkeytest")

val topRdd = rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))

topRdd.collect

Spark 的Core深入(二)

三:SparkContext三大功能

3.1、沒(méi)有使用廣播變量

Spark 的Core深入(二)

SparkContext 的作用:

-1,向Master(主節(jié)點(diǎn),集群管理的主節(jié)點(diǎn))申請(qǐng)資源,運(yùn)行所有Executor
    -2,創(chuàng)建RDD的入口
        sc.textFile("") // 從外部存儲(chǔ)系統(tǒng)創(chuàng)建
        sc.parxx() // 并行化,從Driver 中的集合創(chuàng)建
    -3,調(diào)度管理JOB運(yùn)行
        DAGScheduler 、 TaskScheduler
        --3.1
            為每個(gè)Job構(gòu)建DAG圖
        --3.2
            DAG圖劃分為Stage
                按照RDD之間是否存在Shuffle
                倒推(Stack)
        --3.3
            每個(gè)Stage中TaskSet
                每個(gè)階段中Task代碼相同,僅僅處理數(shù)據(jù)不同

3.2 使用廣播變量

Spark 的Core深入(二)

val list = List(".", "?", "!", "#", "$")
      val braodCastList = sc.broadcast(list)
      val wordRdd = sc.textFile("")
        wordRdd.filter(word => {
            braodCastList.value.contains(word)
        })

3.4 spark 的 cluster mode

Spark 的Core深入(二)

3.4.1 spark的部署模式:
1.spark的默認(rèn)模式是local模式
  spark-submint Scala_Project.jar

Spark 的Core深入(二)

Spark 的Core深入(二)

2. spark job 運(yùn)行在客戶端集群模式:

spark-submit --master spark://192.168.3.1:7077 --deploy-mode cluster Scala_Project.jar

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

3.5 spark 增加外部依賴jar包的方法

方式一:
    --jars JARS                 
      Comma-separated list of local jars to include on the driver and executor classpaths.
      jar包的位置一定要寫決定路徑。

方式二:
    --driver-class-path
      Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.

方式三:
    SPARK_CLASSPATH
      配置此環(huán)境變量
3.5.1 企業(yè)中Spark Application提交,shell 腳本
spark-app-submit.sh:

#!/bin/sh

## SPARK_HOME
SPARK_HOME=/opt/cdh6.3.6/spark-1.6.1-bin-2.5.0-cdh6.3.6

## SPARK CLASSPATH
SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar

${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar

四:Spark on YARN

4.1 啟動(dòng)hadoop的YARN上面的服務(wù)

cd /soft/hadoop/sbin

啟動(dòng)rescouremanager: 
./yarn-daemon.sh start resourcemanager

啟動(dòng)nodemanger:
./yarn-daemon.sh start nodemanager

Spark 的Core深入(二)

4.2 yarn 的架構(gòu)

Spark 的Core深入(二)

YARN
    -1,分布式資源管理
        主節(jié)點(diǎn):ResouceManager
        從節(jié)點(diǎn):NodeManager -> 負(fù)責(zé)管理每臺(tái)機(jī)器上的資源(內(nèi)存和CPU Core)
    -2,資源調(diào)度
        --1,容器Container
            AM/Task
        --2,對(duì)于運(yùn)行在YARN上的每個(gè)應(yīng)用,一個(gè)應(yīng)用的管理者ApplicaitonMaster   資源申請(qǐng)和任務(wù)調(diào)度

4.2 Spark Application

Spark Application
    -1,Driver Program
        資源申請(qǐng)和任務(wù)調(diào)度
    -2,Executors
        每一個(gè)Executor其實(shí)就是一個(gè)JVM,就是一個(gè)進(jìn)程

以spark deploy mode : client
    AM
                        -- 全部都允許在Container中
    Executor s
        運(yùn)行在Container中,類似于MapReduce任務(wù)中Map Task和Reduce Task一樣

Driver -> AM -> RM 

Spark 的Core深入(二)

Spark 的Core深入(二)

4.3 spark on yarn 的運(yùn)行

spark-shell --master yarn

Spark 的Core深入(二)
Spark 的Core深入(二)
Spark 的Core深入(二)

4.4 spark job on yarn

cd jars/

spark-submit --master yarn --deploy-mode cluster Scala_Project.jar

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

Spark 的Core深入(二)

五: spark RDD 的 依賴

5.1 RDD Rependencies

Spark 的Core深入(二)

spark的wordcount

## 
val rdd = sc.textFile("/input")
##
val wordRdd = rdd.flatMap(_.split(" "))
val kvRdd = wordRdd.map((_, 1))
val wordcountRdd = kvRdd.reduceByKey(_ + _)
##
wordcountRdd.collect

-----------------

    input -> rdd  -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT

-> 

    wordcountRdd -> output            :Stage-02 -> ResultStage -> ResultTask
1. 窄依賴(narrow dependencies)
    1.1:子RDD的每個(gè)分區(qū)依賴于常數(shù)個(gè)父分區(qū)(即與數(shù)據(jù)規(guī)模無(wú)關(guān))
    1.2: 輸入輸出一對(duì)一的算子,且結(jié)過(guò)RDD 的分區(qū)結(jié)構(gòu)不變,主要是map,flatMap
    1.3:輸出一對(duì)一,單結(jié)果RDD 的分區(qū)結(jié)構(gòu)發(fā)生變化,如:union,coalesce
    1.4: 從輸入中選擇部分元素的算子,如filer,distinct,subtract,sample

2. 寬依賴(wide dependencies)
   2.1: 子RDD的每個(gè)分區(qū)依賴于所有父RDD 分區(qū)
   2.2:對(duì)單個(gè)RDD 基于key進(jìn)行重組和reduce,如groupByKey,reduceByKey

   2.3:對(duì)兩個(gè)RDD 基于key 進(jìn)行join和重組,如:join
如何判斷RDD之間是窄依賴還是寬依賴:
    父RDD的每個(gè)分區(qū)數(shù)據(jù) 給 子RDD的每個(gè)分區(qū)數(shù)據(jù)

        1    ->     1

        1    ->     N    :  MapReduce 中 Shuffle

5.2 spark 的shuffle

5.2.1 spark shuffle 的內(nèi)在原理
在MapReduce框架中,shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經(jīng)過(guò)shuffle這個(gè)環(huán)節(jié),shuffle的性能高低直接影響了整個(gè)程序的性能和吞吐量。Spark作為MapReduce框架的一種實(shí)現(xiàn),自然也實(shí)現(xiàn)了shuffle的邏輯。
5.2.2 shuffle
Shuffle是MapReduce框架中的一個(gè)特定的phase,介于Map phase和Reduce phase之間,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí),輸出結(jié)果需要按key哈希,并且分發(fā)到每一個(gè)Reducer上去,這個(gè)過(guò)程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個(gè)程序的運(yùn)行效率。

下面這幅圖清晰地描述了MapReduce算法的整個(gè)流程,其中shuffle phase是介于Map phase和Reduce phase之間。

Spark 的Core深入(二)

概念上shuffle就是一個(gè)溝通數(shù)據(jù)連接的橋梁,那么實(shí)際上shuffle(partition)這一部分是如何實(shí)現(xiàn)的的呢,下面我們就以Spark為例講一下shuffle在Spark中的實(shí)現(xiàn)。
5.2.3 spark的shuffle

Spark 的Core深入(二)

 1.首先每一個(gè)Mapper會(huì)根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket,bucket的數(shù)量是M×RM×R,其中MM是Map的個(gè)數(shù),RR是Reduce的個(gè)數(shù)。

2.其次Mapper產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partition算法填充到每個(gè)bucket中去。這里的partition算法是可以自定義的,當(dāng)然默認(rèn)的算法是根據(jù)key哈希到不同的bucket中去。
當(dāng)Reducer啟動(dòng)時(shí),它會(huì)根據(jù)自己task的id和所依賴的Mapper的id從遠(yuǎn)端或是本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進(jìn)行處理。
這里的bucket是一個(gè)抽象概念,在實(shí)現(xiàn)中每個(gè)bucket可以對(duì)應(yīng)一個(gè)文件,可以對(duì)應(yīng)文件的一部分或是其他等。

3. Apache Spark 的 Shuffle 過(guò)程與 Apache Hadoop 的 Shuffle 過(guò)程有著諸多類似,一些概念可直接套用,例如,Shuffle 過(guò)程中,提供數(shù)據(jù)的一端,被稱作 Map 端,Map 端每個(gè)生成數(shù)據(jù)的任務(wù)稱為 Mapper,對(duì)應(yīng)的,接收數(shù)據(jù)的一端,被稱作 Reduce 端,Reduce 端每個(gè)拉取數(shù)據(jù)的任務(wù)稱為 Reducer,Shuffle 過(guò)程本質(zhì)上都是將 Map 端獲得的數(shù)據(jù)使用分區(qū)器進(jìn)行劃分,并將數(shù)據(jù)發(fā)送給對(duì)應(yīng)的 Reducer 的過(guò)程。
那些操作會(huì)引起shuffle

1. 具有重新調(diào)整分區(qū)操作,
eg: repartition,coalese

2. *ByKey   eg: groupByKey,reduceByKey

3. 關(guān)聯(lián)操作 eg:join,cogroup

本文題目:Spark的Core深入(二)
URL標(biāo)題:http://weahome.cn/article/jsoceh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部