ConsumerConfig.scala
儲存Consumer的配置
創(chuàng)新互聯(lián)從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目網(wǎng)站設(shè)計、成都網(wǎng)站制作網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元勐海做網(wǎng)站,已為上家服務(wù),為勐海各地企業(yè)和個人服務(wù),聯(lián)系電話:028-86922220
按照我的理解,0.10的Kafka沒有專門的SimpleConsumer,仍然是沿用0.8版本的。
消費的規(guī)則如下:
一個partition只能被同一個ConsumersGroup的一個線程所消費.
線程數(shù)小于partition數(shù),某些線程會消費多個partition.
線程數(shù)等于partition數(shù),一個線程正好消費一個線程.
當(dāng)添加消費者線程時,會觸發(fā)rebalance,partition的分配發(fā)送變化.
同一個partition的offset保證消費有序,不同的partition消費不保證順序.
Consumers編程的用法:
private final KafkaConsumerconsumer; // 與Kafka進(jìn)行通信的consumer... consumer = new KafkaConsumer (props); consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords records = consumer.poll(512); ...
consumer,是一個純粹的單線程程序,后面所講的所有機(jī)制(包括coordinator,rebalance, heartbeat等),都是在這個單線程的poll函數(shù)里面完成的。也因此,在consumer的代碼內(nèi)部,沒有鎖的出現(xiàn)。
從KafkaConsumer的構(gòu)造函數(shù)可以看出,KafkaConsumer有以下幾個核心部件:
Metadata: 存儲Topic/Partion與broker的映射關(guān)系
NetworkClient:網(wǎng)絡(luò)層 A network client for asynchronous request/response network i/o.
ConsumerNetworkClient: Higher level consumer access to the network layer //對NetworkClient的封裝,非線程安全
ConsumerCoordinator:只是client端的類,只是和服務(wù)端的GroupCoordinator通信的介質(zhì)。(broker端的Coordinator 負(fù)責(zé)reblance、Offset提交、心跳)
SubscriptionState: consumer的Topic、Partition的offset狀態(tài)維護(hù)
Fetcher: manage the fetching process with the brokers. //獲取消息
后面會分組件講解Consumers的工作流程
在consumer啟動時或者coordinator節(jié)點故障轉(zhuǎn)移時,consumer發(fā)送ConsumerMetadataRequest給任意一個brokers。在ConsumerMetadataResponse中,它接收對應(yīng)的Consumer Group所屬的Coordinator的位置信息。
Consumer連接Coordinator節(jié)點,并發(fā)送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協(xié)調(diào)節(jié)點已經(jīng)在初始化平衡。消費者就會停止抓取數(shù)據(jù),提交offsets,發(fā)送JoinGroupRequest給協(xié)調(diào)節(jié)點。在JoinGroupResponse,它接收消費者應(yīng)該擁有的topic-partitions列表以及當(dāng)前Consumer Group的新的generation編號。這個時候Consumer Group管理已經(jīng)完成,Consumer就可以開始fetch數(shù)據(jù),并為它擁有的partitions提交offsets。
如果HeartbeatResponse沒有錯誤返回,Consumer會從它上次擁有的partitions列表繼續(xù)抓取數(shù)據(jù),這個過程是不會被中斷的。
見Producer里面的分析。
補充一下,KafkaConsumer、KafkaProducer都是在構(gòu)造函數(shù)中獲取metadata信息,通過調(diào)用metadata.update
方法來獲取信息。
在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進(jìn)行協(xié)同,新航道雅思培訓(xùn)這與后面要講的rebalance有關(guān)。(ConsumerConnector、KafkaStream、ConsumerIterator
) -- package kafka.consumer
0.9之后新的consumer不依賴與Zookeeper,一個consumerGroup內(nèi)的consumer由Coordinator管理.(KafkaConsumer
) -- package org.apache.kafka.clients.consumer
為什么?后面講
提問:為什么在一個group內(nèi)部,1個parition只能被1個consumer擁有?
給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。
那么,如果按RangeAssignor
策略,分配結(jié)果是:
c0: p0, c1: p1, c2: p2, p3
如果按RoundRobinAssignor
策略:
c0: p1, p3, c1: p1, c2: p2
partition.assignment.strategy=RangeAssignor,默認(rèn)值
(到底是哪種分配狀態(tài)呢)
那這整個分配過程是如何進(jìn)行的呢?見下圖所示:
1. 步驟1:對于每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。(1個consumer group對應(yīng)一個coordinattor)
GroupCoordinatorRequest: GCR,由ConsumerNetworkClient發(fā)送請求去尋找coordinator。
2. 步驟2:找到coordinator之后,發(fā)送JoinGroup請求
consumer在這里會被劃分leader、follower(無責(zé)任的說:選擇第一個consumer)
leader作用:perform the leader synchronization and send back the assignment for the group(負(fù)責(zé)發(fā)送partition分配的結(jié)果)
follower作用:send follower's sync group with an empty assignment
3. 步驟3:JoinGroup返回之后,發(fā)送SyncGroup,得到自己所分配到的partition
SyncGroupRequest
consumer leader發(fā)送 SyncGroupRequest給Coordinator,Coordinator回給它null
follower發(fā)送 null的 SyncGroupRequest 給Coordinator,Coordinator回給它partition分配的結(jié)果。
注意,在上面3步中,有一個關(guān)鍵點:
partition的分配策略和分配結(jié)果其實是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發(fā)送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。
然后由這個leader進(jìn)行partition分配。
然后在第3步,leader通過SyncGroup消息,把分配結(jié)果發(fā)給coordinator,其他consumer也發(fā)送SyncGroup消息,獲得這個分配結(jié)果。
接下來就到Fetcher
拉取數(shù)據(jù)了
四個步驟
步驟0:獲取consumer的offset
步驟1:生成FetchRequest,并放入發(fā)送隊列
步驟2:網(wǎng)絡(luò)poll
步驟3:獲取結(jié)果
當(dāng)consumer初次啟動的時候,面臨的一個首要問題就是:從offset為多少的位置開始消費。
poll之前,給集群發(fā)送請求,讓集群告知客戶端,當(dāng)前該TopicPartition的offset是多少。通過SubscriptionState
來實現(xiàn), 通過ConsumerCoordinator
if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions());
核心是:向Coordinator發(fā)了一個OffsetFetchRequest,并且是同步調(diào)用,直到獲取到初始的offset,再開始接下來的poll.(也就是說Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)
consumer的每個TopicPartition都有了初始的offset,接下來就可以進(jìn)行不斷循環(huán)取消息了,這也就是Fetch的過程:
fetcher.initFetches(cluster)
核心就是生成FetchRequest: 假設(shè)一個consumer訂閱了3個topic: t0, t1, t2,為其分配的partition分別是: t0: p0; t1: p1, p2; t2: p2
即總共4個TopicPartition,即t0p0, t0p1, t1p1, t2p2。這4個TopicPartition可能分布在2臺機(jī)器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2
則會分別針對每臺機(jī)器生成一個FetchRequest,即Map
。所以會有一個方法把所有屬于同一個Node的TopicPartition放在一起,生成一個FetchRequest。
調(diào)用ConsumerNetworkClient.poll
發(fā)送網(wǎng)絡(luò)請求。向服務(wù)器發(fā) 送響應(yīng)請求和獲取服務(wù)器的響應(yīng)。(默認(rèn)值:executeDelayedTasks=true)
fetcher.fetchedRecords()
獲取Broker返回的Response,里面包含了List
是否自動消費確認(rèn):由參數(shù)auto.xxx.commit=true
控制
手動消費:用于自定義Consumers的消費控制
下面從自動消費確認(rèn)來分析,Offset自動確認(rèn)是由ConsumerCoordinator
的AutoCommitTask
來實現(xiàn)的。
其調(diào)用在ConsumerNetworkClient
的 DelayedTaskQueue delayedTasks
里面,然后被周期性的調(diào)用。 周期性的發(fā)送確認(rèn)消息,類似HeartBeat,其實現(xiàn)機(jī)制也就是前面所講的DelayedQueue + DelayedTask
.
poll
函數(shù)中的注釋:
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
可以這樣理解:第二次poll調(diào)用的時候,提交上一次poll的offset和心跳發(fā)送。
先提交offset,再去拉取record。那么這次Offset其實是上一次poll的Record的offset。
因此,當(dāng)你把按照下面的邏輯寫程序的時候,可能會導(dǎo)致Consumer與Coordinator的心跳超時。
while(true) { consumer.poll();do process message // 假如這個耗時過長,那么這個consumer就無法發(fā)送心跳給coordinator,導(dǎo)致它錯誤認(rèn)為這個consumer失去聯(lián)系了,引起不必要的rebalance。槽糕的情況下,會丟重復(fù)消費數(shù)據(jù)。}
因此,有必要把offset的提交單獨拿出來做一個線程。
到這里,就把整個Consumer的流程走完了。