真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

kafka-consumer-offset位移-創(chuàng)新互聯(lián)

目錄

在霍林郭勒等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè) 網(wǎng)站設(shè)計制作定制網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計,網(wǎng)絡(luò)營銷推廣,外貿(mào)網(wǎng)站制作,霍林郭勒網(wǎng)站建設(shè)費用合理。

1 offset的默認維護位置

1.1 消費offset案例

2 自動提交offset

3 手動提交offset

3.1 原理

3.2 代碼示例

3.2.1 同步提交

3.2.2 異步提交(生產(chǎn)常用)

4 指定offset消費

5 指定時間消費

6 漏消費和重復消費分析

6.1 重復消費

6.2 漏消費

6.3 消費者事務(wù)

7 數(shù)據(jù)積壓


1 offset的默認維護位置

_consumer_offsets主題里面采用key和 value的方式存儲數(shù)據(jù)。key是 group.id+topic+分區(qū)號,value 就是當前offset的值。每隔一段時間,kafka 內(nèi)部會對這個topic進行compact(壓縮),也就是每個group.id+topic+分區(qū)號就保留最新數(shù)據(jù)。

Kafka0.9版本之前,consumer黑認將offset保存在Zookeeper中。0.9版本開始,consumer默認將offset保存在Kafka一個內(nèi)置的topic中,該topic為_consumer_offsets。

將offset信息存儲在zk中的不足:如果將offset信息存儲在zk中,那么所有的consumer都會訪問zk,會消耗大量的網(wǎng)絡(luò)資源,消費速度慢。

1.1 消費offset案例
  1. 思想:_consumer_offsets為Kafka中的 topic,那就可以通過消費者進行消費。
  2. 在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默認是 true,表示不能消費系統(tǒng)主題。為了查看該系統(tǒng)主題數(shù)據(jù),所以該參數(shù)修改為false。修改以后執(zhí)行分發(fā)命令:xsync consumer.properties。
  3. 采用命令行方式,創(chuàng)建一個新的topic。
    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2
  4. 啟動生產(chǎn)者往atguigu生產(chǎn)數(shù)據(jù)。
    [atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
  5. 啟動消費者消費atguigu數(shù)據(jù)。
    [atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test
    注意:指定消費者組名稱,更好觀察數(shù)據(jù)存儲位置(key是 group.id+topic+分區(qū)號)。
  6. 查看消費者消費主題_consumer_offsets。
    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
2 自動提交offset

為了使我們能夠?qū)W⒂谧约旱臉I(yè)務(wù)邏輯,Kafka提供了自動提交offset的功能。自動提交offset的相關(guān)參數(shù):

  • enable.auto.commit:是否開啟自動提交offset功能,默認是true
  • auto.commit.interval.ms:自動提交offset的時間間隔,默認是5s

消費者配置代碼:

//配置是否是自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//提交時間間隔,單位是ms
properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);
3 手動提交offset 3.1 原理

雖然自動提交offset十分簡單便利,但由于其是基于時間提交的,開發(fā)人員難以把握offset提交的時機。因此Kafka還提供了手動提交offset的API。
手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次提交的一批數(shù)據(jù)最高的偏移量提交;不同點是,同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現(xiàn)提交失?。┅r而異步提交則沒有失敗重試機制,故有可能提交失敗。

  • commitSync(同步提交)﹔必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。
  • commitAsync(異步提交):發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了

3.2 代碼示例 3.2.1 同步提交
//手動提交屬性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消費代碼邏輯
XXX
XXX
XXX
//手動提交代碼(處理完數(shù)據(jù)以后,這里為了方便,只展示關(guān)鍵代碼)
//手動提交offset
kafkaConsumer.commitsync();
3.2.2 異步提交(生產(chǎn)常用)
//手動提交屬性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消費代碼邏輯
XXX
XXX
XXX
//手動提交代碼(處理完數(shù)據(jù)以后,這里為了方便,只展示關(guān)鍵代碼)
//手動提交offset
kafkaConsumer.commitAsync();
4 指定offset消費

