本篇內(nèi)容介紹了“如何使用Scala開發(fā)Apache Kafka”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)專注于貢井企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,電子商務(wù)商城網(wǎng)站建設(shè)。貢井網(wǎng)站建設(shè)公司,為貢井等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)Apache Kafka是一個(gè)廣受歡迎的分布式流媒體平臺(tái),New Relic、Uber以及Square等數(shù)千家公司都在使用它構(gòu)建可擴(kuò)展、高吞吐量、可靠的實(shí)時(shí)流媒體系統(tǒng)。例如,New Relic的Kafka集群每秒處理超過1500萬條消息,總數(shù)據(jù)速率接近1 Tbps。
Kafka在應(yīng)用程序開發(fā)人員和數(shù)據(jù)科學(xué)家中非常受歡迎,因?yàn)樗鼧O大簡(jiǎn)化了數(shù)據(jù)流的處理過程。但是,Kafka在Scala上實(shí)踐會(huì)比較復(fù)雜。如果消費(fèi)者無法跟上數(shù)據(jù)流,并且消息在他們看到之前就消失了,那么具有自動(dòng)數(shù)據(jù)保留限制的高吞吐量發(fā)布/訂閱模式并沒有多大用。同樣,如果托管數(shù)據(jù)流的系統(tǒng)無法擴(kuò)展以滿足需求或者不可靠,也沒有什么用。
為了降低這種復(fù)雜性,作者將可能的問題分為4大類共20條,以方便用戶理解:
Partitions(分區(qū))
Consumers(消費(fèi)者)
Producers(生產(chǎn)者)
Brokers
Kafka是一種高效分布式消息傳遞系統(tǒng),可提供內(nèi)置數(shù)據(jù)冗余和彈性,同時(shí)保留高吞吐量和可擴(kuò)展性。它包括自動(dòng)數(shù)據(jù)保留限制,使其非常適合將數(shù)據(jù)視為流的應(yīng)用程序,并且還支持對(duì)鍵值對(duì)映射建模的“壓縮”流。
了解最佳實(shí)踐之前,你需要熟悉一些關(guān)鍵術(shù)語:
Message消息:Kafka中的記錄或數(shù)據(jù)單元。每條消息都有一個(gè)鍵(key)和一個(gè)值(value),以及可選標(biāo)題。
生產(chǎn)者:生產(chǎn)者向Kafka的topic發(fā)布消息。生產(chǎn)者決定要發(fā)布哪個(gè)topic分區(qū),可以隨機(jī)(循環(huán))或使用基于消息密鑰的分區(qū)算法。
Broker:Kafka在分布式系統(tǒng)或集群中運(yùn)行,集群中的每個(gè)節(jié)點(diǎn)都稱為broker。
Topic:Topic是發(fā)布數(shù)據(jù)記錄或消息的類別。消費(fèi)者訂閱topic以讀取寫入其中的數(shù)據(jù)。
Topic partition:topic分為多個(gè)分區(qū),每個(gè)消息都有一個(gè)偏移量。每個(gè)分區(qū)通常至少?gòu)?fù)制一或兩次。每個(gè)分區(qū)都有一個(gè)leader和至少一個(gè)副本(數(shù)據(jù)副本),這些副本存在于follower身上,可以防止broker失敗。集群中的所有broker都是leader和follower,但是代理最多只有一個(gè)topic partition副本,leader用于所有讀寫操作。
偏移:為分區(qū)內(nèi)的每條消息分配一個(gè)偏移量,這是一個(gè)單調(diào)遞增整數(shù),用作分區(qū)內(nèi)消息的唯一標(biāo)識(shí)符。
消費(fèi)者:消費(fèi)者通過訂閱 topic partition讀取Kafka主題的消息,消費(fèi)應(yīng)用程序,并處理消息以完成所需工作。
Consumer group:消費(fèi)者可以組織成消費(fèi)者群組,分配topic partition以平衡組中所有使用者。在消費(fèi)者群組中,所有消費(fèi)者都在負(fù)載均衡模式下工作。換句話說,組中每個(gè)消費(fèi)者都將看到每條消息。如果一個(gè)消費(fèi)者離開,則將該分區(qū)分配給該組中的其他消費(fèi)者,這個(gè)過程稱為再平衡。如果組中的消費(fèi)者多于分區(qū),則一些消費(fèi)者將閑置。如果組中的消費(fèi)者少于分區(qū),則某些消費(fèi)者將使用來自多個(gè)分區(qū)的消息。
Lag:當(dāng)消費(fèi)者無法從分區(qū)中讀取消息,消費(fèi)者就會(huì)出現(xiàn)Lag,表示為分區(qū)頂部后的偏移數(shù)。從Lag狀態(tài)恢復(fù)所需的時(shí)間取決于消費(fèi)者每秒消耗消息的速度:
time = messages / (consume rate per second - produce rate per second)
第一部分:使用分區(qū)的最佳實(shí)踐!
在分區(qū)部分,我們需要了解分區(qū)的數(shù)據(jù)速率,以確保擁有正確的保留空間。分區(qū)的數(shù)據(jù)速率是生成數(shù)據(jù)的速率。換句話說,它是平均消息大小乘以每秒消息數(shù)。數(shù)據(jù)速率決定了給定時(shí)間內(nèi)所需的保留空間(以字節(jié)為單位)。如果不知道數(shù)據(jù)速率,則無法正確計(jì)算滿足基本保留目標(biāo)所需的空間大小。數(shù)據(jù)速率指定了單個(gè)消費(fèi)者需要支持的最低性能而保證不會(huì)出現(xiàn)Lag。
除非有其他架構(gòu)需求,否則在寫入topic時(shí)使用隨機(jī)分區(qū)。當(dāng)進(jìn)行大規(guī)模操作時(shí),分區(qū)之間的數(shù)據(jù)速率不均可能難以管理。需要注意以下三方面:
1、首先,“熱點(diǎn)”(更高吞吐量)分區(qū)的消費(fèi)者必須處理比消費(fèi)者群組中其他消費(fèi)者更多的消息,這可能導(dǎo)致處理和網(wǎng)絡(luò)瓶頸。
2、其次,必須為具有最高數(shù)據(jù)速率的分區(qū)調(diào)整topic保留空間大小,這可能會(huì)導(dǎo)致topic中其他分區(qū)的磁盤使用量增加。
3、最后,在分區(qū)領(lǐng)導(dǎo)方面實(shí)現(xiàn)最佳平衡比簡(jiǎn)單地?cái)U(kuò)展到所有 brokers更復(fù)雜?!盁狳c(diǎn)”分區(qū)的份量可能是同一topic中另一分區(qū)的10倍。
第二部分:使用消費(fèi)者最佳實(shí)踐!
如果消費(fèi)者運(yùn)行的Kafka版本低于0.10,請(qǐng)升級(jí)。在0.8.x版本中,消費(fèi)者使用Apache ZooKeeper進(jìn)行消費(fèi)者群組協(xié)調(diào),并且許多已知錯(cuò)誤可能導(dǎo)致長(zhǎng)期運(yùn)行的平衡甚至是重新平衡算法的失敗(我們稱之為“重新平衡風(fēng)暴”)。在重新平衡期間,將一個(gè)或多個(gè)分區(qū)分配給使用者群組中的每個(gè)使用者。在再平衡中,分區(qū)所有權(quán)在消費(fèi)者中不斷變通,阻止任何消費(fèi)者在消費(fèi)方面取得實(shí)際進(jìn)展。
4、調(diào)整消費(fèi)者套接字緩沖區(qū)以進(jìn)行高速獲取。在Kafka 0.10.x中,參數(shù)為isreceive.buffer.bytes,默認(rèn)為64kB。在Kafka 0.8.x中,參數(shù)是socket.receive.buffer.bytes,默認(rèn)為100kB。對(duì)于高吞吐量環(huán)境,這兩個(gè)默認(rèn)值都太小,特別是如果brocker和消費(fèi)者之間的網(wǎng)絡(luò)帶寬延遲大于局域網(wǎng)(LAN)。對(duì)于延遲為1毫秒或更長(zhǎng)的高帶寬網(wǎng)絡(luò)(10 Gbps或更高),請(qǐng)考慮將套接字緩沖區(qū)設(shè)置為8或16 MB。如果內(nèi)存不足,請(qǐng)考慮1 MB,也可以使用值-1,這樣底層操作系統(tǒng)可以根據(jù)網(wǎng)絡(luò)條件調(diào)整緩沖區(qū)大小。但是,對(duì)于需要啟動(dòng)“熱點(diǎn)”消費(fèi)者的系統(tǒng)而言,自動(dòng)調(diào)整的速度可能或比較慢。
5、設(shè)計(jì)高吞吐量消費(fèi)者,以便在有保證的情況下實(shí)施背壓,最好只消耗可以有效處理的東西,而不是消耗太多,以至于過程停止,退出消費(fèi)者群組。 消費(fèi)者應(yīng)該使用固定大小的緩沖區(qū)(參見Disruptor模式),如果在Java虛擬機(jī)(JVM)中運(yùn)行,最好是在堆外使用。固定大小的緩沖區(qū)將阻止消費(fèi)者將大量數(shù)據(jù)拖到堆上,JVM花費(fèi)所有時(shí)間來執(zhí)行垃圾收集而不是做你想讓它處理的工作——處理消息。
6、在JVM上運(yùn)行消費(fèi)者時(shí),請(qǐng)注意垃圾回收可能對(duì)消費(fèi)者產(chǎn)生的影響。例如,垃圾收集較長(zhǎng)時(shí)間暫停可能導(dǎo)致ZooKeeper會(huì)話或者消費(fèi)者組失去平衡。對(duì)于brocker來說也是如此,如果垃圾收集暫停時(shí)間過長(zhǎng),則可能會(huì)從集群中退出。
第三部分:使用生產(chǎn)者最佳實(shí)踐!
7、配置生產(chǎn)者等待確認(rèn)。 這就是生產(chǎn)者如何知道消息實(shí)際已經(jīng)發(fā)送到brocker上的分區(qū)。在Kafka 0.10.x中,設(shè)置為acks; 在0.8.x中,它是request.required.acks。Kafka通過復(fù)制提供容錯(cuò)功能,因此單個(gè)節(jié)點(diǎn)的故障或分區(qū)leader的更改不會(huì)影響可用性。如果將生產(chǎn)者配置為沒有ack(也稱為“fire and forget”),則消息可能會(huì)無聲地丟失。
8、配置生產(chǎn)者重試次數(shù)。默認(rèn)值為3,通常太低。正確的值取決于需求,對(duì)于無法容忍數(shù)據(jù)丟失的應(yīng)用程序,請(qǐng)考慮Integer.MAX_VALUE(實(shí)際上是無窮大),這可以防止leader分區(qū)的brocker無法立即響應(yīng)生產(chǎn)請(qǐng)求。
9、對(duì)于高吞吐量生產(chǎn)者,調(diào)整緩沖區(qū)大小,特別是buffer.memory和batch.size(以字節(jié)為單位)。由于batch.size是按分區(qū)設(shè)置的,因此生產(chǎn)者性能和內(nèi)存使用量可與topic中的分區(qū)數(shù)相關(guān)聯(lián)。這里的值取決于幾個(gè)因素:生產(chǎn)者數(shù)據(jù)速率(消息的大小和數(shù)量),生成的分區(qū)數(shù)以及可用的內(nèi)存量。請(qǐng)記住,較大的緩沖區(qū)并不總是好的,如果生產(chǎn)者由于某種原因而停頓(例如,一個(gè)領(lǐng)導(dǎo)者通過確認(rèn)響應(yīng)較慢),在堆上緩存更多數(shù)據(jù)可能會(huì)導(dǎo)致更多垃圾收集。
10、制定應(yīng)用程序跟蹤指標(biāo),例如生成的消息數(shù),平均生成的消息大小和消耗的消息數(shù)。
第四部分:brocker最佳實(shí)踐!
11、Topic需要brocker的內(nèi)存和CPU資源,日志壓縮需要brocker上的堆(內(nèi)存)和CPU周期才能成功完成,并且失敗的日志壓縮會(huì)使brocker處于無限增長(zhǎng)的分區(qū)風(fēng)險(xiǎn)中。你可以在brocker上使用tunelog.cleaner.dedupe.buffer.size和log.cleaner.threads,但請(qǐng)記住,這些值會(huì)影響brocker上的堆使用情況。如果brocker拋出OutOfMemoryError異常,它將關(guān)閉并可能丟失數(shù)據(jù)。緩沖區(qū)大小和線程數(shù)將取決于要清理的主題分區(qū)數(shù)量以及這些分區(qū)中消息的數(shù)據(jù)速率和密鑰大小。從Kafka 0.10.2.1版本開始,監(jiān)視日志清理程序日志文件以查找ERROR條目是檢測(cè)日志清理程序線程問題的最可靠方法。
12、監(jiān)控brocker的網(wǎng)絡(luò)吞吐量。確保使用發(fā)送(TX)和接收(RX),磁盤I/O,磁盤空間和CPU使用率來執(zhí)行此操作。容量規(guī)劃是維護(hù)集群性能的關(guān)鍵部分。
13、在集群中的brocker之間分配分區(qū)leader,其需要大量的網(wǎng)絡(luò)I/O資源。例如,當(dāng)使用復(fù)制因子3運(yùn)行時(shí),leader必須接收分區(qū)數(shù)據(jù),并同步傳遞給所有副本,再傳輸給想要使用該數(shù)據(jù)的消費(fèi)者。因此,在這個(gè)例子中,作為領(lǐng)導(dǎo)者,在使用網(wǎng)絡(luò)I/O方面至少是follower的四倍,leader必須從磁盤讀取,follower只需要寫。
14、不要忽略監(jiān)視brocker的同步副本(ISR)縮減,重復(fù)不足的分區(qū)和不受歡迎的lesder。這些是集群中潛在問題的跡象。例如,單個(gè)分區(qū)的頻繁ISR收縮可能表明該分區(qū)的數(shù)據(jù)速率超過了leader為消費(fèi)者和副本線程提供服務(wù)的能力。
15、根據(jù)需要修改Apache Log4j屬性。Kafka代理日志記錄可能會(huì)占用過多磁盤空間。但是,不要完全放棄日志記錄,brocker日志可能是在事件發(fā)生后重建事件序列的最佳方式,有時(shí)也是唯一方式。
16、禁用topic自動(dòng)創(chuàng)建有關(guān)的明確策略,定期清理未使用的topic。例如,如果x天沒有看到任何消息,請(qǐng)考慮topic失效并將其從集群中刪除,這樣可以避免在集群中創(chuàng)建必須管理的其他元數(shù)據(jù)。
17、對(duì)于持續(xù)的高吞吐量代理,請(qǐng)?zhí)峁┳銐虻膬?nèi)存以避免從磁盤系統(tǒng)讀取,應(yīng)盡可能直接從操作系統(tǒng)的文件系統(tǒng)緩存中提供分區(qū)數(shù)據(jù)。但是,這意味著必須確保消費(fèi)者能夠跟上,滯后的消費(fèi)者將迫使brocker從磁盤讀取。
18、對(duì)于具有高吞吐量服務(wù)級(jí)別目標(biāo)(SLO)的大型集群,請(qǐng)考慮將topic隔離到brocker子集。如何確定要隔離的topic取決于業(yè)務(wù)需求,例如,如果有多個(gè)使用相同集群的聯(lián)機(jī)事務(wù)處理(OLTP)系統(tǒng),則將每個(gè)系統(tǒng)的topic隔離到brocker的不同子集以幫助限制事件的潛在爆炸半徑。
19、使用較新topic消息格式的舊客戶端(反之亦然)會(huì)在brocker客戶端轉(zhuǎn)換格式時(shí)對(duì)brocker程序施加額外負(fù)擔(dān),盡可能避免這種情況。
20、不要認(rèn)為在本地臺(tái)式機(jī)上測(cè)試brocker代表在實(shí)際生產(chǎn)環(huán)境中的性能。使用復(fù)制因子1對(duì)分區(qū)的環(huán)回接口進(jìn)行測(cè)試是與大多數(shù)生產(chǎn)環(huán)境完全不同的拓?fù)洹Mㄟ^環(huán)回可以忽略網(wǎng)絡(luò)延遲,并且在不涉及復(fù)制時(shí),接收leader確認(rèn)所需的時(shí)間可能會(huì)有很大差異。
“如何使用Scala開發(fā)Apache Kafka”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!