真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何解析Kafka消息丟失與消費(fèi)精確一次性-創(chuàng)新互聯(lián)

今天就跟大家聊聊有關(guān)如何解析Kafka 消息丟失與消費(fèi)精確一次性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

成都創(chuàng)新互聯(lián),為您提供重慶網(wǎng)站建設(shè)、網(wǎng)站制作、網(wǎng)站營銷推廣、網(wǎng)站開發(fā)設(shè)計(jì),對服務(wù)加固等多個(gè)行業(yè)擁有豐富的網(wǎng)站建設(shè)及推廣經(jīng)驗(yàn)。成都創(chuàng)新互聯(lián)網(wǎng)站建設(shè)公司成立于2013年,提供專業(yè)網(wǎng)站制作報(bào)價(jià)服務(wù),我們深知市場的競爭激烈,認(rèn)真對待每位客戶,為客戶提供賞心悅目的作品。 與客戶共同發(fā)展進(jìn)步,是我們永遠(yuǎn)的責(zé)任!

消息丟失的場景

如果Kafka Producer使用“發(fā)后即忘”的方式發(fā)送消息,即調(diào)用producer.send(msg)方法來發(fā)送消息,方法會立即返回,但此時(shí)并不能說明消息已經(jīng)發(fā)送成功。消息發(fā)送方式詳見初次邂逅Kafka生產(chǎn)者。

如果在消息過程中發(fā)生了網(wǎng)絡(luò)抖動(dòng),那么消息就會丟失;或發(fā)送的消息本身不符合要求,如大小超過Broker端的承受能力等(消息太大的情況在生產(chǎn)中實(shí)際遇到過,最后通過在發(fā)送前將消息分包,再依次發(fā)送,解決了該問題)。

