跟生產(chǎn)者一樣,消費者也屬于kafka的客戶端,不過kafka消費者是從kafka讀取數(shù)據(jù)的應(yīng)用,側(cè)重于讀數(shù)據(jù)。一個或多個消費者訂閱kafka集群中的topic,并從broker接收topic消息,從而進(jìn)行業(yè)務(wù)處理。今天來學(xué)習(xí)下kafka consumer基本使用。
創(chuàng)新互聯(lián)建站-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比鎮(zhèn)海網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式鎮(zhèn)海網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋鎮(zhèn)海地區(qū)。費用合理售后完善,十載實體公司更值得信賴。消費者example 組件版本org.apache.kafka kafka-clients 3.3.1
消費者代碼public static void main(String[] args){String topicName = "consumer-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_01");
props.put("enable.auto.commit", true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumerconsumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {while (true) {ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1));
records.forEach(record ->{System.out.println("Message received " + record.value());
});
}
}finally {consumer.close();
}
}
測試驗證創(chuàng)建topic
./bin/kafka-topics.sh --create --topic consumer-topic --bootstrap-server localhost:9092
啟動生產(chǎn)者 - 這里使用kafka自帶的生產(chǎn)者腳本進(jìn)行測試
./bin/kafka-console-producer.sh --topic consumer-topic --bootstrap-server localhost:9092
測試結(jié)果
至此 一個簡單的kafka消費者程序已經(jīng)開發(fā)完成,代碼不多,開發(fā)起來也快。但是關(guān)于kafka 消費者內(nèi)部有很多的原理、細(xì)節(jié)需要去梳理,否則出現(xiàn)問題就會茫然失措,不知所以。
pull VS poll上面的消費者程序有一個很核心的細(xì)節(jié)需要關(guān)注,即kafka 消費者以什么的方式對數(shù)據(jù)進(jìn)行消費。對比其他傳統(tǒng)的消息中間件,消息消費的方式主要有兩種:
kafka在設(shè)計之處,就考慮這個問題:消費者從broker拉取數(shù)據(jù),還是broker主動推送數(shù)據(jù)給消費者。在這方面kafka采用更為傳統(tǒng)的設(shè)計:消費者主動拉取,其優(yōu)勢如下:
在介紹消息傳遞語義之前,首先要了解下kafka 消費者位置(也叫做偏移量)管理。
位移管理kafka 消費者端需要為每個讀取的topic 分區(qū)保存消費進(jìn)度,即當(dāng)前分區(qū)中消費者消費消息的最新位置。該位置也叫做偏移量- offset。消費者需要定期地想kafka提交自己的位置信息,實際上,偏移量通常是下一條待消費消息的位置。如下圖
從kafka broker讀取消息,開發(fā)者可以選擇提交偏移量的時間,消費者默認(rèn)自動提交偏移量,這可能會帶來一些風(fēng)險。
最多一次在這種情況下,在調(diào)用poll()后,一旦收到消息批,就立即提交偏移量。如果后續(xù)處理失?。ㄈ鐦I(yè)務(wù)處理過程中發(fā)生異常,數(shù)據(jù)只是被從Broker讀取出來,并沒有真正的處理),消息將丟失。它不會被再次讀取,因為這些消息的偏移量已經(jīng)提交。
在至少一次語義定義中,broker消息的每一個消息都會被傳遞到消費者,但是可能會存在重復(fù)拉取的場景,從而導(dǎo)致消息被重復(fù)處理。跟最多一次提交位置偏移量的時機(jī)不同,至少一次在處理消息后提交偏移量。
因此需要確保消息處理的冪等性,如對數(shù)據(jù)進(jìn)行插入、更新操作;防止重復(fù)消費導(dǎo)致數(shù)據(jù)出現(xiàn)錯亂。
至少一次消息處理的流程大致如下:
批量拉取數(shù)據(jù)
此時消費者并不提交偏移量
對消息進(jìn)行業(yè)務(wù)處理
3.1 處理完成 提交偏移量 進(jìn)行下一次拉取數(shù)據(jù)
3.2 消息處理失?。ù藭r可能有一部分?jǐn)?shù)據(jù)處理完成,還有一部分?jǐn)?shù)據(jù)尚未處理)
重啟應(yīng)用 拉取數(shù)據(jù),又會拉取之前的數(shù)據(jù) 導(dǎo)致消息被重復(fù)處理
有些場景不僅需要至少一次語義(保證數(shù)據(jù)不丟失),還需要精確一次語義。每條消息只投遞一次,這需要消費者應(yīng)用程序跟kafka相互配合、相互合作就可以實現(xiàn)精確一次語義
props.put("enable.auto.commit", true);
enable.auto.commit 參數(shù)默認(rèn)值為true,kafka默認(rèn)在后臺線程中周期性的提交消費者偏移量
auto.commit.interval.ms默認(rèn)為5秒,如果enable.auto.commit參數(shù)設(shè)置為true,即消費者5秒提交一次位移。
在至少一次、精確一次語義中 需要將該參數(shù)設(shè)置為false,由應(yīng)用程序手動提交偏移量
//...
props.put("enable.auto.commit", false);
//...
while (true) {ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(1));
records.forEach(record ->{System.out.println("Message received " + record.value());
});
//提交偏移量
consumer.commitSync();
}
根據(jù)不用的應(yīng)用場景,kakfa提供了多個API讓開發(fā)者對消費者位移進(jìn)行手動管理
auto.offset.reset指定消費者從broker拉取數(shù)據(jù)的位置,有以下幾個選項可以配置
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_01");
在開發(fā)kafka消費者代碼時,必須指定消費者組,否則會報錯,那么該參數(shù)有什么作用呢。在回答這個問題之前,先假設(shè)兩個應(yīng)用場景
借用RocketMQ中的概念(個人覺得比較合適),以上兩種應(yīng)用場景叫做集群消費、廣播消費
kafka 內(nèi)部以消費者組的方式實現(xiàn)以上兩點要求
在開發(fā)代碼時,只需要按需更改一下配置即可
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_B");
props.put("client.id", "client_02");
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