Kafka是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)。具有:高吞吐量、低延遲、可擴(kuò)展性、持久性、可靠性、容錯性、高并發(fā)等特性。常見的應(yīng)用場景有:日志收集、消息系統(tǒng)、流式處理等。
創(chuàng)新互聯(lián)堅持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的靈川網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!二. Kafka的基本架構(gòu)Producer
:生產(chǎn)者,也就是發(fā)送消息的一方。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息,然后將其發(fā)送到 Kafka。Consumer
:消費(fèi)者,也就是接受消息的一方。消費(fèi)者連接到 Kafka 上并接收消息,進(jìn)而進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理。Consumer Group
:一個消費(fèi)者組可以包含一個或多個消費(fèi)者。使用多分區(qū) + 多消費(fèi)者方式可以極大提高數(shù)據(jù)下游的處理速度,同一消費(fèi)組中的消費(fèi)者不會重復(fù)消費(fèi)消息,同樣的,不同消費(fèi)組中的消費(fèi)者消息消息時互不影響。Kafka 就是通過消費(fèi)組的方式來實現(xiàn)消息 P2P 模式和廣播模式。Broker
:服務(wù)代理節(jié)點(diǎn)。Broker 是 Kafka 的服務(wù)節(jié)點(diǎn),即 Kafka 的服務(wù)器。Topic
:Kafka 中的消息以 Topic 為單位進(jìn)行劃分,生產(chǎn)者將消息發(fā)送到特定的 Topic,而消費(fèi)者負(fù)責(zé)訂閱 Topic 的消息并進(jìn)行消費(fèi)。Partition
:Topic 是一個邏輯的概念,它可以細(xì)分為多個分區(qū),每個分區(qū)只屬于單個主題。同一個主題下不同分區(qū)包含的消息是不同的,分區(qū)在存儲層面可以看作一個可追加的日志(Log)文件,消息在被追加到分區(qū)日志文件的時候都會分配一個特定的偏移量(offset)。Offset
:offset 是消息在分區(qū)中的唯一標(biāo)識,Kafka 通過它來保證消息在分區(qū)內(nèi)的順序性,不過 offset 并不跨越分區(qū),也就是說,Kafka 保證的是分區(qū)有序性而不是主題有序性。Replication
:副本,是 Kafka 保證數(shù)據(jù)高可用的方式,Kafka 同一 Partition 的數(shù)據(jù)可以在多 Broker 上存在多個副本,通常只有主副本對外提供讀寫服務(wù),當(dāng)主副本所在 broker 崩潰或發(fā)生網(wǎng)絡(luò)一場,Kafka 會在 Controller 的管理下會重新選擇新的 Leader 副本對外提供讀寫服務(wù)。Record
:實際寫入 Kafka 中并可以被讀取的消息記錄。每個 record 包含了 key、value 和 timestamp。Kafka 在Topic
級別本身是無序的,只有partition
上才有序,所以為了保證處理順序,可以自定義分區(qū)器,將需順序處理的數(shù)據(jù)發(fā)送到同一個partition
。自定義分區(qū)器需要實現(xiàn)接口Partitioner
接口并實現(xiàn) 3 個方法:partition
,close
,configure
,在partition
方法中返回分區(qū)號即可。
Kafka 中發(fā)送 1 條消息的時候,可以指定(topic, partition, key)
3 個參數(shù),partiton
和key
是可選的。
Kafka 分布式的單位是partition
,同一個partition
用一個write ahead log
組織,所以可以保證FIFO
的順序。不同partition
之間不能保證順序。因此你可以指定partition
,將相應(yīng)的消息發(fā)往同 1個partition
,并且在消費(fèi)端,Kafka 保證1 個partition
只能被1 個consumer
消費(fèi),就可以實現(xiàn)這些消息的順序消費(fèi)。
另外,也可以指定 key(比如 order id),具有同 1 個 key 的所有消息,會發(fā)往同 1 個partition
,那這樣也實現(xiàn)了消息的順序消息。
Kafka在數(shù)據(jù)生產(chǎn)的時候,有一個數(shù)據(jù)分發(fā)策略。默認(rèn)的情況使用org.apache.kafka.clients.producer.internals.DefaultPartitioner
類,這個類中就是定義數(shù)據(jù)分發(fā)的策略。默認(rèn)策略為:
Kafka的消息避免丟失可以從三個方面考慮處理:Producer發(fā)送消息避免失敗、Broker能成功保存接收到的消息、Consumer確認(rèn)消費(fèi)消息。
Producer發(fā)送消息避免失敗
想要Produce發(fā)送消息不失敗,那就得知道發(fā)送結(jié)果,網(wǎng)絡(luò)抖動這些情況是無法避免的,只能是發(fā)送后獲取發(fā)送結(jié)果,那么最直接的方式就是把Kafka默認(rèn)的異步發(fā)送改為同步發(fā)送(Broker收到消息后ack
回復(fù)確認(rèn)),這樣就能實時知道消息發(fā)送的結(jié)果,但是這樣會讓Kafka的發(fā)送效率大大降低,因為Kafka在默認(rèn)的異步發(fā)送消息的時候可以批量發(fā)送,以此大幅度提高發(fā)送效率,因此一般很少使用同步發(fā)送的方式,除非消息很重要絕不允許丟失。
但是我們可以采用添加異步或調(diào)函數(shù),監(jiān)聽消息發(fā)送的結(jié)果,如果失敗可以在回調(diào)中重試,以此來達(dá)到盡可能的發(fā)送成功。同時Producer
本身提供了一個retries
的機(jī)制,如果因為網(wǎng)絡(luò)問題,或者Broker故障 導(dǎo)致發(fā)送失敗,就是重試。一般這個retries
設(shè)置3-5次或者更高,同時重試間隔時間也隨著次數(shù)增長。
Broker能成功保存接收到的消息
Broker要成功的保存接收到的消息并且不丟失,就需要把接收到的消息保存到磁盤。Kafka為了提高性能采用的是異步批量,存儲到磁盤的機(jī)制,就是有一定的消息量和時間間隔要求的,刷磁盤的這個動作是操作系統(tǒng)來調(diào)度的,如果在刷盤之前系統(tǒng)就崩潰了,就會數(shù)據(jù)丟失。
針對這個情況,Kafka采用Partition
分區(qū)ack
機(jī)制,Partition
分區(qū)是指一個Topic下的多個分區(qū),有一個Leader
分區(qū),其他的都是Follower
分區(qū),Leader
分區(qū)負(fù)責(zé)接收和被讀取消息,Follower
分區(qū)會通過Replication
機(jī)制同步Leader
的數(shù)據(jù),負(fù)責(zé)高可用(Kafka在2.4之后,Kafka提供了讀寫分離,Follower
也可以提供讀取),當(dāng)Leader
出現(xiàn)故障時會從Follower
中選取一個成為新的Leader
。那么當(dāng)一個消息發(fā)送到Leader
分區(qū)之后,Kafka提供了一個acks
的參數(shù),Producer
可以設(shè)置這個參數(shù),去結(jié)合broker
的Partition
機(jī)制來共同保障數(shù)據(jù)的可靠性,這個參數(shù)的值有三個
0
,表示Producer
不需要等待broker
的響應(yīng),就認(rèn)為消息發(fā)送成功了(可能存在數(shù)據(jù)丟失)1
,表示Leader
收到消息之后,不等待其他的Follower
的同步就給Producer
發(fā)一個確認(rèn),如果Leader
和Partition
掛了就可能存在數(shù)據(jù)丟失-1
,表示Leader
收到消息之后還會等待ISR
列表(與Leader
保持正常連接的Follwer
節(jié)點(diǎn)列表)中的Follower
同步完成,再給Producer
返回一個確認(rèn),也就是所有分區(qū)節(jié)點(diǎn)都確認(rèn)收到消息,保證數(shù)據(jù)不丟失Consumer確認(rèn)消費(fèi)消息
當(dāng)Producer
確定發(fā)送消息成功并且Broker
成功保存消息之后,基本上Consumer
就肯定能消費(fèi)到消息。Kafka在消費(fèi)者消費(fèi)時有一個offset
機(jī)制,代表了當(dāng)前消費(fèi)者消費(fèi)到了Partition
的哪一條消息。kafka的Consumer
的配置中,默認(rèn)的enable.auto.commit = true
,表示在Consumer
通過poll
方法 獲取到消息以后,每過5秒(通過配置項可修改)會自動獲取poll
中得到的大的offset
, 提交給Partition
中的offset_consumer
(存儲 offset 的特定topic)。如果enable.auto.commit = false
時,則關(guān)閉了自動提交,需要手動的通過應(yīng)用程序代碼進(jìn)行提交。
所以在Consumer
消費(fèi)消息時,丟失消息的可能會有兩種,比如開啟了offset
自動提交,但是消息消費(fèi)失??;或者沒有開啟自動提交offset
,但是在消費(fèi)消息之前提交了offset
。針對這兩種情況,可以設(shè)置在消息消費(fèi)完成后手動提交offset
??傊?code>Consumer端確認(rèn)消息消費(fèi)成功后再提交offset
即可保證消息正常消費(fèi)。
?Kafka中的每個Partition
都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到Partition
中。Partition
中的每個消息都有一個連續(xù)的序號,用于Partition
唯一標(biāo)識一條消息,這個唯一標(biāo)識就是offset
。Offset
從語義上來看擁有兩種:Current Offset
和Committed Offset
。
Current Offset
保存在Consumer
客戶端中,它表示Consumer
希望收到的下一條消息的序號。它僅僅在poll()
方法中使用。例如,Consumer
第一次調(diào)用poll()
方法后收到了20條消息,那么Current Offset
就被設(shè)置為20
。這樣Consumer
下一次調(diào)用poll()
方法時,Kafka就知道應(yīng)該從序號為21的消息開始讀取。這樣就能夠保證每次Consumer poll
消息時,都能夠收到不重復(fù)的消息。
Committed Offset
保存在Broker
上,它表示Consumer
已經(jīng)確認(rèn)消費(fèi)過的消息的序號。主要通過commitSync
和commitAsync
API來操作。舉個例子,Consumer
通過poll()
方法收到20條消息后,此時Current Offset
就是20,經(jīng)過一系列的邏輯處理后,并沒有調(diào)用consumer.commitAsync()
或consumer.commitSync()
來提交Committed Offset
,那么此時Committed Offset
依舊是0。Committed Offset
主要用于Consumer Rebalance
。在Consumer Rebalance
的過程中,一個partition
被分配給了一個Consumer
,那么這個Consumer
該從什么位置開始消費(fèi)消息呢?答案就是Committed Offset
。另外,如果一個Consumer
消費(fèi)了5條消息(poll并且成功commitSync
)之后宕機(jī)了,重新啟動之后它仍然能夠從第6條消息開始消費(fèi),因為Committed Offset
已經(jīng)被Kafka記錄為5。
在Kafka 0.9前,Committed Offset
信息保存在zookeeper
的consumers/{group}/offsets/{topic}/{partition}
目錄中(zookeeper
并不適合進(jìn)行大批量的讀寫操作,尤其是寫操作)。而在0.9之后,所有的offset
信息都保存在了Broker
上的一個名為_consumer_offsets
的topic
中。
順序讀寫
Kafka的Partition
中寫入數(shù)據(jù),是通過分段、追加日志的方式,這在很大程度上將讀寫限制為順序 I/O(sequential I/O),這在大多數(shù)的存儲介質(zhì)上都很快。實際上不管是內(nèi)存還是磁盤,快或慢關(guān)鍵在于尋址的方式,磁盤分為順序讀寫與隨機(jī)讀寫,內(nèi)存也一樣分為順序讀寫與隨機(jī)讀寫?;诖疟P的隨機(jī)讀寫確實很慢,但磁盤的順序讀寫性能卻很高,一般而言要高出磁盤隨機(jī)讀寫三個數(shù)量級,一些情況下磁盤順序讀寫性能甚至要高于內(nèi)存隨機(jī)讀寫。
Page Cache
為了優(yōu)化讀寫性能,Kafka利用了操作系統(tǒng)本身的Page Cache
,就是利用操作系統(tǒng)自身的內(nèi)存而不是JVM空間內(nèi)存。這樣做可以避免Object消耗,如果是使用 Java 堆,Java對象的內(nèi)存消耗比較大,通常是所存儲數(shù)據(jù)的兩倍甚至更多;還能避免GC問題,隨著JVM中數(shù)據(jù)不斷增多,垃圾回收將會變得復(fù)雜與緩慢,使用系統(tǒng)緩存就不會存在GC問題。
相比于使用JVM
或in-memory cache
等數(shù)據(jù)結(jié)構(gòu),利用操作系統(tǒng)的Page Cache
更加簡單可靠。首先,操作系統(tǒng)層面的緩存利用率會更高,因為存儲的都是緊湊的字節(jié)結(jié)構(gòu)而不是獨(dú)立的對象。其次,操作系統(tǒng)本身也對于Page Cache
做了大量優(yōu)化,提供了write-behind
、read-ahead
以及flush
等多種機(jī)制。再者,即使服務(wù)進(jìn)程重啟,系統(tǒng)緩存依然不會消失,避免了in-process cache
重建緩存的過程。
通過操作系統(tǒng)的Page Cache
,Kafka的讀寫操作基本上是基于內(nèi)存的,讀寫速度得到了極大的提升。
零拷貝
Linux操作系統(tǒng) 零拷貝 機(jī)制使用了sendfile
方法, 允許操作系統(tǒng)將數(shù)據(jù)從Page Cache
直接發(fā)送到網(wǎng)絡(luò),只需要最后一步的copy操作將數(shù)據(jù)復(fù)制到 NIC 緩沖區(qū), 這樣避免重新復(fù)制數(shù)據(jù)。零拷貝的技術(shù)基礎(chǔ)是DMA,又稱之為直接內(nèi)存訪問。DMA 傳輸將數(shù)據(jù)從一個地址空間復(fù)制到另外一個地址空間。當(dāng)CPU 初始化這個傳輸動作,傳輸動作本身是由 DMA 控制器來實行和完成。因此通過DMA,硬件則可以繞過CPU,自己去直接訪問系統(tǒng)主內(nèi)存。很多硬件都支持DMA,其中就包括網(wǎng)卡、聲卡、磁盤驅(qū)動控制器等。通過這種 “零拷貝” 的機(jī)制,Page Cache
結(jié)合sendfile
方法,Kafka消費(fèi)端的性能也大幅提升。這也是為什么有時候消費(fèi)端在不斷消費(fèi)數(shù)據(jù)時,我們并沒有看到磁盤io比較高,此刻正是操作系統(tǒng)緩存在提供數(shù)據(jù)。
當(dāng)Kafka客戶端從服務(wù)器讀取數(shù)據(jù)時,如果不使用零拷貝技術(shù),那么大致需要經(jīng)歷這樣的一個過程:
如果使用零拷貝技術(shù),那么只需要:
批量讀寫
Kafka數(shù)據(jù)讀寫也是批量的而不是單條的。除了利用底層的技術(shù)外,Kafka還在應(yīng)用程序?qū)用嫣峁┝艘恍┦侄蝸硖嵘阅?,最明顯的就是使用批次,在向Kafka寫入數(shù)據(jù)時,可以啟用批次寫入,這樣可以避免在網(wǎng)絡(luò)上頻繁傳輸單個消息帶來的延遲和帶寬開銷。假設(shè)網(wǎng)絡(luò)帶寬為10MB/S,一次性傳輸10MB的消息比傳輸1KB的消息10000萬次顯然要快得多。
批量壓縮
Kafka可以把所有的消息都變成一個批量的文件,并且進(jìn)行合理的批量壓縮,減少網(wǎng)絡(luò)IO損耗,通過mmap提高I/O速度,寫入數(shù)據(jù)的時候由于單個Partion是末尾添加所以速度最優(yōu),讀取數(shù)據(jù)的時候配合sendfile直接輸出。并且Kafka支持多種壓縮協(xié)議,包括Gzip和Snappy壓縮協(xié)議。
Kafka的topic
數(shù)量多性能會下降的主要原因是topic
在物理層面以partition
為分組,一個topic
可以分成若干個partition
,partition
還可以細(xì)分為logSegment
,一個partition
物理上由多個logSegment
組成,logSegment
文件由兩部分組成,分別為.index
文件和.log
文件,分別表示為Segment
索引文件和數(shù)據(jù)文件。Kafka在Broker
接受并存儲消息的時候,是將消息數(shù)據(jù)使用分段、追加日志的方式寫入log文件,在很大程度上將讀寫限制為順序 I/O(sequential I/O),那么如果topic
數(shù)量很多,即使每個topic
只有1個partition
,也會導(dǎo)致總分區(qū)數(shù)很多,磁盤讀寫退化為隨機(jī),影響性能。同時Kafka中topic
的元數(shù)據(jù)是在zookeeper
中的,大量topic
確實會造成性能瓶頸(zk不適合做高并發(fā)的讀寫操作),不僅在磁盤讀寫上。而且topic
太多造成partition
過多。partition
是kafka的最小并行單元,每個partition
都會在對應(yīng)的broker
上有日志文件。當(dāng)topic
過多,partition
增加,日志文件數(shù)也隨之增加,就需要允許打開更多的文件數(shù)。partition
過多在controller
選舉和controller
重新選舉partition leader
的耗時會大大增加,造成kafka不可用的時間延長。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