引言:來(lái)到了新公司,需要對(duì)kafka組件有很深的研究,本人之前對(duì)老版的kafka有過(guò)一定的研究,但是談不上深入,新公司力推kafka,比較kafka作為消息系統(tǒng)在目前的市場(chǎng)上的占有率還是很高的,可以看本人之前kafka的博客中有關(guān)kafka的優(yōu)點(diǎn)和為什么要用kafka。
在眾多優(yōu)點(diǎn)中,我本人認(rèn)為最重要的2個(gè)優(yōu)點(diǎn)如下:
1、削峰
數(shù)據(jù)庫(kù)的處理能力是有限的,在峰值期,過(guò)多的請(qǐng)求落到后臺(tái),一旦超過(guò)系統(tǒng)的處理能力,可能會(huì)使系統(tǒng)掛掉。
如上圖所示,系統(tǒng)的處理能力是 2k/s,MQ 處理能力是 8k/s,峰值請(qǐng)求 5k/s,MQ 的處理能力遠(yuǎn)遠(yuǎn)大于數(shù)據(jù)庫(kù),在高峰期,請(qǐng)求可以先積壓在 MQ 中,系統(tǒng)可以根據(jù)自身的處理能力以 2k/s 的速度消費(fèi)這些請(qǐng)求。
這樣等高峰期一過(guò),請(qǐng)求可能只有 100/s,系統(tǒng)可以很快的消費(fèi)掉積壓在 MQ 中的請(qǐng)求。
注意,上面的請(qǐng)求指的是寫(xiě)請(qǐng)求,查詢請(qǐng)求一般通過(guò)緩存解決。
2、解耦
如下場(chǎng)景,S 系統(tǒng)與 A、B、C 系統(tǒng)緊密耦合。由于需求變動(dòng),A 系統(tǒng)修改了相關(guān)代碼,S 系統(tǒng)也需要調(diào)整 A 相關(guān)的代碼。
過(guò)幾天,C 系統(tǒng)需要?jiǎng)h除,S 緊跟著刪除 C 相關(guān)代碼;又過(guò)了幾天,需要新增 D 系統(tǒng),S 系統(tǒng)又要添加與 D 相關(guān)的代碼;再過(guò)幾天,程序猿瘋了...
這樣各個(gè)系統(tǒng)緊密耦合,不利于維護(hù),也不利于擴(kuò)展?,F(xiàn)在引入 MQ,A 系統(tǒng)變動(dòng),A 自己修改自己的代碼即可;C 系統(tǒng)刪除,直接取消訂閱;D 系統(tǒng)新增,訂閱相關(guān)消息即可。
這樣通過(guò)引入消息中間件,使各個(gè)系統(tǒng)都與 MQ 交互,從而避免它們之間的錯(cuò)綜復(fù)雜的調(diào)用關(guān)系。
kafka架構(gòu)原理:
最經(jīng)典的圖也就是官方的圖了
找了一些其他博主的圖:這里自己就懶的畫(huà)了
詳細(xì)復(fù)雜的kafka架構(gòu)
通俗點(diǎn)講:就是producer ----> kafka cluster(brokers) -----> consumer
生產(chǎn)者生產(chǎn)消息 經(jīng)過(guò) kafka隊(duì)列 被消費(fèi)者消費(fèi)
相關(guān)的組件概念見(jiàn):
topic and logs
廢話不多說(shuō),先見(jiàn)圖
文字解釋如下:
Message 是按照 Topic 來(lái)組織的,每個(gè) Topic 可以分成多個(gè) Partition(對(duì)server.properties/num.partitions)。 本人習(xí)慣性配置文件為num.partitions=broker個(gè)數(shù),人為的分配到各個(gè)節(jié)點(diǎn)上。
Partition 中的每條記錄(Message)包含三個(gè)屬性:Offset,messageSize 和 Data。
其中 Offset 表示消息偏移量;messageSize 表示消息的大小;Data 表示消息的具體內(nèi)容。
Partition 是以文件的形式存儲(chǔ)在文件系統(tǒng)中,位置由 server.properties/log.dirs 指定,其命名規(guī)則為
生產(chǎn)配置文件為:log.dirs=/data/kafka/kafka-logs
[hadoop@kafka03-55-13 kafka-logs]$ pwd
/data/kafka/kafka-logs
[hadoop@kafka03-55-13 kafka-logs]$ ls |grep mjh
topic-by-mjh-0
topic-by-mjh-1
topic-by-mjh-10
topic-by-mjh-11
topic-by-mjh-12
...
...
...
Partition 可能位于不同的 Broker 上,Partition 是分段的,每個(gè)段是一個(gè) Segment 文件。
Partition 目錄下包括了數(shù)據(jù)文件和索引文件
[hadoop@kafka03-55-13 kafka-logs]$ cd topic-by-mjh-0
[hadoop@kafka03-55-13 topic-by-mjh-0]$ ll
total 4
-rw-rw-r-- 1 hadoop hadoop 10485760 Aug 24 20:13 00000000000000000334.index
-rw-rw-r-- 1 hadoop hadoop 0 Aug 13 17:42 00000000000000000334.log
-rw-rw-r-- 1 hadoop hadoop 10485756 Aug 24 20:13 00000000000000000334.timeindex
-rw-rw-r-- 1 hadoop hadoop 4 Aug 16 14:16 leader-epoch-checkpoint
Index 采用稀疏存儲(chǔ)的方式,它不會(huì)為每一條 Message 都建立索引,而是每隔一定的字節(jié)數(shù)建立一條索引,避免索引文件占用過(guò)多的空間。
缺點(diǎn)是沒(méi)有建立索引的 Offset 不能一次定位到 Message 的位置,需要做一次順序掃描,但是掃描的范圍很小。
索引包含兩個(gè)部分(均為 4 個(gè)字節(jié)的數(shù)字),分別為相對(duì) Offset 和 Position。
相對(duì) Offset 表示 Segment 文件中的 Offset,Position 表示 Message 在數(shù)據(jù)文件中的位置。
Segment下的log文件就是存儲(chǔ)消息的地方
每個(gè)消息都會(huì)包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗(yàn)和、消息版本號(hào)等。
在磁盤(pán)上的數(shù)據(jù)格式和producer發(fā)送到broker的數(shù)據(jù)格式一模一樣,也和consumer收到的數(shù)據(jù)格式一模一樣。由于磁盤(pán)格式與consumer以及producer的數(shù)據(jù)格式一模一樣,這樣就使得Kafka可以通過(guò)零拷貝(zero-copy)技術(shù)來(lái)提高傳輸效率。 // 關(guān)于零拷貝技術(shù),后期會(huì)專門(mén)寫(xiě)一遍博客來(lái)解釋
小結(jié):
1、Partition 是一個(gè)順序的追加日志,屬于順序?qū)懘疟P(pán)(順序?qū)懘疟P(pán)效率比隨機(jī)寫(xiě)內(nèi)存要高,保障 Kafka 吞吐率)。
2、Kafka 的 Message 存儲(chǔ)采用了分區(qū)(Partition),磁盤(pán)順序讀寫(xiě),分段(LogSegment)和稀疏索引這幾個(gè)手段來(lái)達(dá)到高效性。
3、在 Kafka 的文件存儲(chǔ)中,同一個(gè) Topic 下有多個(gè)不同的 Partition,每個(gè) Partition 都為一個(gè)目錄,而每一個(gè)目錄又被平均分配成多個(gè)大小相等的 Segment File 中(Segment 大小我們?cè)谏a(chǎn)上設(shè)置成1G或者 500MB ),Segment File 又由 index file 和 data file 組成,他們總是成對(duì)出現(xiàn),后綴 ".index" 和 ".log" 分表表示 Segment 索引文件和數(shù)據(jù)文件。
Partition and Replica
一個(gè) Topic 物理上分為多個(gè) Partition,位于不同的 Broker 上。如果沒(méi)有 Replica,一旦 Broker 宕機(jī),其上所有的 Patition 將不可用。
每個(gè) Partition 可以有多個(gè)Replica(對(duì)應(yīng)server.properties/default.replication.factor),分配到不同的 Broker 上。本人默認(rèn)習(xí)慣為 default.replication.factor=2 也就是默認(rèn)2個(gè)副本,比較合理
其中有一個(gè) Leader 負(fù)責(zé)讀寫(xiě),處理來(lái)自 Producer 和 Consumer 的請(qǐng)求;其他作為 Follower 從 Leader Pull 消息,保持與 Leader 的同步。
如何分配 Partition 和 Replica 到 Broker 上?步驟如下:
1、將所有 Broker(假設(shè)共 n 個(gè) Broker)和待分配的 Partition 排序。
2、將第 i 個(gè) Partition 分配到第(i mod n)個(gè) Broker 上。
3、將第 i 個(gè) Partition 的第 j 個(gè) Replica 分配到第((i + j) mode n)個(gè) Broker 上。
根據(jù)上面的分配規(guī)則,若 Replica 的數(shù)量大于 Broker 的數(shù)量,必定會(huì)有兩個(gè)相同的 Replica 分配到同一個(gè) Broker 上,產(chǎn)生冗余。因此 Replica 的數(shù)量應(yīng)該小于或等于 Broker 的數(shù)量。
//這里kafka硬性規(guī)定了創(chuàng)建的replica不能超過(guò)broker的數(shù)量,必須等于小于broker的數(shù)量
這里有2個(gè)算法函數(shù)解釋一下
1、mod:求余函數(shù);
2、mode:返回在某數(shù)組或數(shù)據(jù)區(qū)域中出現(xiàn)頻率最多的數(shù)值,mode是一個(gè)位置測(cè)量函數(shù)。
我這里只有3個(gè)broker 創(chuàng)建4個(gè)replica就出現(xiàn)報(bào)錯(cuò) 具體見(jiàn)下
[root@kafka02-55-12 ~]# kafka-topics.sh --zookeeper 10.211.55.11:2181,10.211.55.12:2181,10.211.55.13:2181/kafkagroup --replication-factor 4 --partitions 9 --create --topic topic-zhuhair
**Error while executing topic command : Replication factor: 4 larger than available brokers: 3.**
[2019-08-24 20:41:40,611] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
pratition的leader是如何選舉的---broker failover故障轉(zhuǎn)移
//通俗點(diǎn)講也就是當(dāng)broker發(fā)生宕機(jī)了,如何保證高可用的
文字描述如下:
Kafka 在 Zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas)。
ISR 里面的所有 Replica 都"跟上"了 Leader,Controller 將會(huì)從 ISR 里選一個(gè)做 Leader。
具體流程文字描述如下:
1、Controller 在 Zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊(cè) Watcher,
2、當(dāng) Broker 宕機(jī)時(shí) Zookeeper 會(huì) Fire Watch。
3、Controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用 Broker。
4、Controller 決定 set_p,該集合包含宕機(jī) Broker 上的所有 Partition。
5、對(duì) set_p 中的每一個(gè) Partition,從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR,決定新 Leader,將新 Leader、ISR、controller_epoch 和 leader_epoch 等信息寫(xiě)入 State 節(jié)點(diǎn)。
6、zk通過(guò) RPC 向相關(guān) Broker 發(fā)送 leaderAndISRRequest 命令。
極端情況下需要考慮的是:
當(dāng) ISR 為空時(shí),會(huì)選一個(gè) Replica(不一定是 ISR 成員)作為 Leader;
當(dāng)所有的 Replica 都歇菜了,會(huì)等任意一個(gè) Replica 復(fù)活,將其作為 Leader。
//
這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的折衷。如果一定要等待ISR中的Replica“活”過(guò)來(lái),那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中的所有Replica都無(wú)法“活”過(guò)來(lái)了,或者數(shù)據(jù)都丟失了,這個(gè)Partition將永遠(yuǎn)不可用。選擇第一個(gè)“活”過(guò)來(lái)的Replica作為L(zhǎng)eader,而這個(gè)Replica不是ISR中的Replica,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會(huì)成為L(zhǎng)eader而作為consumer的數(shù)據(jù)源(前文有說(shuō)明,所有讀寫(xiě)都由Leader完成)。Kafka0.8.*使用了第二種方式。根據(jù)Kafka的文檔,在以后的版本中,Kafka支持用戶通過(guò)配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性。
ISR(同步列表)中的 Follower 都"跟上"了Leader,"跟上"并不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置。表示 Leader 等待 Follower 同步消息的大時(shí)間,如果超時(shí),Leader 將 Follower 移除 ISR。配置項(xiàng) replica.lag.max.messages 已經(jīng)移除。
Replica 副本如何同步 消息傳遞同步策略
1、Producer在發(fā)布消息到某個(gè)Partition時(shí),先通過(guò)ZooKeeper找到該P(yáng)artition的Leader,
2、無(wú)論該Topic的Replication Factor為多少,Producer只將該消息發(fā)送到該P(yáng)artition的Leader。
3、Leader會(huì)將該消息寫(xiě)入其本地Log。
4、每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。
5、Follower在收到該消息并寫(xiě)入其Log后,向Leader發(fā)送ACK。
6、一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW并且向Producer發(fā)送ACK。
為了提高性能,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK,而非等到數(shù)據(jù)寫(xiě)入Log中。
因此,對(duì)于已經(jīng)commit的消息,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤(pán)中,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)。
Consumer讀消息也是從Leader讀取,只有被commit過(guò)的消息才會(huì)暴露給Consumer。
具體的可靠性,是由生產(chǎn)者(根據(jù)配置項(xiàng) producer.properties/acks)來(lái)決定的。
有資料說(shuō) 最新的文檔 2.2.x request.required.acks 已經(jīng)不存在了,這一點(diǎn)有待我去確認(rèn)
通俗一點(diǎn)講對(duì)ack的三個(gè)參數(shù)的含義為
Kafka?producer有三種ack機(jī)制 ?初始化producer時(shí)在config中進(jìn)行配置
0?:意味著producer不等待broker同步完成的確認(rèn),繼續(xù)發(fā)送下一條(批)信息
提供了最低的延遲。但是最弱的持久性,當(dāng)服務(wù)器發(fā)生故障時(shí),就很可能發(fā)生數(shù)據(jù)丟失。例如leader已經(jīng)死亡,producer不知情,還會(huì)繼續(xù)發(fā)送消息broker接收不到數(shù)據(jù)就會(huì)數(shù)據(jù)丟失
1:意味著producer要等待leader成功收到數(shù)據(jù)并得到確認(rèn),才發(fā)送下一條message。此選項(xiàng)提供了較好的持久性較低的延遲性。Partition的Leader死亡,follwer尚未復(fù)制,數(shù)據(jù)就會(huì)丟失
-1:意味著producer得到follwer確認(rèn),才發(fā)送下一條數(shù)據(jù)
持久性最好,延時(shí)性最差。
在這里強(qiáng)調(diào)的一點(diǎn)是,在kafak的partition中的fllower和leader中的復(fù)制不是完全的同步復(fù)制,也不是單純的異步復(fù)制
同步復(fù)制:所有的fllower復(fù)制完才提交 這樣的缺點(diǎn)是極大的影響了吞吐率
異步復(fù)制:Follower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫(xiě)入log就被認(rèn)為已經(jīng)commit,這種情況下如果Follower都復(fù)制完都落后于Leader,而如果Leader突然宕機(jī),則會(huì)丟失數(shù)據(jù)。
所有 kafak采用的是ISR 的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower可以批量的從Leader復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫(xiě)磁盤(pán)),極大減少了Follower與Leader的差距。
producer如何發(fā)送消息
Producer 首先將消息封裝進(jìn)一個(gè) ProducerRecord 實(shí)例中。
寫(xiě)消息的路由模式
1、 指定了 patition,則直接使用;
2、 未指定 patition 但指定 key,通過(guò)對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition
這個(gè) Hash(即分區(qū)機(jī)制)由 producer.properties/partitioner.class 指定的類實(shí)現(xiàn),這個(gè)路由類需要實(shí)現(xiàn) Partitioner 接口。
3、 patition 和 key 都未指定,使用輪詢選出一個(gè) patition。
備注:消息并不會(huì)立即發(fā)送,而是先進(jìn)行序列化后,發(fā)送給 Partitioner,
也就是上面提到的 Hash 函數(shù),由 Partitioner 確定目標(biāo)分區(qū)后,發(fā)送到一塊內(nèi)存緩沖區(qū)中(發(fā)送隊(duì)列)。Producer 的另一個(gè)工作線程(即 Sender 線程),
則負(fù)責(zé)實(shí)時(shí)地從該緩沖區(qū)中提取出準(zhǔn)備好的消息封裝到一個(gè)批次內(nèi),統(tǒng)一發(fā)送到對(duì)應(yīng)的 Broker 中。
具體寫(xiě)數(shù)據(jù)流程如下:
具體流程如下:
1、 producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader
2、 producer 將消息發(fā)送給該 leader
3、 leader 將消息寫(xiě)入本地 log
4、 followers 從 leader pull 消息,寫(xiě)入本地 log 后 leader 發(fā)送 ACK
5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK
參考網(wǎng)上的資料,有的文字版的解釋是這樣的 個(gè)人覺(jué)得下面的這樣的文字解釋的更加通俗易懂
流程如下:
1、首先,我們需要?jiǎng)?chuàng)建一個(gè)ProducerRecord,這個(gè)對(duì)象需要包含消息的主題(topic)和值(value),可以選擇性指定一個(gè)鍵值(key)或者分區(qū)(partition)。
2、發(fā)送消息時(shí),生產(chǎn)者會(huì)對(duì)鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器(partitioner)。
3、如果我們指定了分區(qū),那么分配器返回該分區(qū)即可;否則,分配器將會(huì)基于鍵值來(lái)選擇一個(gè)分區(qū)并返回。
4、選擇完分區(qū)后,生產(chǎn)者知道了消息所屬的主題和分區(qū),它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個(gè)線程負(fù)責(zé)發(fā)送這些批量消息到對(duì)應(yīng)的Kafka broker。
5、當(dāng)broker接收到消息后,如果成功寫(xiě)入則返回一個(gè)包含消息的主題、分區(qū)及位移的RecordMetadata對(duì)象,否則返回異常。
6、生產(chǎn)者接收到結(jié)果后,對(duì)于異??赡軙?huì)進(jìn)行重試。
參考鏈接:
架構(gòu)成長(zhǎng)之路:Kafka設(shè)計(jì)原理看了又忘,忘了又看?一文讓你掌握: https://www.toutiao.com/i6714606866355192328/
Kafka的ACK機(jī)制有三種,是哪三種 : https://blog.csdn.net/Sun1181342029/article/details/87806207
kafka原理系列ACK機(jī)制(數(shù)據(jù)可靠性和持久性保證) https://blog.csdn.net/bluehawksk/article/details/96120803
kafka入門(mén)介紹 https://www.orchome.com/5
Kafka學(xué)習(xí)之路 (三)Kafka的高可用 https://www.cnblogs.com/qingyunzong/p/9004703.html
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。