真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Kafka分組消費(fèi)的示例分析

這篇文章將為大家詳細(xì)講解有關(guān)Kafka分組消費(fèi)的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

十載的余江網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。成都營銷網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整余江建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“余江網(wǎng)站設(shè)計”,“余江網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實(shí)執(zhí)行。

1
Kafka消費(fèi)模式

從kafka消費(fèi)消息,kafka客戶端提供兩種模式: 分區(qū)消費(fèi),分組消費(fèi)。

分區(qū)消費(fèi)對應(yīng)的就是我們的DirectKafkaInputDStream

分組消費(fèi)對應(yīng)的就是我們的KafkaInputDStream

消費(fèi)者數(shù)目跟分區(qū)數(shù)目的關(guān)系:

1),一個消費(fèi)者可以消費(fèi)一個到全部分區(qū)數(shù)據(jù)

2),分組消費(fèi),同一個分組內(nèi)所有消費(fèi)者消費(fèi)一份完整的數(shù)據(jù),此時一個分區(qū)數(shù)據(jù)只能被一個消費(fèi)者消費(fèi),而一個消費(fèi)者可以消費(fèi)多個分區(qū)數(shù)據(jù)

3),同一個消費(fèi)組內(nèi),消費(fèi)者數(shù)目大于分區(qū)數(shù)目后,消費(fèi)者會有空余=分區(qū)數(shù)-消費(fèi)者數(shù)

Kafka分組消費(fèi)的示例分析

2
分組消費(fèi)再平衡策略

當(dāng)一個group中,有consumer加入或者離開時,會觸發(fā)partitions均衡partition.assignment.strategy,決定了partition分配給消費(fèi)者的分配策略,有兩種分配策略:

1,org.apache.kafka.clients.consumer.RangeAssignor

默認(rèn)采用的是這種再平衡方式,這種方式分配只是針對消費(fèi)者訂閱的topic的單個topic所有分區(qū)再分配,Consumer Rebalance的算法如下:

1),將目標(biāo)Topic下的所有Partirtion排序,存于TP

2),對某Consumer Group下所有Consumer按照名字根據(jù)字典排序,存于CG,第i個Consumer記為Ci

3),N=size(TP)/size(CG)

4),R=size(TP)%size(CG)

5),Ci獲取的分區(qū)起始位置=N*i+min(i,R)

6),Ci獲取的分區(qū)總數(shù)=N+(if (i+ 1 > R) 0 else 1)

2,org.apache.kafka.clients.consumer.RoundRobinAssignor

這種分配策略是針對消費(fèi)者消費(fèi)的所有topic的所有分區(qū)進(jìn)行分配。當(dāng)有新的消費(fèi)者加入或者有消費(fèi)者退出,就會觸發(fā)rebalance。這種方式有兩點(diǎn)要求

A),在實(shí)例化每個消費(fèi)者時給每個topic指定相同的流數(shù)

B),每個消費(fèi)者實(shí)例訂閱的topic必須相同

Map topicCountMap = new HashMap();topicCountMap.put(topic, new Integer(1));Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

其中,topic對應(yīng)的value就是流數(shù)目。對應(yīng)的kafka源碼是在

在kafka.consumer.ZookeeperConsumerConnector的consume方法里,根據(jù)這個參數(shù)構(gòu)建了相同數(shù)目的KafkaStream。

這種策略的具體分配步驟:

1),對所有topic的所有分區(qū)按照topic+partition轉(zhuǎn)string之后的hash進(jìn)行排序

2),對消費(fèi)者按字典進(jìn)行排序

3),然后輪訓(xùn)的方式將分區(qū)分配給消費(fèi)者

3,舉例對比

舉個例子,比如有兩個消費(fèi)者(c0,c1),兩個topic(t0,t1),每個topic有三個分區(qū)p(0-2),

那么采用RangeAssignor,結(jié)果為:

* C0: [t0p0, t0p1, t1p0, t1p1]

