真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Kafka原理及Kafka群集部署-創(chuàng)新互聯(lián)

博文大綱:
一、Kafka概述
1)消息隊列
2)為什么要使用消息隊列?
3)什么是Kafka?
4)Kafka的特性
5)Kafka架構(gòu)
6)Topic和Partition的區(qū)別
7)kafka流程圖
8)Kafka的文件存儲機制
9)數(shù)據(jù)的可靠性和持久性保證
10)leader選舉
二、部署單機Kafka
1)部署Kafka
2)測試Kafka
三、部署Kafka群集
1)環(huán)境準(zhǔn)備
2)部署zookeeper群集
3)部署Kafka群集

創(chuàng)新互聯(lián)公司專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、祿勸網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5場景定制、電子商務(wù)商城網(wǎng)站建設(shè)、集團公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為祿勸等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

一、Kafka概述

1)消息隊列

1)點對點模式(一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除)

點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監(jiān)聽者也是如此;

2)發(fā)布/訂閱模式(一對多,數(shù)據(jù)生產(chǎn)后,推送給所有訂閱者)

發(fā)布訂閱模型則是一個基于推送的消息傳送模型。發(fā)布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監(jiān)聽主題時才接收消息,而持久訂閱者則監(jiān)聽主題的所有消息,即使當(dāng)前訂閱者不可用,處于離線狀態(tài)。

2)為什么要使用消息隊列?

1)解耦

允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2)冗余

消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

3)擴展性

因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。

4)靈活性&&削峰填谷

在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。

5)可恢復(fù)性

系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

6)順序保證

在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。(Kafka 保證一個 Partition 內(nèi)的消息的有序性)

7)緩沖

有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。

8)異步通信

很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

3)什么是Kafka?

kafka是由Apache軟件基金會發(fā)布的一個開源流處理平臺,由Scala和Java編寫。它是一種高吞吐量的分布式發(fā)布的訂閱消息系統(tǒng),它可以處理消費者規(guī)模的網(wǎng)站中的所有動作流數(shù)據(jù)。這種動作(網(wǎng)頁瀏覽,搜索和其他用戶的行動)是在現(xiàn)代網(wǎng)絡(luò)上的許多社會功能的一個關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統(tǒng)一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

4)Kafka的特性

kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),具有以下特性:
1)通過磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對于即使數(shù)以TB的消息存儲也能夠保持長時間的穩(wěn)定性能;
2)持久性:使用文件性存儲,日志文件存儲消息,需要寫入硬盤,采用達到一定閾值才寫入硬盤,從而減少磁盤I/O,如果kafka突然宕機,數(shù)據(jù)會丟失一部分;
3)高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)百萬的消息;
4)支持通過kafka服務(wù)器和消費機集群來分區(qū)消息;
5)支持Hadoop并行數(shù)據(jù)加載;

5)Kafka架構(gòu)

Kafka原理及Kafka群集部署

架構(gòu)圖中的各個組件作用:
1)Producer :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端;
2)Broker :一臺 kafka 服務(wù)器就是一個 broker。一個集群由多個broker 組成。一個 broker可以容納多個 topic;
3)Consumer :消息消費者,向 kafka broker 取消息的客戶端;
4)Partition:為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)。kafka 只保證按一個 partition 中的順序?qū)⑾l(fā)給consumer,不保證一個 topic 的整體(多個 partition 間)的順序;
5)Topic :Kafka根據(jù)topic對消息進行歸類,發(fā)布到Kafka集群的每條消息都需要指定一個topic;
6)ConsumerGroup:每個Consumer屬于一個特定的Consumer Group,一條消息可以發(fā)送到多個不同的Consumer Group,但是一個Consumer Group中只能有一個Consumer能夠消費該消息;

6)Topic和Partition的區(qū)別

