這篇文章主要講解了“Kafka的原理以及分區(qū)分配策略”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Kafka的原理以及分區(qū)分配策略”吧!
創(chuàng)新互聯(lián)建站客戶idc服務(wù)中心,提供服務(wù)器主機(jī)托管、成都服務(wù)器、成都主機(jī)托管、成都雙線服務(wù)器等業(yè)務(wù)的一站式服務(wù)。通過各地的服務(wù)中心,我們向成都用戶提供優(yōu)質(zhì)廉價(jià)的產(chǎn)品以及開放、透明、穩(wěn)定、高性價(jià)比的服務(wù),資深網(wǎng)絡(luò)工程師在機(jī)房提供7*24小時(shí)標(biāo)準(zhǔn)級技術(shù)保障。
Apache Kafka 是一個(gè)分布式的流處理平臺(分布式的基于發(fā)布/訂閱模式的消息隊(duì)列【Message Queue】)。
流處理平臺有以下3個(gè)特性:
可以讓你發(fā)布和訂閱流式的記錄。這一方面與消息隊(duì)列或者企業(yè)消息系統(tǒng)類似。
可以儲存流式的記錄,并且有較好的容錯(cuò)性。
可以在流式記錄產(chǎn)生時(shí)就進(jìn)行處理。
生產(chǎn)者將消息發(fā)送到queue中,然后消費(fèi)者從queue中取出并且消費(fèi)消息。消息被消費(fèi)以后,queue中不再存儲,所以消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消費(fèi)者,但是對一個(gè)消息而言,只能被一個(gè)消費(fèi)者消費(fèi)。
生產(chǎn)者將消息發(fā)布到topic中,同時(shí)可以有多個(gè)消費(fèi)者訂閱該消息。和點(diǎn)對點(diǎn)方式不同,發(fā)布到topic的消息會被所有訂閱者消費(fèi)。
它可以用于兩大類別的應(yīng)用:
構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。(相當(dāng)于message queue)。
構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。(就是流處理,通過kafka stream topic和topic之間內(nèi)部進(jìn)行變化)。
為了理解Kafka是如何做到以上所說的功能,從下面開始,我們將深入探索Kafka的特性。
首先是一些概念:
Kafka作為一個(gè)集群,運(yùn)行在一臺或者多臺服務(wù)器上。
Kafka 通過 topic 對存儲的流數(shù)據(jù)進(jìn)行分類。
每條記錄中包含一個(gè)key,一個(gè)value和一個(gè)timestamp(時(shí)間戳)。
Kafka的消息通過主題(Topic)進(jìn)行分類,就好比是數(shù)據(jù)庫的表,或者是文件系統(tǒng)里的文件夾。主題可以被分為若干個(gè)分區(qū)(Partition),一個(gè)分區(qū)就是一個(gè)提交日志。消息以追加的方式寫入分區(qū),然后以先進(jìn)先出的順序讀取。注意,由于一個(gè)主題一般包含幾個(gè)分區(qū),因此無法在整個(gè)主題范圍內(nèi)保證消息的順序,但可以保證消息在單個(gè)分區(qū)內(nèi)的順序。主題是邏輯上的概念,在物理上,一個(gè)主題是橫跨多個(gè)服務(wù)器的。
Kafka 集群保留所有發(fā)布的記錄(無論他們是否已被消費(fèi)),并通過一個(gè)可配置的參數(shù)——保留期限來控制(可以同時(shí)配置時(shí)間和消息大小,以較小的那個(gè)為準(zhǔn))。舉個(gè)例子, 如果保留策略設(shè)置為2天,一條記錄發(fā)布后兩天內(nèi),可以隨時(shí)被消費(fèi),兩天過后這條記錄會被拋棄并釋放磁盤空間。
有時(shí)候我們需要增加分區(qū)的數(shù)量,比如為了擴(kuò)展主題的容量、降低單個(gè)分區(qū)的吞吐量或者要在單個(gè)消費(fèi)者組內(nèi)運(yùn)行更多的消費(fèi)者(因?yàn)橐粋€(gè)分區(qū)只能由消費(fèi)者組里的一個(gè)消費(fèi)者讀?。?。從消費(fèi)者的角度來看,基于鍵的主題添加分區(qū)是很困難的,因?yàn)榉謪^(qū)數(shù)量改變,鍵到分區(qū)的映射也會變化,所以對于基于鍵的主題來說,建議在一開始就設(shè)置好分區(qū),避免以后對其進(jìn)行調(diào)整。
(注意:不能減少分區(qū)的數(shù)量,因?yàn)槿绻麆h除了分區(qū),分區(qū)里面的數(shù)據(jù)也一并刪除了,導(dǎo)致數(shù)據(jù)不一致。如果一定要減少分區(qū)的數(shù)量,只能刪除topic重建)
生產(chǎn)者(發(fā)布者)創(chuàng)建消息,一般情況下,一個(gè)消息會被發(fā)布到一個(gè)特定的主題上。生產(chǎn)者在默認(rèn)情況下把消息均衡的分布到主題的所有分區(qū)上,而并不關(guān)心特定消息會被寫入哪個(gè)分區(qū)。不過,生產(chǎn)者也可以把消息直接寫到指定的分區(qū)。這通常通過消息鍵和分區(qū)器來實(shí)現(xiàn),分區(qū)器為鍵生成一個(gè)散列值,并將其映射到指定的分區(qū)上。生產(chǎn)者也可以自定義分區(qū)器,根據(jù)不同的業(yè)務(wù)規(guī)則將消息映射到分區(qū)。
消費(fèi)者(訂閱者)讀取消息,消費(fèi)者可以訂閱一個(gè)或者多個(gè)主題,并按照消息生成的順序讀取它們。消費(fèi)者通過檢查消息的偏移量來區(qū)分已經(jīng)讀取過的消息。偏移量是一種元數(shù)據(jù),它是一個(gè)不斷遞增的整數(shù)值,在創(chuàng)建消息時(shí),kafka會把它添加到消息里。在給定的分區(qū)里,每個(gè)消息的偏移量都是唯一的。消費(fèi)者把每個(gè)分區(qū)最后讀取的消息偏移量保存在zookeeper或者kafka上,如果消費(fèi)者關(guān)閉或者重啟,它的讀取狀態(tài)不會丟失。
消費(fèi)者是消費(fèi)者組的一部分,也就是說,會有一個(gè)或者多個(gè)消費(fèi)共同讀取一個(gè)主題。消費(fèi)者組保證每個(gè)分區(qū)只能被同一個(gè)組內(nèi)的一個(gè)消費(fèi)者使用。如果一個(gè)消費(fèi)者失效,群組里的其他消費(fèi)者可以接管失效消費(fèi)者的工作。
broker:一個(gè)獨(dú)立的kafka服務(wù)器被稱為broker。broker接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤保存。broker為消費(fèi)者提供服務(wù),對讀取分區(qū)的請求作出相應(yīng),返回已經(jīng)提交到磁盤上的消息。
集群:交給同一個(gè)zookeeper集群來管理的broker節(jié)點(diǎn)就組成了kafka的集群。
broker是集群的組成部分,每個(gè)集群都有一個(gè)broker同時(shí)充當(dāng)集群控制器的角色??刂破髫?fù)責(zé)管理工作,包括將分區(qū)分配給broker和監(jiān)控broker。在broker中,一個(gè)分區(qū)從屬于一個(gè)broker,該broker被稱為分區(qū)的首領(lǐng)。一個(gè)分區(qū)可以分配給多個(gè)broker(Topic設(shè)置了多個(gè)副本的時(shí)候),這時(shí)會發(fā)生分區(qū)復(fù)制。如下圖:
broker如何處理請求:broker會在它所監(jiān)聽的每個(gè)端口上運(yùn)行一個(gè)Acceptor線程,這個(gè)線程會創(chuàng)建一個(gè)連接并把它交給Processor線程去處理。Processor線程(也叫網(wǎng)絡(luò)線程)的數(shù)量是可配的,Processor線程負(fù)責(zé)從客戶端獲取請求信息,把它們放進(jìn)請求隊(duì)列,然后從響應(yīng)隊(duì)列獲取響應(yīng)信息,并發(fā)送給客戶端。如下圖所示:
生產(chǎn)請求和獲取請求都必須發(fā)送給分區(qū)的首領(lǐng)副本(分區(qū)Leader)。如果broker收到一個(gè)針對特定分區(qū)的請求,而該分區(qū)的首領(lǐng)在另外一個(gè)broker上,那么發(fā)送請求的客戶端會收到一個(gè)“非分區(qū)首領(lǐng)”的錯(cuò)誤響應(yīng)。Kafka客戶端要自己負(fù)責(zé)把生產(chǎn)請求和獲取請求發(fā)送到正確的broker上。
客戶端如何知道該往哪里發(fā)送請求呢?客戶端使用了另外一種請求類型——元數(shù)據(jù)請求。這種請求包含了客戶端感興趣的主題列表,服務(wù)器的響應(yīng)消息里指明了這些主題所包含的分區(qū)、每個(gè)分區(qū)都有哪些副本,以及哪個(gè)副本是首領(lǐng)。元數(shù)據(jù)請求可以發(fā)給任意一個(gè)broker,因?yàn)樗械腷roker都緩存了這些信息??蛻舳司彺孢@些元數(shù)據(jù),并且會定時(shí)從broker請求刷新這些信息。此外如果客戶端收到“非首領(lǐng)”錯(cuò)誤,它會在嘗試重新發(fā)送請求之前,先刷新元數(shù)據(jù)。
Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向topic的。
Topic是邏輯上的概念,而partition(分區(qū))是物理上的概念,每個(gè)partition對應(yīng)于一個(gè)log文件,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。Producer生產(chǎn)的數(shù)據(jù)會被不斷追加到該log文件末端,且每條數(shù)據(jù)都有自己的offset。消費(fèi)者組中的每個(gè)消費(fèi)者,都會實(shí)時(shí)記錄自己消費(fèi)到哪個(gè)offset,以便出錯(cuò)恢復(fù)時(shí),從上次的位置繼續(xù)消費(fèi)。
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到log文件末尾,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引的機(jī)制,將每個(gè)partition分為多個(gè)segment。(由log.segment.bytes決定,控制每個(gè)segment的大小,也可通過log.segment.ms控制,指定多長時(shí)間后日志片段會被關(guān)閉)每個(gè)segment對應(yīng)兩個(gè)文件——“.index”文件和“.log”文件。這些文件位于一個(gè)文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。例如:bing這個(gè)topic有3個(gè)分區(qū),則其對應(yīng)的文件夾為:bing-0、bing-1和bing-2。
索引文件和日志文件命名規(guī)則:每個(gè) LogSegment 都有一個(gè)基準(zhǔn)偏移量,用來表示當(dāng)前 LogSegment 中第一條消息的 offset。偏移量是一個(gè) 64位的長整形數(shù),固定是20位數(shù)字,長度未達(dá)到,用 0 進(jìn)行填補(bǔ)。如下圖所示:
index和log文件以當(dāng)前segment的第一條消息的offset命名。index文件記錄的是數(shù)據(jù)文件的offset和對應(yīng)的物理位置,正是有了這個(gè)index文件,才能對任一數(shù)據(jù)寫入和查看擁有O(1)的復(fù)雜度,index文件的粒度可以通過參數(shù)log.index.interval.bytes來控制,默認(rèn)是是每過4096字節(jié)記錄一條index。下圖為index文件和log文件的結(jié)構(gòu)示意圖:
查找message的流程(比如要查找offset為170417的message):
首先用二分查找確定它是在哪個(gè)Segment文件中,其中0000000000000000000.index為最開始的文件,第二個(gè)文件為0000000000000170410.index(起始偏移為170410+1 = 170411),而第三個(gè)文件為0000000000000239430.index(起始偏移為239430+1 = 239431)。所以這個(gè)offset = 170417就落在第二個(gè)文件中。其他后續(xù)文件可以依此類推,以起始偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置。
用該offset減去索引文件的編號,即170417 - 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那個(gè)編號??梢钥闯鑫覀兡軌蛘业絒4,476]這組數(shù)據(jù),476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。
打開數(shù)據(jù)文件(0000000000000170410.log),從位置為476的那個(gè)地方開始順序掃描直到找到offset為170417的那條Message。
當(dāng)日志片段大小達(dá)到log.segment.bytes指定的上限(默認(rèn)是1GB)或者日志片段打開時(shí)長達(dá)到log.segment.ms時(shí),當(dāng)前日志片段就會被關(guān)閉,一個(gè)新的日志片段被打開。如果一個(gè)日志片段被關(guān)閉,就開始等待過期。當(dāng)前正在寫入的片段叫做活躍片段,活躍片段永遠(yuǎn)不會被刪除,所以如果你要保留數(shù)據(jù)1天,但是片段包含5天的數(shù)據(jù),那么這些數(shù)據(jù)就會被保留5天,因?yàn)槠伪魂P(guān)閉之前,這些數(shù)據(jù)無法被刪除。
多Partition分布式存儲,利于集群數(shù)據(jù)的均衡。
并發(fā)讀寫,加快讀寫速度。
加快數(shù)據(jù)恢復(fù)的速率:當(dāng)某臺機(jī)器掛了,每個(gè)Topic僅需恢復(fù)一部分的數(shù)據(jù),多機(jī)器并發(fā)。
分區(qū)的原則
指明partition的情況下,使用指定的partition;
沒有指明partition,但是有key的情況下,將key的hash值與topic的partition數(shù)進(jìn)行取余得到partition值;
既沒有指定partition,也沒有key的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增),將這個(gè)值與topic可用的partition數(shù)取余得到partition值,也就是常說的round-robin算法。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Listpartitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { //key為空時(shí),獲取一個(gè)自增的計(jì)數(shù),然后對分區(qū)做取模得到分區(qū)編號 int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition // key不為空時(shí),通過key的hash對分區(qū)取模(疑問:為什么這里不像上面那樣,使用availablePartitions呢?) // 根據(jù)《Kafka權(quán)威指南》Page45理解:為了保證相同的鍵,總是能路由到固定的分區(qū),如果使用可用分區(qū),那么因?yàn)榉謪^(qū)數(shù)變化,會導(dǎo)致相同的key,路由到不同分區(qū) // 所以如果要使用key來映射分區(qū),最好在創(chuàng)建主題的時(shí)候就把分區(qū)規(guī)劃好 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { //為每個(gè)topic維護(hù)了一個(gè)AtomicInteger對象,每次獲取時(shí)+1 AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); }
kafka提供了哪些方面的保證
kafka可以保證分區(qū)消息的順序。如果使用同一個(gè)生產(chǎn)者往同一個(gè)分區(qū)寫入消息,而且消息B在消息A之后寫入,那么kafka可以保證消息B的偏移量比消息A的偏移量大,而且消費(fèi)者會先讀取到消息A再讀取消息B。
只有當(dāng)消息被寫入分區(qū)的所有副本時(shí),它才被認(rèn)為是“已提交”的。生產(chǎn)者可以選擇接收不同類型的確認(rèn),比如在消息被完全提交時(shí)的確認(rèn)、在消息被寫入分區(qū)首領(lǐng)時(shí)的確認(rèn),或者在消息被發(fā)送到網(wǎng)絡(luò)時(shí)的確認(rèn)。
只要還有一個(gè)副本是活躍的,那么已經(jīng)提交的信息就不會丟失。
消費(fèi)者只能讀取到已經(jīng)提交的消息。
復(fù)制
Kafka的復(fù)制機(jī)制和分區(qū)的多副本架構(gòu)是kafka可靠性保證的核心。把消息寫入多個(gè)副本可以使kafka在發(fā)生奔潰時(shí)仍能保證消息的持久性。
kafka的topic被分成多個(gè)分區(qū),分區(qū)是基本的數(shù)據(jù)塊。每個(gè)分區(qū)可以有多個(gè)副本,其中一個(gè)是首領(lǐng)。所有事件都是發(fā)給首領(lǐng)副本,或者直接從首領(lǐng)副本讀取事件。其他副本只需要與首領(lǐng)副本保持同步,并及時(shí)復(fù)制最新的事件。
Leader維護(hù)了一個(gè)動態(tài)的in-sync replica set(ISR),意為和leader保持同步的follower集合。當(dāng)ISR中的follower完成數(shù)據(jù)同步后,leader就會發(fā)送ack。如果follower長時(shí)間未向leader同步數(shù)據(jù),則該follower將被踢出ISR,該時(shí)間閾值由replica.lag.time.max.ms參數(shù)設(shè)定。Leader不可用時(shí),將會從ISR中選舉新的leader。滿足以下條件才能被認(rèn)為是同步的:
與zookeeper之間有一個(gè)活躍的會話,也就是說,它在過去的6s(可配置)內(nèi)向zookeeper發(fā)送過心跳。
在過去的10s(可配置)內(nèi)從首領(lǐng)那里獲取過最新的數(shù)據(jù)。
影響Kafka消息存儲可靠性的配置
ack應(yīng)答機(jī)制
對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒有必要等ISR中的follower全部接收成功。所以Kafka提供了三種可靠性級別,用戶可以根據(jù)對可靠性和延遲的要求進(jìn)行權(quán)衡。acks:
0: producer不等待broker的ack,這一操作提供了一個(gè)最低的延遲,broker一接收到還沒寫入磁盤就已經(jīng)返回,當(dāng)broker故障時(shí)可能丟失數(shù)據(jù);
1: producer等待leader的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障,那么將會丟失數(shù)據(jù);
-1(all):producer等待broker的ack,partition的leader和ISR里的follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成重復(fù)數(shù)據(jù)。(極端情況下也有可能丟數(shù)據(jù):ISR中只有一個(gè)Leader時(shí),相當(dāng)于1的情況)。
消費(fèi)一致性保證
(1)follower故障
follower發(fā)生故障后會被臨時(shí)踢出ISR,待該follower恢復(fù)后,follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向leader進(jìn)行同步。
等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader發(fā)生故障后,會從ISR中選出一個(gè)新的leader,之后為了保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的follower會先將各自的log文件高于HW的部分截掉,然后從新的leader同步數(shù)據(jù)。
注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
Kafka 的producer 發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送過程中,涉及到了兩個(gè)線程——main線程和sender線程,以及一個(gè)線程共享變量——RecordAccumulator。main線程將消息發(fā)送給RecordAccumulator,sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。
為了提高效率,消息被分批次寫入kafka。批次就是一組消息,這些消息屬于同一個(gè)主題和分區(qū)。(如果每一個(gè)消息都單獨(dú)穿行于網(wǎng)絡(luò),會導(dǎo)致大量的網(wǎng)絡(luò)開銷,把消息分成批次傳輸可以減少網(wǎng)絡(luò)開銷。不過要在時(shí)間延遲和吞吐量之間做出權(quán)衡:批次越大,單位時(shí)間內(nèi)處理的消息就越多,單個(gè)消息的傳輸時(shí)間就越長)。批次數(shù)據(jù)會被壓縮,這樣可以提升數(shù)據(jù)的傳輸和存儲能力,但要做更多的計(jì)算處理。
相關(guān)參數(shù):
batch.size:只有數(shù)據(jù)積累到batch.size后,sender才會發(fā)送數(shù)據(jù)。(單位:字節(jié),注意:不是消息個(gè)數(shù))。
linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待 linger.ms之后也會發(fā)送數(shù)據(jù)。(單位:毫秒)。
client.id:該參數(shù)可以是任意字符串,服務(wù)器會用它來識別消息的來源,還可用用在日志和配額指標(biāo)里。
max.in.flight.requests.per.connection:該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息。它的值越高,就會占用越多的內(nèi)存,不過也會提升吞吐量。把它設(shè)置為1可以保證消息時(shí)按發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。
consumer采用pull(拉)的模式從broker中讀取數(shù)據(jù)。
push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快的速度傳遞消息,但是這樣容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
pull模式的不足之處是,如果kafka沒有數(shù)據(jù),消費(fèi)者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。針對這一點(diǎn),kafka的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會傳入一個(gè)時(shí)長參數(shù)timeout,如果當(dāng)前沒有數(shù)據(jù)可消費(fèi),consumer會等待一段時(shí)間后再返回。
一個(gè)consumer group中有多個(gè)consumer,一個(gè)topic有多個(gè)partition,所以必然會涉及到partition的分配問題,即確定哪個(gè)partition由哪個(gè)consumer來消費(fèi)。Kafka提供了3種消費(fèi)者分區(qū)分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。
PartitionAssignor接口用于用戶定義實(shí)現(xiàn)分區(qū)分配算法,以實(shí)現(xiàn)Consumer之間的分區(qū)分配。消費(fèi)組的成員訂閱它們感興趣的Topic并將這種訂閱關(guān)系傳遞給作為訂閱組協(xié)調(diào)者的Broker。協(xié)調(diào)者選擇其中的一個(gè)消費(fèi)者來執(zhí)行這個(gè)消費(fèi)組的分區(qū)分配并將分配結(jié)果轉(zhuǎn)發(fā)給消費(fèi)組內(nèi)所有的消費(fèi)者。Kafka默認(rèn)采用RangeAssignor的分配算法。
RangeAssignor對每個(gè)Topic進(jìn)行獨(dú)立的分區(qū)分配。對于每一個(gè)Topic,首先對分區(qū)按照分區(qū)ID進(jìn)行排序,然后訂閱這個(gè)Topic的消費(fèi)組的消費(fèi)者再進(jìn)行排序,之后盡量均衡的將分區(qū)分配給消費(fèi)者。這里只能是盡量均衡,因?yàn)榉謪^(qū)數(shù)可能無法被消費(fèi)者數(shù)量整除,那么有一些消費(fèi)者就會多分配到一些分區(qū)。分配示意圖如下:
分區(qū)分配的算法如下:
@Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { Map > consumersPerTopic = consumersPerTopic(subscriptions); Map > assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList ()); //for循環(huán)對訂閱的多個(gè)topic分別進(jìn)行處理 for (Map.Entry > topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; //對消費(fèi)者進(jìn)行排序 Collections.sort(consumersForTopic); //計(jì)算平均每個(gè)消費(fèi)者分配的分區(qū)數(shù) int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //計(jì)算平均分配后多出的分區(qū)數(shù) int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { //計(jì)算第i個(gè)消費(fèi)者,分配分區(qū)的起始位置 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); //計(jì)算第i個(gè)消費(fèi)者,分配到的分區(qū)數(shù)量 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; }
這種分配方式明顯的一個(gè)問題是隨著消費(fèi)者訂閱的Topic的數(shù)量的增加,不均衡的問題會越來越嚴(yán)重,比如上圖中4個(gè)分區(qū)3個(gè)消費(fèi)者的場景,C0會多分配一個(gè)分區(qū)。如果此時(shí)再訂閱一個(gè)分區(qū)數(shù)為4的Topic,那么C0又會比C1、C2多分配一個(gè)分區(qū),這樣C0總共就比C1、C2多分配兩個(gè)分區(qū)了,而且隨著Topic的增加,這個(gè)情況會越來越嚴(yán)重。分配結(jié)果:
訂閱2個(gè)Topic,每個(gè)Topic4個(gè)分區(qū),共3個(gè)Consumer
C0:[T0P0,T0P1,T1P0,T1P1]
C1:[T0P2,T1P2]
C2:[T0P3,T1P3]
RoundRobinAssignor的分配策略是將消費(fèi)組內(nèi)訂閱的所有Topic的分區(qū)及所有消費(fèi)者進(jìn)行排序后盡量均衡的分配(RangeAssignor是針對單個(gè)Topic的分區(qū)進(jìn)行排序分配的)。如果消費(fèi)組內(nèi),消費(fèi)者訂閱的Topic列表是相同的(每個(gè)消費(fèi)者都訂閱了相同的Topic),那么分配結(jié)果是盡量均衡的(消費(fèi)者之間分配到的分區(qū)數(shù)的差值不會超過1)。如果訂閱的Topic列表是不同的,那么分配結(jié)果是不保證“盡量均衡”的,因?yàn)槟承┫M(fèi)者不參與一些Topic的分配。
以上兩個(gè)topic的情況,相比于之前RangeAssignor的分配策略,可以使分區(qū)分配的更均衡。不過考慮這種情況,假設(shè)有三個(gè)消費(fèi)者分別為C0、C1、C2,有3個(gè)Topic T0、T1、T2,分別擁有1、2、3個(gè)分區(qū),并且C0訂閱T0,C1訂閱T0和T1,C2訂閱T0、T1、T2,那么RoundRobinAssignor的分配結(jié)果如下:
看上去分配已經(jīng)盡量的保證均衡了,不過可以發(fā)現(xiàn)C2承擔(dān)了4個(gè)分區(qū)的消費(fèi)而C1訂閱了T1,是不是把T1P1交給C1消費(fèi)能更加的均衡呢?
StickyAssignor分區(qū)分配算法,目的是在執(zhí)行一次新的分配時(shí),能在上一次分配的結(jié)果的基礎(chǔ)上,盡量少的調(diào)整分區(qū)分配的變動,節(jié)省因分區(qū)分配變化帶來的開銷。Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動。其目標(biāo)有兩點(diǎn):
分區(qū)的分配盡量的均衡。
每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致。
當(dāng)這兩個(gè)目標(biāo)發(fā)生沖突時(shí),優(yōu)先保證第一個(gè)目標(biāo)。第一個(gè)目標(biāo)是每個(gè)分配算法都盡量嘗試去完成的,而第二個(gè)目標(biāo)才真正體現(xiàn)出StickyAssignor特性的。
StickyAssignor算法比較復(fù)雜,下面舉例來說明分配的效果(對比RoundRobinAssignor),前提條件:
有4個(gè)Topic:T0、T1、T2、T3,每個(gè)Topic有2個(gè)分區(qū)。
有3個(gè)Consumer:C0、C1、C2,所有Consumer都訂閱了這4個(gè)分區(qū)。
上面紅色的箭頭代表的是有變動的分區(qū)分配,可以看出,StickyAssignor的分配策略,變動較小。
由于Consumer在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,Consumer恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi),所以Consumer需要實(shí)時(shí)記錄自己消費(fèi)到哪個(gè)位置,以便故障恢復(fù)后繼續(xù)消費(fèi)。Kafka0.9版本之前,Consumer默認(rèn)將offset保存在zookeeper中,從0.9版本開始,Consumer默認(rèn)將offset保存在Kafka一個(gè)內(nèi)置的名字叫_consumeroffsets的topic中。默認(rèn)是無法讀取的,可以通過設(shè)置consumer.properties中的exclude.internal.topics=false來讀取。
順序?qū)懘疟P
Kafka 的 producer生產(chǎn)數(shù)據(jù),要寫入到log文件中,寫的過程是一直追加到文件末端,為順序?qū)?。?shù)據(jù)表明,同樣的磁盤,順序?qū)懩艿?00M/s,而隨機(jī)寫只有100K/s。這與磁盤的機(jī)械結(jié)構(gòu)有關(guān),順序?qū)懼钥?,是因?yàn)槠涫∪チ舜罅看蓬^尋址的時(shí)間。
零拷貝技術(shù)
零拷貝主要的任務(wù)就是避免CPU將數(shù)據(jù)從一塊存儲拷貝到另外一塊存儲,主要就是利用各種零拷貝技術(shù),避免讓CPU做大量的數(shù)據(jù)拷貝任務(wù),減少不必要的拷貝,或者讓別的組件來做這一類簡單的數(shù)據(jù)傳輸任務(wù),讓CPU解脫出來專注于別的任務(wù)。這樣就可以讓系統(tǒng)資源的利用更加有效。
感謝各位的閱讀,以上就是“Kafka的原理以及分區(qū)分配策略”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Kafka的原理以及分區(qū)分配策略這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!