* C1: [t0p2, t1p2]

采用RoundRobinAssignor,結(jié)果為:

* C0: [t0p0, t0p2, t1p1]

* C1: [t0p1, t1p0, t1p2]

4
分組成員的存活檢測

分組消費(fèi)有一個比較好的功能就是自動檢測失敗的消費(fèi)者并將其踢出分組,然后重新進(jìn)行分區(qū)分配。那么kafka是如何檢測失敗的消費(fèi)者的呢。我們就拿0.10.x為例進(jìn)行講解說明。

消費(fèi)著訂閱了一組的topic后,會在調(diào)用poll(long)函數(shù)的時候加入分組,分組內(nèi)新增消費(fèi)者就會進(jìn)行再平衡。Poll 函數(shù)的設(shè)計目標(biāo)就是來保證消費(fèi)者存活的。只要持續(xù)不斷的調(diào)用poll函數(shù),消費(fèi)者就會留在分組里,連續(xù)的從分配給他的分區(qū)里消費(fèi)消息。消費(fèi)者也會使用一個后臺線程發(fā)送周期性的心跳給broker。如果消費(fèi)者掛掉或者無法在session.timeout.ms時間范圍內(nèi)發(fā)送心跳,消費(fèi)者會被視為死亡,它的分區(qū)就會被重新分配。session.timeout.ms默認(rèn)是10000ms。該值要在group.max.session.timeout.ms=300000ms和group.min.session.timeout.ms=6000ms之間。

由于心跳是后臺線程周期性發(fā)送的,那么會存在消費(fèi)者心跳正常發(fā)送,但是不消費(fèi)消息的情況。為了避免這種消費(fèi)者無限期的占用分配給他的分區(qū)這種情況,kafka提供了一種存活檢測機(jī)制,使用max.poll.interval.ms配置。根本上來說,兩次調(diào)用poll函數(shù)的間隔大于該值,消費(fèi)者就會離開分組,然后它的分區(qū)會被其它消費(fèi)著消費(fèi)。當(dāng)發(fā)生這種情況時,你會收到一個offset提交失敗的異常。這種機(jī)制確保了只有活躍的消費(fèi)者才能提交offset。

消費(fèi)者有兩個配置來控制poll函數(shù)的行為:

  1. max.poll.interval.ms:增加兩次調(diào)用poll的間隔,實(shí)際上就是增加消費(fèi)者處理上次poll所拉取消息的時間。當(dāng)然,弊端是增加該值會增加消費(fèi)者組再平衡的時間,因?yàn)閮H僅在調(diào)用poll的過程中消費(fèi)者才能參與再平衡。要注意一點(diǎn),request.timeout.ms=305000,默認(rèn)值要修改比max.poll.interval.ms大,也即是大于5min。該值是當(dāng)消費(fèi)者進(jìn)行再平衡時,JoinGroup請求在server端的阻塞時間。

  2. max.poll.records:限制每次調(diào)用poll返回消息的最大數(shù)。有了該參數(shù)我們就可以預(yù)估兩次

有些情況下,數(shù)據(jù)處理時間不可預(yù)期,上面的兩個參數(shù)并不難滿足需求。這種情況下,推薦將消息處理放到其它后臺線程中執(zhí)行,這樣消費(fèi)者就可以持續(xù)的調(diào)用poll函數(shù)了。但是這中情況下,要處理好offset提交的問題。典型做法就是禁止掉自動提交offset,改為手動再消息處理結(jié)束后提交offset。這種情況下,需要對消費(fèi)的分區(qū)調(diào)用pause函數(shù),這樣在調(diào)用poll函數(shù)的時候就不會接受新的數(shù)據(jù),然后處理完之后調(diào)用resume(Collection)即可恢復(fù)消費(fèi)。

關(guān)于Kafka分組消費(fèi)的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


名稱欄目:Kafka分組消費(fèi)的示例分析
網(wǎng)站路徑:http://weahome.cn/article/ghspdh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部