一個topic可以認為一個一類消息,每個topic將被分成多個partition,每個partition在存儲層面是append log文件。任何發(fā)布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為long型的數(shù)字,它唯一標(biāo)記一條消息。每條消息都被append到partition中,是順序?qū)懘疟P,因此效率非常高(順序?qū)懘疟P比隨機寫內(nèi)存的速度還要高,這是kafka高吞吐率的一個很重要的保證)。
Kafka原理及Kafka群集部署
每一條消息被發(fā)送到broker中,會根據(jù)partition規(guī)則選擇被存儲到哪一個partition(默認采用輪詢的方式進行寫入數(shù)據(jù))。如果partition規(guī)則設(shè)置合理,所有消息可以均勻分布到不同的partition里,這樣就實現(xiàn)了水平擴展。(如果一個topic對應(yīng)一個文件,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸,而partition解決了這個問題),如果消息被消費則保留append.log兩天。

7)kafka流程圖

Kafka原理及Kafka群集部署
如上圖所示,一個典型的kafka體系架構(gòu)包括若干Producer(可以是服務(wù)器日志,業(yè)務(wù)數(shù)據(jù),頁面前端產(chǎn)生的page view等),若干個broker(kafka支持水平擴展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer(Group),以及一個Zookeeper集群。kafka通過Zookeeper管理集群配置,選舉出leader,以及在consumer group發(fā)生變化時進行重新調(diào)整。Producer使用push(推)模式將消息發(fā)布到broker,consumer使用pull(拉)模式從broker訂閱并消費消息。

zookeeper群集中有兩個角色:leader和follower,leader對外提供服務(wù),follower負責(zé)leader里面所產(chǎn)生內(nèi)容同步消息寫入生成時產(chǎn)生replicas(副本);

kafka的高可靠性的保證來源于其健壯的副本(replicas)策略。通過調(diào)節(jié)其副本相關(guān)參數(shù),可以使得kafka在性能和可靠性之間運轉(zhuǎn)之間的游刃有余。kafka從0.8.x版本開始提供partition級別的復(fù)制的。

8)Kafka的文件存儲機制

kafka中消息是以topic進行分類的,生產(chǎn)者通過topic向kafka broker發(fā)送消息,消費者通過topic讀取數(shù)據(jù)。然而topic在物理層面又能以partition為分組,一個topic可以分為若干個partition,partition還可以細分為segment,一個partition物理上由多個segment組成。

為了便于說明問題,假設(shè)這里只有一個kafka集群,且這個集群只有一個kafka broker,也就是只有一臺物理機。在這個kafka broker的server.properties配置文件中定義kafka的日志文件存放路徑以此來設(shè)置kafka消息文件存儲目錄,與此同時創(chuàng)建一個topic:test,partition的數(shù)量為4,啟動kafka就可以在日志存放路徑中看到生成4個目錄,在kafka文件存儲中,同一個topic下有多個不同的partition,每個partition為一個目錄,partition的名稱規(guī)則為:topic名稱+有序序號,第一個序號從0開始。

segment是什么?

如果就以partition為最小存儲單位,我們可以想象當(dāng)Kafka producer不斷發(fā)送消息,必然會引起partition文件的無限擴張,這樣對于消息文件的維護以及已經(jīng)被消費的消息的清理帶來嚴(yán)重的影響,所以這里以segment為單位又將partition細分。每個partition(目錄)相當(dāng)于一個巨型文件被平均分配到多個大小相等的segment(段)數(shù)據(jù)文件中(每個segment 文件中消息數(shù)量不一定相等)這種特性也方便old segment的刪除,即方便已被消費的消息的清理,提高磁盤的利用率。每個partition只需要支持順序讀寫就行。

segment文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為segment索引文件和數(shù)據(jù)文件。這兩個文件的命令規(guī)則為:partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值(偏移量),數(shù)值大小為64位,20位數(shù)字字符長度,沒有數(shù)字用0填充。

9)數(shù)據(jù)的可靠性和持久性保證

當(dāng)producer向leader發(fā)送數(shù)據(jù)時,可以通request.required.acks參數(shù)來設(shè)置數(shù)據(jù)可靠性的級別:

  • 1(默認):producer的leader已成功收到數(shù)據(jù)并得到確認。如果leader宕機了,則會丟失數(shù)據(jù);
  • 0 :producer無需等待來自broker的確認而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率高,但是數(shù)據(jù)可靠性確是最低的;
  • -1:producer需要等待所有follower都確認接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性高;

