一、簡介
站在用戶的角度思考問題,與客戶深入溝通,找到燈塔網(wǎng)站設(shè)計與燈塔網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、域名注冊、網(wǎng)頁空間、企業(yè)郵箱。業(yè)務(wù)覆蓋燈塔地區(qū)。1、消息傳輸流程
Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設(shè)計實現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實現(xiàn)。kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息。
Producer即生產(chǎn)者,向Kafka集群發(fā)送消息,在發(fā)送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發(fā)送了分類為topic1的消息,另外一個發(fā)送了topic2的消息。
Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關(guān)注自己需要的Topic中的消息
Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。
2、Topics/logs
一個Topic可以認為是一類消息,每個topic將被分成多個partition(區(qū)),每個partition在存儲層面是append log文件。任何發(fā)布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數(shù)字,它是唯一標記一條消息。它唯一的標記一條消息。kafka并沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行“隨機讀寫”。
談到kafka的存儲,就不得不提到分區(qū),即partitions,創(chuàng)建一個topic時,同時可以指定分區(qū)數(shù)目,分區(qū)數(shù)越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產(chǎn)者發(fā)送的消息之后,會根據(jù)均衡策略將消息存儲到不同的分區(qū)中。
kafka服務(wù)器消息存儲策略如圖
kafka和JMS(Java Message Service)實現(xiàn)(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除.日志文件將會根據(jù)broker中的配置要求,保留一定的時間之后刪除;比如log文件保留2天,那么兩天后,文件會被清除,無論其中的消息是否被消費.kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費之后對文件內(nèi)容改動的磁盤IO開支.
對于consumer而言,它需要保存消費消息的offset,對于offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向前驅(qū)動,即消息將依次順序被消費.事實上consumer可以使用任意順序消費消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實現(xiàn)非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響.
partitions的設(shè)計目的有多個.最根本原因是kafka基于文件存儲.通過分區(qū),可以將日志內(nèi)容分散到多個server上,來避免文件尺寸達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費的能力.(具體原理參見下文).
3、Distribution(分布)
一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(shù)(replicas),每個partition將會被備份到多臺機器上,以提高可用性.
基于replicated方案,那么就意味著需要對多個備份進行調(diào)度;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader);follower只是單調(diào)的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩(wěn)定.
Producers
Producer將消息發(fā)布到指定的Topic中,同時Producer也能決定將此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等.
Consumers
本質(zhì)上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費.
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡.
如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會廣播給所有的消費者.
在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來說,消息仍不是有序的.
kafka的設(shè)計原理決定,對于一個topic,同一個group中不能有多于partitions個數(shù)的consumer同時消費,否則將意味著某些consumer將無法得到消息.
Guarantees
1) 發(fā)送到partitions中的消息將會按照它接收的順序追加到日志中
2) 對于消費者而言,它們消費消息的順序和日志中消息順序一致.
3) 如果Topic的"replicationfactor"為N,那么允許N-1個kafka實例失效。
與生產(chǎn)者的交互
生產(chǎn)者在向kafka集群發(fā)送消息的時候,可以通過指定分區(qū)來發(fā)送到指定的分區(qū)中,也可以通過指定均衡策略來將消息發(fā)送到不同的分區(qū)中,如果不指定,就會采用默認的隨機均衡策略,將消息隨機的存儲到不同的分區(qū)中
與消費者的交互
在消費者消費消息時,kafka使用offset來記錄當前消費的位置,在kafka的設(shè)計中,可以有多個不同的group來同時消費同一個topic下的消息,如圖,我們有兩個不同的group同時消費,他們的的消費的記錄位置offset各不項目,不互相干擾。
對于一個group而言,消費者的數(shù)量不應該多余分區(qū)的數(shù)量,因為在一個group中,每個分區(qū)至多只能綁定到一個消費者上,即一個消費者可以消費多個分區(qū),一個分區(qū)只能給一個消費者消費
因此,若一個group中的消費者數(shù)量大于分區(qū)數(shù)量的話,多余的消費者將不會收到任何消息。
二、使用場景
1、Messaging
對于一些常規(guī)的消息系統(tǒng),kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優(yōu)勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔保(消息確認機制)""消息分組"等企業(yè)級特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如,消息重發(fā),消息發(fā)送丟失等)
2、Websit activity tracking
kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中.并實時監(jiān)控,或者離線統(tǒng)計分析等
3、Log Aggregation
kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統(tǒng)化的存儲和分析系統(tǒng).
三、設(shè)計原理
kafka的設(shè)計初衷是希望作為一個統(tǒng)一的信息收集平臺,能夠?qū)崟r的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯能力。
1、持久性
2、性能
3、生產(chǎn)者
4、消費者
5、消息傳送機制
6、復制備份
7、日志
8、分配
四、主要配置
1、Broker配置
2、Consumer主要配置
3、Producer主要配置
五、kafka集群搭建步驟
1、系統(tǒng)環(huán)境
主機名 | 系統(tǒng) | zookeeper版本 | IP |
master | CentOS7.4 | 3.4.12 | 192.168.56.129 |
slave1 | CentOS7.4 | 3.4.12 | 192.168.56.130 |
slave2 | CentOS7.4 | 3.4.12 | 192.168.56.131 |
2、暫時關(guān)閉防火墻和selinux
3、軟件下載
下載地址:http://kafka.apache.org/downloads.html
備注:下載最新的二進制tgz包
4、搭建zookeeper集群
備注:小伙伴可以參考上一篇文章即可
5、kafka集群
5.1、根據(jù)上面的zookeeper集群服務(wù)器,把kafka上傳到/home下
5.2、解壓
[root@master home]# tar -zxvf kafka_2.12-2.0.0.tgz
[root@master home]# mv kafka_2.12-2.0.0 kafka01
5.3、配置文件
[root@master home]# cd /home/kafka01/config/
備注:server.properties文件里的broker.id,log.dirs,zookeeper.connect必須根據(jù)實際情況進行修改,其他項根據(jù)需要自行斟酌,master配置如下:
broker.id=1
port=9091
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka/kafka-logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
num.replica.fetchers=2
log.cleanup.interval.mins=10
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
5.4、啟動服務(wù)(master)----前提是三個節(jié)點的zookeeper已啟動
[root@master kafka01]# ./bin/kafka-server-start.sh config/server.properties &
補充:
問題:&可以使程序在后臺運行,但一旦斷開ssh終端,后臺Java程序也會終止。
解決辦法:使用shell腳本啟動
[root@master kafka01]# cat start.sh
#!/bin/bash
cd /home/kafka01/
./bin/kafka-server-start.sh config/server.properties &
exit
授權(quán),運行即可
[root@master kafka01]#chmod +x start.sh
5.5、配置slave1和slave2
slave1配置如下:
broker.id=2
port=9092
log.dirs=/var/log/kafka
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
啟動即可
slave2配置如下:
broker.id=3
port=9093
log.dirs=/var/log/kafka
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
啟動即可
6、測試
Kafka通過topic對同一類的數(shù)據(jù)進行管理,同一類的數(shù)據(jù)使用同一個topic可以在處理數(shù)據(jù)時更加的便捷
6.1、創(chuàng)建一個Topic
[root@master kafka01]# bin/kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --replication-factor 1 --partitions 1 --topic test
查看
[root@master kafka01]# bin/kafka-topics.sh --list --zookeeper 192.168.56.129:2181
6.2、創(chuàng)建一個消息消費者
[root@master kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.129:9091 --topic test --from-beginning
消費者創(chuàng)建完成之后,因為還沒有發(fā)送任何數(shù)據(jù),因此這里在執(zhí)行后沒有打印出任何數(shù)據(jù)
不過別著急,不要關(guān)閉這個終端,打開一個新的終端,接下來我們創(chuàng)建第一個消息生產(chǎn)者
6.3、創(chuàng)建一個消息生產(chǎn)者
在kafka解壓目錄打開一個新的終端,輸入
[root@master kafka01]# bin/kafka-console-producer.sh --broker-list 192.168.56.129:9091 --topic test
在發(fā)送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經(jīng)打印出了我們剛才發(fā)送的消息
zookeeper查看topic
到此即可,共同進步之路?。。。?!
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。