這篇文章主要介紹“Kafka多線程Consumer的實(shí)例代碼”,在日常操作中,相信很多人在Kafka多線程Consumer的實(shí)例代碼問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Kafka多線程Consumer的實(shí)例代碼”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出鹿寨免費(fèi)做網(wǎng)站回饋大家。
這里要根據(jù)自身需求開發(fā),我這里只舉一個簡單的例子,就是幾個分區(qū)就啟動幾個consumer,一一對應(yīng)。
三個類:
Main:
public static void main(String[] args) {
String bootstrapServers = "kafka01:9092,kafka02:9092";
String groupId = "test";
String topic = "testtopic";
int consumerNum = 3;
ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
cg.execute();
}
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List consumers;
public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){
consumers = new ArrayList<>(consumerNum);
for(int i=0;i < consumerNum;i++){
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
consumers.add(ConsumerRunnable);
}
}
public void execute(){
for(ConsumerRunnable consumerRunnable:consumers){
new Thread(consumerRunnable).start();
}
}
}
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerRunnable implements Runnable{
private final KafkaConsumer consumer;
public ConsumerRunnable(String bootstrapServers,String groupId,String topic){
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords records = consumer.poll(10);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
(舊版本:多分區(qū)多線程 新版本:一個線程管理多個socket連接)
但新版本KafkaConsumer是雙線程的,主線程負(fù)責(zé):消息獲取,rebalance,coordinator,位移提交等等,
另一個是后臺心跳線程。
根據(jù)上邊的各種配置,poll方法會找到offset,當(dāng)獲取了足夠多的可用數(shù)據(jù),或者等待時間超過了指定的超時時間,就會返回。
java consumer不是線程安全的,同一個KafkaConsumer用在了多個線程中,將會報(bào)Kafka Consumer is not safe for multi-threaded assess異常。可以加一個同步鎖進(jìn)行保護(hù)。
poll的超時參數(shù),已經(jīng)說過1000的話是超時設(shè)定,如果沒有很多數(shù)據(jù),也就等一秒,就返回了,比如定時5秒的將消息寫入,就可以將超時參數(shù)設(shè)置為5000,達(dá)到效率最大化。
如果沒有定時任務(wù)呢,那就設(shè)置為 Long.MAX_VALUE 未獲取足夠多的數(shù)據(jù)就無限等待。這里要捕獲一下WakeupException。
到此,關(guān)于“Kafka多線程Consumer的實(shí)例代碼”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!