這篇文章將為大家詳細(xì)講解有關(guān)Kafka分組消費(fèi)的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
十載的余江網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。成都營銷網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整余江建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“余江網(wǎng)站設(shè)計”,“余江網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實(shí)執(zhí)行。
從kafka消費(fèi)消息,kafka客戶端提供兩種模式: 分區(qū)消費(fèi),分組消費(fèi)。
分區(qū)消費(fèi)對應(yīng)的就是我們的DirectKafkaInputDStream
分組消費(fèi)對應(yīng)的就是我們的KafkaInputDStream
消費(fèi)者數(shù)目跟分區(qū)數(shù)目的關(guān)系:
1),一個消費(fèi)者可以消費(fèi)一個到全部分區(qū)數(shù)據(jù)
2),分組消費(fèi),同一個分組內(nèi)所有消費(fèi)者消費(fèi)一份完整的數(shù)據(jù),此時一個分區(qū)數(shù)據(jù)只能被一個消費(fèi)者消費(fèi),而一個消費(fèi)者可以消費(fèi)多個分區(qū)數(shù)據(jù)
3),同一個消費(fèi)組內(nèi),消費(fèi)者數(shù)目大于分區(qū)數(shù)目后,消費(fèi)者會有空余=分區(qū)數(shù)-消費(fèi)者數(shù)
當(dāng)一個group中,有consumer加入或者離開時,會觸發(fā)partitions均衡partition.assignment.strategy,決定了partition分配給消費(fèi)者的分配策略,有兩種分配策略:
1,org.apache.kafka.clients.consumer.RangeAssignor
默認(rèn)采用的是這種再平衡方式,這種方式分配只是針對消費(fèi)者訂閱的topic的單個topic所有分區(qū)再分配,Consumer Rebalance的算法如下:
1),將目標(biāo)Topic下的所有Partirtion排序,存于TP
2),對某Consumer Group下所有Consumer按照名字根據(jù)字典排序,存于CG,第i個Consumer記為Ci
3),N=size(TP)/size(CG)
4),R=size(TP)%size(CG)
5),Ci獲取的分區(qū)起始位置=N*i+min(i,R)
6),Ci獲取的分區(qū)總數(shù)=N+(if (i+ 1 > R) 0 else 1)
2,org.apache.kafka.clients.consumer.RoundRobinAssignor
這種分配策略是針對消費(fèi)者消費(fèi)的所有topic的所有分區(qū)進(jìn)行分配。當(dāng)有新的消費(fèi)者加入或者有消費(fèi)者退出,就會觸發(fā)rebalance。這種方式有兩點(diǎn)要求
A),在實(shí)例化每個消費(fèi)者時給每個topic指定相同的流數(shù)
B),每個消費(fèi)者實(shí)例訂閱的topic必須相同
Map
其中,topic對應(yīng)的value就是流數(shù)目。對應(yīng)的kafka源碼是在
在kafka.consumer.ZookeeperConsumerConnector的consume方法里,根據(jù)這個參數(shù)構(gòu)建了相同數(shù)目的KafkaStream。
這種策略的具體分配步驟:
1),對所有topic的所有分區(qū)按照topic+partition轉(zhuǎn)string之后的hash進(jìn)行排序
2),對消費(fèi)者按字典進(jìn)行排序
3),然后輪訓(xùn)的方式將分區(qū)分配給消費(fèi)者
3,舉例對比
舉個例子,比如有兩個消費(fèi)者(c0,c1),兩個topic(t0,t1),每個topic有三個分區(qū)p(0-2),
那么采用RangeAssignor,結(jié)果為:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
采用RoundRobinAssignor,結(jié)果為:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
分組消費(fèi)有一個比較好的功能就是自動檢測失敗的消費(fèi)者并將其踢出分組,然后重新進(jìn)行分區(qū)分配。那么kafka是如何檢測失敗的消費(fèi)者的呢。我們就拿0.10.x為例進(jìn)行講解說明。
消費(fèi)著訂閱了一組的topic后,會在調(diào)用poll(long)函數(shù)的時候加入分組,分組內(nèi)新增消費(fèi)者就會進(jìn)行再平衡。Poll 函數(shù)的設(shè)計目標(biāo)就是來保證消費(fèi)者存活的。只要持續(xù)不斷的調(diào)用poll函數(shù),消費(fèi)者就會留在分組里,連續(xù)的從分配給他的分區(qū)里消費(fèi)消息。消費(fèi)者也會使用一個后臺線程發(fā)送周期性的心跳給broker。如果消費(fèi)者掛掉或者無法在session.timeout.ms時間范圍內(nèi)發(fā)送心跳,消費(fèi)者會被視為死亡,它的分區(qū)就會被重新分配。session.timeout.ms默認(rèn)是10000ms。該值要在group.max.session.timeout.ms=300000ms和group.min.session.timeout.ms=6000ms之間。
由于心跳是后臺線程周期性發(fā)送的,那么會存在消費(fèi)者心跳正常發(fā)送,但是不消費(fèi)消息的情況。為了避免這種消費(fèi)者無限期的占用分配給他的分區(qū)這種情況,kafka提供了一種存活檢測機(jī)制,使用max.poll.interval.ms配置。根本上來說,兩次調(diào)用poll函數(shù)的間隔大于該值,消費(fèi)者就會離開分組,然后它的分區(qū)會被其它消費(fèi)著消費(fèi)。當(dāng)發(fā)生這種情況時,你會收到一個offset提交失敗的異常。這種機(jī)制確保了只有活躍的消費(fèi)者才能提交offset。
消費(fèi)者有兩個配置來控制poll函數(shù)的行為:
max.poll.interval.ms:增加兩次調(diào)用poll的間隔,實(shí)際上就是增加消費(fèi)者處理上次poll所拉取消息的時間。當(dāng)然,弊端是增加該值會增加消費(fèi)者組再平衡的時間,因?yàn)閮H僅在調(diào)用poll的過程中消費(fèi)者才能參與再平衡。要注意一點(diǎn),request.timeout.ms=305000,默認(rèn)值要修改比max.poll.interval.ms大,也即是大于5min。該值是當(dāng)消費(fèi)者進(jìn)行再平衡時,JoinGroup請求在server端的阻塞時間。
max.poll.records:限制每次調(diào)用poll返回消息的最大數(shù)。有了該參數(shù)我們就可以預(yù)估兩次
有些情況下,數(shù)據(jù)處理時間不可預(yù)期,上面的兩個參數(shù)并不難滿足需求。這種情況下,推薦將消息處理放到其它后臺線程中執(zhí)行,這樣消費(fèi)者就可以持續(xù)的調(diào)用poll函數(shù)了。但是這中情況下,要處理好offset提交的問題。典型做法就是禁止掉自動提交offset,改為手動再消息處理結(jié)束后提交offset。這種情況下,需要對消費(fèi)的分區(qū)調(diào)用pause函數(shù),這樣在調(diào)用poll函數(shù)的時候就不會接受新的數(shù)據(jù),然后處理完之后調(diào)用resume(Collection)即可恢復(fù)消費(fèi)。
關(guān)于Kafka分組消費(fèi)的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。