真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎么解析Kafka中的事務(wù)消息

這篇文章給大家介紹怎么解析Kafka中的事務(wù)消息,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

專注于為中小企業(yè)提供成都網(wǎng)站制作、成都做網(wǎng)站服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)葉集免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了成百上千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

1.冪等消息

為了解決重試導(dǎo)致的消息重復(fù)、亂序問題,kafka引入了冪等消息。冪等消息保證producer在一次會話內(nèi)寫入一個(gè)partition內(nèi)的消息具有冪等性,可以通過重試來確保消息發(fā)布的Exactly Once語義。

實(shí)現(xiàn)邏輯很簡單:

  • 區(qū)分producer會話

producer每次啟動(dòng)后,首先向broker申請一個(gè)全局唯一的pid,用來標(biāo)識本次會話。

  • 消息檢測

message_v2 增加了sequence number字段,producer每發(fā)一批消息,seq就加1。

broker在內(nèi)存維護(hù)(pid,seq)映射,收到消息后檢查seq,如果,

new_seq=old_seq+1: 正常消息;new_seq<=old_seq : 重復(fù)消息;new_seq>old_seq+1: 消息丟失;
  • producer重試

producer在收到明確的的消息丟失ack,或者超時(shí)后未收到ack,要進(jìn)行重試。

2.事務(wù)消息

考慮在stream處理的場景中,需要多個(gè)消息的原子寫入語義,要么全部寫入成功,要么全部失敗,這就是kafka事務(wù)消息要解決的問題。

事務(wù)消息是由producer、事務(wù)協(xié)調(diào)器、broker、組協(xié)調(diào)器、consumer共同參與實(shí)現(xiàn)的,

1)producer

為producer指定固定的TransactionalId,可以穿越producer的多次會話(producer重啟/斷線重連)中,持續(xù)標(biāo)識producer的身份。

使用epoch標(biāo)識producer的每一次"重生",防止同一producer存在多個(gè)會話。

producer遵從冪等消息的行為,并在發(fā)送的recordbatch中增加事務(wù)id和epoch。

2)事務(wù)協(xié)調(diào)器(Transaction Coordinator)

引入事務(wù)協(xié)調(diào)器,以兩階段提交的方式,實(shí)現(xiàn)消息的事務(wù)提交。

事務(wù)協(xié)調(diào)器使用一個(gè)特殊的topic:transaction,來做事務(wù)提交日志。

事務(wù)控制器通過RPC調(diào)研,協(xié)調(diào) broker 和 consumer coordinator 實(shí)現(xiàn)事務(wù)的兩階段提交。

每一個(gè)broker都會啟動(dòng)一個(gè)事務(wù)協(xié)調(diào)器,使用hash(TransactionalId)確定producer對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個(gè)集群的負(fù)載均衡。

3) broker

broker處理在事務(wù)協(xié)調(diào)器的commit/abort控制消息,把控制消息向正常消息一樣寫入topic(和正常消息交織在一起,用來確認(rèn)事務(wù)提交的日志偏移),并向前推進(jìn)消息提交偏移hw。

4) 組協(xié)調(diào)器

如果在事務(wù)過程中,提交了消費(fèi)偏移,組協(xié)調(diào)器在offset log中寫入事務(wù)消費(fèi)偏移。當(dāng)事務(wù)提交時(shí),在offset log中寫入事務(wù)offset確認(rèn)消息。

5)consumer

consumer過濾未提交消息和事務(wù)控制消息,使這些消息對用戶不可見。

有兩種實(shí)現(xiàn)方式:

  • consumer緩存方式

設(shè)置isolation.level=read_uncommitted,此時(shí)topic的所有消息對consumer都可見。

consumer緩存這些消息,直到收到事務(wù)控制消息。若事務(wù)commit,則對外發(fā)布這些消息;若事務(wù)abort,則丟棄這些消息。

  • broker過濾方式

設(shè)置isolation.level=read_committed,此時(shí)topic中未提交的消息對consumer不可見,只有在事務(wù)結(jié)束后,消息才對consumer可見。

broker給consumer的BatchRecord消息中,會包含以列表,指明哪些是"abort"事務(wù),consumer丟棄abort事務(wù)的消息即可。

事務(wù)消息處理流程如圖1所示,

怎么解析Kafka中的事務(wù)消息

圖1 事務(wù)消息業(yè)務(wù)流程

流程說明:

1. 查找事務(wù)協(xié)調(diào)器 -- FindCoordinatorRequest

事務(wù)協(xié)調(diào)器是分配pid和管理事務(wù)的核心,produer首先對任何一個(gè)broker發(fā)送FindCoordinatorRequest,發(fā)現(xiàn)自己的事務(wù)協(xié)調(diào)器。

2. 申請pid -- InitPidRequest

緊接著,producer向事務(wù)協(xié)調(diào)器發(fā)送InitPidRequest,申請生成pid。

2a.當(dāng)指定了transactional.id時(shí),事務(wù)協(xié)調(diào)器為producer分區(qū)pid,并更新epoch,把(tid,pid)的映射關(guān)系寫入事務(wù)日志。同時(shí)清理tid任何未完成的事務(wù),丟棄未提交的消息。

3. 啟動(dòng)事務(wù)

啟動(dòng)事務(wù)是producer的本地操作,促使producer更新內(nèi)部狀態(tài),不會和事務(wù)協(xié)調(diào)器發(fā)生關(guān)系。