10)leader選舉

一條消息只有被所有follower都從leader復(fù)制過去才會被認為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進了leader,還沒來得及被任何follower復(fù)制就宕機了,而造成數(shù)據(jù)丟失。而對于producer而言,它可以選擇是否等待消息commit。

一種非常常用的選舉leader的方式是“少數(shù)服從多數(shù)”,在進行數(shù)據(jù)的復(fù)制過程中,存在多個follower,并且每個follower的數(shù)據(jù)速度都不相同,當(dāng)leader宕機后,當(dāng)前的follower上誰的數(shù)據(jù)最多誰就是leader。

二、部署單機Kafka

1)部署Kafka

Kafka服務(wù)默認需要依賴Java環(huán)境,本次使用Centos 7系統(tǒng),默認已經(jīng)具備Java的環(huán)境。

Kafka群集所需軟件下載鏈接:https://pan.baidu.com/s/1VHUH-WsptDB2wOugxvfKVg
提取碼:47x7

[root@kafka ~]# tar zxf kafka_2.11-2.2.1.tgz 
[root@kafka ~]# mv kafka_2.11-2.2.1 /usr/local/kafka
[root@kafka ~]# cd /usr/local/kafka/bin/
[root@kafka bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties &
#啟動zookeeper,啟動時需指定zookeeper的配置文件
#需添加“&”符號,表示后臺運行,否則會占用前臺的終端
[root@kafka bin]#  ./kafka-server-start.sh ../config/server.properties &
#啟動kafka,啟動方式與啟動zookeeper一樣
[root@kafka bin]# netstat -anpt | grep 9092
#kafka的監(jiān)聽端口為9092,確認端口在監(jiān)聽的狀態(tài)

由于kafka是通過zookeeper來調(diào)度的,所以,即使是單機kafka也需要啟動zookeeper服務(wù),kafka的安裝目錄下是默認集成了zookeeper的,直接啟動即可。

2)測試Kafka

[root@kafka bin]# ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
#在本機創(chuàng)建一個topic,名稱為test,副本數(shù)量為1,分區(qū)數(shù)量為1
[root@kafka bin]# ./kafka-topics.sh --list --bootstrap-server localhost:9092
#查看本機的topic
[root@kafka bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
#向創(chuàng)建的topic中添加消息,消息內(nèi)容自定義
>aaaa
>bbbb
>cccc
[root@kafka bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
aaaa
bbbb
cccc
#開啟新的終端,進行讀取消息測試,“--from-beginning”表示從開頭讀取

測試成功!

三、部署Kafka群集

1)環(huán)境準(zhǔn)備

系統(tǒng)版本 主機名 IP地址 運行的服務(wù)
Centos 7kafka01192.168.1.6Kafka+zookeeper
Centos 7kafka02192.168.1.7Kafka+zookeeper
Centos 7kafka03192.168.1.8Kafka+zookeeper

2)部署zookeeper群集

