這篇文章主要介紹了kafka的編程模型有哪些,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)公司專注于棗強(qiáng)企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,商城網(wǎng)站建設(shè)。棗強(qiáng)網(wǎng)站建設(shè)公司,為棗強(qiáng)等地區(qū)提供建站服務(wù)。全流程按需求定制設(shè)計(jì),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)
1.kafka消費(fèi)者編程模型
分區(qū)消費(fèi)模型
組(group)消費(fèi)模型
1.1.1.分區(qū)消費(fèi)架構(gòu)圖,每個(gè)分區(qū)對應(yīng)一個(gè)消費(fèi)者。
1.1.2.分區(qū)消費(fèi)模型偽代碼描述
指定偏移量,用于從上次消費(fèi)的地方開始消費(fèi).
提交offset ,java客戶端會自動(dòng)提交的集群,所以這一步可選。
1.2.1.組消費(fèi)模型架構(gòu)圖
每個(gè)組都消費(fèi)該topic的全量數(shù)據(jù),一條消息會發(fā)給groupA和groupB.
1.2.2.組消費(fèi)模型偽代碼:
流數(shù)N:表示一個(gè)consumer組里面有幾個(gè)consumer 實(shí)例,上例中組A創(chuàng)建2個(gè)流,組B創(chuàng)建4個(gè)流。
1.2.3.consumer分配算法
當(dāng)kafka的分區(qū)個(gè)數(shù)大于組A里consumer實(shí)例個(gè)數(shù)時(shí),怎么去分配,以下為分配步驟:
Partition消費(fèi)模型更加靈活但是:
(1)需要自己處理各種異常情況;
(2)需要自己管理offset(以實(shí)現(xiàn)消息傳遞的其他語義);
Group消費(fèi)模型更加簡單,但是不靈活:
(1)不需要自己處理異常情況,不需要自己管理offset;
(2)只能實(shí)現(xiàn)kafka默認(rèn)的最少一次消息傳遞語義;
知識補(bǔ)充:消息傳遞的3中語義:
至少一次,(消息不會丟,消息者至少得到一次,但有可能會重復(fù),生產(chǎn)者向消費(fèi)者發(fā)送之后,會等待消費(fèi)者確認(rèn),沒收到確認(rèn)會再發(fā)) (kafka 默認(rèn)實(shí)現(xiàn)的語義)。
至多一次,(消息會丟)
有且只有一次。
fetchSize: 從服務(wù)器獲取單包大小;
bufferSize: kafka客戶端緩沖區(qū)大小;
group.id: 分組消費(fèi)時(shí)分組名 (指定的每個(gè)組將獲得全量的數(shù)據(jù))
同步生產(chǎn)模型
異步生產(chǎn)模型
至少成功一次 , 發(fā)送給kafka消費(fèi)者
打包發(fā)送給kafka broker。
main()
創(chuàng)建到kafka broker的連接:KafkaClient(host,port)
選擇或者自定義生產(chǎn)者負(fù)載均衡算法 partitioner (算法有:hash,輪詢,隨機(jī))
設(shè)置生產(chǎn)者參數(shù) (緩存隊(duì)列長度,發(fā)送時(shí)間,同步/異步參數(shù)設(shè)置)
根據(jù)負(fù)載均衡算法和設(shè)置的生產(chǎn)者參數(shù)構(gòu)造Producer對象
while True
getMessage:從上游獲得一條消息
按照kafka要求的消息格式構(gòu)造kafka消息
根據(jù)分區(qū)算法得到分區(qū)
發(fā)送消息
處理異常
同步生產(chǎn)模型:
(1)低消息丟失率;
(2)高消息重復(fù)率(由于網(wǎng)絡(luò)原因,回復(fù)確認(rèn)未收到);
(3)高延遲 (每發(fā)一條消息需要確認(rèn))
(使用在不丟消息場景)
異步生產(chǎn)模型:
(1)低延遲;
(2)高發(fā)送性能;(每秒一個(gè)分區(qū)發(fā)50萬條)
(3)高消息丟失率(無確認(rèn)機(jī)制,發(fā)送端隊(duì)列滿了,消息會丟掉;整個(gè)隊(duì)列發(fā)送給)
(使用在允許丟消息場景,偶爾丟一條)
//同步配置參數(shù):
默認(rèn)的序列化方式:字節(jié)序列化。
設(shè)定分區(qū)算法:默認(rèn)是對key進(jìn)行hash分區(qū)算法,可以自定義分區(qū)算法。
確認(rèn)機(jī)制 request.require.acks: 合理設(shè)置為1; 0: 絕不等確認(rèn) 1: leader的一個(gè)副本收到這條消息,并發(fā)回確認(rèn) -1: leader的所有副本都收到這條消息,并發(fā)回確認(rèn)
消息是以key-value的形式發(fā)送的,key必須要設(shè)置。
message.send.max.retries: 發(fā)送失敗重試次數(shù);
retry.backoff.ms :未接到確認(rèn),認(rèn)為發(fā)送失敗的時(shí)間;
producer.type: 同步發(fā)送或者異步發(fā)送;
batch.num.messages: 異步發(fā)送時(shí),累計(jì)最大消息數(shù);
queue.buffering.max.ms:異步發(fā)送時(shí),累計(jì)最大時(shí)間;
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“kafka的編程模型有哪些”這篇文章對大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!