事務(wù)協(xié)調(diào)器自動(dòng)啟動(dòng)事務(wù),始終處在一個(gè)接一個(gè)的事務(wù)處理狀態(tài)機(jī)中。

4. consume-transform-produce 事務(wù)循環(huán)

4.1. 注冊partition -- AddPartitionsToTxnRequest

對于每一個(gè)要在事務(wù)中寫消息的topic分區(qū),producer應(yīng)當(dāng)在第一次發(fā)消息前,向事務(wù)處理器注冊分區(qū)。

4.1a.事務(wù)處理器把事務(wù)關(guān)聯(lián)的分區(qū)寫入事務(wù)日志。

在提交或終止事務(wù)時(shí),事務(wù)協(xié)調(diào)器需要這些信息,控制事務(wù)涉及的所有分區(qū)leader完成事務(wù)提交或終止。

4.2. 寫消息 -- ProduceRequest

4.2a. producer向分區(qū)leader寫消息,消息中包含tid,pid,epoch和seq。

4.3. 提交消費(fèi)偏移 -- AddOffsetCommitsToTxnRequest

4.3a. producer向事務(wù)協(xié)調(diào)器發(fā)送消費(fèi)偏移,事務(wù)協(xié)調(diào)器在事務(wù)日志中記錄偏移信息,并把組協(xié)調(diào)器返回給producer。

4.4. 提交消費(fèi)偏移 -- TxnOffsetCommitRequest

4.4a. producer向組協(xié)調(diào)器發(fā)送TxnOffsetCommitRequest,組協(xié)調(diào)器把偏移信息寫入偏移日志。但是,要一直等到事務(wù)提交后,這個(gè)偏移才生效,對外部可見。

5. 提交或終止事務(wù)

5.1. EndTxnRequest

收到提交或終止事務(wù)的請求時(shí),事務(wù)處理器執(zhí)行下面的操作:

1. 在事務(wù)日志中寫入PREPARE_COMMIT或PREPARE_ABORT消息(5.1a)。

2. 通過WriteTxnMarkerRequest向事務(wù)中的所有broker發(fā)事務(wù)控制消息(5.2)。

3. 在事務(wù)之日中寫入COMMITTED或ABORTED消息(5.3)。

5.2. WriteTxnMarkerRequest

這個(gè)消息由事務(wù)處理器發(fā)給事務(wù)中所涉及分區(qū)的leader。

當(dāng)收到這個(gè)消息后,broker會在分區(qū)log中寫入一個(gè)COMMIT或ABORT控制消息。同時(shí),也會更新該分區(qū)的事務(wù)提交偏移hw。

如果事務(wù)中有提交消費(fèi)偏移, broker也會把控制消息寫入 __consumer-offsets log,并通知組協(xié)調(diào)器使事務(wù)中提交的消費(fèi)偏移生效。

5.3. 寫最終的commit或abort消息

當(dāng)所有的commit或abort消息寫入數(shù)據(jù)日志,事務(wù)協(xié)調(diào)器在事務(wù)日志中寫入事務(wù)日志,標(biāo)志這事務(wù)結(jié)束。

至此,本事務(wù)的所有狀態(tài)信息都可以被刪除,可以開始一個(gè)新的事務(wù)。

在實(shí)現(xiàn)上,還有很多細(xì)節(jié),比如,事務(wù)協(xié)調(diào)器會啟動(dòng)定時(shí)器,用來檢測并終止開始后長時(shí)間不活動(dòng)的事務(wù),具體請參考下面列出的kafka社區(qū)技術(shù)文檔。

我們要認(rèn)識到,雖然kafka事務(wù)消息提供了多個(gè)消息原子寫的保證,但它不保證原子讀。

例如,

1)事務(wù)向topic_a和topic_b兩個(gè)分區(qū)寫入消息,在事務(wù)提交后的某個(gè)時(shí)刻,topic_a的全部副本失效。這時(shí)topic_b中的消息可以正常消費(fèi),但topic_a中的消息就丟失了。2)假如consumer只消費(fèi)了topic_a,沒有消費(fèi)topic_b,這樣也不能讀到完整的事務(wù)消息。3)典型的kafka stream應(yīng)用從多個(gè)topic消費(fèi),然后向一個(gè)或多個(gè)topic寫。在一次故障后,kafka stream應(yīng)用重新開始處理流數(shù)據(jù),由于從多個(gè)topic讀到的數(shù)據(jù)之間不存在穩(wěn)定的順序(即便只有一個(gè)topic,從多個(gè)分區(qū)讀到的數(shù)據(jù)之間也沒有穩(wěn)定的順序),那么兩次處理輸出的結(jié)果就可能會不一樣。

也就是說,雖然kafka log持久化了數(shù)據(jù),也可以通過指定offset多次消費(fèi)數(shù)據(jù),但由于分區(qū)數(shù)據(jù)之間的無序性,導(dǎo)致每次處理輸出的結(jié)果都是不同的。這使得kafka stream不能像hadoop批處理任務(wù)一樣,可以隨時(shí)重新執(zhí)行,保證每次執(zhí)行的結(jié)果相同。除非我們只從一個(gè)topic分區(qū)讀數(shù)據(jù)。

關(guān)于怎么解析Kafka中的事務(wù)消息就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。


網(wǎng)頁標(biāo)題:怎么解析Kafka中的事務(wù)消息
標(biāo)題URL:http://weahome.cn/article/jhcdpe.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部