最近和一些同學(xué)交流的時(shí)候反饋說,在面試Kafka時(shí),被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細(xì)作答。但是,問到一個(gè)平時(shí)不注意的問題,就是Kafka的冪等性,被卡主了。那么,今天筆者就為大家來剖析一下Kafka的冪等性原理及實(shí)現(xiàn)。
創(chuàng)新互聯(lián)"三網(wǎng)合一"的企業(yè)建站思路。企業(yè)可建設(shè)擁有電腦版、微信版、手機(jī)版的企業(yè)網(wǎng)站。實(shí)現(xiàn)跨屏營銷,產(chǎn)品發(fā)布一步更新,電腦網(wǎng)絡(luò)+移動(dòng)網(wǎng)絡(luò)一網(wǎng)打盡,滿足企業(yè)的營銷需求!創(chuàng)新互聯(lián)具備承接各種類型的成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)項(xiàng)目的能力。經(jīng)過10年的努力的開拓,為不同行業(yè)的企事業(yè)單位提供了優(yōu)質(zhì)的服務(wù),并獲得了客戶的一致好評。
Producer在生產(chǎn)發(fā)送消息時(shí),難免會重復(fù)發(fā)送消息。Producer進(jìn)行retry時(shí)會產(chǎn)生重試機(jī)制,發(fā)生消息重復(fù)發(fā)送。而引入冪等性后,重復(fù)發(fā)送只會生成一條有效的消息。Kafka作為分布式消息系統(tǒng),它的使用場景常見與分布式系統(tǒng)中,比如消息推送系統(tǒng)、業(yè)務(wù)平臺系統(tǒng)(如物流平臺、銀行結(jié)算平臺等)。以銀行結(jié)算平臺來說,業(yè)務(wù)方作為上游把數(shù)據(jù)上報(bào)到銀行結(jié)算平臺,如果一份數(shù)據(jù)被計(jì)算、處理多次,那么產(chǎn)生的影響會很嚴(yán)重。
在使用Kafka時(shí),需要確保Exactly-Once語義。分布式系統(tǒng)中,一些不可控因素有很多,比如網(wǎng)絡(luò)、OOM、FullGC等。在Kafka Broker確認(rèn)Ack時(shí),出現(xiàn)網(wǎng)絡(luò)異常、FullGC、OOM等問題時(shí)導(dǎo)致Ack超時(shí),Producer會進(jìn)行重復(fù)發(fā)送。可能出現(xiàn)的情況如下:
Kafka為了實(shí)現(xiàn)冪等性,它在底層設(shè)計(jì)架構(gòu)中引入了ProducerID和SequenceNumber。那這兩個(gè)概念的用途是什么呢?
Kafka在引入冪等性之前,Producer向Broker發(fā)送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實(shí)現(xiàn)流程如下:
上圖的實(shí)現(xiàn)流程是一種理想狀態(tài)下的消息發(fā)送情況,但是實(shí)際情況中,會出現(xiàn)各種不確定的因素,比如在Producer在發(fā)送給Broker的時(shí)候出現(xiàn)網(wǎng)絡(luò)異常。比如以下這種異常情況的出現(xiàn):
上圖這種情況,當(dāng)Producer第一次發(fā)送消息給Broker時(shí),Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時(shí)失敗了(比如網(wǎng)絡(luò)異常) 。此時(shí),Producer端觸發(fā)重試機(jī)制,將消息(x2,y2)重新發(fā)送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復(fù)追加了兩條相同的(x2,y2)的消息。
面對這樣的問題,Kafka引入了冪等性。那么冪等性是如何解決這類重復(fù)發(fā)送消息的問題的呢?下面我們可以先來看看流程圖:
?同樣,這是一種理想狀態(tài)下的發(fā)送流程。實(shí)際情況下,會有很多不確定的因素,比如Broker在發(fā)送Ack信號給Producer時(shí)出現(xiàn)網(wǎng)絡(luò)異常,導(dǎo)致發(fā)送失敗。異常情況如下圖所示:
?當(dāng)Producer發(fā)送消息(x2,y2)給Broker時(shí),Broker接收到消息并將其追加到消息流中。此時(shí),Broker返回Ack信號給Producer時(shí),發(fā)生異常導(dǎo)致Producer接收Ack信號失敗。對于Producer來說,會觸發(fā)重試機(jī)制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會出現(xiàn)重復(fù)發(fā)送的情況。
客戶端在生成Producer時(shí),會實(shí)例化如下代碼:
// 實(shí)例化一個(gè)Producer對象
Producer producer = new KafkaProducer<>(props);
在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個(gè)maybeWaitForPid()方法,用來生成一個(gè)ProducerID,實(shí)現(xiàn)代碼如下:
private void maybeWaitForPid() {
if (transactionState == null)
return;
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. " +
"We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. " +
"We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
與冪等性有關(guān)的另外一個(gè)特性就是事務(wù)。Kafka中的事務(wù)與數(shù)據(jù)庫的事務(wù)類似,Kafka中的事務(wù)屬性是指一系列的Producer生產(chǎn)消息和消費(fèi)消息提交Offsets的操作在一個(gè)事務(wù)中,即原子性操作。對應(yīng)的結(jié)果是同時(shí)成功或者同時(shí)失敗。
這里需要與數(shù)據(jù)庫中事務(wù)進(jìn)行區(qū)別,操作數(shù)據(jù)庫中的事務(wù)指一系列的增刪查改,對Kafka來說,操作事務(wù)是指一系列的生產(chǎn)和消費(fèi)等原子性操作。
在事務(wù)屬性引入之前,先引入Producer的冪等性,它的作用為:
產(chǎn)生的場景有:
比如,在Consumer中Commit Offsets時(shí),當(dāng)Consumer在消費(fèi)完成時(shí)Commit的Offsets為100(假設(shè)最近一次Commit的Offsets為50),那么執(zhí)行觸發(fā)Balance時(shí),其他Consumer就會重復(fù)消費(fèi)消息(消費(fèi)的Offsets介于50~100之間的消息)。
Producer提供了五種事務(wù)方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer
// 初始化事務(wù),需要注意確保transation.id屬性被分配
void initTransactions();
// 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 為Consumer提供的在事務(wù)內(nèi)Commit Offsets的操作
void sendOffsetsToTransaction(Map offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 放棄事務(wù),類似于回滾事務(wù)的操作
void abortTransaction() throws ProducerFencedException;
在Kafka事務(wù)中,一個(gè)原子性操作,根據(jù)操作類型可以分為3種情況。情況如下:
Kafka的冪等性和事務(wù)是比較重要的特性,特別是在數(shù)據(jù)丟失和數(shù)據(jù)重復(fù)的問題上非常重要。Kafka引入冪等性,設(shè)計(jì)的原理也比較好理解。而事務(wù)與數(shù)據(jù)庫的事務(wù)特性類似,有數(shù)據(jù)庫使用的經(jīng)驗(yàn)對理解Kafka的事務(wù)也比較容易接受。