本篇文章為大家展示了spark 流式去重的示例分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
在紅寺堡等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè) 網(wǎng)站設(shè)計(jì)制作按需定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),成都全網(wǎng)營(yíng)銷推廣,外貿(mào)營(yíng)銷網(wǎng)站建設(shè),紅寺堡網(wǎng)站建設(shè)費(fèi)用合理。
大數(shù)據(jù)去重本身很蛋疼,針對(duì)個(gè)別數(shù)據(jù)去重更是不可理喻但是spark的Structured Streaming就很容易能實(shí)現(xiàn)這個(gè)功能。
數(shù)據(jù)從采集到最終處理結(jié)束是會(huì)存在一條數(shù)據(jù)在某一個(gè)點(diǎn)被重復(fù)接收處理的情況。如 kafka支持的是至少一次寫語(yǔ)義,也即是當(dāng)寫數(shù)據(jù)到kafka的時(shí)候,有些記錄可能重復(fù),例如如果消息已經(jīng)被broker接收并寫入文件但是并沒(méi)有應(yīng)答,這時(shí)生產(chǎn)者向kafka重發(fā)一個(gè)消息,就可能重復(fù)。由于kafka的至少一次的寫語(yǔ)義,structured streaming不能避免這種類型數(shù)據(jù)重復(fù)。所以一旦寫入成功,可以假設(shè)structured Streaming的查詢輸出是以至少一次語(yǔ)義寫入kafka的。一個(gè)可行去除重復(fù)記錄的解決方案是數(shù)據(jù)中引入一個(gè)primary(unique)key,這樣就可以在讀取數(shù)據(jù)的時(shí)候?qū)嵭腥ブ亍?/p>
structured streaming是可以使用事件中的唯一標(biāo)識(shí)符對(duì)數(shù)據(jù)流中的記錄進(jìn)行重復(fù)數(shù)據(jù)刪除。這與使用唯一標(biāo)識(shí)符列的靜態(tài)重復(fù)數(shù)據(jù)刪除完全相同。該查詢將存儲(chǔ)來(lái)自先前記錄的一定量的數(shù)據(jù),以便可以過(guò)濾重復(fù)的記錄。與聚合類似,可以使用帶有或不帶有watermark 的重復(fù)數(shù)據(jù)刪除功能。
A),帶watermark:如果重復(fù)記錄可能到達(dá)的時(shí)間有上限,則可以在事件時(shí)間列上定義watermark,并使用guid和事件時(shí)間列進(jìn)行重復(fù)數(shù)據(jù)刪除。
B),不帶watermark:由于重復(fù)記錄可能到達(dá)時(shí)間沒(méi)有界限,所以查詢將來(lái)自所有過(guò)去記錄的數(shù)據(jù)存儲(chǔ)為狀態(tài)。
源代碼,已測(cè)試通過(guò)~
package bigdata.spark.StructuredStreaming.KafkaSourceOperator
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object KafkaDropDuplicate {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/opt/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"
,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.config(sparkConf)
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","mt-mdh.local:9093")
.option("subscribe", "jsontest")
.load()
val words = df.selectExpr("CAST(value AS STRING)")
val fruit = words.select(
get_json_object($"value", "$.time").alias("timestamp").cast("long")
, get_json_object($"value", "$.fruit").alias("fruit"))
val fruitCast = fruit
.select(fruit("timestamp")
.cast("timestamp"),fruit("fruit"))
.withWatermark("timestamp", "10 Seconds")
.dropDuplicates("fruit")
.groupBy("fruit").count()
fruitCast.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.trigger(Trigger.ProcessingTime(5000))
.option("truncate","false")
.start()
.awaitTermination()
}
}
上述內(nèi)容就是spark 流式去重的示例分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。