解決該問題的方法就是:Producer要使用帶回調(diào)通知的方法發(fā)送消息,即producer.send(msg, callback)?;卣{(diào)方法callback可以告訴我們消息是否真的提交成功了,一旦出現(xiàn)消息發(fā)送失敗的情況,可以使用代碼進(jìn)行容錯(cuò)及補(bǔ)救。

例如:網(wǎng)絡(luò)抖動(dòng)導(dǎo)致的消息丟失,可以使Producer重試;消息不合格,則將消息格式進(jìn)行調(diào)整,再發(fā)送。Producer使用帶回調(diào)的消息發(fā)送API,可以及時(shí)發(fā)現(xiàn)消息是否發(fā)送失敗并作相應(yīng)處理。

消費(fèi)者丟失數(shù)據(jù)

Consumer端丟失數(shù)據(jù)主要體現(xiàn)在:拉取了消息,并提交了消費(fèi)位移,但是在消息處理結(jié)束之前突然發(fā)生了宕機(jī)等故障。消費(fèi)者重生后,會從之前已提交的位移的下一個(gè)位置重新開始消費(fèi),之前未處理完成的消息不會再次處理,即相當(dāng)于消費(fèi)者丟失了消息。

解決Consumer端丟失消息的方法也很簡單:將位移提交的時(shí)機(jī)改為消息處理完成后,確認(rèn)消費(fèi)完成了一批消息再提交相應(yīng)的位移。這樣做,即使處理消息的過程中發(fā)生了異常,由于沒有提交位移,下次消費(fèi)時(shí)還會從上次的位移處重新拉取消息,不會發(fā)生消息丟失的情況。

具體的實(shí)現(xiàn)方法為,Consumer在消費(fèi)消息時(shí),關(guān)閉自動(dòng)提交位移,由應(yīng)用程序手動(dòng)提交位移。

Broker端丟失數(shù)據(jù)

Broker端丟失數(shù)據(jù)主要有以下幾種情況:

原來的Broker宕機(jī)了,卻選舉了一個(gè)落后Leader太多的Broker成為新的Leader,那么落后的這些消息就都丟失了,可以禁止這些“unclean”的Broker競選成為Leader;

Kafka使用頁緩存機(jī)制,將消息寫入頁緩存而非直接持久化至磁盤,將刷盤工作交由操作系統(tǒng)來調(diào)度,以此來保證高效率和高吞吐量。如果某一部分消息還在內(nèi)存頁中,未持久化至磁盤,此時(shí)Broker宕機(jī),重啟后則這部分消息丟失,使用多副本機(jī)制可以避免Broker端丟失消息;

避免消息丟失的最佳實(shí)踐

不使用producer.send(msg),而使用帶回調(diào)的producer.send(msg, callback)方法;

設(shè)置acks = all。acks參數(shù)是Producer的一個(gè)參數(shù),代表了對消息“已提交”的定義。如果設(shè)置成all,則表示所有的Broker副本都要接收到消息,才算消息“已提交”,是最高等級的“已提交”標(biāo)準(zhǔn);

設(shè)置retries為一個(gè)較大的值,retries表示Producer發(fā)送消息失敗后的重試次數(shù),如果發(fā)生了網(wǎng)絡(luò)抖動(dòng)等瞬時(shí)故障,可以通過重試機(jī)制重新發(fā)送消息,避免消息丟失;

設(shè)置unclean.leader.election.enable = false。這是一個(gè)Broker端參數(shù),表示哪些Broker有資格競選為分區(qū)的Leader。如果一個(gè)落后Leader太多的Follower所在Broker成為了新的Leader,則必然會導(dǎo)致消息的丟失,故將該參數(shù)設(shè)置為false,即不允許這種情況的發(fā)生;

設(shè)置replication.factor >= 3。Broker端參數(shù),表示每個(gè)分區(qū)的副本數(shù)大于等于3,使用冗余的機(jī)制來防止消息丟失;

設(shè)置min.insync.replicas > 1。Broker端參數(shù),控制的是消息至少被寫入多少個(gè)副本蔡栓是“已提交”,將該參數(shù)設(shè)置成大于1可以提升消息持久性;

確保replication.factor > min.insync.replicas。若兩者相等,則如果有一個(gè)副本掛了,整個(gè)分區(qū)就無法正常工作了。推薦設(shè)置為:replication.factor = min.insync.replicas + 1;

確保消息消費(fèi)完再提交位移,將Consumer端參數(shù)enable.auto.commit設(shè)置為fasle,關(guān)閉位移自動(dòng)提交,使用手動(dòng)提交位移的形式。

精確一次消費(fèi)

目前Kafka默認(rèn)提供的消息可靠機(jī)制是“至少一次”,即消息不會丟失。上一節(jié)中我們知道,Producer如果發(fā)送消息失敗,則可以通過重試解決,若Broker端的應(yīng)答未成功發(fā)送給Producer(如網(wǎng)絡(luò)抖動(dòng)),Producer此時(shí)也會進(jìn)行重試,再次發(fā)送原來的消息。這就是Kafka默認(rèn)提供消息至少一次性的原因,不過這可能會導(dǎo)致消息重復(fù)發(fā)送。

如果需要保證消息消費(fèi)的“最多一次”,那么禁止Producer的重試即可。但是寫入失敗的消息如果不重試則會永遠(yuǎn)丟失。是否有其他方法來保證消息的發(fā)送既不丟失,也不重復(fù)消費(fèi)?或者說即使Producer重復(fù)發(fā)送了某些消息,Broker端也能夠自動(dòng)去重。

Kafka實(shí)際上通過兩種機(jī)制來確保消息消費(fèi)的精確一次:

冪等性(Idempotence)

事務(wù)(Transaction)

冪等性

所謂的冪等,簡單說就是對接口的多次調(diào)用所產(chǎn)生的結(jié)果和調(diào)用一次是一致的。在Kafka中,Producer默認(rèn)不是冪等性的,Kafka于0.11.0.0版本引入該特性。設(shè)置參數(shù)enable.idempotence為true即可指定Producer的冪等性。開啟冪等生產(chǎn)者后,Kafka會自動(dòng)進(jìn)行消息的去重發(fā)送。為了實(shí)現(xiàn)生產(chǎn)者的冪等性,Kafka引入了producer id(后簡稱PID)和序列號(sequence number)兩個(gè)概念。

生產(chǎn)者實(shí)例在被創(chuàng)建的時(shí)候,會分配一個(gè)PID,這個(gè)PID對用戶完全透明。對于每個(gè)PID,消息發(fā)送到的每一個(gè)分區(qū)都有對應(yīng)的序列號,這些序列號從0開始單調(diào)遞增。生產(chǎn)者每發(fā)送一條消息就會將****對應(yīng)的序列號值加1。

Broker端在內(nèi)存中為每一對維護(hù)一個(gè)序列號SN_old。針對生產(chǎn)者發(fā)送來的每一條消息,對其序列號SN_new進(jìn)行判斷,并作相應(yīng)處理。

只有SN_new比SN_old大1時(shí),即SN_new = SN_old + 1時(shí),broker才會接受這條消息;

SN_new < SN_old + 1,說明消息被重復(fù)寫入,broker直接丟棄該條消息;

SN_new > SN_old + 1,說明中間有數(shù)據(jù)尚未寫入,出現(xiàn)了消息亂序,可能存在消息丟失的現(xiàn)象,對應(yīng)的生產(chǎn)者會拋出OutOfOrderSequenceException。

注意:序列號針對,這意味著冪等生產(chǎn)者只能保證單個(gè)主題的單一分區(qū)內(nèi)消息不重復(fù);其次,它只能實(shí)現(xiàn)單會話上的冪等性,不能實(shí)現(xiàn)跨會話的冪等性,這里的會話即可以理解為:Producer進(jìn)程的一次運(yùn)行。當(dāng)重啟了Producer進(jìn)程之后,則冪等性保證就失效了。

事務(wù)

冪等性并不能跨多個(gè)分區(qū)運(yùn)作,而Kafka事務(wù)則可以彌補(bǔ)這個(gè)缺陷。Kafka從0.11版本開始提供了對事務(wù)的支持,主要在read committed隔離級別。它能保證多條消息原子性地寫入到目標(biāo)分區(qū),同時(shí)也能寶恒Consumer只能看到事務(wù)成功提交的消息。

Producer端配置

事務(wù)型Producer能保證消息原子性地寫入多個(gè)分區(qū)。批量的消息要么全部寫入成功,要么全部失敗。并且,事務(wù)型Producer在重啟后,Kafka依然保證它們發(fā)送消息的精確一次處理。開啟事務(wù)型Producer的配置如下:

和冪等性Producer一樣,開啟enable.idempotence = true。

設(shè)置Producer端參數(shù)transcational.id。最好為其設(shè)置一個(gè)有意義的名字。

設(shè)置了事務(wù)型的Producer可以調(diào)用一些事務(wù)API,如下:initTransaction、beginTransaction、commitTransaction和abortTransaction,分別對應(yīng)事務(wù)的初始化、事務(wù)開啟、事務(wù)提交和事務(wù)終止。

producer.initTransactions();
try {
 producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
  }
 catch (KafkaExecption e) {
 producer.abortTransaction();
 }

上述代碼中,事務(wù)型Producer可以保證record1和record2要么全部提交成功,要么全部寫入失敗。實(shí)際上,即使寫入失敗,Kafka也會將它們寫入到底層的日志中,也就是說Consumer還是會看到這些消息,具體Consumer端讀取事務(wù)型Producer發(fā)送的消息需要另行配置。

Consumer端配置

讀取事務(wù)型Producer發(fā)送的消息時(shí),Consumer端的isolation.level參數(shù)表征著事務(wù)的隔離級別,即決定了Consumer以怎樣的級別去讀取消息。該參數(shù)有以下兩個(gè)取值: read_uncommitted:默認(rèn)值,表面Consumer能夠讀到Kafka寫入的任何消息,不論事務(wù)型Producer是否正常提交了事務(wù)。顯然,如果啟用了事務(wù)型的Producer,則Consumer端參數(shù)就不要使用該值,否則事務(wù)是無效的。 read_committed:表面Consumer只會讀取事務(wù)型Producer成功提交的事務(wù)中寫入的消息,同時(shí),非事務(wù)型Producer寫入的所有消息對Consumer也是可見的。

看完上述內(nèi)容,你們對如何解析Kafka 消息丟失與消費(fèi)精確一次性有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道,感謝大家的支持。


分享文章:如何解析Kafka消息丟失與消費(fèi)精確一次性-創(chuàng)新互聯(lián)
鏈接地址:http://weahome.cn/article/egepc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部