本篇內(nèi)容介紹了“Kafka的原理和應(yīng)用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
我們提供的服務(wù)有:成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、外貿(mào)網(wǎng)站建設(shè)、微信公眾號(hào)開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、海晏ssl等。為1000+企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的海晏網(wǎng)站制作公司
Kafka是最初由Linkedin公司開發(fā),是一個(gè)分布式、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當(dāng)做MQ系統(tǒng)),常見可以用于web/nginx日志、訪問日志,消息服務(wù)等等,Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開源項(xiàng)目,Java核心學(xué)習(xí)筆記分享。
主要應(yīng)用場景是:日志收集系統(tǒng)和消息系統(tǒng)。
Kafka主要設(shè)計(jì)目標(biāo)如下:
以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能。
高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸。
支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸。
同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
一個(gè)典型的kafka集群中包含若干producer,若干broker,若干consumer,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在consumer group發(fā)生變化時(shí)進(jìn)行rebalance。producer使用push模式將消息發(fā)布到broker,consumer使用pull模式從broker訂閱并消費(fèi)消息。
Kafka專用術(shù)語:
Broker:消息中間件處理結(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker,多個(gè)broker可以組成一個(gè)Kafka集群。
Topic:一類消息,Kafka集群能夠同時(shí)負(fù)責(zé)多個(gè)topic的分發(fā)。
Partition:topic物理上的分組,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。
Segment:partition物理上由多個(gè)segment組成。
offset:每個(gè)partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。partition中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset,用于partition唯一標(biāo)識(shí)一條消息。
Producer:負(fù)責(zé)發(fā)布消息到Kafka broker。
Consumer:消息消費(fèi)者,向Kafka broker讀取消息的客戶端。
Consumer Group:每個(gè)Consumer屬于一個(gè)特定的Consumer Group。
at most once:最多一次,這個(gè)和JMS中"非持久化"消息類似,發(fā)送一次,無論成敗,將不會(huì)重發(fā)。消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理。那么此后"未處理"的消息將不能被fetch到,這就是"at most once"。
at least once:消息至少發(fā)送一次,如果消息未能接受成功,可能會(huì)重發(fā),直到接收成功。消費(fèi)者fetch消息,然后處理消息,然后保存offset。如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來再次fetch時(shí)可能獲得上次已經(jīng)處理過的消息,這就是"at least once",原因offset沒有及時(shí)的提交給zookeeper,zookeeper恢復(fù)正常還是之前offset狀態(tài)。
exactly once:消息只會(huì)發(fā)送一次。kafka中并沒有嚴(yán)格的去實(shí)現(xiàn)(基于2階段提交),我們認(rèn)為這種策略在kafka中是沒有必要的。
通常情況下"at-least-once"是我們首選。
Topic & Partition
一個(gè)topic可以認(rèn)為一個(gè)一類消息,每個(gè)topic將被分成多個(gè)partition,每個(gè)partition在存儲(chǔ)層面是append log文件。
在Kafka文件存儲(chǔ)中,同一個(gè)topic下有多個(gè)不同partition,每個(gè)partition為一個(gè)目錄,partiton命名規(guī)則為topic名稱+有序序號(hào),第一個(gè)partiton序號(hào)從0開始,序號(hào)最大值為partitions數(shù)量減1。
每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中。但每個(gè)段segment file消息數(shù)量不一定相等,這種特性方便old segment file快速被刪除。
每個(gè)partiton只需要支持順序讀寫就行了,segment文件生命周期由服務(wù)端配置參數(shù)決定。
這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。
segment file組成:由2大部分組成,分別為index file和data file,此2個(gè)文件一一對(duì)應(yīng),成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件、數(shù)據(jù)文件.
segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值。數(shù)值最大為64位long大小,19位數(shù)字字符長度,沒有數(shù)字用0填充。
segment中index與data file對(duì)應(yīng)關(guān)系物理結(jié)構(gòu)如下:
上圖中索引文件存儲(chǔ)大量元數(shù)據(jù),數(shù)據(jù)文件存儲(chǔ)大量消息,索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。
其中以索引文件中元數(shù)據(jù)3,497為例,依次在數(shù)據(jù)文件中表示第3個(gè)message(在全局partiton表示第368772個(gè)message),以及該消息的物理偏移地址為497。
了解到segment data file由許多message組成,下面詳細(xì)說明message物理結(jié)構(gòu)如下:
參數(shù)說明:
關(guān)鍵字解釋說明8 byte offset在parition(分區(qū))內(nèi)的每條消息都有一個(gè)有序的id號(hào),這個(gè)id號(hào)被稱為偏移(offset),它可以唯一確定每條消息在parition(分區(qū))內(nèi)的位置。即offset表示partiion的第多少message4 byte message sizemessage大小4 byte CRC32用crc32校驗(yàn)message1 byte “magic"表示本次發(fā)布Kafka服務(wù)程序協(xié)議版本號(hào)1 byte “attributes"表示為獨(dú)立版本、或標(biāo)識(shí)壓縮類型、或編碼類型。4 byte key length表示key的長度,當(dāng)key為-1時(shí),K byte key字段不填K byte key可選value bytes payload表示實(shí)際消息數(shù)據(jù)。
Kafka的高可靠性的保障來源于其健壯的副本(replication)策略。
數(shù)據(jù)同步
kafka在0.8版本前沒有提供Partition的Replication機(jī)制,一旦Broker宕機(jī),其上的所有Partition就都無法提供服務(wù),而Partition又沒有備份數(shù)據(jù),數(shù)據(jù)的可用性就大大降低了。所以0.8后提供了Replication機(jī)制來保證Broker的failover。
引入Replication之后,同一個(gè)Partition可能會(huì)有多個(gè)Replica,而這時(shí)需要在這些Replication之間選出一個(gè)Leader,Producer和Consumer只與這個(gè)Leader交互,其它Replica作為Follower從Leader中復(fù)制數(shù)據(jù)。
副本放置策略
為了更好的做負(fù)載均衡,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上。
Kafka分配Replica的算法如下:
將所有存活的N個(gè)Brokers和待分配的Partition排序
將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上,這個(gè)Partition的第一個(gè)Replica存在于這個(gè)分配的Broker上,并且會(huì)作為partition的優(yōu)先副本
將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上
假設(shè)集群一共有4個(gè)brokers,一個(gè)topic有4個(gè)partition,每個(gè)Partition有3個(gè)副本。下圖是每個(gè)Broker上的副本分配情況。
同步策略
Producer在發(fā)布消息到某個(gè)Partition時(shí),先通過ZooKeeper找到該P(yáng)artition的Leader,然后無論該Topic的Replication Factor為多少,Producer只將該消息發(fā)送到該P(yáng)artition的Leader。Leader會(huì)將該消息寫入其本地Log。每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW并且向Producer發(fā)送ACK。
為了提高性能,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK,而非等到數(shù)據(jù)寫入Log中。因此,對(duì)于已經(jīng)commit的消息,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)。
Consumer讀消息也是從Leader讀取,只有被commit過的消息才會(huì)暴露給Consumer。
Kafka Replication的數(shù)據(jù)流如下圖所示:
對(duì)于Kafka而言,定義一個(gè)Broker是否“活著”包含兩個(gè)條件:
一是它必須維護(hù)與ZooKeeper的session(這個(gè)通過ZooKeeper的Heartbeat機(jī)制來實(shí)現(xiàn))。
二是Follower必須能夠及時(shí)將Leader的消息復(fù)制過來,不能“落后太多”。
Leader會(huì)跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個(gè)Follower宕機(jī),或者落后太多,Leader將把它從ISR中移除。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過預(yù)定值或者Follower超過一定時(shí)間未向Leader發(fā)送fetch請(qǐng)求。
Kafka只解決fail/recover,一條消息只有被ISR里的所有Follower都從Leader復(fù)制過去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了Leader,還沒來得及被任何Follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(Consumer無法消費(fèi)這些數(shù)據(jù))。而對(duì)于Producer而言,它可以選擇是否等待消息commit。這種機(jī)制確保了只要ISR有一個(gè)或以上的Follower,一條被commit的消息就不會(huì)丟失。
leader選舉
Leader選舉本質(zhì)上是一個(gè)分布式鎖,有兩種方式實(shí)現(xiàn)基于ZooKeeper的分布式鎖:
節(jié)點(diǎn)名稱唯一性:多個(gè)客戶端創(chuàng)建一個(gè)節(jié)點(diǎn),只有成功創(chuàng)建節(jié)點(diǎn)的客戶端才能獲得鎖
臨時(shí)順序節(jié)點(diǎn):所有客戶端在某個(gè)目錄下創(chuàng)建自己的臨時(shí)順序節(jié)點(diǎn),只有序號(hào)最小的才獲得鎖
Majority Vote的選舉策略和ZooKeeper中的Zab選舉是類似的,實(shí)際上ZooKeeper內(nèi)部本身就實(shí)現(xiàn)了少數(shù)服從多數(shù)的選舉策略。kafka中對(duì)于Partition的leader副本的選舉采用了第一種方法:為Partition分配副本,指定一個(gè)ZNode臨時(shí)節(jié)點(diǎn),第一個(gè)成功創(chuàng)建節(jié)點(diǎn)的副本就是Leader節(jié)點(diǎn),其他副本會(huì)在這個(gè)ZNode節(jié)點(diǎn)上注冊(cè)Watcher監(jiān)聽器,一旦Leader宕機(jī),對(duì)應(yīng)的臨時(shí)節(jié)點(diǎn)就會(huì)被自動(dòng)刪除,這時(shí)注冊(cè)在該節(jié)點(diǎn)上的所有Follower都會(huì)收到監(jiān)聽器事件,它們都會(huì)嘗試創(chuàng)建該節(jié)點(diǎn),只有創(chuàng)建成功的那個(gè)follower才會(huì)成為Leader(ZooKeeper保證對(duì)于一個(gè)節(jié)點(diǎn)只有一個(gè)客戶端能創(chuàng)建成功),其他follower繼續(xù)重新注冊(cè)監(jiān)聽事件。
同一Topic的一條消息只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi),但多個(gè)Consumer Group可同時(shí)消費(fèi)這一消息。
這是Kafka用來實(shí)現(xiàn)一個(gè)Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個(gè)Consumer)的手段。一個(gè)Topic可以對(duì)應(yīng)多個(gè)Consumer Group。如果需要實(shí)現(xiàn)廣播,只要每個(gè)Consumer有一個(gè)獨(dú)立的Group就可以了。要實(shí)現(xiàn)單播只要所有的Consumer在同一個(gè)Group里。用Consumer Group還可以將Consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的Topic。
Push vs. Pull
作為一個(gè)消息系統(tǒng),Kafka遵循了傳統(tǒng)的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息。
push模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由broker決定的。push模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
對(duì)于Kafka而言,pull模式更合適。pull模式可簡化broker的設(shè)計(jì),Consumer可自主控制消費(fèi)消息的速率,同時(shí)Consumer可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義。
生產(chǎn)者(producer)是負(fù)責(zé)向Kafka提交數(shù)據(jù)的,Kafka會(huì)把收到的消息都寫入到硬盤中,它絕對(duì)不會(huì)丟失數(shù)據(jù)。為了優(yōu)化寫入速度Kafak采用了兩個(gè)技術(shù),順序?qū)懭牒蚆MFile。
順序?qū)懭?/strong>
因?yàn)橛脖P是機(jī)械結(jié)構(gòu),每次讀寫都會(huì)尋址,寫入,其中尋址是一個(gè)“機(jī)械動(dòng)作”,它是最耗時(shí)的。所以硬盤最“討厭”隨機(jī)I/O,最喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
每條消息都被append到該P(yáng)artition中,屬于順序?qū)懘疟P,因此效率非常高。
對(duì)于傳統(tǒng)的message queue而言,一般會(huì)刪除已經(jīng)被消費(fèi)的消息,而Kafka是不會(huì)刪除數(shù)據(jù)的,它會(huì)把所有的數(shù)據(jù)都保留下來,每個(gè)消費(fèi)者(Consumer)對(duì)每個(gè)Topic都有一個(gè)offset用來表示讀取到了第幾條數(shù)據(jù)。
即便是順序?qū)懭胗脖P,硬盤的訪問速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲(chǔ)來利用內(nèi)存提高I/O效率。
在Linux Kernal 2.2之后出現(xiàn)了一種叫做“零拷貝(zero-copy)”系統(tǒng)調(diào)用機(jī)制,就是跳過“用戶緩沖區(qū)”的拷貝,建立一個(gè)磁盤空間和內(nèi)存空間的直接映射,數(shù)據(jù)不再復(fù)制到“用戶態(tài)緩沖區(qū)”系統(tǒng)上下文切換減少2次,可以提升一倍性能。
通過mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存)。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷(調(diào)用文件的read會(huì)把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中,然后再復(fù)制到用戶空間的內(nèi)存中。)
試想一下,一個(gè)Web Server傳送一個(gè)靜態(tài)文件,如何優(yōu)化?答案是zero copy。傳統(tǒng)模式下我們從硬盤讀取一個(gè)文件是這樣的。
先復(fù)制到內(nèi)核空間(read是系統(tǒng)調(diào)用,放到了DMA,所以用內(nèi)核空間),然后復(fù)制到用戶空間(1、2);從用戶空間重新復(fù)制到內(nèi)核空間(你用的socket是系統(tǒng)調(diào)用,所以它也有自己的內(nèi)核空間),最后發(fā)送給網(wǎng)卡(3、4)。
Zero Copy中直接從內(nèi)核空間(DMA的)到內(nèi)核空間(Socket的),然后發(fā)送網(wǎng)卡。這個(gè)技術(shù)非常普遍,Nginx也是用的這種技術(shù)。
實(shí)際上,Kafka把所有的消息都存放在一個(gè)一個(gè)的文件中,當(dāng)消費(fèi)者需要數(shù)據(jù)的時(shí)候Kafka直接把“文件”發(fā)送給消費(fèi)者。Java中間件面試題 +學(xué)習(xí)筆記,當(dāng)不需要把整個(gè)文件發(fā)出去的時(shí)候,Kafka通過調(diào)用Zero Copy的sendfile這個(gè)函數(shù),這個(gè)函數(shù)包括:
out_fd作為輸出(一般及時(shí)socket的句柄)
in_fd作為輸入文件句柄
off_t表示in_fd的偏移(從哪里開始讀取)
size_t表示讀取多少個(gè)
“Kafka的原理和應(yīng)用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!