小編給大家分享一下springboot中怎么實(shí)現(xiàn)kafa指定offset消費(fèi),相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
成都創(chuàng)新互聯(lián)公司是專業(yè)的樂(lè)山網(wǎng)站建設(shè)公司,樂(lè)山接單;提供成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè),網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行樂(lè)山網(wǎng)站開發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛(ài)的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來(lái)合作!
kafka消費(fèi)過(guò)程難免會(huì)遇到需要重新消費(fèi)的場(chǎng)景,例如我們消費(fèi)到kafka數(shù)據(jù)之后需要進(jìn)行存庫(kù)操作,若某一時(shí)刻數(shù)據(jù)庫(kù)down了,導(dǎo)致kafka消費(fèi)的數(shù)據(jù)無(wú)法入庫(kù),為了彌補(bǔ)數(shù)據(jù)庫(kù)down期間的數(shù)據(jù)損失,有一種做法我們可以指定kafka消費(fèi)者的offset到之前某一時(shí)間的數(shù)值,然后重新進(jìn)行消費(fèi)。
首先創(chuàng)建kafka消費(fèi)服務(wù)
@Service@Slf4j//實(shí)現(xiàn)CommandLineRunner接口,在springboot啟動(dòng)時(shí)自動(dòng)運(yùn)行其run方法。public class TspLogbookAnalysisService implements CommandLineRunner { @Override public void run(String... args) { //do something }}
kafka消費(fèi)模型建立
kafka server中每個(gè)主題存在多個(gè)分區(qū)(partition),每個(gè)分區(qū)自己維護(hù)一個(gè)偏移量(offset),我們的目標(biāo)是實(shí)現(xiàn)kafka consumer指定offset消費(fèi)。
在這里使用consumer-->partition一對(duì)一的消費(fèi)模型,每個(gè)consumer各自管理自己的partition。
@Service@Slf4jpublic class TspLogbookAnalysisService implements CommandLineRunner { //聲明kafka分區(qū)數(shù)相等的消費(fèi)線程數(shù),一個(gè)分區(qū)對(duì)應(yīng)一個(gè)消費(fèi)線程 private static final int consumeThreadNum = 9; //特殊指定每個(gè)分區(qū)開始消費(fèi)的offset private List
kafka consumer對(duì)offset的處理
聲明kafka consumer的配置類
private Properties buildKafkaConfig() { Properties kafkaConfiguration = new Properties(); kafkaConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ""); kafkaConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, ""); kafkaConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ""); kafkaConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, ""); kafkaConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ""); kafkaConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ""); kafkaConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,""); kafkaConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ""); ...更多配置項(xiàng) return kafkaConfiguration;}
創(chuàng)建kafka consumer,處理offset,開始消費(fèi)數(shù)據(jù)任務(wù)#
private void startConsume(int partitionIndex) { //創(chuàng)建kafka consumer KafkaConsumer
消費(fèi)數(shù)據(jù)邏輯,offset操作
private void kafkaRecordConsume(KafkaConsumer
以上是“springboot中怎么實(shí)現(xiàn)kafa指定offset消費(fèi)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!