真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

第15課:SparkStreaming源碼解讀之NoReceivers徹底思考

本期內(nèi)容:

成都創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比天津網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式天津網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋天津地區(qū)。費用合理售后完善,十多年實體公司更值得信賴。

  • Direct Access

  • Kafka

前面有幾期我們講了帶Receiver的Spark Streaming 應(yīng)用的相關(guān)源碼解讀。但是現(xiàn)在開發(fā)Spark Streaming的應(yīng)用越來越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的優(yōu)勢: 

1. 更強的控制自由度 

2. 語義一致性 

其實No Receivers的方式更符合我們讀取數(shù)據(jù),操作數(shù)據(jù)的思路的。因為Spark 本身是一個計算框架,他底層會有數(shù)據(jù)來源,如果沒有Receivers,我們直接操作數(shù)據(jù)來源,這其實是一種更自然的方式。 如果要操作數(shù)據(jù)來源,肯定要有一個封裝器,這個封裝器一定是RDD類型。 以直接訪問Kafka中的數(shù)據(jù)為例:

object DirectKafkaWordCount {  def main(args: Array[String]) {    val Array(brokers, topics) = args    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(2))    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Spark Streaming會封裝一個KafkaRDD:

/** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka  R
  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
  }
...  override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {    val part = thePart.asInstanceOf[KafkaRDDPartition]
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))    if (part.fromOffset == part.untilOffset) {
      log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
        s"skipping ${part.topic} ${part.partition}")      Iterator.empty
    } else {      new KafkaRDDIterator(part, context)
    }
  }

RDD中重要的方法 getPartitions 和 compute 其中compute中返回了一個 KafkaRDDIterator:

private class KafkaRDDIterator(      part: KafkaRDDPartition,      context: TaskContext) extends NextIterator[R] {    val kc = new KafkaCluster(kafkaParams)

...    private def fetchBatch: Iterator[MessageAndOffset] = {      val req = new FetchRequestBuilder()
        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
        .build()      val resp = consumer.fetch(req)
      handleFetchErr(resp)      // kafka may return a batch that starts before the requested offset
      resp.messageSet(part.topic, part.partition)
        .iterator
        .dropWhile(_.offset < requestOffset)
    }    override def close(): Unit = {      if (consumer != null) {
        consumer.close()
      }
    }    override def getNext(): R = {      if (iter == null || !iter.hasNext) {
        iter = fetchBatch
      }      if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {        val item = iter.next()        if (item.offset >= part.untilOffset) {
          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset
          messageHandler(new MessageAndMetadata(
            part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
        }
      }
    }
  }

其中會調(diào)用KafkaCluster的connect方法:

org/apache/spark/streaming/kafka/KafkaCluster.scala  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)

KafkaCluster的connect方法返回了一個 SimpleConsumer,如果想自定義控制kafka消息的消費,則可自定義Kafka的consumer。

我們再回過頭看看:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

實際生成了什么:

 def createDirectStream[    K: ClassTag,    V: ClassTag,    KD <: Decoder[K]: ClassTag,    VD <: Decoder[V]: ClassTag,    R: ClassTag] (      ssc: StreamingContext,      kafkaParams: Map[String, String],      fromOffsets: Map[TopicAndPartition, Long],      messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {    val cleanedHandler = ssc.sc.clean(messageHandler)    new DirectKafkaInputDStream[K, V, KD, VD, R]
      ssc, kafkaParams, fromOffsets, cleanedHandler)
  }

生成了一個DirectKafkaInputDStream:

org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(      "offsets" -> offsetRanges.toList,      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)
  }

這里面即產(chǎn)生了KafkaRDD實例。

我們再重新思考有Receiver和No Receiver的Spark Streaming應(yīng)用 Direct訪問的好處:

 1. 不需要緩存,不會出現(xiàn)OOM等問題(數(shù)據(jù)緩存在Kafka中) 

 2. 如果采用Receiver的方式,Receiver和Worker上Executor綁定了,不方便做分布式(配置一下也可以做)。如果采用Direct的方式,直接是RDD操作,數(shù)據(jù)默認分布在多個Executor上,天然就是分布式的。 

 3. 數(shù)據(jù)消費的問題,在實際操作的時候,如果采用Receiver的方式,如果數(shù)據(jù)操作來不及消費,Delay多次之后,Spark Streaming程序有可能崩潰。如果是Direct的方式,就不會。

 4. 完全的語義一致性,不會重復(fù)消費,且只被消費一次。

備注:

1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


當前文章:第15課:SparkStreaming源碼解讀之NoReceivers徹底思考
瀏覽地址:http://weahome.cn/article/jdched.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部