Kafka 本質(zhì)上是?個(gè)消息隊(duì)列。與zeromq不同的是,Kafka是一個(gè)獨(dú)立的框架而不是一個(gè)庫。這里主要介紹其原理,至于具體的安裝等操作不做介紹,只是提示一下,第一次運(yùn)行時(shí),先設(shè)置前臺(tái)運(yùn)行,看會(huì)不會(huì)報(bào)錯(cuò)。
成都創(chuàng)新互聯(lián)專注于路橋企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,商城建設(shè)。路橋網(wǎng)站建設(shè)公司,為路橋等地區(qū)提供建站服務(wù)。全流程定制網(wǎng)站制作,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)架構(gòu)注意下圖沒有畫上zookeeper,請自行腦補(bǔ)。kafka需要連接到zookeeper,來完成注冊發(fā)現(xiàn)等集群操作。broker都是由zookeeper管理。
先給出 Kafka ?些重要概念,讓?家對 Kafka 有個(gè)整體的認(rèn)識(shí)和感知,后?還會(huì)詳細(xì)的解析每?個(gè)概念的作?以及更深?的原理:
kafka 存儲(chǔ)的消息來?任意多被稱為 Producer ?產(chǎn)者的進(jìn)程。數(shù)據(jù)從?可以被發(fā)布到不同的Topic 主題下的不同 Partition 分區(qū)。在?個(gè)分區(qū)內(nèi),這些消息被索引并連同時(shí)間戳存儲(chǔ)在?起。其它被稱為 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱消息。
Kafka 運(yùn)?在?個(gè)由?臺(tái)或多臺(tái)服務(wù)器組成的集群上,并且分區(qū)可以跨集群結(jié)點(diǎn)分布。
Kafka集群將 Record 流存儲(chǔ)在稱為 Topic 的類中,每個(gè)記錄由?個(gè)鍵、?個(gè)值和?個(gè)時(shí)間戳組成。
Kafka 中消息是以 Topic 進(jìn)?分類的,?產(chǎn)者?產(chǎn)消息,消費(fèi)者消費(fèi)消息,?向的都是同?個(gè)Topic。Topic 是邏輯上的概念,? Partition 是物理上的概念,每個(gè) Partition 對應(yīng)于?個(gè) log ?件,該log ?件中存儲(chǔ)的就是 Producer ?產(chǎn)的數(shù)據(jù)。Producer ?產(chǎn)的數(shù)據(jù)會(huì)不斷追加到該 log ?件末端,且每條數(shù)據(jù)都有??的 Offset。消費(fèi)者組中的每個(gè)消費(fèi)者,都會(huì)實(shí)時(shí)記錄??消費(fèi)到了哪個(gè) Offset,以便出錯(cuò)恢復(fù)時(shí),從上次的位置繼續(xù)消費(fèi)。
由于?產(chǎn)者?產(chǎn)的消息會(huì)不斷追加到 log ?件末尾,為防? log ?件過?導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采取了分?和索引機(jī)制。它將每個(gè) Partition 分為多個(gè) Segment,每個(gè) Segment 對應(yīng)兩個(gè)?件:“.index” 索引?件和“.log” 數(shù)據(jù)?件。這種索引思想值得我們學(xué)習(xí)應(yīng)用到平時(shí)的開發(fā)中。
這些?件位于同??件下,該?件夾的命名規(guī)則為:topic 名-分區(qū)號(hào)。例如,test這個(gè) topic 有三個(gè)分區(qū),則其對應(yīng)的?件夾為 test-0,test-1,test-2。
$ ls /tmp/kafka-logs/test-1
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
leader-epoch-checkpoint
index 和 log ?件以當(dāng)前 Segment 的第?條消息的 Offset 命名。下圖為 index ?件和 log ?件的結(jié)構(gòu)示意圖
“.index” ?件存儲(chǔ)?量的索引信息,“.log” ?件存儲(chǔ)?量的數(shù)據(jù),索引?件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)?件中 Message 的物理偏移量。
使用shell命令查看索引
./kafka-dump-log.sh --files /tmp/kafka-logs/test-1/00000000000000000000.index
分區(qū)機(jī)制分區(qū)原因:
分區(qū)原則:我們需要將 Producer 發(fā)送的數(shù)據(jù)封裝成?個(gè) ProducerRecord 對象。該對象需要指定?些參數(shù):
指明 Partition 的情況下,直接將給定的 Value 作為 Partition 的值;沒有指明 Partition 但有 Key 的情況下,將 Key 的 Hash 值與分區(qū)數(shù)取余得到 Partition 值;既沒有 Partition 又沒有 Key 的情況下,第?次調(diào)?時(shí)隨機(jī)?成?個(gè)整數(shù)(后?每次調(diào)?都在這個(gè)整數(shù)上?增),將這個(gè)值與可?的分區(qū)數(shù)取余,得到 Partition 值,也就是常說的 Round-Robin輪詢算法。
生產(chǎn)者Producer?產(chǎn)者,是數(shù)據(jù)的??。Producer在寫?數(shù)據(jù)的時(shí)候永遠(yuǎn)的找leader,不會(huì)直接將數(shù)據(jù)寫?follower。下圖很好地闡釋了生產(chǎn)者的工作流程。
這里獲取分區(qū)信息,是從zookeeper中獲取的。
生產(chǎn)者不會(huì)每個(gè)消息都調(diào)用一次send(),這樣效率太低,默認(rèn)是數(shù)據(jù)攢到16K或是超時(shí)(如10ms)會(huì)send()一次。注意這里發(fā)消息是異步操作。
producer端設(shè)置request.required.acks=0;
只要請求已發(fā)送出去,就算是發(fā)送完了,不關(guān)心有沒有寫成功。性能很好,如果是對一些日志進(jìn)行分析,可以承受丟數(shù)據(jù)的情況,用這個(gè)參數(shù),性能會(huì)很好。request.required.acks=1;
發(fā)送一條消息,當(dāng)leader partition寫入成功以后,才算寫入成功。不過這種方式也有丟數(shù)據(jù)的可能。request.required.acks=-1;
需要ISR列表里面,所有副本都寫完以后,這條消息才算寫入成功。
設(shè)計(jì)一個(gè)不丟數(shù)據(jù)的方案:數(shù)據(jù)不丟失的方案:1)分區(qū)副本 >=2 2)acks = -1 3)min.insync.replicas >=2。
下面給出此時(shí)leader出現(xiàn)故障的情況,可以看出,此時(shí)數(shù)據(jù)可能重復(fù)。
解釋上面出現(xiàn)的幾個(gè)名詞。Leader維護(hù)了?個(gè)動(dòng)態(tài)的 in-sync replica set(ISR):和 Leader 保持同步的 Follower 集合。當(dāng) ISR 集合中的 Follower 完成數(shù)據(jù)的同步之后,Leader 就會(huì)給 Follower 發(fā)送 ACK。如果 Follower ?時(shí)間未向 Leader 同步數(shù)據(jù),則該 Follower 將被踢出 ISR 集合,該時(shí)間閾值由replica.lag.time.max.ms 參數(shù)設(shè)定。Leader 發(fā)?故障后,就會(huì)從 ISR 中選舉出新的 Leader。
kafka服務(wù)端中min.insync.replicas。 如果我們不設(shè)置的話,默認(rèn)這個(gè)值是1。一個(gè)leader partition會(huì)維護(hù)一個(gè)ISR列表,這個(gè)值就是限制ISR列表里面 至少得有幾個(gè)副本,比如這個(gè)值是2,那么當(dāng)ISR列表里面只有一個(gè)副本的時(shí)候,往這個(gè)分區(qū)插入數(shù)據(jù)的時(shí)候會(huì)報(bào)錯(cuò)。
Consumer 采? Pull(拉取)模式從 Broker 中讀取數(shù)據(jù)。Pull 模式則可以根據(jù) Consumer 的消費(fèi)能?以適當(dāng)?shù)乃俾氏M(fèi)消息。Pull 模式不?之處是,如果Kafka 沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷?循環(huán)中,?直返回空數(shù)據(jù)。因?yàn)橄M(fèi)者從 Broker 主動(dòng)拉取數(shù)據(jù),需要維護(hù)?個(gè)?輪詢,針對這?點(diǎn), Kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳??個(gè)時(shí)?參數(shù) timeout。如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),Consumer 會(huì)等待?段時(shí)間之后再返回,這段時(shí)?即為 timeout。
分區(qū)分配策略?個(gè) Consumer Group 中有多個(gè) Consumer,?個(gè) Topic 有多個(gè) Partition。不同組間的消費(fèi)者是相互獨(dú)立的,相同組內(nèi)的消費(fèi)者才會(huì)協(xié)作,這就必然會(huì)涉及到Partition 的分配問題,即確定哪個(gè) Partition 由哪個(gè) Consumer 來消費(fèi)。
Kafka 有三種分配策略:
當(dāng)消費(fèi)者組內(nèi)消費(fèi)者發(fā)?變化時(shí),會(huì)觸發(fā)分區(qū)分配策略(?法重新分配),在分配完成前,kafka會(huì)暫停對外服務(wù)。注意為了盡量確保消息的有序執(zhí)行,一個(gè)分區(qū)只能對應(yīng)一個(gè)消費(fèi)者,這也說明消費(fèi)者的數(shù)量不能超過分區(qū)的數(shù)量。
range方式Range ?式是按照主題來分的,不會(huì)產(chǎn)?輪詢?式的消費(fèi)混亂問題,但是也有不足。
注意圖文不符,圖片是一個(gè)例子,文字再給一個(gè)例子,以便理解。假設(shè)我們有10個(gè)分區(qū),3個(gè)消費(fèi)者,排完序的分區(qū)將會(huì)是0,1,2,3,4,5,6,7,8,9;消費(fèi)者線程排完序?qū)?huì)是C1-0,C2-0,C3-0。然后將partitions的個(gè)數(shù)除于消費(fèi)者線程的總數(shù)來決定每個(gè)消費(fèi)者線程消費(fèi)?個(gè)分區(qū)。如果除不盡,那么前??個(gè)消費(fèi)者線程將會(huì)多消費(fèi)?個(gè)分區(qū)。
在我們的例???,我們有10個(gè)分區(qū),3個(gè)消費(fèi)者線程, 10/3 = 3,?且除不盡,那么消費(fèi)者線程 C1-0將會(huì)多消費(fèi)?個(gè)分區(qū):C1-0 將消費(fèi) 0, 1, 2, 3 分區(qū);C2-0將消費(fèi) 4,5,6分區(qū);C3-0將消費(fèi) 7,8,9分區(qū)。
假如我們有11個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:
C1-0將消費(fèi) 0,1,2,3分區(qū);C2-0將消費(fèi) 4,5,6,7分區(qū);C3-0 將消費(fèi) 8, 9, 10 分區(qū)。
假如我們有2個(gè)主題(T1和T2),分別有10個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:
C1-0 將消費(fèi) T1主題的 0, 1, 2, 3 分區(qū)以及 T2主題的 0, 1, 2, 3分區(qū)
C2-0將消費(fèi) T1主題的 4,5,6分區(qū)以及 T2主題的 4,5,6分區(qū)
C3-0將消費(fèi) T1主題的 7,8,9分區(qū)以及 T2主題的 7,8,9分區(qū)
這就可以看出,C1-0 消費(fèi)者線程?其他消費(fèi)者線程多消費(fèi)了2個(gè)分區(qū),這就是Range strategy的?個(gè)很明顯的弊端。如下圖所示,Consumer0、Consumer1 同時(shí)訂閱了主題 A 和 B,可能造成消息分配不對等問題,當(dāng)消費(fèi)者組內(nèi)訂閱的主題越多,分區(qū)分配可能越不均衡。
RoundRobinRoundRobin 輪詢?式將分區(qū)所有作為?個(gè)整體進(jìn)? Hash 排序,消費(fèi)者組內(nèi)分配分區(qū)個(gè)數(shù)最?差別為 1,是按照組來分的,可以解決多個(gè)消費(fèi)者消費(fèi)數(shù)據(jù)不均衡的問題。
輪詢分區(qū)策略是把所有partition和所有consumer線程都列出來,然后按照hashcode進(jìn)?排序。最后通過輪詢算法分配partition給消費(fèi)線程。如果所有consumer實(shí)例的訂閱是相同的,那么partition會(huì)均勻分布。
在上面的例???,假如按照 hashCode排序完的topic-partitions組依次為T1-5,T1-3,T1-0,T1-8,T1-2,T1-1,T1-4,T1-7,T1-6,T1-9,我們的消費(fèi)者線程排序?yàn)镃1-0,C1-1,C2-0,C2-1,最后分區(qū)分配的結(jié)果為:
C1-0將消費(fèi) T1-5,T1-2,T1-6分區(qū);
C1-1將消費(fèi) T1-3,T1-1,T1-9分區(qū);
C2-0將消費(fèi) T1-0,T1-4分區(qū);
C2-1將消費(fèi) T1-8,T1-7分區(qū)。
圖文不符。
但是,當(dāng)消費(fèi)者組內(nèi)訂閱不同主題時(shí),可能造成消費(fèi)混亂,如下圖所示,Consumer0 訂閱主題A,Consumer1 訂閱主題 B。
將 A、B 主題的分區(qū)排序后分配給消費(fèi)者組,TopicB 分區(qū)中的數(shù)據(jù)可能分配到 Consumer0 中。
因此,使?輪詢分區(qū)策略必須滿?兩個(gè)條件:
注意,其實(shí)對于生產(chǎn)者而言,可以自定義push但哪個(gè)分區(qū)中,也可以使用如hash等方法。
Sticky前兩種rebalance方式需要重新映射,代價(jià)較大,特別是由于rebalance期間會(huì)暫停服務(wù),這就要求該過程盡量短。Sticky在沒有rebalance時(shí)采用輪詢方式,發(fā)生rebalance時(shí),盡量保持原映射關(guān)系,只是改變與宕機(jī)相關(guān)的映射,依然采用輪詢的方式。
可靠性保證在前面ack保障消息到了broker之后,消費(fèi)者也需要有?定的保證,因?yàn)橄M(fèi)者也可能出現(xiàn)某些問題導(dǎo)致消息沒有消費(fèi)到。
這里介紹一下偏移量。每個(gè)consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對每個(gè)topic的每個(gè)分區(qū)的消費(fèi)offset,定期會(huì)提交offset,0.9版本以后,提交offset發(fā)送給kafka內(nèi)部額外生成的一個(gè)topic:__consumer_offsets,提交過去的時(shí)候, key是group.id+topic+分區(qū)號(hào),value就是當(dāng)前offset的值,每隔一段時(shí)間,kafka內(nèi)部會(huì)對這個(gè)topic進(jìn)行compact(合并),也就是每個(gè)group.id+topic+分區(qū)號(hào)就保留最新數(shù)據(jù)。
這里引入enable.auto.commit,默認(rèn)為true,也就是?動(dòng)提交offset,?動(dòng)提交是批量執(zhí)?的,有?個(gè)時(shí)間窗?,這種?式會(huì)帶來重復(fù)提交或者消息丟失的問題,所以對于?可靠性要求的程序,要使??動(dòng)提交。對于?可靠要求的應(yīng)?來說,寧愿重復(fù)消費(fèi)也不應(yīng)該因?yàn)橄M(fèi)異常?導(dǎo)致消息丟失。當(dāng)然,我們也可以使用策略來避免消息的重復(fù)消費(fèi)與丟失,比如使用事務(wù),將offset與消息執(zhí)行放在同一數(shù)據(jù)庫中。
最后再簡單介紹一個(gè)應(yīng)用。kafka可以用在分布式延時(shí)隊(duì)列中。創(chuàng)建一個(gè)額外的主題和一個(gè)定時(shí)進(jìn)程,檢測這個(gè)主題中是否有消息過期,過期后放在常規(guī)的消息隊(duì)列中,消費(fèi)者從這個(gè)常規(guī)的隊(duì)列中獲取消息來消費(fèi)。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