真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark流式去重的示例分析

本篇文章為大家展示了spark 流式去重的示例分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

在紅寺堡等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè) 網(wǎng)站設(shè)計(jì)制作按需定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),成都全網(wǎng)營(yíng)銷推廣,外貿(mào)營(yíng)銷網(wǎng)站建設(shè),紅寺堡網(wǎng)站建設(shè)費(fèi)用合理。

大數(shù)據(jù)去重本身很蛋疼,針對(duì)個(gè)別數(shù)據(jù)去重更是不可理喻但是spark的Structured Streaming就很容易能實(shí)現(xiàn)這個(gè)功能。

數(shù)據(jù)從采集到最終處理結(jié)束是會(huì)存在一條數(shù)據(jù)在某一個(gè)點(diǎn)被重復(fù)接收處理的情況。如 kafka支持的是至少一次寫語(yǔ)義,也即是當(dāng)寫數(shù)據(jù)到kafka的時(shí)候,有些記錄可能重復(fù),例如如果消息已經(jīng)被broker接收并寫入文件但是并沒(méi)有應(yīng)答,這時(shí)生產(chǎn)者向kafka重發(fā)一個(gè)消息,就可能重復(fù)。由于kafka的至少一次的寫語(yǔ)義,structured streaming不能避免這種類型數(shù)據(jù)重復(fù)。所以一旦寫入成功,可以假設(shè)structured Streaming的查詢輸出是以至少一次語(yǔ)義寫入kafka的。一個(gè)可行去除重復(fù)記錄的解決方案是數(shù)據(jù)中引入一個(gè)primary(unique)key,這樣就可以在讀取數(shù)據(jù)的時(shí)候?qū)嵭腥ブ亍?/p>

structured streaming是可以使用事件中的唯一標(biāo)識(shí)符對(duì)數(shù)據(jù)流中的記錄進(jìn)行重復(fù)數(shù)據(jù)刪除。這與使用唯一標(biāo)識(shí)符列的靜態(tài)重復(fù)數(shù)據(jù)刪除完全相同。該查詢將存儲(chǔ)來(lái)自先前記錄的一定量的數(shù)據(jù),以便可以過(guò)濾重復(fù)的記錄。與聚合類似,可以使用帶有或不帶有watermark 的重復(fù)數(shù)據(jù)刪除功能。

A),帶watermark:如果重復(fù)記錄可能到達(dá)的時(shí)間有上限,則可以在事件時(shí)間列上定義watermark,并使用guid和事件時(shí)間列進(jìn)行重復(fù)數(shù)據(jù)刪除。

B),不帶watermark:由于重復(fù)記錄可能到達(dá)時(shí)間沒(méi)有界限,所以查詢將來(lái)自所有過(guò)去記錄的數(shù)據(jù)存儲(chǔ)為狀態(tài)。

源代碼,已測(cè)試通過(guò)~

package bigdata.spark.StructuredStreaming.KafkaSourceOperator

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object KafkaDropDuplicate {
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
     .set("yarn.resourcemanager.hostname", "mt-mdh.local")
     .set("spark.executor.instances","2")
     .set("spark.default.parallelism","4")
     .set("spark.sql.shuffle.partitions","4")
     .setJars(List("/opt/sparkjar/bigdata.jar"
       ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
       ,"/opt/jars/kafka-clients-0.10.2.2.jar"
       ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
       ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))

   val spark = SparkSession
     .builder
     .appName("StructuredKafkaWordCount")
     .config(sparkConf)
     .getOrCreate()
   import spark.implicits._

   val df = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers","mt-mdh.local:9093")
     .option("subscribe", "jsontest")
     .load()
   val words = df.selectExpr("CAST(value AS STRING)")

   val fruit = words.select(
     get_json_object($"value", "$.time").alias("timestamp").cast("long")
     , get_json_object($"value", "$.fruit").alias("fruit"))

   val fruitCast = fruit
     .select(fruit("timestamp")
       .cast("timestamp"),fruit("fruit"))
     .withWatermark("timestamp", "10 Seconds")
     .dropDuplicates("fruit")
     .groupBy("fruit").count()

   fruitCast.writeStream
     .outputMode(OutputMode.Complete())
     .format("console")
     .trigger(Trigger.ProcessingTime(5000))
     .option("truncate","false")
     .start()
     .awaitTermination()
 }
}

上述內(nèi)容就是spark 流式去重的示例分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


網(wǎng)頁(yè)名稱:spark流式去重的示例分析
文章位置:http://weahome.cn/article/ijpgjj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部