這篇文章給大家分享的是有關(guān)Kafka配置參數(shù)Consumer的示例分析的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。
成都創(chuàng)新互聯(lián)公司是一家集成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、網(wǎng)站頁(yè)面設(shè)計(jì)、網(wǎng)站優(yōu)化SEO優(yōu)化為一體的專業(yè)網(wǎng)站設(shè)計(jì)公司,已為成都等多地近百家企業(yè)提供網(wǎng)站建設(shè)服務(wù)。追求良好的瀏覽體驗(yàn),以探求精品塑造與理念升華,設(shè)計(jì)最適合用戶的網(wǎng)站頁(yè)面。 合作只是第一步,服務(wù)才是根本,我們始終堅(jiān)持講誠(chéng)信,負(fù)責(zé)任的原則,為您進(jìn)行細(xì)心、貼心、認(rèn)真的服務(wù),與眾多客戶在蓬勃發(fā)展的市場(chǎng)環(huán)境中,互促共生。
#############################Consumer #############################
# Consumer端核心的配置是group.id、zookeeper.connect
# 決定該Consumer歸屬的唯一組ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.
group.id
# 消費(fèi)者的ID,若是沒有設(shè)置的話,會(huì)自增
consumer.id
# 一個(gè)用于跟蹤調(diào)查的ID ,最好同group.id相同
client.id =
# 對(duì)于zookeeper集群的指定,必須和broker使用同樣的zk配置
zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182
# zookeeper的心跳超時(shí)時(shí)間,查過這個(gè)時(shí)間就認(rèn)為是無效的消費(fèi)者
zookeeper.session.timeout.ms = 6000
# zookeeper的等待連接時(shí)間
zookeeper.connection.timeout.ms = 6000
# zookeeper的follower同leader的同步時(shí)間
zookeeper.sync.time.ms = 2000
# 當(dāng)zookeeper中沒有初始的offset時(shí),或者超出offset上限時(shí)的處理方式 。
# smallest :重置為最小值
# largest:重置為最大值
# anything else:拋出異常給consumer
auto.offset.reset = largest
# socket的超時(shí)時(shí)間,實(shí)際的超時(shí)時(shí)間為max.fetch.wait + socket.timeout.ms.
socket.timeout.ms= 30 * 1000
# socket的接收緩存空間大小
socket.receive.buffer.bytes=64 * 1024
#從每個(gè)分區(qū)fetch的消息大小限制
fetch.message.max.bytes = 1024 * 1024
# true時(shí),Consumer會(huì)在消費(fèi)消息后將offset同步到zookeeper,這樣當(dāng)Consumer失敗后,新的consumer就能從zookeeper獲取最新的offset
auto.commit.enable = true
# 自動(dòng)提交的時(shí)間間隔
auto.commit.interval.ms = 60 * 1000
# 用于消費(fèi)的最大數(shù)量的消息塊緩沖大小,每個(gè)塊可以等同于fetch.message.max.bytes中數(shù)值
queued.max.message.chunks = 10
# 當(dāng)有新的consumer加入到group時(shí),將嘗試reblance,將partitions的消費(fèi)端遷移到新的consumer中, 該設(shè)置是嘗試的次數(shù)
rebalance.max.retries = 4
# 每次reblance的時(shí)間間隔
rebalance.backoff.ms = 2000
# 每次重新選舉leader的時(shí)間
refresh.leader.backoff.ms
# server發(fā)送到消費(fèi)端的最小數(shù)據(jù),若是不滿足這個(gè)數(shù)值則會(huì)等待直到滿足指定大小。默認(rèn)為1表示立即接收。
fetch.min.bytes = 1
# 若是不滿足fetch.min.bytes時(shí),等待消費(fèi)端請(qǐng)求的最長(zhǎng)等待時(shí)間
fetch.wait.max.ms = 100
# 如果指定時(shí)間內(nèi)沒有新消息可用于消費(fèi),就拋出異常,默認(rèn)-1表示不受限
consumer.timeout.ms = -1
------------------------------------Kafka配置參數(shù)--Consumer詳解-----------------------------
group.id 默認(rèn)值:無
唯一的指明了consumer的group的名字,group名一樣的進(jìn)程屬于同一個(gè)consumer group。
zookeeper.connect 默認(rèn)值:無
指定了ZooKeeper的connect string,以hostname:port的形式,hostname和port就是ZooKeeper集群各個(gè)節(jié)點(diǎn)的hostname和port。 ZooKeeper集群中的某個(gè)節(jié)點(diǎn)可能會(huì)掛掉,所以可以指定多個(gè)節(jié)點(diǎn)的connect string。如下所式:
hostname1:port1,hostname2:port2,hostname3:port3.
ZooKeeper也可以允許你指定一個(gè)"chroot"的路徑,可以讓Kafka集群將需要存儲(chǔ)在ZooKeeper的數(shù)據(jù)存儲(chǔ)到指定的路徑下這可以讓多個(gè)Kafka集群或其他應(yīng)用程序公用同一個(gè)ZooKeeper集群。可以使用如下的connect string:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
consumer.id 默認(rèn)值:null
如果沒有設(shè)置的話則自動(dòng)生成。
socket.timeout.ms 默認(rèn)值:30 * 1000
socket請(qǐng)求的超時(shí)時(shí)間。實(shí)際的超時(shí)時(shí)間為max.fetch.wait + socket.timeout.ms。
socket.receive.buffer.bytes 默認(rèn)值:64 * 1024
socket的receiver buffer的字節(jié)大小。
fetch.message.max.bytes 默認(rèn)值:1024 * 1024
每一個(gè)獲取某個(gè)topic的某個(gè)partition的請(qǐng)求,得到最大的字節(jié)數(shù),每一個(gè)partition的要被讀取的數(shù)據(jù)會(huì)加載入內(nèi)存,所以這可以幫助控制consumer使用的內(nèi)存。這個(gè)值的設(shè)置不能小于在server端設(shè)置的最大消息的字節(jié)數(shù),否則producer可能會(huì)發(fā)送大于consumer可以獲取的字節(jié)數(shù)限制的消息。
auto.commit.enable 默認(rèn)值:true
如果設(shè)為true,consumer會(huì)定時(shí)向ZooKeeper發(fā)送已經(jīng)獲取到的消息的offset。當(dāng)consumer進(jìn)程掛掉時(shí),已經(jīng)提交的offset可以繼續(xù)使用,讓新的consumer繼續(xù)工作。
auto.commit.interval.ms 默認(rèn)值:60 * 1000
consumer向ZooKeeper發(fā)送offset的時(shí)間間隔。
queued.max.message.chunks 默認(rèn)值:10
緩存用來消費(fèi)的消息的chunk的最大數(shù)量,每一個(gè)chunk最大可以達(dá)到fetch.message.max.bytes。
rebalance.max.retries 默認(rèn)值:4
當(dāng)一個(gè)新的consumer加入一個(gè)consumer group時(shí),會(huì)有一個(gè)rebalance的操作,導(dǎo)致每一個(gè)consumer和partition的關(guān)系重新分配。如果這個(gè)重分配失敗的話,會(huì)進(jìn)行重試,此配置就代表最大的重試次數(shù)。
fetch.min.bytes 默認(rèn)值:1
一個(gè)fetch請(qǐng)求最少要返回多少字節(jié)的數(shù)據(jù),如果數(shù)據(jù)量比這個(gè)配置少,則會(huì)等待,知道有足夠的數(shù)據(jù)為止。
fetch.wait.max.ms 默認(rèn)值:100
在server回應(yīng)fetch請(qǐng)求前,如果消息不足,就是說小于fetch.min.bytes時(shí),server最多阻塞的時(shí)間。如果超時(shí),消息將立即發(fā)送給consumer.。
rebalance.backoff.ms 默認(rèn)值:2000
在rebalance重試時(shí)的backoff時(shí)間。
refresh.leader.backoff.ms 默認(rèn)值:200
在consumer發(fā)現(xiàn)失去某個(gè)partition的leader后,在leader選出來前的等待的backoff時(shí)間。
auto.offset.reset 默認(rèn)值:largest
在Consumer在ZooKeeper中發(fā)現(xiàn)沒有初始的offset時(shí)或者發(fā)現(xiàn)offset不在范圍呢,該怎么做:
* smallest : 自動(dòng)把offset設(shè)為最小的offset。
* largest : 自動(dòng)把offset設(shè)為最大的offset。
* anything else: 拋出異常。
consumer.timeout.ms 默認(rèn)值:-1
如果在指定的時(shí)間間隔后,沒有發(fā)現(xiàn)可用的消息可消費(fèi),則拋出一個(gè)timeout異常。
client.id 默認(rèn)值: group id value
每一個(gè)請(qǐng)求中用戶自定義的client id,可幫助追蹤調(diào)用情況。
zookeeper.session.timeout.ms 默認(rèn)值:6000
ZooKeeper的session的超時(shí)時(shí)間,如果在這段時(shí)間內(nèi)沒有收到ZK的心跳,則會(huì)被認(rèn)為該Kafka server掛掉了。如果把這個(gè)值設(shè)置得過低可能被誤認(rèn)為掛掉,如果設(shè)置得過高,如果真的掛了,則需要很長(zhǎng)時(shí)間才能被server得知。
zookeeper.connection.timeout.ms 默認(rèn)值:6000
client連接到ZK server的超時(shí)時(shí)間。
zookeeper.sync.time.ms 默認(rèn)值:2000
一個(gè)ZK follower能落后leader多久。
感謝各位的閱讀!關(guān)于“Kafka配置參數(shù)Consumer的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!