hu本期內(nèi)容:
華容ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書合作)期待與您的合作!
1、Kafka解密
背景:
目前No Receivers在企業(yè)中使用的越來(lái)越多,No Receivers具有更強(qiáng)的控制度,語(yǔ)義一致性。No Receivers是我們操作數(shù)據(jù)來(lái)源自然方式,操作數(shù)據(jù)來(lái)源使用一個(gè)封裝器,且是RDD類型的。
所以Spark Streaming就產(chǎn)生了自定義RDD –> KafkaRDD.
源碼分析:
1、KafkaRDD源碼
private[kafka] class KafkaRDD[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag, R: ClassTag] private[spark] ( sc: SparkContext, kafkaParams: Map[String, String], val offsetRanges: Array[OffsetRange], //指定數(shù)據(jù)范圍 leaders: Map[TopicAndPartition, (String, Int)], messageHandler: MessageAndMetadata[K, V] => R ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }
2、HasOffsetRanges
/** * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). * {{{ * KafkaUtils.createDirectStream(...).foreachRDD { rdd => * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges * ... * } * }}} */ trait HasOffsetRanges { def offsetRanges: Array[OffsetRange] }
3、KafkaRDD中的compute
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { val part = thePart.asInstanceOf[KafkaRDDPartition] assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { new KafkaRDDIterator(part, context) } }
SparkStreaming一般使用KafkaUtils的createDirectStream讀取數(shù)據(jù)
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics) new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }
4、通過(guò)getFromOffsets的方法獲取topic的fromOffset值
[kafka] ( kc: KafkaClusterkafkaParams: []topics: [] ): [TopicAndPartition] = { reset = kafkaParams.get().map(_.toLowerCase) result = { topicPartitions <- kc.getPartitions(topics).right leaderOffsets <- ((reset == ()) { kc.getEarliestLeaderOffsets(topicPartitions) } { kc.getLatestLeaderOffsets(topicPartitions) }).right } { leaderOffsets.map { (tplo) => (tplo.offset) } } KafkaCluster.(result) }
createDirectStream其實(shí)生成的是DirectKafkaInputDStream對(duì)象,通過(guò)compute方法會(huì)產(chǎn)生KafkaRDD
(validTime: Time): Option[KafkaRDD[]] = { untilOffsets = clamp(latestLeaderOffsets()) rdd = []( context.sparkContextkafkaParamsuntilOffsetsmessageHandler) offsetRanges = .map { (tpfo) => uo = untilOffsets(tp) (tp.topictp.partitionfouo.offset) } description = offsetRanges.filter { offsetRange => offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => {offsetRange.topic}{offsetRange.partition}+ {offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString() metadata = ( -> offsetRanges.toListStreamInputInfo.-> description) inputInfo = (rdd.countmetadata) ssc...reportInfo(validTimeinputInfo) = untilOffsets.map(kv => kv._1 -> kv._2.offset) (rdd) }
采用Direct的好處?
1. Direct方式?jīng)]有數(shù)據(jù)緩存,因此不會(huì)出現(xiàn)內(nèi)存溢出,但是如果采用Receiver的話就需要緩存。
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默認(rèn)數(shù)據(jù)就在多臺(tái)機(jī)器上。
3. 在實(shí)際操作的時(shí)候如果采用Receiver的方式的弊端是假設(shè)數(shù)據(jù)來(lái)不及處理,但是Direct就不會(huì),因?yàn)槭侵苯幼x取數(shù)據(jù)。
4. 語(yǔ)義一致性,Direct的方式數(shù)據(jù)一定會(huì)被執(zhí)行。