1)主機kafka01配置
[root@kafka01 ~]# tar zxf zookeeper-3.4.9.tar.gz 
[root@kafka01 ~]# mv zookeeper-3.4.9 /usr/local/zookeeper
#安裝zookeeper
[root@kafka01 ~]# cd /usr/local/zookeeper/conf
[root@kafka01 conf]# cp -p zoo_sample.cfg zoo.cfg 
[root@kafka01 conf]#  sed -i 's/dataDir=\/tmp\/zookeeper/dataDir=\/usr\/local\/zookeeper\/data/g' zoo.cfg
[root@kafka01 conf]#  echo "server.1 192.168.1.6:2888:3888" >> zoo.cfg 
[root@kafka01 conf]#  echo "server.2 192.168.1.7:2888:3888" >> zoo.cfg 
[root@kafka01 conf]#  echo "server.3 192.168.1.8:2888:3888" >> zoo.cfg  
[root@kafka01 conf]# egrep -v '^$|^#' zoo.cfg        #更改后的配置文件
tickTime=2000            #節(jié)點之間的心跳檢測時間單位為毫秒 
initLimit=10      #節(jié)點之間檢查失敗次數(shù)超過后斷開相應(yīng)的節(jié)點
syncLimit=5      #達到5個訪問進行同步數(shù)據(jù)
dataDir=/usr/local/zookeeper/data    #日志文件存放路徑
clientPort=2181              #監(jiān)聽端口
server.1 192.168.1.6:2888:3888
server.2 192.168.1.7:2888:3888
server.3 192.168.1.8:2888:3888
#聲明參與集群的主機,2888和3888端口用于群集內(nèi)部通信
[root@kafka01 conf]# mkdir /usr/local/zookeeper/data
[root@kafka01 conf]# echo 1 > /usr/local/zookeeper/data/myid
#創(chuàng)建所需目錄及設(shè)置節(jié)點的ID號
[root@kafka01 conf]# scp -r /usr/local/zookeeper/ root@192.168.1.7:/usr/local/
[root@kafka01 conf]# scp -r /usr/local/zookeeper/ root@192.168.1.8:/usr/local/
#將配置好的zookeeper目錄復(fù)制到群集內(nèi)的其他節(jié)點
[root@kafka01 conf]# /usr/local/zookeeper/bin/zkServer.sh start
[root@kafka01 conf]# netstat -anpt | grep 2181
#確認zookeeper服務(wù)端口正在監(jiān)聽
2)主機kafka02配置
[root@kafka02 ~]# echo 2 > /usr/local/zookeeper/data/myid 
#修改ID號為2
[root@kafka02 ~]# /usr/local/zookeeper/bin/zkServer.sh start
#啟動zookeeper
3)主機kafka03配置
[root@kafka03 ~]#  echo 3 > /usr/local/zookeeper/data/myid
#修改ID號為3
[root@kafka03 ~]#  /usr/local/zookeeper/bin/zkServer.sh start
#啟動zookeeper
4)查看zookeeper群集內(nèi)節(jié)點的角色
[root@kafka01 conf]#  /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower         #角色為follower
[root@kafka02 ~]#  /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower         #角色為follower
[root@kafka03 ~]#  /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader      #角色為leader

3)部署Kafka群集

