真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

(版本定制)第15課:SparkStreaming源碼解讀之NoReceivers徹底思考

hu本期內(nèi)容:

華容ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書合作)期待與您的合作!

    1、Kafka解密

背景: 
目前No Receivers在企業(yè)中使用的越來(lái)越多,No Receivers具有更強(qiáng)的控制度,語(yǔ)義一致性。No Receivers是我們操作數(shù)據(jù)來(lái)源自然方式,操作數(shù)據(jù)來(lái)源使用一個(gè)封裝器,且是RDD類型的。

所以Spark Streaming就產(chǎn)生了自定義RDD –> KafkaRDD.

源碼分析:

1、KafkaRDD源碼

private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
R: ClassTag] private[spark] (
    sc: SparkContext,
kafkaParams: Map[String, String],
val offsetRanges: Array[OffsetRange], //指定數(shù)據(jù)范圍
leaders: Map[TopicAndPartition, (String, Int)],
messageHandler: MessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
  }

2、HasOffsetRanges


/**
 * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
 * offset ranges in RDDs generated by the direct Kafka DStream (see
 * [[KafkaUtils.createDirectStream()]]).
 * {{{
*   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
 *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 *      ...
 *   }
 * }}}
*/
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}

3、KafkaRDD中的compute

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
  }
}

SparkStreaming一般使用KafkaUtils的createDirectStream讀取數(shù)據(jù)

def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
    ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
    ssc, kafkaParams, fromOffsets, messageHandler)
}

4、通過(guò)getFromOffsets的方法獲取topic的fromOffset值

[kafka] (
    kc: KafkaClusterkafkaParams: []topics: []
  ): [TopicAndPartition] = {
reset = kafkaParams.get().map(_.toLowerCase)
result = {
    topicPartitions <- kc.getPartitions(topics).right
    leaderOffsets <- ((reset == ()) {
      kc.getEarliestLeaderOffsets(topicPartitions)
    } {
      kc.getLatestLeaderOffsets(topicPartitions)
    }).right
  } {
    leaderOffsets.map { (tplo) =>
        (tplo.offset)
    }
  }
  KafkaCluster.(result)
}

createDirectStream其實(shí)生成的是DirectKafkaInputDStream對(duì)象,通過(guò)compute方法會(huì)產(chǎn)生KafkaRDD

(validTime: Time): Option[KafkaRDD[]] = {
untilOffsets = clamp(latestLeaderOffsets())
rdd = [](
    context.sparkContextkafkaParamsuntilOffsetsmessageHandler)

offsetRanges = .map { (tpfo) =>
uo = untilOffsets(tp)
(tp.topictp.partitionfouo.offset)
  }
description = offsetRanges.filter { offsetRange =>
offsetRange.fromOffset != offsetRange.untilOffset
  }.map { offsetRange =>
{offsetRange.topic}{offsetRange.partition}+
{offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString()
metadata = (
-> offsetRanges.toListStreamInputInfo.-> description)
inputInfo = (rdd.countmetadata)
  ssc...reportInfo(validTimeinputInfo)

= untilOffsets.map(kv => kv._1 -> kv._2.offset)
(rdd)
}

采用Direct的好處? 
1. Direct方式?jīng)]有數(shù)據(jù)緩存,因此不會(huì)出現(xiàn)內(nèi)存溢出,但是如果采用Receiver的話就需要緩存。 
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默認(rèn)數(shù)據(jù)就在多臺(tái)機(jī)器上。 
3. 在實(shí)際操作的時(shí)候如果采用Receiver的方式的弊端是假設(shè)數(shù)據(jù)來(lái)不及處理,但是Direct就不會(huì),因?yàn)槭侵苯幼x取數(shù)據(jù)。 
4. 語(yǔ)義一致性,Direct的方式數(shù)據(jù)一定會(huì)被執(zhí)行。


文章名稱:(版本定制)第15課:SparkStreaming源碼解讀之NoReceivers徹底思考
URL標(biāo)題:http://weahome.cn/article/goehei.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部