本篇內(nèi)容主要講解“Kafka丟失數(shù)據(jù)問(wèn)題優(yōu)化分析”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Kafka丟失數(shù)據(jù)問(wèn)題優(yōu)化分析”吧!
目前創(chuàng)新互聯(lián)已為上千余家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁(yè)空間、成都網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計(jì)、織金網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
數(shù)據(jù)丟失是一件非常嚴(yán)重的事情事,針對(duì)數(shù)據(jù)丟失的問(wèn)題我們需要有明確的思路來(lái)確定問(wèn)題所在,針對(duì)這段時(shí)間的總結(jié),我個(gè)人面對(duì)kafka 數(shù)據(jù)丟失問(wèn)題的解決思路如下:
1、是否真正的存在數(shù)據(jù)丟失問(wèn)題,比如有很多時(shí)候可能是其他同事操作了測(cè)試環(huán)境,所以首先確保數(shù)據(jù)沒(méi)有第三方干擾。
2、理清你的業(yè)務(wù)流程,數(shù)據(jù)流向,數(shù)據(jù)到底是在什么地方丟失的數(shù)據(jù),在kafka 之前的環(huán)節(jié)或者kafka之后的流程丟失?比如kafka的數(shù)據(jù)是由flume提供的,也許是flume丟失了數(shù)據(jù),kafka 自然就沒(méi)有這一部分?jǐn)?shù)據(jù)。
3、如何發(fā)現(xiàn)有數(shù)據(jù)丟失,又是如何驗(yàn)證的。從業(yè)務(wù)角度考慮,例如:教育行業(yè),每年高考后數(shù)據(jù)量巨大,但是卻反常的比高考前還少,或者源端數(shù)據(jù)量和目的端數(shù)據(jù)量不符
4、 定位數(shù)據(jù)是否在kafka之前就已經(jīng)丟失還事消費(fèi)端丟失數(shù)據(jù)的
kafka支持?jǐn)?shù)據(jù)的重新回放功能(換個(gè)消費(fèi)group),清空目的端所有數(shù)據(jù),重新消費(fèi)。
如果是在消費(fèi)端丟失數(shù)據(jù),那么多次消費(fèi)結(jié)果完全一模一樣的幾率很低。
如果是在寫入端丟失數(shù)據(jù),那么每次結(jié)果應(yīng)該完全一樣(在寫入端沒(méi)有問(wèn)題的前提下)。
5、kafka環(huán)節(jié)丟失數(shù)據(jù),常見(jiàn)的kafka環(huán)節(jié)丟失數(shù)據(jù)的原因有:
如果auto.commit.enable=true,當(dāng)consumer fetch了一些數(shù)據(jù)但還沒(méi)有完全處理掉的時(shí)候,剛好到commit interval出發(fā)了提交offset操作,接著consumer crash掉了。這時(shí)已經(jīng)fetch的數(shù)據(jù)還沒(méi)有處理完成但已經(jīng)被commit掉,因此沒(méi)有機(jī)會(huì)再次被處理,數(shù)據(jù)丟失。
網(wǎng)絡(luò)負(fù)載很高或者磁盤很忙寫入失敗的情況下,沒(méi)有自動(dòng)重試重發(fā)消息。沒(méi)有做限速處理,超出了網(wǎng)絡(luò)帶寬限速。kafka一定要配置上消息重試的機(jī)制,并且重試的時(shí)間間隔一定要長(zhǎng)一些,默認(rèn)1秒鐘并不符合生產(chǎn)環(huán)境(網(wǎng)絡(luò)中斷時(shí)間有可能超過(guò)1秒)。
如果磁盤壞了,會(huì)丟失已經(jīng)落盤的數(shù)據(jù)
單批數(shù)據(jù)的長(zhǎng)度超過(guò)限制會(huì)丟失數(shù)據(jù),報(bào)kafka.common.MessageSizeTooLargeException異常解決:
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
6、partition leader在未完成副本數(shù)follows的備份時(shí)就宕機(jī)的情況,即使選舉出了新的leader但是已經(jīng)push的數(shù)據(jù)因?yàn)槲磦浞菥蛠G失了!kafka是多副本的,當(dāng)你配置了同步復(fù)制之后。多個(gè)副本的數(shù)據(jù)都在PageCache里面,出現(xiàn)多個(gè)副本同時(shí)掛掉的概率比1個(gè)副本掛掉的概率就很小了。(官方推薦是通過(guò)副本來(lái)保證數(shù)據(jù)的完整性的)
7、kafka的數(shù)據(jù)一開(kāi)始就是存儲(chǔ)在PageCache上的,定期flush到磁盤上的,也就是說(shuō),不是每個(gè)消息都被存儲(chǔ)在磁盤了,如果出現(xiàn)斷電或者機(jī)器故障等,PageCache上的數(shù)據(jù)就丟失了??梢酝ㄟ^(guò)log.flush.interval.messages和log.flush.interval.ms來(lái)配置flush間隔,interval大丟的數(shù)據(jù)多些,小會(huì)影響性能但在0.8版本,可以通過(guò)replica機(jī)制保證數(shù)據(jù)不丟,代價(jià)就是需要更多資源,尤其是磁盤資源,kafka當(dāng)前支持GZip和Snappy壓縮,來(lái)緩解這個(gè)問(wèn)題 是否使用replica取決于在可靠性和資源代價(jià)之間的balance。
同時(shí)kafka也提供了相關(guān)的配置參數(shù),來(lái)讓你在性能與可靠性之間權(quán)衡(一般默認(rèn)):
當(dāng)達(dá)到下面的消息數(shù)量時(shí),會(huì)將數(shù)據(jù)flush到日志文件中。默認(rèn)10000
log.flush.interval.messages=10000
當(dāng)達(dá)到下面的時(shí)間(ms)時(shí),執(zhí)行一次強(qiáng)制的flush操作。interval.ms和interval.messages無(wú)論哪個(gè)達(dá)到,都會(huì)flush。默認(rèn)3000ms
log.flush.interval.ms=1000
檢查是否需要將日志flush的時(shí)間間隔
log.flush.scheduler.interval.ms = 3000
producer端
設(shè)計(jì)上保證數(shù)據(jù)的可靠安全性,依據(jù)分區(qū)數(shù)做好數(shù)據(jù)備份,設(shè)立副本數(shù)等。push數(shù)據(jù)的方式:同步異步推送數(shù)據(jù):權(quán)衡安全性和速度性的要求,選擇相應(yīng)的同步推送還是異步推送方式,當(dāng)發(fā)現(xiàn)數(shù)據(jù)有問(wèn)題時(shí),可以改為同步來(lái)查找問(wèn)題。
flush是kafka的內(nèi)部機(jī)制,kafka優(yōu)先在內(nèi)存中完成數(shù)據(jù)的交換,然后將數(shù)據(jù)持久化到磁盤.kafka首先會(huì)把數(shù)據(jù)緩存(緩存到內(nèi)存中)起來(lái)再批量flush??梢酝ㄟ^(guò)log.flush.interval.messages和log.flush.interval.ms來(lái)配置flush間隔
可以通過(guò)replica機(jī)制保證數(shù)據(jù)不丟。代價(jià)就是需要更多資源,尤其是磁盤資源,kafka當(dāng)前支持GZip和Snappy壓縮,來(lái)緩解這個(gè)問(wèn)題。是否使用replica(副本)取決于在可靠性和資源代價(jià)之間的balance(平衡)
broker到 Consumer kafka的consumer提供兩種接口。
high-level版本已經(jīng)封裝了對(duì)partition和offset的管理,默認(rèn)是會(huì)定期自動(dòng)commit offset,這樣可能會(huì)丟數(shù)據(jù)的
low-level版本自己管理spout線程和partition之間的對(duì)應(yīng)關(guān)系和每個(gè)partition上的已消費(fèi)的offset(定期寫到zk)
并且只有當(dāng)這個(gè)offset被ack后,即成功處理后,才會(huì)被更新到zk,所以基本是可以保證數(shù)據(jù)不丟的即使spout線程crash(崩潰),重啟后還是可以從zk中讀到對(duì)應(yīng)的offset
異步要考慮到partition leader在未完成副本數(shù)follows的備份時(shí)就宕機(jī)的情況,即使選舉出了新的leader但是已經(jīng)push的數(shù)據(jù)因?yàn)槲磦浞菥蛠G失了!
不能讓內(nèi)存的緩沖池太滿,如果滿了內(nèi)存溢出,也就是說(shuō)數(shù)據(jù)寫入過(guò)快,kafka的緩沖池?cái)?shù)據(jù)落盤速度太慢,這時(shí)肯定會(huì)造成數(shù)據(jù)丟失。
盡量保證生產(chǎn)者端數(shù)據(jù)一直處于線程阻塞狀態(tài),這樣一邊寫內(nèi)存一邊落盤。
異步寫入的話還可以設(shè)置類似flume回滾類型的batch數(shù),即按照累計(jì)的消息數(shù)量,累計(jì)的時(shí)間間隔,累計(jì)的數(shù)據(jù)大小設(shè)置batch大小。
設(shè)置合適的方式,增大batch 大小來(lái)減小網(wǎng)絡(luò)IO和磁盤IO的請(qǐng)求,這是對(duì)于kafka效率的思考。
不過(guò)異步寫入丟失數(shù)據(jù)的情況還是難以控制
還是得穩(wěn)定整體集群架構(gòu)的運(yùn)行,特別是zookeeper,當(dāng)然正對(duì)異步數(shù)據(jù)丟失的情況盡量保證broker端的穩(wěn)定運(yùn)作吧
kafka不像hadoop更致力于處理大量級(jí)數(shù)據(jù),kafka的消息隊(duì)列更擅長(zhǎng)于處理小數(shù)據(jù)。針對(duì)具體業(yè)務(wù)而言,若是源源不斷的push大量的數(shù)據(jù)(eg:網(wǎng)絡(luò)爬蟲(chóng)),可以考慮消息壓縮。但是這也一定程度上對(duì)CPU造成了壓力,還是得結(jié)合業(yè)務(wù)數(shù)據(jù)進(jìn)行測(cè)試選擇
topic設(shè)置多分區(qū),分區(qū)自適應(yīng)所在機(jī)器,為了讓各分區(qū)均勻分布在所在的broker中,分區(qū)數(shù)要大于broker數(shù)。分區(qū)是kafka進(jìn)行并行讀寫的單位,是提升kafka速度的關(guān)鍵。
broker能接收消息的最大字節(jié)數(shù)的設(shè)置一定要比消費(fèi)端能消費(fèi)的最大字節(jié)數(shù)要小,否則broker就會(huì)因?yàn)橄M(fèi)端無(wú)法使用這個(gè)消息而掛起。
broker可賦值的消息的最大字節(jié)數(shù)設(shè)置一定要比能接受的最大字節(jié)數(shù)大,否則broker就會(huì)因?yàn)閿?shù)據(jù)量的問(wèn)題無(wú)法復(fù)制副本,導(dǎo)致數(shù)據(jù)丟失。
關(guān)閉自動(dòng)更新offset,等到數(shù)據(jù)被處理后再手動(dòng)跟新offset。
在消費(fèi)前做驗(yàn)證前拿取的數(shù)據(jù)是否是接著上回消費(fèi)的數(shù)據(jù),不正確則return先行處理排錯(cuò)。
一般來(lái)說(shuō)zookeeper只要穩(wěn)定的情況下記錄的offset是沒(méi)有問(wèn)題,除非是多個(gè)consumer group 同時(shí)消費(fèi)一個(gè)分區(qū)的數(shù)據(jù),其中一個(gè)先提交了,另一個(gè)就丟失了。
kafka的數(shù)據(jù)一開(kāi)始就是存儲(chǔ)在PageCache上的,定期flush到磁盤上的,也就是說(shuō),不是每個(gè)消息都被存儲(chǔ)在磁盤了,如果出現(xiàn)斷電或者機(jī)器故障等,PageCache上的數(shù)據(jù)就丟失了。這個(gè)是總結(jié)出的到目前為止沒(méi)有發(fā)生丟失數(shù)據(jù)的情況
//producer用于壓縮數(shù)據(jù)的壓縮類型。默認(rèn)是無(wú)壓縮。正確的選項(xiàng)值是none、gzip、snappy。壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好
props.put("compression.type", "gzip");
//增加延遲
props.put("linger.ms", "50");
//這意味著leader需要等待所有備份都成功寫入日志,這種策略會(huì)保證只要有一個(gè)備份存活就不會(huì)丟失數(shù)據(jù)。這是最強(qiáng)的保證。,
props.put("acks", "all");
//無(wú)限重試,直到你意識(shí)到出現(xiàn)了問(wèn)題,設(shè)置大于0的值將使客戶端重新發(fā)送任何數(shù)據(jù),一旦這些數(shù)據(jù)發(fā)送失敗。注意,這些重試與客戶端接收到發(fā)送錯(cuò)誤時(shí)的重試沒(méi)有什么不同。允許重試將潛在的改變數(shù)據(jù)的順序,如果這兩個(gè)消息記錄都是發(fā)送到同一個(gè)partition,則第一個(gè)消息失敗第二個(gè)發(fā)送成功,則第二條消息會(huì)比第一條消息出現(xiàn)要早。
props.put("retries ", MAX_VALUE);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000);
//關(guān)閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數(shù)據(jù)丟失
props.put("unclean.leader.election.enable", false);
//關(guān)閉自動(dòng)提交offset
props.put("enable.auto.commit", false);
限制客戶端在單個(gè)連接上能夠發(fā)送的未響應(yīng)請(qǐng)求的個(gè)數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請(qǐng)求之前client不能再向同一個(gè)broker發(fā)送請(qǐng)求。注意:設(shè)置此參數(shù)是為了避免消息亂序
props.put("max.in.flight.requests.per.connection", 1);
強(qiáng)行kill線程,導(dǎo)致消費(fèi)后的數(shù)據(jù),offset沒(méi)有提交,partition就斷開(kāi)連接。比如,通常會(huì)遇到消費(fèi)的數(shù)據(jù),處理很耗時(shí),導(dǎo)致超過(guò)了Kafka的session timeout時(shí)間(0.10.x版本默認(rèn)是30秒),那么就會(huì)re-blance重平衡,此時(shí)有一定幾率offset沒(méi)提交,會(huì)導(dǎo)致重平衡后重復(fù)消費(fèi)。
如果在close之前調(diào)用了consumer.unsubscribe()則有可能部分offset沒(méi)提交,下次重啟會(huì)重復(fù)消費(fèi)。
kafka數(shù)據(jù)重復(fù) kafka設(shè)計(jì)的時(shí)候是設(shè)計(jì)了(at-least once)至少一次的邏輯,這樣就決定了數(shù)據(jù)可能是重復(fù)的,kafka采用基于時(shí)間的SLA(服務(wù)水平保證),消息保存一定時(shí)間(通常為7天)后會(huì)被刪除。
kafka的數(shù)據(jù)重復(fù)一般情況下應(yīng)該在消費(fèi)者端,這時(shí)log.cleanup.policy = delete使用定期刪除機(jī)制。
到此,相信大家對(duì)“Kafka丟失數(shù)據(jù)問(wèn)題優(yōu)化分析”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!