關(guān)于kafka的工作機(jī)制,已經(jīng)在上篇博文:Kafka原理及單機(jī)部署中詳細(xì)寫出來,這里只是將kafka的一個(gè)群集部署寫了出來。
淄博ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書合作)期待與您的合作!博文大綱:
一、環(huán)境準(zhǔn)備
二、部署zookeeper服務(wù)
三、部署kafka集群
部署kafka群集所需的安裝包,可以從我的網(wǎng)盤鏈接中下載。
#部署zookeeper
[root@kafka01 src]# tar zxf zookeeper-3.4.9.tar.gz
[root@kafka01 src]# mv zookeeper-3.4.9 /usr/local/zookeeper
#修改配置文件
[root@kafka01 src]# cd /usr/local/zookeeper/conf/
[root@kafka01 conf]# cp -p zoo_sample.cfg zoo.cfg
[root@kafka01 conf]# sed -i 's/dataDir=\/tmp\/zookeeper/dataDir=\/usr\/local\/zookeeper\/data/g' zoo.cfg
#直接群集節(jié)點(diǎn)信息,2888和3888端口用于群集內(nèi)部通信
[root@kafka01 conf]# echo "server.1 192.168.20.2:2888:3888" >> zoo.cfg
[root@kafka01 conf]# echo "server.2 192.168.20.3:2888:3888" >> zoo.cfg
[root@kafka01 conf]# echo "server.3 192.168.20.4:2888:3888" >> zoo.cfg
[root@kafka01 conf]# egrep -v '^$|^#' zoo.cfg #更改后的配置文件如下
tickTime=2000 #節(jié)點(diǎn)之間的心跳檢測(cè)時(shí)間單位為毫秒
initLimit=10 #達(dá)到5個(gè)訪問進(jìn)行同步數(shù)據(jù)
syncLimit=5 #節(jié)點(diǎn)之間檢查失敗次數(shù)超過后斷開相應(yīng)的節(jié)點(diǎn)
dataDir=/usr/local/zookeeper/data #日志文件存放路徑
clientPort=2181
#聲明參與集群的主機(jī)
server.1 192.168.20.2:2888:3888
server.2 192.168.20.3:2888:3888
server.3 192.168.20.4:2888:3888
#創(chuàng)建所需目錄及設(shè)置節(jié)點(diǎn)的ID號(hào)
[root@kafka01 conf]# mkdir /usr/local/zookeeper/data
[root@kafka01 conf]# echo 1 > /usr/local/zookeeper/data/myid
#將配置好的zookeeper目錄復(fù)制到群集內(nèi)的其他節(jié)點(diǎn)
[root@kafka01 conf]# scp -r /usr/local/zookeeper/ root@192.168.20.3:/usr/local/
[root@kafka01 conf]# scp -r /usr/local/zookeeper/ root@192.168.20.4:/usr/local/
#啟動(dòng)zookeeper服務(wù)
[root@kafka01 conf]# /usr/local/zookeeper/bin/zkServer.sh start
[root@kafka01 bin]# netstat -antp | egrep '2181|2888|3888' #確認(rèn)群集端口在監(jiān)聽
#修改ID號(hào)為2
[root@kafka02 ~]# echo 2 > /usr/local/zookeeper/data/myid
#啟動(dòng)zookeeper
[root@kafka02 ~]# /usr/local/zookeeper/bin/zkServer.sh start
#修改ID號(hào)為3
[root@kafka03 ~]# echo 3 > /usr/local/zookeeper/data/myid
#啟動(dòng)zookeeper
[root@kafka03 ~]# /usr/local/zookeeper/bin/zkServer.sh start
#kafka01上如下:
[root@kafka01 conf]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower #角色為follower
#kafka02上如下:
[root@kafka02 ~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader #角色為leader
#kafka03上如下:
[root@kafka03 ~]# /usr/local/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower #角色為follower
#解壓至指定目錄
[root@kafka01 src]# tar zxf kafka_2.11-2.2.1.tgz
[root@kafka01 src]# mv kafka_2.11-2.2.1 /usr/local/kafka
#修改配置文件
[root@kafka01 src]# cd /usr/local/kafka/config/
[root@kafka01 config]# sed -i 's/broker.id=0/broker.id=1/g' server.properties
[root@kafka01 config]# sed -i 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/192.168.20.2:9092/g' server.properties
[root@kafka01 config]# sed -i 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/192.168.20.2:9092/g' server.properties
[root@kafka01 config]# sed -i 's/log.dirs=\/tmp\/kafka-logs/log.dirs=\/usr\/local\/zookeeper\/data/g' server.properties
[root@kafka01 config]# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181/g' server.properties
[root@kafka01 config]# sed -i 's/zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=600000/g' server.properties
[root@kafka01 config]# egrep -v '^$|^#' server.properties #修改后的配置文件如下
broker.id=1 #kafka的ID號(hào),這里為1,其他節(jié)點(diǎn)依次是2、3
listeners=PLAINTEXT://192.168.20.2:9092 #節(jié)點(diǎn)監(jiān)聽地址,填寫每個(gè)節(jié)點(diǎn)自己的IP地址
advertised.listeners=PLAINTEXT://192.168.20.2:9092
#集群中節(jié)點(diǎn)內(nèi)部交流使用的端口,填寫每個(gè)節(jié)點(diǎn)自己的IP地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/zookeeper/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#聲明鏈接zookeeper節(jié)點(diǎn)的地址
zookeeper.connect=192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181
zookeeper.connection.timeout.ms=600000 #修改這的時(shí)間,單位是毫秒,為了防止連接zookeeper超時(shí)
group.initial.rebalance.delay.ms=0
#將修改后的kafka目錄發(fā)送至其他節(jié)點(diǎn)
[root@kafka01 config]# scp -r /usr/local/kafka root@192.168.20.3:/usr/local/
[root@kafka01 config]# scp -r /usr/local/kafka root@192.168.20.4:/usr/local/
#啟動(dòng)kafka
[root@kafka01 config]# cd ../bin/
[root@kafka01 bin]# ./kafka-server-start.sh ../config/server.properties &
#修改與kafka01沖突之處
[root@kafka02 ~]# cd /usr/local/kafka/
[root@kafka02 kafka]# sed -i 's/192.168.20.2/192.168.20.3/g' config/server.properties
[root@kafka02 kafka]# sed -i 's/broker.id=1/broker.id=2/g' config/server.properties
#啟動(dòng)kafka服務(wù)
[root@kafka02 kafka]# cd bin/
[root@kafka02 bin]# ./kafka-server-start.sh ../config/server.properties &
[root@kafka02 bin]# netstat -anpt | grep 9092 #確定端口在監(jiān)聽
#修改kafka配置文件中沖突之處
[root@kafka03 ~]# cd /usr/local/kafka/
[root@kafka03 kafka]# sed -i 's/192.168.20.2/192.168.20.4/g' config/server.properties
[root@kafka03 kafka]# sed -i 's/broker.id=1/broker.id=3/g' config/server.properties
#啟動(dòng)kafka服務(wù)
[root@kafka03 kafka]# cd bin/
[root@kafka03 bin]# ./kafka-server-start.sh ../config/server.properties &
[root@kafka03 bin]# netstat -anpt | grep 9092 #確定端口在監(jiān)聽
#創(chuàng)建名為my-replicated-topic的topic
[root@kafka01 bin]# ./kafka-topics.sh --create --bootstrap-server 192.168.20.2:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
#查看topic的狀態(tài)和leader
[root@kafka01 bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.20.2:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
#返回的信息表示partition數(shù)量為1,副本數(shù)量為3,segment字節(jié)數(shù)為1073741824
#名稱為“my-replicated-topic”,ID為2的節(jié)點(diǎn)為leader
[root@kafka01 bin]# ./kafka-console-producer.sh --broker-list 192.168.20.2:9092 --topic my-replicated-topic
#隨便寫入幾行數(shù)據(jù)
>aaaaa
>bbbbb
>ccccc
>ddddd
#在其他節(jié)點(diǎn)上訂閱消息
[root@kafka02 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.20.3:9092 --from-beginning --topic my-replicated-topic
................#省略部分內(nèi)容
aaaaa
bbbbb
ccccc
ddddd
#可以看到當(dāng)前l(fā)eader是ID為2的節(jié)點(diǎn)
[root@kafka01 bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.20.2:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
#到kafka02主機(jī)上停止kafka服務(wù)
[root@kafka02 bin]# ./kafka-server-stop.sh
#再次查看leader是哪個(gè)節(jié)點(diǎn)?(可以發(fā)現(xiàn)leader換成了ID為1的節(jié)點(diǎn))
[root@kafka01 bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.20.2:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3
———————— 本文至此結(jié)束,感謝閱讀 ————————
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。