1)主機kafka01配置
[root@kafka01 ~]# tar zxf kafka_2.11-2.2.1.tgz 
[root@kafka01 ~]# mv kafka_2.11-2.2.1 /usr/local/kafka
#安裝kafka
[root@kafka01 ~]# cd /usr/local/kafka/config/
[root@kafka01 config]# sed -i 's/broker.id=0/broker.id=1/g' server.properties 
[root@kafka01 config]# sed -i 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/192.168.1.6:9092/g' server.properties
[root@kafka01 config]# sed -i 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/192.168.1.6:9092/g' server.properties
[root@kafka01 config]# sed -i 's/log.dirs=\/tmp\/kafka-logs/log.dirs=\/usr\/local\/zookeeper\/data/g' server.properties
[root@kafka01 config]#  sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181/g' server.properties
[root@kafka01 config]#  sed -i 's/zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=600000/g' server.properties
#修改kafka的配置文件
[root@kafka01 config]# egrep -v '^$|^#' server.properties
#修改后的配置文件
broker.id=1     #kafka的ID號,這里為1,其他節(jié)點依次是2、3
listeners=PLAINTEXT://192.168.1.6:9092   #節(jié)點監(jiān)聽地址,填寫每個節(jié)點自己的IP地址
advertised.listeners=PLAINTEXT://192.168.1.6:9092    #集群中節(jié)點內(nèi)部交流使用的端口,填寫每個節(jié)點自己的IP地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/zookeeper/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.62:2181,192.168.1.7:2181,192.168.1.8:2181
#聲明鏈接zookeeper節(jié)點的地址
zookeeper.connection.timeout.ms=600000
#修改這的時間,單位是毫秒,為了防止連接zookeeper超時
group.initial.rebalance.delay.ms=0
[root@kafka01 config]# scp -r /usr/local/kafka/ root@192.168.1.7:/usr/local/
[root@kafka01 config]# scp -r /usr/local/kafka/ root@192.168.1.8:/usr/local/
#將修改后的kafka目錄發(fā)送至其他節(jié)點
[root@kafka01 bin]# ./kafka-server-start.sh ../config/server.properties &
#啟動kafka
[root@kafka01 bin]# netstat -anpt | grep 9092
確認端口在監(jiān)聽
2)主機kafka02配置
[root@kafka02 ~]# cd /usr/local/kafka/
[root@kafka02 kafka]#  sed -i 's/broker.id=1/broker.id=2/g' config/server.properties
[root@kafka02 kafka]# sed -i 's/192.168.1.6:9092/192.168.1.7:9092/g' config/server.properties 
#修改與kafka01沖突之處
[root@kafka02 kafka]# cd bin/
[root@kafka02 bin]#  ./kafka-server-start.sh ../config/server.properties &
#啟動kafka
[root@kafka02 bin]# netstat -anpt | grep 9092
#確認端口在監(jiān)聽
3)主機kafka03配置
[root@kafka03 ~]# cd /usr/local/kafka/
[root@kafka03 kafka]# sed -i 's/broker.id=1/broker.id=3/g' config/server.properties
[root@kafka03 kafka]# sed -i 's/192.168.1.6:9092/192.168.1.8:9092/g' config/server.properties
#修改與kafka01沖突之處
[root@kafka03 kafka]# cd bin/
[root@kafka03 bin]#  ./kafka-server-start.sh ../config/server.properties &
#啟動kafka
[root@kafka03 bin]# netstat -anpt | grep 9092
#確認端口在監(jiān)聽
4)發(fā)布和訂閱消息測試
[root@kafka01 bin]#  ./kafka-topics.sh --create --bootstrap-server 192.168.1.6:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
#創(chuàng)建名為my-replicated-topic的topic
[root@kafka01 bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.1.6:9092 --topic my-replicated-topic
#查看topic的狀態(tài)和leader
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
#返回的信息表示partition數(shù)量為1,副本數(shù)量為3,segment字節(jié)數(shù)為1073741824
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,3,Isr: 1,3,2
#名稱為“my-replicated-topic”,ID為1的節(jié)點為leader
[root@kafka01 bin]#  ./kafka-console-producer.sh --broker-list 192.168.1.6:9092 --topic my-replicated-topic
#向名為my-replicated-topic的topic 中插入數(shù)據(jù)進行測試
>aaaaaaaa
>bbbbbbbbbbbbbbb
>ccccccccccccccccccccccc
[root@kafka02 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --from-beginning --topic my-replicated-topic
#在其他節(jié)點上查看插入的數(shù)據(jù)
aaaaaaaa
bbbbbbbbbbbbbbb
ccccccccccccccccccccccc
5)模擬leader宕機,查看topic的狀態(tài)及新的leader
[root@kafka01 bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.1.6:9092 --topic my-replicated-topic
#查看名為my-replicated-topic的topic狀態(tài)
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,3,Isr: 1,3,2
#可以看出ID號為1的節(jié)點是leader
[root@kafka01 bin]# ./kafka-server-stop.sh
#ID號為1的節(jié)點停止kafka服務(wù)
[root@kafka02 bin]#  ./kafka-topics.sh --describe --bootstrap-server 192.168.1.7:9092 --topic my-replicated-topic
#在第二個節(jié)點上查看topic狀態(tài)
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
    Topic: my-replicated-topic  Partition: 0    Leader: 3   Replicas: 1,3,Isr: 3,2
#可以看出leader換成了ID為3的節(jié)點

———————— 本文至此結(jié)束,感謝閱讀 ————————

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


網(wǎng)頁標(biāo)題:Kafka原理及Kafka群集部署-創(chuàng)新互聯(lián)
本文地址:http://weahome.cn/article/ccdghh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部