真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何進(jìn)行Kafka源碼分析及Broker端

這篇文章給大家介紹如何進(jìn)行Kafka源碼分析及Broker端,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

公司主營業(yè)務(wù):網(wǎng)站制作、成都網(wǎng)站設(shè)計、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。成都創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。成都創(chuàng)新互聯(lián)公司推出??诿赓M(fèi)做網(wǎng)站回饋大家。

首先從kafka如何創(chuàng)建一個topic來開始:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

其中有這么幾個參數(shù):

  • --zookeeper:zookeeper的地址

  • --replication-factor:副本因子

  • --partitions:分區(qū)個數(shù)(默認(rèn)是1)

  • --topic:topic名稱

二.什么是分區(qū)

一個topic可以有多個分區(qū),每個分區(qū)的消息都是不同的。  雖然分區(qū)可以提供更高的吞吐量,但是分區(qū)不是越多越好。一般分區(qū)數(shù)不要超過kafka集群的機(jī)器數(shù)量。分區(qū)越多占用的內(nèi)存和文件句柄。  一般分區(qū)設(shè)置為3-10個。比如現(xiàn)在集群有3個機(jī)器,要創(chuàng)建一個名為test的topic,分區(qū)數(shù)為2,那么如圖:

如何進(jìn)行Kafka源碼分析及Broker端

partiton都是有序切順序不可變的記錄集,并且不斷追加到log文件,partition中的每一個消息都回分配一個id,也就是offset(偏移量),offset用來標(biāo)記分區(qū)的一條記錄  ,這里就用官網(wǎng)的圖了,我畫的不好:

如何進(jìn)行Kafka源碼分析及Broker端

2.1 producer端和分區(qū)關(guān)系

就圖上的情況,producer端會把mq給哪個分區(qū)呢?這也是上一節(jié)我們提到的一個參數(shù)partitioner.class。  默認(rèn)分區(qū)器的處理是:有key則用murmur2算法計算key的哈希值,對總分區(qū)取模算出分區(qū)號,無key則輪詢。(org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition)。當(dāng)然了我們也可以自定義分區(qū)策略,只要實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口即可:

/**  * Compute the partition for the given record.  *  * @param topic The topic name  * @param key The key to partition on (or null if no key)  * @param keyBytes serialized key to partition on (or null if no key)  * @param value The value to partition on or null  * @param valueBytes serialized value to partition on or null  * @param cluster The current cluster metadata  */  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {  List partitions = cluster.partitionsForTopic(topic);  int numPartitions = partitions.size();  if (keyBytes == null) {  int nextValue = nextValue(topic);  List availablePartitions = cluster.availablePartitionsForTopic(topic);  if (availablePartitions.size() > 0) {  int part = Utils.toPositive(nextValue) % availablePartitions.size();  return availablePartitions.get(part).partition();  } else {  // no partitions are available, give a non-available partition  return Utils.toPositive(nextValue) % numPartitions;  }  } else {  // hash the keyBytes to choose a partition  return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;  }  }

2.2 consumer端和分區(qū)關(guān)系

先來看下官網(wǎng)對于消費(fèi)組的定義:Consumers label themselves with a consumer group name, and  each record published to a topic is delivered to one consumer instance within  each subscribing consumer group.

翻譯:消費(fèi)者使用一個消費(fèi)者組名來標(biāo)記自己,一個topic的消息會被發(fā)送到訂閱它的消費(fèi)者組的 一個 消費(fèi)者實例上。

consumer group是用于實現(xiàn)高伸縮性,高容錯性的consumer機(jī)制。如果有consumer掛了或者新增一個consumer,consumer  group會進(jìn)行重平衡(rebalance),重平衡機(jī)制會在consumer篇具體講解,本節(jié)不講。那么按照上面的圖繼續(xù)畫消費(fèi)者端:

如何進(jìn)行Kafka源碼分析及Broker端

這里是最好的情況,2個partition對應(yīng)1個group中的2個consumer。那么思考,如果一個消費(fèi)組的消費(fèi)者大于分區(qū)數(shù)呢?或者小于分區(qū)數(shù)呢?

如果一個消費(fèi)組的消費(fèi)者大于分區(qū)數(shù),那么相當(dāng)于多余的消費(fèi)者是一種浪費(fèi),多余的消費(fèi)者將無法消費(fèi)消息。

如果一個消費(fèi)組的消費(fèi)者小于分區(qū)數(shù),會有對應(yīng)的消費(fèi)者分區(qū)分配策略。一種是Range(默認(rèn)),一種是RoundRobin(輪詢),當(dāng)然也可以自定義策略。  其實思想換湯不換藥的啊,每個消費(fèi)者能負(fù)載均衡的工作。 具體會在消費(fèi)者篇講解,這里不講。

建議:配置分區(qū)數(shù)是消費(fèi)者數(shù)的整數(shù)倍

三.副本與ISR設(shè)計

3.1 什么是副本

在創(chuàng)建topic的時候有個參數(shù)是--replication-factor來設(shè)定副本數(shù)。Kafka利用多份相同的備份保持系統(tǒng)的高可用性,這些備份在Kafka中被稱為副本(replica)。副本分為3類:

  • leader副本:響應(yīng)producer端的讀寫請求

  • follower副本:備份leader副本的數(shù)據(jù), 不響應(yīng)producer端的讀寫請求!

  • ISR副本集合:包含1個leader副本和所有follower副本(也可能沒有follower副本)

Kafka會把所有的副本均勻分配到kafka-cluster中的所有broker上,并從這些副本中挑選一個作為leader副本,其他成為follow副本。如果leader副本所在的broker宕機(jī)了,那么其中的一個follow副本就會成為leader副本。leader副本接收producer端的讀寫請求,而follow副本只是向leader副本請求數(shù)據(jù)不會接收讀寫請求!

如何進(jìn)行Kafka源碼分析及Broker端

3.2 副本同步機(jī)制

上面說了ISR就是動態(tài)維護(hù)一組同步副本集合,leader副本總是包含在ISR集合中。只有ISR中的副本才有資格被選舉為leader副本。當(dāng)producer端的ack參數(shù)配置為all(-1)時,producer寫入的mq需要ISR所有副本都接收到,才被視為已提交。當(dāng)然了,上一節(jié)就提到了,使用ack參數(shù)必須配合broker端的min.insync.replicas(默認(rèn)是1)參數(shù)一起用才能達(dá)到效果,該參數(shù)控制寫入isr中的多少副本才算成功。  如果ISR中的副本數(shù)少于min.insync.replicas時,客戶端會返回異常org.apache.kafka.common.errors.NotEnoughReplicasExceptoin:  Messages are rejected since there are fewer in-sync replicas than required。

要了解副本同步機(jī)制需要先學(xué)習(xí)幾個術(shù)語:

  • High Watermark:副本高水位值,簡稱HW, 小于HW或者說在HW以下的消息都被認(rèn)為是“已備份的”,HW指向的也是下一條消息!  leader副本的HW值決定consumer能poll的消息數(shù)量!consumer只能消費(fèi)小于HW值的消息!

  • LEO:log end offset,下一條消息的位移。 也就是說LEO指向的位置是沒有消息的!

  • remote  LEO:嚴(yán)格來說這是一個集合。leader副本所在broker的內(nèi)存中維護(hù)了一個Partition對象來保存對應(yīng)的分區(qū)信息,這個Partition中維護(hù)了一個Replica列表,保存了該分區(qū)所有的副本對象。除了leader  Replica副本之外,該列表中其他Replica對象的LEO就被稱為remote LEO。

如何進(jìn)行Kafka源碼分析及Broker端

下面舉個一個實際的例子(本例子參考胡夕博客),該例子中的topic是單分區(qū),副本因子是2。也就是說一個leader副本,一個follower副本,ISR中包含這2個副本集合。我們首先看下當(dāng)producer發(fā)送一條消息時,leader/follower端broker的副本對象到底會發(fā)生什么事情以及分區(qū)HW是如何被更新的。首先是初始狀態(tài):

如何進(jìn)行Kafka源碼分析及Broker端

此時producer給該topic分區(qū)發(fā)送了一條消息。此時的狀態(tài)如下圖所示:

如何進(jìn)行Kafka源碼分析及Broker端

如上圖所見,producer發(fā)送消息成功后(假設(shè)acks=1,  leader成功寫入即返回),follower發(fā)來了新的FECTH請求,依然請求fetchOffset =  0的數(shù)據(jù)。和上次不同的是,這次是有數(shù)據(jù)可以讀取的,因此整個處理流程如下圖:

如何進(jìn)行Kafka源碼分析及Broker端

顯然,現(xiàn)在leader和follower都保存了位移是0的這條消息,但兩邊的HW值都沒有被更新,它們需要在下一輪FETCH請求處理中被更新,如下圖所示:

如何進(jìn)行Kafka源碼分析及Broker端

簡單解釋一下, 第二輪FETCH請求中,follower發(fā)送fetchOffset = 1的FETCH請求——因為fetchOffset =  0的消息已經(jīng)成功寫入follower本地日志了,所以這次請求fetchOffset =  1的數(shù)據(jù)了。Leader端broker接收到FETCH請求后首先會更新other replicas中的LEO值,即將remote  LEO更新成1,然后更新分區(qū)HW值為1——具體的更新規(guī)則參見上面的解釋。做完這些之后將當(dāng)前分區(qū)HW值(1)封裝進(jìn)FETCH  response發(fā)送給follower。Follower端broker接收到FETCH  response之后從中提取出當(dāng)前分區(qū)HW值1,然后與自己的LEO值比較,從而將自己的HW值更新成1,至此完整的HW、LEO更新周期結(jié)束。

3.3 ISR維護(hù)

在0.9.0.0版本之后,只有一個參數(shù):replica.lag.time.max.ms來判定該副本是否應(yīng)該在ISR集合中,這個參數(shù)默認(rèn)值為10s。意思是如果一個follower副本響應(yīng)leader副本的時間超過10s,kafka會認(rèn)為這個副本走遠(yuǎn)了從同步副本列表移除。

四.日志設(shè)計

Kafka的每個主題相互隔離,每個主題可以有一個或者多個分區(qū),每個分區(qū)都有記錄消息數(shù)據(jù)的日志文件:

如何進(jìn)行Kafka源碼分析及Broker端

圖中有個demo-topic的主題,這個topic有8個分區(qū),每一個分區(qū)都存在[topic-partition]命名的消息日志文件  。在分區(qū)日志文件中,可以看到前綴一樣,但是文件類型不一樣的幾個文件。比如圖中的3個文件,(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。這稱之為一個LogSegment(日志分段)。

4.1 LogSegment

以一個測試環(huán)境的具體例子來講,一個名為ALC.ASSET.EQUITY.SUBJECT.CHANGE的topic,我們看partition0的日志文件:

如何進(jìn)行Kafka源碼分析及Broker端

每一個LogSegment都包含一些文件名一致的文件集合。文件名的固定是20位數(shù)字,如果文件名是00000000000000000000代表當(dāng)前LogSegment的第一條消息的offset(偏移量)為0,如果文件名是00000000000000000097代表當(dāng)前LogSegment的第一條消息的offset(偏移量)為97。日志文件有多種后綴的文件,重點(diǎn)關(guān)注.index、.timestamp、.log三種類型文件即可。

  • .index:偏移量索引文件

  • .timeindex:時間索引文件

  • .log:日志文件

  • .snapshot:快照文件

  • .swap:Log Compaction之后的臨時文件

4.2 索引與日志文件

kafka有2種索引文件,第一種是offset(偏移量)索引文件,也就是.index結(jié)尾的文件。第二種是時間戳索引文件,也就是.timeindex結(jié)尾的文件。

我們可以用kafka-run-class.sh來查看offset(偏移量)索引文件的內(nèi)容:

如何進(jìn)行Kafka源碼分析及Broker端

可以看到每一行都是offset:xxx position:xxxx。 這兩者沒有直接關(guān)系。

  • offset:相對偏移量

  • position:物理地址

那么第一行的offset:12 position:4423是什么意思呢?它代表偏移量從0-12的消息的物理地址在0-4423。

同理第二行的offset:24 position:8773的意思也能猜得出來:它代表偏移量從13-24的消息的物理地址在4424-8773。

我們可以再用kafka-run-class.sh來看下.log文件的文件內(nèi)容,關(guān)注里面的baseOffset和postion的值。你看看和上面說的對應(yīng)的上嗎。

如何進(jìn)行Kafka源碼分析及Broker端

4.3 如何用offset查找

按上面的例子,如何查詢偏移量為60的消息

根據(jù)offset首先找到對應(yīng)的LogSegment,這里找到00000000000000000000.index

通過二分法找到不大于offset的最大索引項,這里找到offset:24 position:8773

打開00000000000000000000.log文件,從position為8773的那個地方開始順序掃描直到找到offset=60的消息

如何進(jìn)行Kafka源碼分析及Broker端

關(guān)于如何進(jìn)行Kafka源碼分析及Broker端就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


網(wǎng)頁題目:如何進(jìn)行Kafka源碼分析及Broker端
瀏覽路徑:http://weahome.cn/article/jidigj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部