transform
DStream 和 rdd之間數(shù)據(jù)進行交互的算子
流處理 數(shù)據(jù)源:
一個數(shù)據(jù)來自于 mysql數(shù)據(jù)/hdfs上文本數(shù)據(jù) 【量小】 從表/維表
一個數(shù)據(jù) 來自于 kafka sss 讀取形成 DStream數(shù)據(jù) 【量大】 主業(yè)務(wù) =》 主表
案例需求:彈幕 過濾的功能 /黑名單的功能
離線:
彈幕: 主表
不好看
垃圾
男主真帥
女主真好看
666
過濾的彈幕:維表
熱巴真丑
雞兒真美
王鶴棣退出娛樂圈
實時:
sparkstreaming + kafka 整合 :kafka =》 sparkstreaming
版本選擇:spark 2.x : kafka版本: 0.8 0.10.0 or higher ok
spark 3.x =>kafka : 1.kafka版本: 0.10.0 or higher ok
spark 去kafka讀取數(shù)據(jù)的方式:
1.kafka 0.8 reciver方式讀取kafka數(shù)據(jù) 【效率低 、代碼開發(fā)復(fù)雜】
2.kafka 0.10.0版本之后 direct stream的方式加載kafka數(shù)據(jù) 【效率高、代碼開發(fā)簡單】
kafka:
版本也有要求: 0.11.0 版本之后
交付語義: consumer producer
producer 默認就是精準一次
consumer 交付語義取決于 consumer 框架本身
交付語義: consumer
? 至多一次 數(shù)據(jù)丟失問題
? 至少一次 數(shù)據(jù)不會丟失,數(shù)據(jù)重復(fù)消費
? 精準一次 數(shù)據(jù)不會丟失 數(shù)據(jù)也不會重復(fù)消費
spark 整合kafka 版本 0.10.0版本之后:
1.kafka 0.11.0之后 2.2.1 =>direct stream
2.sparkstreaming 默認消費kafka數(shù)據(jù) 交付語義:
至少一次
? 1.simple API =》 過時不用了
!?。〔恍枰?kafka-clients 依賴
查看kafka topic命令:kafka-topics.sh --list
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
kafka-topics.sh --create
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
–topic spark-kafka01 --partitions 3 --replication-factor 1
producer:
kafka-console-producer.sh
–broker-list bigdata33:9092,bigdata34:9092
–topic spark-kafka01
consumer:
kafka-console-consumer.sh
–bootstrap-server bigdata33:9092,bigdata34:9092
–topic spark-kafka
–from-beginning
val kafkaParams = Map[String, Object](
“bootstrap.servers” ->“bigdata33:9092,bigdata34:9092”,
“key.deserializer” ->classOf[StringDeserializer],
“value.deserializer” ->classOf[StringDeserializer],
“group.id” ->“dl2262_01”,
“auto.offset.reset” ->“l(fā)atest”,
“enable.auto.commit” ->(false: java.lang.Boolean)
)
需求:
消費kafka數(shù)據(jù) wc 將 結(jié)果寫到 mysql里面
input
todo
output
kafka =>spark =>mysql 鏈路打通了
模擬:spark作業(yè)掛掉 =》 重啟
“消費完kafka的數(shù)據(jù) 程序重啟之后接著從上次消費的位置接著消費 ”
目前: code不能滿足
1.目前代碼 這兩個參數(shù) 不能動
“auto.offset.reset” ->“earliest”
“enable.auto.commit” ->(false: java.lang.Boolean)
2.主要原因 : spark作業(yè) 消費kafka數(shù)據(jù):
1.獲取kafka offset =》 處理kafka數(shù)據(jù) =》 “提交offset的操作” 沒有
解決:
1.獲取kafka offset // todo
2. 處理kafka數(shù)據(jù)
3.提交offset的操作 // todo
1.獲取kafka offset // todo
1. kafka offset 信息
2.spark rdd分區(qū)數(shù) 和 kafka topic 的分區(qū)數(shù) 是不是 一對一
報錯:
org.apache.spark.rdd.ShuffledRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges
ShuffledRDD =》 HasOffsetRanges 說明 代碼有問題
sparkstreaming里面: 開發(fā)模式:***? 1.獲取kafka 流數(shù)據(jù)
? 2. 流 Dstream =》 調(diào)用foreachRDD算子 進行輸出:
? 0.獲取offset 信息
? 1.做業(yè)務(wù)邏輯
? 2.結(jié)果數(shù)據(jù)輸出
? 3.提交offset信息
offset解釋:
01 batch:
rdd的分區(qū)數(shù):3
topic partition fromOffset untilOffset
spark-kafka01 0 0 1
spark-kafka01 1 0 1
spark-kafka01 2 0 0
02 batch:
rdd的分區(qū)數(shù):3
topic partition fromOffset untilOffset
spark-kafka01 0 1 1
spark-kafka01 1 1 1
spark-kafka01 2 0 0
此時 kafka 里面數(shù)據(jù)已經(jīng)消費完了 fromOffset=untilOffset
3.提交offset信息2.存儲offset信息
spark流式處理 默認消費語義 : 至少一次
精準一次:
1.output + offset 同時完成
1.生產(chǎn)上Checkpoints不能用
2.Kafka itself =》至少一次
推薦使用 =》 簡單 高效
90% 都可以解決 10% 精準一次
3.Your own data store: =》 開發(fā)大量代碼 =》
mysql、redis、hbase、
至少一次
精準一次
mysql:
獲取offset
todo
output
提交offset
spark作業(yè)掛了 =》 啟動spark作業(yè) :
1.從mysql里面獲取offset
todo
output
提交offset
? 1.至多一次 【丟數(shù)據(jù)】
? 2.至少一次 【不會丟數(shù)據(jù) 可能會重復(fù)消費數(shù)據(jù)】
? 3.精準一次 【不丟、不重復(fù)消費】
offset信息提交 :
1.spark todo :
至少一次:
1 2 3 4
offset get
業(yè)務(wù)邏輯 output db
提交offset
精準一次:output + 提交offset 一起發(fā)生 =》 事務(wù)來實現(xiàn)
事務(wù): 一次操作要么成功 要么失敗
topic partition fromOffset untilOffset
spark-kafka01 0 3 3
spark-kafka01 2 2 2
spark-kafka01 1 2 2
? kafka 本身:
? offset 信息存儲在哪?
kafka 某個topic下:
__consumer_offsets =》 spark作業(yè) 消費kafka的offset信息
topic offset 信息存儲的地方
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