auto.offset.reset = earliest | latest | none 默認是latest。
當Kafka 中沒有初始偏移量(消費者組第一次消費)或服務(wù)器上不再存在當前偏移量時(例如該數(shù)據(jù)已被刪除),該怎么辦?

  1. earliest:自動將偏移量重置為最早的偏移量,--from-beginning。
  2. latest(默認值):自動將偏移量重置為最新偏移量。
  3. none:如果未找到消費者組的先前偏移量,則向消費者拋出異常。
  4. 任意指定offset位移開始消費。
    //1創(chuàng)建消費者
    KafkaConsumerkafkaConsumer = new KafkaConsumer<>(properties);
    // 2訂閱主題
    ArrayListtopics = new ArrayList<>(;topics.add( "first");
    kafkaConsumer.subscribe(topics);
    
    //指定位置進行消費
    setassignment = kafkaConsumer.assignment();//獲取所有分區(qū)信息
    //保證分區(qū)分配方案已經(jīng)制定完畢,因為由于leader消費者制定分配方案會消耗一定時間,有可能此時獲取不到分區(qū)信息,所以加一層分區(qū)空間判斷
    while (assignment.size() == 0){
        //促使獲取的分區(qū)數(shù)量不為0
        kafkaConsumer.poll(Duration.ofSeconds(1));
        assignment = kafkaConsumer.assignment();
    }
    
    //遍歷所有分區(qū),指定消費的offset
    for (TopicPartition topicPartition : assignment) {
        kafkaConsumer.seek(topicPartition, 100);
    }
    
    // 3消費數(shù)據(jù)
    while (true){

5 指定時間消費

需求:在生產(chǎn)環(huán)境中,會遇到最近消費的幾個小時數(shù)據(jù)異常,想重新按照時間消費。例如要求按照時間消費前一天的數(shù)據(jù),怎么處理?

//1創(chuàng)建消費者
KafkaConsumerkafkaConsumer = new KafkaConsumer<>(properties);
// 2訂閱主題
ArrayListtopics = new ArrayList<>(;topics.add( "first");
kafkaConsumer.subscribe(topics);

//指定位置進行消費
setassignment = kafkaConsumer.assignment();//獲取所有分區(qū)信息
//保證分區(qū)分配方案已經(jīng)制定完畢,因為由于leader消費者制定分配方案會消耗一定時間,有可能此時獲取不到分區(qū)信息,所以加一層分區(qū)空間判斷
while (assignment.size() == 0){
    //促使獲取的分區(qū)數(shù)量不為0
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
//希望把時間轉(zhuǎn)換為對應(yīng)的offset
HashMaptopicPartitionLongHashMap = new HashMap<>();
//封裝對應(yīng)集合
for (TopicPartition topicPartition : assignment) {
    //希望獲取當前系統(tǒng)時間一天前的數(shù)據(jù)。
    topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
NaptopioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap);


//遍歷所有分區(qū),指定消費的offset
//指定消費的offset
for (TopicPartition topicPartition : assignment) {
    OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition);
    kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}

// 3消費數(shù)據(jù)
while (true){
6 漏消費和重復消費分析 6.1 重復消費

場景1:重復消費。自動提交offset引起。

6.2 漏消費

場景1:漏消費。設(shè)置offset為手動提交,當offset被提交時,數(shù)據(jù)還在內(nèi)存中未落盤,此時剛好消費者線程被kill掉,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理,導致這部分內(nèi)存中的數(shù)據(jù)丟失。

6.3 消費者事務(wù)

如果想完成Consumer端的精準一次性消費,那么需要Kafka消費端將消費過程和提交offset過程做原子綁定。此時我們需要將Kafka的offset保存到支持事務(wù)的自定義介質(zhì)(比如MySQL)。這部分知識會在后續(xù)項目部分涉及。

7 數(shù)據(jù)積壓

方案1:如果是Kafka消費能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費組的消費者數(shù)量,消費者數(shù)=分區(qū)數(shù)。(兩者缺一不可)

方案2:如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧


網(wǎng)頁標題:kafka-consumer-offset位移-創(chuàng)新互聯(lián)
鏈接URL:http://weahome.cn/article/jcdds.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部