這篇文章給大家介紹怎么解析Kafka中的事務(wù)消息,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
專注于為中小企業(yè)提供成都網(wǎng)站制作、成都做網(wǎng)站服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)葉集免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了成百上千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
為了解決重試導(dǎo)致的消息重復(fù)、亂序問題,kafka引入了冪等消息。冪等消息保證producer在一次會話內(nèi)寫入一個(gè)partition內(nèi)的消息具有冪等性,可以通過重試來確保消息發(fā)布的Exactly Once語義。
實(shí)現(xiàn)邏輯很簡單:
區(qū)分producer會話
producer每次啟動(dòng)后,首先向broker申請一個(gè)全局唯一的pid,用來標(biāo)識本次會話。
消息檢測
message_v2 增加了sequence number字段,producer每發(fā)一批消息,seq就加1。
broker在內(nèi)存維護(hù)(pid,seq)映射,收到消息后檢查seq,如果,
new_seq=old_seq+1: 正常消息;new_seq<=old_seq : 重復(fù)消息;new_seq>old_seq+1: 消息丟失;
producer重試
producer在收到明確的的消息丟失ack,或者超時(shí)后未收到ack,要進(jìn)行重試。
考慮在stream處理的場景中,需要多個(gè)消息的原子寫入語義,要么全部寫入成功,要么全部失敗,這就是kafka事務(wù)消息要解決的問題。
事務(wù)消息是由producer、事務(wù)協(xié)調(diào)器、broker、組協(xié)調(diào)器、consumer共同參與實(shí)現(xiàn)的,
為producer指定固定的TransactionalId,可以穿越producer的多次會話(producer重啟/斷線重連)中,持續(xù)標(biāo)識producer的身份。
使用epoch標(biāo)識producer的每一次"重生",防止同一producer存在多個(gè)會話。
producer遵從冪等消息的行為,并在發(fā)送的recordbatch中增加事務(wù)id和epoch。
引入事務(wù)協(xié)調(diào)器,以兩階段提交的方式,實(shí)現(xiàn)消息的事務(wù)提交。
事務(wù)協(xié)調(diào)器使用一個(gè)特殊的topic:transaction,來做事務(wù)提交日志。
事務(wù)控制器通過RPC調(diào)研,協(xié)調(diào) broker 和 consumer coordinator 實(shí)現(xiàn)事務(wù)的兩階段提交。
每一個(gè)broker都會啟動(dòng)一個(gè)事務(wù)協(xié)調(diào)器,使用hash(TransactionalId)確定producer對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個(gè)集群的負(fù)載均衡。
broker處理在事務(wù)協(xié)調(diào)器的commit/abort控制消息,把控制消息向正常消息一樣寫入topic(和正常消息交織在一起,用來確認(rèn)事務(wù)提交的日志偏移),并向前推進(jìn)消息提交偏移hw。
如果在事務(wù)過程中,提交了消費(fèi)偏移,組協(xié)調(diào)器在offset log中寫入事務(wù)消費(fèi)偏移。當(dāng)事務(wù)提交時(shí),在offset log中寫入事務(wù)offset確認(rèn)消息。
consumer過濾未提交消息和事務(wù)控制消息,使這些消息對用戶不可見。
有兩種實(shí)現(xiàn)方式:
consumer緩存方式
設(shè)置isolation.level=read_uncommitted,此時(shí)topic的所有消息對consumer都可見。
consumer緩存這些消息,直到收到事務(wù)控制消息。若事務(wù)commit,則對外發(fā)布這些消息;若事務(wù)abort,則丟棄這些消息。
broker過濾方式
設(shè)置isolation.level=read_committed,此時(shí)topic中未提交的消息對consumer不可見,只有在事務(wù)結(jié)束后,消息才對consumer可見。
broker給consumer的BatchRecord消息中,會包含以列表,指明哪些是"abort"事務(wù),consumer丟棄abort事務(wù)的消息即可。
事務(wù)消息處理流程如圖1所示,
圖1 事務(wù)消息業(yè)務(wù)流程
流程說明:
事務(wù)協(xié)調(diào)器是分配pid和管理事務(wù)的核心,produer首先對任何一個(gè)broker發(fā)送FindCoordinatorRequest,發(fā)現(xiàn)自己的事務(wù)協(xié)調(diào)器。
緊接著,producer向事務(wù)協(xié)調(diào)器發(fā)送InitPidRequest,申請生成pid。
2a.當(dāng)指定了transactional.id時(shí),事務(wù)協(xié)調(diào)器為producer分區(qū)pid,并更新epoch,把(tid,pid)的映射關(guān)系寫入事務(wù)日志。同時(shí)清理tid任何未完成的事務(wù),丟棄未提交的消息。
啟動(dòng)事務(wù)是producer的本地操作,促使producer更新內(nèi)部狀態(tài),不會和事務(wù)協(xié)調(diào)器發(fā)生關(guān)系。
事務(wù)協(xié)調(diào)器自動(dòng)啟動(dòng)事務(wù),始終處在一個(gè)接一個(gè)的事務(wù)處理狀態(tài)機(jī)中。
對于每一個(gè)要在事務(wù)中寫消息的topic分區(qū),producer應(yīng)當(dāng)在第一次發(fā)消息前,向事務(wù)處理器注冊分區(qū)。
4.1a.事務(wù)處理器把事務(wù)關(guān)聯(lián)的分區(qū)寫入事務(wù)日志。
在提交或終止事務(wù)時(shí),事務(wù)協(xié)調(diào)器需要這些信息,控制事務(wù)涉及的所有分區(qū)leader完成事務(wù)提交或終止。
4.2a. producer向分區(qū)leader寫消息,消息中包含tid,pid,epoch和seq。
4.3. 提交消費(fèi)偏移 -- AddOffsetCommitsToTxnRequest
4.3a. producer向事務(wù)協(xié)調(diào)器發(fā)送消費(fèi)偏移,事務(wù)協(xié)調(diào)器在事務(wù)日志中記錄偏移信息,并把組協(xié)調(diào)器返回給producer。
4.4a. producer向組協(xié)調(diào)器發(fā)送TxnOffsetCommitRequest,組協(xié)調(diào)器把偏移信息寫入偏移日志。但是,要一直等到事務(wù)提交后,這個(gè)偏移才生效,對外部可見。
收到提交或終止事務(wù)的請求時(shí),事務(wù)處理器執(zhí)行下面的操作:
1. 在事務(wù)日志中寫入PREPARE_COMMIT或PREPARE_ABORT消息(5.1a)。
2. 通過WriteTxnMarkerRequest向事務(wù)中的所有broker發(fā)事務(wù)控制消息(5.2)。
3. 在事務(wù)之日中寫入COMMITTED或ABORTED消息(5.3)。
這個(gè)消息由事務(wù)處理器發(fā)給事務(wù)中所涉及分區(qū)的leader。
當(dāng)收到這個(gè)消息后,broker會在分區(qū)log中寫入一個(gè)COMMIT或ABORT控制消息。同時(shí),也會更新該分區(qū)的事務(wù)提交偏移hw。
如果事務(wù)中有提交消費(fèi)偏移, broker也會把控制消息寫入 __consumer-offsets log,并通知組協(xié)調(diào)器使事務(wù)中提交的消費(fèi)偏移生效。
當(dāng)所有的commit或abort消息寫入數(shù)據(jù)日志,事務(wù)協(xié)調(diào)器在事務(wù)日志中寫入事務(wù)日志,標(biāo)志這事務(wù)結(jié)束。
至此,本事務(wù)的所有狀態(tài)信息都可以被刪除,可以開始一個(gè)新的事務(wù)。
在實(shí)現(xiàn)上,還有很多細(xì)節(jié),比如,事務(wù)協(xié)調(diào)器會啟動(dòng)定時(shí)器,用來檢測并終止開始后長時(shí)間不活動(dòng)的事務(wù),具體請參考下面列出的kafka社區(qū)技術(shù)文檔。
我們要認(rèn)識到,雖然kafka事務(wù)消息提供了多個(gè)消息原子寫的保證,但它不保證原子讀。
例如,
1)事務(wù)向topic_a和topic_b兩個(gè)分區(qū)寫入消息,在事務(wù)提交后的某個(gè)時(shí)刻,topic_a的全部副本失效。這時(shí)topic_b中的消息可以正常消費(fèi),但topic_a中的消息就丟失了。2)假如consumer只消費(fèi)了topic_a,沒有消費(fèi)topic_b,這樣也不能讀到完整的事務(wù)消息。3)典型的kafka stream應(yīng)用從多個(gè)topic消費(fèi),然后向一個(gè)或多個(gè)topic寫。在一次故障后,kafka stream應(yīng)用重新開始處理流數(shù)據(jù),由于從多個(gè)topic讀到的數(shù)據(jù)之間不存在穩(wěn)定的順序(即便只有一個(gè)topic,從多個(gè)分區(qū)讀到的數(shù)據(jù)之間也沒有穩(wěn)定的順序),那么兩次處理輸出的結(jié)果就可能會不一樣。
也就是說,雖然kafka log持久化了數(shù)據(jù),也可以通過指定offset多次消費(fèi)數(shù)據(jù),但由于分區(qū)數(shù)據(jù)之間的無序性,導(dǎo)致每次處理輸出的結(jié)果都是不同的。這使得kafka stream不能像hadoop批處理任務(wù)一樣,可以隨時(shí)重新執(zhí)行,保證每次執(zhí)行的結(jié)果相同。除非我們只從一個(gè)topic分區(qū)讀數(shù)據(jù)。
關(guān)于怎么解析Kafka中的事務(wù)消息就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。