在前面的文章《Kafka的Lag計算誤區(qū)及正確實現(xiàn)》中介紹了如何計算消費者的消費滯后量(Lag),并且講解了如何調(diào)用Kafka的kafka.admin.ConsumerGroupCommand文件中的KafkaConsumerGroupService來發(fā)送OffsetRequest和OffsetFetchRequest兩個請求,進而通過兩個請求結(jié)果之間的差值來獲得結(jié)果。不過如果你不想修改kafka-core的代碼并重新編譯的話,這種實現(xiàn)方式無法成功,所以本文的主要目的就是通過調(diào)用更底層的API來實現(xiàn)不修改kafka-core的代碼來實現(xiàn)KafkaConsumerGroupService的功能,即通過Java調(diào)用Scala的代碼來實現(xiàn)獲取Kafka的消費者詳情的功能。
成都創(chuàng)新互聯(lián)公司主營洛扎網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都APP應(yīng)用開發(fā),洛扎h5小程序開發(fā)搭建,洛扎網(wǎng)站營銷推廣歡迎洛扎等地區(qū)企業(yè)咨詢實現(xiàn)如同 bin/kafka-consumer-group.sh –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID的效果:
[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
KafkaConsumerGroupService的核心方法是CollectGroupAssignment,其方法參數(shù)為一個consumer group的groupId,方法輸出為上面示例中的列表信息。CollectGroupAssignment方法主要有以下幾個步驟:
對應(yīng)Java版的KafkaConsumerGroupService改造代碼可以參見代碼,目錄結(jié)構(gòu)如下圖所示:
其中model中的ConsumerGroupSummary、ConsumerSummary和PartitionAssignmentState是簡單的JavaBean, PartitionAssignmentState是用來保存每個TopicPartition的消費者信息的,具體內(nèi)容參考如下。KafkaConsumerGroupCustomService就是本文所要陳述的Java改造辦的KafkaConsumerGroupSerivice,ConsumerGroupUtils用來存放一些公用的代碼。
@Data
@Builder
public class PartitionAssignmentState {
private String group; // groupId
private Node coordinator; // consumer coodinator節(jié)點信息
private String topic;
private int partition;
private long offset;
private long lag;
private String consumerId;
private String host;
private String clientId;
private long logEndOffset;
}
初始化KafkaConsumerGroupCustomService需要Kafka的服務(wù)端地址,然后初始化AdminClient和KafkaConsumer,AdminClient中包含了眾多管理類方法,主要是通過發(fā)送各種自定義協(xié)議請求來完成,上面步驟中所說的describeConsumerGroup和listGroupOffsets方法也是通過AdminClient來實現(xiàn)的;KafkaConsumer主要是用來獲取TopicPartition對應(yīng)的HW(消費者可見的LogEndOffsets)的。
KafkaConsumerGroupCustomService中與scala版對應(yīng)的collectGroupAssignment方法如下(詳細步驟參考代碼注釋):
public List collectGroupAssignment(
AdminClient adminClient, KafkaConsumer consumer,
String group) {
//1. 獲取consumer group的基本信息,包括CONSUMER-ID、HOST、
// CLIENT-ID以及TopicPartition信息
AdminClient.ConsumerGroupSummary consumerGroupSummary
= adminClient.describeConsumerGroup(group, 0);
List assignedTopicPartitions = new ArrayList<>();
List rowsWithConsumer = new ArrayList<>();
scala.collection.immutable.List consumers
= consumerGroupSummary.consumers().get();
if (consumers != null) {
//2. 獲取各個分區(qū)(Partition)的對應(yīng)的消費位移CURRENT-OFFSET
scala.collection.immutable.Map offsets
= adminClient.listGroupOffsets(group);
if (offsets.nonEmpty()) {
String state = consumerGroupSummary.state();
// 3. 還有一個狀態(tài)是Dead表示"group"對應(yīng)的consumer group不存在
if (state.equals("Stable") || state.equals("Empty")
|| state.equals("PreparingRebalance")
|| state.equals("AwaitingSync")) {
List consumerList = changeToJavaList(consumers);
// 4. 獲取當前有消費者的消費信息,即包含CONSUMER-ID、HOST、CLIENT-ID
rowsWithConsumer = getRowsWithConsumer(consumerGroupSummary, offsets,
consumer, consumerList, assignedTopicPartitions, group);
}
}
//5. 獲取當前沒有消費者的消費信息
List rowsWithoutConsumer =
getRowsWithoutConsumer(consumerGroupSummary,
offsets, consumer, assignedTopicPartitions, group);
//6. 合并結(jié)果
rowsWithConsumer.addAll(rowsWithoutConsumer);
}
return rowsWithConsumer;
}
KafkaConsumerGroupCustomService類中包含有g(shù)etRowsWithConsumer()、getRowsWithoutConsumer()、changeToJavaList等私有方法也都是在Scala語言與Java語言之間進行切換,這樣可以不需要修改kafka-core的原生代碼而通過外部的封裝調(diào)用既可以實現(xiàn)獲取Kafka消費者詳情的功能。光看代碼比較抽象,建議對此感興趣的同學可以親自對比一下kafka-core包中kafka.admin.ConsumerGroupCommand的KafkaConsumerGroupSerivice與筆者自定義的KafkaConsumerGroupCustomService的實現(xiàn)來了解下Scala語言到Java語言的轉(zhuǎn)換。
如果需要打印詳情可以調(diào)用KafkaConsumerGroupCustomService同目錄的ConsumerGroupUtils類中的printPasList(List list)方法。注意要運行這些代碼需要JDK8的環(huán)境,筆者為了讓代碼顯得“騷氣”一點就用來一點Java8的語法,如果需要Java7的代碼實現(xiàn)可以關(guān)注私聊。
或許有些同學對于Scala和Java交叉的代碼并不感冒,想要尋求一種存Java式的實現(xiàn)方式,那么在這里怎么實現(xiàn)呢?答案是通過KafkaAdminClient,它是AdminClient的Java版實現(xiàn),從Kafka0.11.0.0版本開始引入的,不過KafkaAdminClient本身并沒有提供describeConsumerGroup、listGroupOffsets之類的方法給我們直接使用,擴展一下也很方便,由于篇幅限制,這部分的內(nèi)容將在下一篇文章中進行介紹,如果想要先一睹為快,可以參考下代碼實現(xiàn),詳細的邏輯解析敬請期待….
本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹記這一點。同時我經(jīng)過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限于:分布式架構(gòu)、高可擴展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構(gòu)師的朋友有一定的參考和幫助
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。