kafka集群是把狀態(tài)保存在zookeeper中的,首先要搭建zookeeper集群。
成都創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括潁泉網(wǎng)站建設(shè)、潁泉網(wǎng)站制作、潁泉網(wǎng)頁制作以及潁泉網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,潁泉網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到潁泉省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!wget http://xxxxx.oss-cn-xxxx.aliyuncs.com/xxxx/jdk-8u171-linux-x64.rpm
yum localinstall jdk-8u171-linux-x64.rpm -y
wget http://xxx-xx.oss-cn-xxx.aliyuncs.com/xxx/kafka_2.12-1.1.0.tgz
官網(wǎng)下載鏈接:http://kafka.apache.org/downloads
解壓kafka
tar -zxvf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 kafka
直接使用kafka自帶的zookeeper建立zk集群
cd /data/kafka
vim conf/zookeeper.properties
#tickTime:
這個(gè)時(shí)間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔,也就是每個(gè) tickTime 時(shí)間就會(huì)發(fā)送一個(gè)心跳。
#initLimit:
這個(gè)配置項(xiàng)是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務(wù)器的客戶端,而是 Zookeeper 服務(wù)器集群中連接到 Leader 的 Follower 服務(wù)器)初始化連接時(shí)最長能忍受多少個(gè)心跳時(shí)間間隔數(shù)。當(dāng)已經(jīng)超過 5個(gè)心跳的時(shí)間(也就是 tickTime)長度后 Zookeeper 服務(wù)器還沒有收到客戶端的返回信息,那么表明這個(gè)客戶端連接失敗??偟臅r(shí)間長度就是 5*2000=10 秒
#syncLimit:
這個(gè)配置項(xiàng)標(biāo)識(shí) Leader 與Follower 之間發(fā)送消息,請(qǐng)求和應(yīng)答時(shí)間長度,最長不能超過多少個(gè) tickTime 的時(shí)間長度,總的時(shí)間長度就是5*2000=10秒
#dataDir:
快照日志的存儲(chǔ)路徑
#dataLogDir:需手動(dòng)創(chuàng)建
事物日志的存儲(chǔ)路徑,如果不配置這個(gè)那么事物日志會(huì)默認(rèn)存儲(chǔ)到dataDir制定的目錄,這樣會(huì)嚴(yán)重影響zk的性能,當(dāng)zk吞吐量較大的時(shí)候,產(chǎn)生的事物日志、快照日志太多
#clientPort:
這個(gè)端口就是客戶端連接 Zookeeper 服務(wù)器的端口,Zookeeper 會(huì)監(jiān)聽這個(gè)端口,接受客戶端的訪問請(qǐng)求。
進(jìn)入dataDir目錄,將三臺(tái)服務(wù)器上的myid文件分別寫入1、2、3。
myid是zk集群用來發(fā)現(xiàn)彼此的標(biāo)識(shí),必須創(chuàng)建,且不能相同。
echo "1" > /data/kafka/zk/myid
echo "2" > /data/kafka/zk/myid
echo "3" > /data/kafka/zk/myid
zookeeper不會(huì)主動(dòng)的清除舊的快照和日志文件,需要定期清理。
#!/bin/bash
#snapshot file dir
dataDir=/data/kafka/zk/version-2
#tran log dir
dataLogDir=/data/kafka/log/zk/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上這個(gè)腳本定義了刪除對(duì)應(yīng)兩個(gè)目錄中的文件,保留最新的66個(gè)文件,可以將他寫到crontab中,設(shè)置為每天凌晨2點(diǎn)執(zhí)行一次就可以了。
進(jìn)入kafka目錄,執(zhí)行zookeeper命令
cd /data/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &
沒有報(bào)錯(cuò),而且jps查看有zk進(jìn)程就說明啟動(dòng)成功了。
vim conf/server.properties
部分參數(shù)含義:
先每臺(tái)設(shè)置host,listeners里要設(shè)置,否則后面消費(fèi)消息會(huì)報(bào)錯(cuò)。
broker.id 每臺(tái)都不能相同
num.network.threads 設(shè)置為cpu核數(shù)
num.partitions 分區(qū)數(shù)設(shè)置視情況而定,上面有講分區(qū)數(shù)設(shè)置
default.replication.factor kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)
nohup ./bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
執(zhí)行jps檢查
./bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 2 --partitions 1 --topic test1
--replication-factor 2 #復(fù)制兩份
--partitions 1 #創(chuàng)建1個(gè)分區(qū)
--topic #主題為test1
#模擬客戶端去發(fā)送消息,生產(chǎn)者
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test1
#模擬客戶端去接受消息,消費(fèi)者
./bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --from-beginning --topic test1
#然后在生產(chǎn)者處輸入任意內(nèi)容,在消費(fèi)端查看內(nèi)容。
./bin/kafka-topics.sh --list --zookeeper xxxx:2181
#顯示創(chuàng)建的所有topic
./bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic test1
#Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:
# Topic: test1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
#分區(qū)為為1 復(fù)制因子為2 他的 test1的分區(qū)為0
#Replicas: 0,1 復(fù)制的為0,1
修改配置文件server.properties添加如下配置:
delete.topic.enable=true
配置完重啟kafka、zookeeper。
如果不想修改配置文件可刪除topc及相關(guān)數(shù)據(jù)目錄
#刪除kafka topic
./bin/kafka-topics.sh --delete --zookeeper xxxx:2181,xxxx:2181 --topic test1
#刪除kafka相關(guān)數(shù)據(jù)目錄
rm -rf /data/kafka/log/kafka/test*
#刪除zookeeper相關(guān)路徑
rm -rf /data/kafka/zk/test*
rm -rf /data/kafka/log/zk/test*
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。