對(duì)于一個(gè)成熟的消息中間件而言,消息格式不僅關(guān)系到功能維度的擴(kuò)展,還牽涉到性能維度的優(yōu)化。隨著Kafka的迅猛發(fā)展,其消息格式也在不斷的升級(jí)改進(jìn),從0.8.x版本開始到現(xiàn)在的1.1.x版本,Kafka的消息格式也經(jīng)歷了3個(gè)版本。本文這里主要來講述Kafka的三個(gè)版本的消息格式的演變,文章偏長(zhǎng),建議先關(guān)注后鑒定。
成都創(chuàng)新互聯(lián)致力于互聯(lián)網(wǎng)品牌建設(shè)與網(wǎng)絡(luò)營(yíng)銷,包括成都網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、SEO優(yōu)化、網(wǎng)絡(luò)推廣、整站優(yōu)化營(yíng)銷策劃推廣、電子商務(wù)、移動(dòng)互聯(lián)網(wǎng)營(yíng)銷等。成都創(chuàng)新互聯(lián)為不同類型的客戶提供良好的互聯(lián)網(wǎng)應(yīng)用定制及解決方案,成都創(chuàng)新互聯(lián)核心團(tuán)隊(duì)十載專注互聯(lián)網(wǎng)開發(fā),積累了豐富的網(wǎng)站經(jīng)驗(yàn),為廣大企業(yè)客戶提供一站式企業(yè)網(wǎng)站建設(shè)服務(wù),在網(wǎng)站建設(shè)行業(yè)內(nèi)樹立了良好口碑。Kafka根據(jù)topic(主題)對(duì)消息進(jìn)行分類,發(fā)布到Kafka集群的每條消息都需要指定一個(gè)topic,每個(gè)topic將被分為多個(gè)partition(分區(qū))。每個(gè)partition在存儲(chǔ)層面是追加log(日志)文件,任何發(fā)布到此partition的消息都會(huì)被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個(gè)long型的數(shù)值,它唯一標(biāo)記一條消息。
每一條消息被發(fā)送到Kafka中,其會(huì)根據(jù)一定的規(guī)則選擇被存儲(chǔ)到哪一個(gè)partition中。如果規(guī)則設(shè)置的合理,所有的消息可以均勻分布到不同的partition里,這樣就實(shí)現(xiàn)了水平擴(kuò)展。如上圖,每個(gè)partition由其上附著的每一條消息組成,如果消息格式設(shè)計(jì)的不夠精煉,那么其功能和性能都會(huì)大打折扣。比如有冗余字段,勢(shì)必會(huì)使得partition不必要的增大,進(jìn)而不僅使得存儲(chǔ)的開銷變大、網(wǎng)絡(luò)傳輸?shù)拈_銷變大,也會(huì)使得Kafka的性能下降;又比如缺少字段,在最初的Kafka消息版本中沒有timestamp字段,對(duì)內(nèi)部而言,其影響了日志保存、切分策略,對(duì)外部而言,其影響了消息審計(jì)、端到端延遲等功能的擴(kuò)展,雖然可以在消息體內(nèi)部添加一個(gè)時(shí)間戳,但是解析變長(zhǎng)的消息體會(huì)帶來額外的開銷,而存儲(chǔ)在消息體(參考下圖中的value字段)前面可以通過指針偏量獲取其值而容易解析,進(jìn)而減少了開銷(可以查看v1版本),雖然相比于沒有timestamp字段的開銷會(huì)差一點(diǎn)。如此分析,僅在一個(gè)字段的一增一減之間就有這么多門道,那么Kafka具體是怎么做的呢?本文只針對(duì)Kafka 0.8.x版本開始做相應(yīng)說明,對(duì)于之前的版本不做陳述。
對(duì)于Kafka消息格式的第一個(gè)版本,我們把它稱之為v0,在Kafka 0.10.0版本之前都是采用的這個(gè)消息格式。注意如無特殊說明,我們只討論消息未壓縮的情形。
上左圖中的“RECORD”部分就是v0版本的消息格式,大多數(shù)人會(huì)把左圖中的整體,即包括offset和message size字段都都看成是消息,因?yàn)槊總€(gè)Record(v0和v1版)必定對(duì)應(yīng)一個(gè)offset和message size。每條消息都一個(gè)offset用來標(biāo)志它在partition中的偏移量,這個(gè)offset是邏輯值,而非實(shí)際物理偏移值,message size表示消息的大小,這兩者的一起被稱之為日志頭部(LOG_OVERHEAD),固定為12B。LOG_OVERHEAD和RECORD一起用來描述一條消息。與消息對(duì)應(yīng)的還有消息集的概念,消息集中包含一條或者多條消息,消息集不僅是存儲(chǔ)于磁盤以及在網(wǎng)絡(luò)上傳輸(Produce & Fetch)的基本形式,而且是kafka中壓縮的基本單元,詳細(xì)結(jié)構(gòu)參考上右圖。
下面來具體陳述一下消息(Record)格式中的各個(gè)字段,從crc32開始算起,各個(gè)字段的解釋如下:
v0版本中一個(gè)消息的最小長(zhǎng)度(RECORD_OVERHEAD_V0)為crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B,也就是說v0版本中一條消息的最小長(zhǎng)度為14B,如果小于這個(gè)值,那么這就是一條破損的消息而不被接受。
這里我們來做一個(gè)測(cè)試,首先創(chuàng)建一個(gè)partition數(shù)和副本數(shù)都為1的topic,名稱為“msg_format_v0”,然后往msg_format_v0中發(fā)送一條key=”key”,value=”value”的消息,之后查看對(duì)應(yīng)的日志:
[root@node1 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /tmp/kafka-logs-08/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3
查看消息的大小,即00000000000000000000.log文件的大小為34B,其值正好等于LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 3B的key + 5B的value = 12B + 14B + 3B + 5B = 34B。
[root@node1 msg_format_v0-0]# ll *.log
-rw-r--r-- 1 root root 34 Apr 26 02:52 00000000000000000000.log
我們?cè)侔l(fā)送一條key=null, value=”value”的消息,之后查看日志的大?。?/p>
[root@node3 msg_format_v0-0]# ll *.log
-rw-r--r-- 1 root root 65 Apr 26 02:56 00000000000000000000.log
日志大小為65B,減去上一條34B的消息,可以得知本條消息的大小為31B,正好等于LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 5B的value = 12B + 14B+ 5B = 31B。
kafka從0.10.0版本開始到0.11.0版本之前所使用的消息格式版本為v1,其比v0版本就多了一個(gè)timestamp字段,表示消息的時(shí)間戳。v1版本的消息結(jié)構(gòu)圖如下所示:
v1版本的magic字段值為1。v1版本的attributes字段中的低3位和v0版本的一樣,還是表示壓縮類型,而第4個(gè)bit也被利用了起來:0表示timestamp類型為CreateTime,而1表示tImestamp類型為L(zhǎng)ogAppendTime,其他位保留。v1版本的最小消息(RECORD_OVERHEAD_V1)大小要比v0版本的要大8個(gè)字節(jié),即22B。如果像v0版本介紹的一樣發(fā)送一條key=”key”,value=”value”的消息,那么此條消息在v1版本中會(huì)占用42B,具體測(cè)試步驟參考v0版的相關(guān)介紹。
常見的壓縮算法是數(shù)據(jù)量越大壓縮效果越好,一條消息通常不會(huì)太大,這就導(dǎo)致壓縮效果并不太好。而kafka實(shí)現(xiàn)的壓縮方式是將多條消息一起進(jìn)行壓縮,這樣可以保證較好的壓縮效果。而且在一般情況下,生產(chǎn)者發(fā)送的壓縮數(shù)據(jù)在kafka broker中也是保持壓縮狀態(tài)進(jìn)行存儲(chǔ),消費(fèi)者從服務(wù)端獲取也是壓縮的消息,消費(fèi)者在處理消息之前才會(huì)解壓消息,這樣保持了端到端的壓縮。
壓縮率是壓縮后的大小與壓縮前的對(duì)比。例如:把100MB的文件壓縮后是90MB,壓縮率為90/100*100%=90%,壓縮率一般是越小壓縮效果越好。一般口語化陳述時(shí)會(huì)誤描述為壓縮率越高越好,為了避免混淆,本文不引入學(xué)術(shù)上的壓縮率而引入壓縮效果,這樣容易達(dá)成共識(shí)。
講解到這里都是針對(duì)消息未壓縮的情況,而當(dāng)消息壓縮時(shí)是將整個(gè)消息集進(jìn)行壓縮而作為內(nèi)層消息(inner message),內(nèi)層消息整體作為外層(wrapper message)的value,其結(jié)構(gòu)圖如下所示:
壓縮后的外層消息(wrapper message)中的key為null,所以圖右部分沒有畫出key這一部分。當(dāng)生產(chǎn)者創(chuàng)建壓縮消息的時(shí)候,對(duì)內(nèi)部壓縮消息設(shè)置的offset是從0開始為每個(gè)內(nèi)部消息分配offset,詳細(xì)可以參考下圖右部:
其實(shí)每個(gè)從生產(chǎn)者發(fā)出的消息集中的消息offset都是從0開始的,當(dāng)然這個(gè)offset不能直接存儲(chǔ)在日志文件中,對(duì)offset進(jìn)行轉(zhuǎn)換時(shí)在服務(wù)端進(jìn)行的,客戶端不需要做這個(gè)工作。外層消息保存了內(nèi)層消息中最后一條消息的絕對(duì)位移(absolute offset),絕對(duì)位移是指相對(duì)于整個(gè)partition而言的。參考上圖,對(duì)于未壓縮的情形,圖右內(nèi)層消息最后一條的offset理應(yīng)是1030,但是被壓縮之后就變成了5,而這個(gè)1030被賦予給了外層的offset。當(dāng)消費(fèi)者消費(fèi)這個(gè)消息集的時(shí)候,首先解壓縮整個(gè)消息集,然后找到內(nèi)層消息中最后一條消息的inner offset,然后根據(jù)如下公式找到內(nèi)層消息中最后一條消息前面的消息的absolute offset(RO表示Relative Offset,IO表示Inner Offset,而AO表示Absolute Offset):
RO = IO_of_a_message - IO_of_the_last_message
AO = AO_Of_Last_Inner_Message + RO
注意這里RO是前面的消息相對(duì)于最后一條消息的IO而言的,所以其值小于等于0,0表示最后一條消息自身。
壓縮消息,英文是compress message,Kafka中還有一個(gè)compact message,常常也會(huì)被人們直譯成壓縮消息,需要注意兩者的區(qū)別。compact message是針對(duì)日志清理策略而言的(cleanup.policy=compact),是指日志壓縮(log compaction)后的消息,這個(gè)后續(xù)的系列文章中會(huì)有介紹。本文中的壓縮消息單指compress message,即采用GZIP、LZ4等壓縮工具壓縮的消息。
在講述v1版本的消息時(shí),我們了解到v1版本比v0版的消息多了個(gè)timestamp的字段。對(duì)于壓縮的情形,外層消息的timestamp設(shè)置為:
內(nèi)層消息的timestamp設(shè)置為:
對(duì)于attributes字段而言,它的timestamp位只在外層消息(wrapper message)中設(shè)置,內(nèi)層消息(inner message)中的timestamp類型一直都是CreateTime。
kafka從0.11.0版本開始所使用的消息格式版本為v2,這個(gè)版本的消息相比于v0和v1的版本而言改動(dòng)很大,同時(shí)還參考了Protocol Buffer而引入了變長(zhǎng)整型(Varints)和ZigZag編碼。Varints是使用一個(gè)或多個(gè)字節(jié)來序列化整數(shù)的一種方法,數(shù)值越小,其所占用的字節(jié)數(shù)就越少。ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭于正負(fù)整數(shù)之間,以使得帶符號(hào)整數(shù)映射為無符號(hào)整數(shù),這樣可以使得絕對(duì)值較小的負(fù)數(shù)仍然享有較小的Varints編碼值,比如-1編碼為1,1編碼為2,-2編碼為3。詳細(xì)可以參考:https://developers.google.com/protocol-buffers/docs/encoding。
回顧一下kafka v0和v1版本的消息格式,如果消息本身沒有key,那么key length字段為-1,int類型的需要4個(gè)字節(jié)來保存,而如果采用Varints來編碼則只需要一個(gè)字節(jié)。根據(jù)Varints的規(guī)則可以推導(dǎo)出0-63之間的數(shù)字占1個(gè)字節(jié),64-8191之間的數(shù)字占2個(gè)字節(jié),8192-1048575之間的數(shù)字占3個(gè)字節(jié)。而kafka broker的配置message.max.bytes的默認(rèn)大小為1000012(Varints編碼占3個(gè)字節(jié)),如果消息格式中與長(zhǎng)度有關(guān)的字段采用Varints的編碼的話,絕大多數(shù)情況下都會(huì)節(jié)省空間,而v2版本的消息格式也正是這樣做的。不過需要注意的是Varints并非一直會(huì)省空間,一個(gè)int32最長(zhǎng)會(huì)占用5個(gè)字節(jié)(大于默認(rèn)的4字節(jié)),一個(gè)int64最長(zhǎng)會(huì)占用10字節(jié)(大于默認(rèn)的8字節(jié))。
v2版本中消息集謂之為Record Batch,而不是先前的Message Set了,其內(nèi)部也包含了一條或者多條消息,消息的格式參見下圖中部和右部。在消息壓縮的情形下,Record Batch Header部分(參見下圖左部,從first offset到records count字段)是不被壓縮的,而被壓縮的是records字段中的所有內(nèi)容。
先來講述一下消息格式Record的關(guān)鍵字段,可以看到內(nèi)部字段大量采用了Varints,這樣Kafka可以根據(jù)具體的值來確定需要幾個(gè)字節(jié)來保存。v2版本的消息格式去掉了crc字段,另外增加了length(消息總長(zhǎng)度)、timestamp delta(時(shí)間戳增量)、offset delta(位移增量)和headers信息,并且attributes被棄用了,筆者對(duì)此做如下分析(對(duì)于key、key length、value、value length字段和v0以及v1版本的一樣,這里不再贅述):
1. length:消息總長(zhǎng)度。
2. attributes:棄用,但是還是在消息格式中占據(jù)1B的大小,以備未來的格式擴(kuò)展。
3. timestamp delta:時(shí)間戳增量。通常一個(gè)timestamp需要占用8個(gè)字節(jié),如果像這里保存與RecordBatch的其實(shí)時(shí)間戳的差值的話可以進(jìn)一步的節(jié)省占用的字節(jié)數(shù)。
4. offset delta:位移增量。保存與RecordBatch起始位移的差值,可以節(jié)省占用的字節(jié)數(shù)。
5. headers:這個(gè)字段用來支持應(yīng)用級(jí)別的擴(kuò)展,而不需要像v0和v1版本一樣不得不將一些應(yīng)用級(jí)別的屬性值嵌入在消息體里面。Header的格式如上圖最有,包含key和value,一個(gè)Record里面可以包含0至多個(gè)Header。
如果對(duì)于v1版本的消息,如果用戶指定的timestamp類型是LogAppendTime而不是CreateTime,那么消息從發(fā)送端(Producer)進(jìn)入broker端之后timestamp字段會(huì)被更新,那么此時(shí)消息的crc值將會(huì)被重新計(jì)算,而此值在Producer端已經(jīng)被計(jì)算過一次;再者,broker端在進(jìn)行消息格式轉(zhuǎn)換時(shí)(比如v1版轉(zhuǎn)成v0版的消息格式)也會(huì)重新計(jì)算crc的值。在這些類似的情況下,消息從發(fā)送端到消費(fèi)端(Consumer)之間流動(dòng)時(shí),crc的值是變動(dòng)的,需要計(jì)算兩次crc的值,所以這個(gè)字段的設(shè)計(jì)在v0和v1版本中顯得比較雞肋。在v2版本中將crc的字段從Record中轉(zhuǎn)移到了RecordBatch中。
v2版本對(duì)于消息集(RecordBatch)做了徹底的修改,參考上圖左部,除了剛剛提及的crc字段,還多了如下字段:
1. first offset:表示當(dāng)前RecordBatch的起始位移。
2. length:計(jì)算partition leader epoch到headers之間的長(zhǎng)度。
3. partition leader epoch:用來確保數(shù)據(jù)可靠性。
4. magic:消息格式的版本號(hào),對(duì)于v2版本而言,magic等于2。
5. attributes:消息屬性,注意這里占用了兩個(gè)字節(jié)。低3位表示壓縮格式,可以參考v0和v1;第4位表示時(shí)間戳類型;第5位表示此RecordBatch是否處于事務(wù)中,0表示非事務(wù),1表示事務(wù)。第6位表示是否是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用來支持事務(wù)功能。
6. last offset delta:RecordBatch中最后一個(gè)Record的offset與first offset的差值。主要被broker用來確認(rèn)RecordBatch中Records的組裝正確性。
7. first timestamp:RecordBatch中第一條Record的時(shí)間戳。
8. max timestamp:RecordBatch中大的時(shí)間戳,一般情況下是指最后一個(gè)Record的時(shí)間戳,和last offset delta的作用一樣,用來確保消息組裝的正確性。
9. producer id:用來支持冪等性。
10. producer epoch:和producer id一樣,用來支持冪等性。
11. first sequence:和producer id、producer epoch一樣,用來支持冪等性。
12. records count:RecordBatch中Record的個(gè)數(shù)。
這里我們?cè)賮碜鲆粋€(gè)測(cè)試,在1.0.0的kafka中創(chuàng)建一個(gè)partition數(shù)和副本數(shù)都為1的topic,名稱為“msg_format_v2”。然后同樣插入一條key=”key”,value=”value”的消息,查看日志結(jié)果如下:
[root@node1 kafka_2.12-1.0.0]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1524709879130 isvalid: true size: 76 magic: 2 compresscodec: NONE crc: 2857248333
可以看到size字段為76,我們根據(jù)上圖中的v2版本的日志格式來驗(yàn)證一下,Record Batch Header部分共61B。Record部分中attributes占1B;timestamp delta值為0,占1B;offset delta值為0,占1B;key length值為3,占1B,key占3B;value length值為5,占1B,value占5B;headers count值為0,占1B, 無headers。Record部分的總長(zhǎng)度=1B+1B+1B+1B+3B+1B+5B+1B=14B,所以Record的length字段值為14,編碼為變長(zhǎng)整型占1B。最后推到出這條消息的占用字節(jié)數(shù)=61B+14B+1B=76B,符合測(cè)試結(jié)果。同樣再發(fā)一條key=null,value=”value”的消息的話,可以計(jì)算出這條消息占73B。
這么看上去好像v2版本的消息比之前版本的消息占用空間要大很多,的確對(duì)于單條消息而言是這樣的,如果我們連續(xù)往msg_format_v2中再發(fā)送10條value長(zhǎng)度為6,key為null的消息,可以得到:
baseOffset: 2 lastOffset: 11 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 149 CreateTime: 1524712213771 isvalid: true size: 191 magic: 2 compresscodec: NONE crc: 820363253
本來應(yīng)該占用740B大小的空間,實(shí)際上只占用了191B,如果在v0版本中這10條消息則需要占用320B的空間,v1版本則需要占用400B的空間,這樣看來v2版本又節(jié)省了很多的空間,因?yàn)槠鋵⒍鄠€(gè)消息(Record)打包存放到單個(gè)RecordBatch中,又通過Varints編碼極大的節(jié)省了空間。
就以v1和v2版本對(duì)比而立,至于哪個(gè)消息格式占用空間大是不確定的,要根據(jù)具體情況具體分析。比如每條消息的大小為16KB,那么一個(gè)消息集中只能包含有一條消息(參數(shù)batch.size默認(rèn)大小為16384),所以v1版本的消息集大小為12B+22B+16384B=16418B。而對(duì)于v2版本而言,其消息集大小為61B+11B+16384B=17086B(length值為16384+,占用3B,value length值為16384,占用大小為3B,其余數(shù)值型的字段都可以只占用1B的空間)。可以看到v1版本又會(huì)比v2版本節(jié)省些許空間。
其實(shí)可以思考一下:當(dāng)消息體越小,v2版本中的Record字段的占用會(huì)比v1版本的LogHeader+Record占用越小,以至于某個(gè)臨界點(diǎn)可以完全忽略到v2版本中Record Batch Header的61B大小的影響。就算消息體很大,v2版本的空間占用也不會(huì)比v1版本的空間占用大太多,幾十個(gè)字節(jié)內(nèi),反觀對(duì)于這種大消息體的大小而言,這幾十個(gè)字節(jié)的大小從某種程度上又可以忽略。
由此可見,v2版本的消息不僅提供了類似事務(wù)、冪等等更多的功能,還對(duì)空間占用提供了足夠的優(yōu)化,總體提升很大。也由此體現(xiàn)一個(gè)優(yōu)秀的設(shè)計(jì)是多么的重要,雖然說我們不要過度的設(shè)計(jì)和優(yōu)化,那么是否可以著眼于前來思考一下?kafka為我們做了一個(gè)很好的榜樣。
本文的重點(diǎn)是你有沒有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。