【kafka cluster機器】:
機器名稱 用戶名稱
sht-sgmhadoopdn-01/02/03 root
成都創(chuàng)新互聯(lián)主營瓊結(jié)網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都App制作,瓊結(jié)h5微信小程序搭建,瓊結(jié)網(wǎng)站營銷推廣歡迎瓊結(jié)等地區(qū)企業(yè)咨詢
【安裝目錄】: /root/learnproject/app
[root@sht-sgmhadoopnn-01 app]# scp -r scala root@sht-sgmhadoopdn-01:/root/learnproject/app/
[root@sht-sgmhadoopnn-01 app]# scp -r scala root@sht-sgmhadoopdn-02:/root/learnproject/app/
[root@sht-sgmhadoopnn-01 app]# scp -r scala root@sht-sgmhadoopdn-03:/root/learnproject/app/
#環(huán)境變量
[root@sht-sgmhadoopdn-01 app]# vi /etc/profile
export SCALA_HOME=/root/learnproject/app/scala
export PATH=$SCALA_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$JAVA_HOME/bin:$PATH
[root@sht-sgmhadoopdn-02 app]# vi /etc/profile
export SCALA_HOME=/root/learnproject/app/scala
export PATH=$SCALA_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$JAVA_HOME/bin:$PATH
[root@sht-sgmhadoopdn-02 app]# vi /etc/profile
export SCALA_HOME=/root/learnproject/app/scala
export PATH=$SCALA_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$JAVA_HOME/bin:$PATH
[root@sht-sgmhadoopdn-01 app]# source /etc/profile
[root@sht-sgmhadoopdn-02 app]# source /etc/profile
[root@sht-sgmhadoopdn-03 app]# source /etc/profile
[root@sht-sgmhadoopdn-01 app]# pwd
/root/learnproject/app
[root@sht-sgmhadoopdn-01 app]# wget http://www-eu.apache.org/dist/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz
[root@sht-sgmhadoopdn-01 app]# tar xzvf kafka_2.11-0.10.1.0.tgz
[root@sht-sgmhadoopdn-01 app]# mv kafka_2.11-0.10.1.0 kafka
[root@sht-sgmhadoopdn-01 app]# cd kafka
[root@sht-sgmhadoopdn-01 kafka]# mkdir logs
[root@sht-sgmhadoopdn-01 kafka]# cd config/
[root@sht-sgmhadoopdn-01 config]# vi server.properties
broker.id=1
port=9092
host.name=172.16.101.58
log.dirs=/root/learnproject/app/kafka/logs
zookeeper.connect=172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka
[root@sht-sgmhadoopdn-01 app]# scp -r kafka sht-sgmhadoopdn-03:/root/learnproject/app/
[root@sht-sgmhadoopdn-01 app]# scp -r kafka sht-sgmhadoopdn-03:/root/learnproject/app/
[root@sht-sgmhadoopdn-02 config]# vi server.properties
broker.id=2
port=9092
host.name=172.16.101.59
[root@sht-sgmhadoopdn-03 config]# vi server.properties
broker.id=3
port=9092
host.name=172.16.101.60
[root@sht-sgmhadoopdn-01 kafka]# vi /etc/profile
export KAFKA_HOME=/root/learnproject/app/kafka
export PATH=$KAFKA_HOME/bin:$SCALA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH
[root@sht-sgmhadoopdn-01 kafka]# scp /etc/profile sht-sgmhadoopdn-02:/etc/profile
[root@sht-sgmhadoopdn-01 kafka]# scp /etc/profile sht-sgmhadoopdn-03:/etc/profile
[root@sht-sgmhadoopdn-01 kafka]#
[root@sht-sgmhadoopdn-01 kafka]# source /etc/profile
[root@sht-sgmhadoopdn-02 kafka]# source /etc/profile
[root@sht-sgmhadoopdn-03 kafka]# source /etc/profile
[root@sht-sgmhadoopdn-01 kafka]# nohup kafka-server-start.sh config/server.properties &
[root@sht-sgmhadoopdn-02 kafka]# nohup kafka-server-start.sh config/server.properties &
[root@sht-sgmhadoopdn-03 kafka]# nohup kafka-server-start.sh config/server.properties &
###停止
bin/kafka-server-stop.sh
a.創(chuàng)建topic,如能成功創(chuàng)建topic則表示集群安裝完成,也可以用jps命令查看kafka進程是否存在。
[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-topics.sh --create --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --replication-factor 3 --partitions 1 --topic test
b.通過list命令查看創(chuàng)建的topic:
[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-topics.sh --list --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka
c.查看創(chuàng)建的Topic
[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-topics.sh --describe --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --topic test
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
[root@sht-sgmhadoopdn-01 kafka]#
第一行列出了這個topic的總體情況,如topic名稱,分區(qū)數(shù)量,副本數(shù)量等。
第二行開始,每一行列出了一個分區(qū)的信息,如它是第幾個分區(qū),這個分區(qū)的leader是哪個broker,副本位于哪些broker,有哪些副本處理同步狀態(tài)。
Partition: 分區(qū)
Leader : 負(fù)責(zé)讀寫指定分區(qū)的節(jié)點
Replicas : 復(fù)制該分區(qū)log的節(jié)點列表
Isr : “in-sync” replicas,當(dāng)前活躍的副本列表(是一個子集),并且可能成為Leader
我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示如果發(fā)布消息、消費消息。
d.刪除topic
[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-topics.sh --delete --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --topic test
e.修改topic
使用—-alert原則上可以修改任何配置,以下列出了一些常用的修改選項:
(1)改變分區(qū)數(shù)量
[root@sht-sgmhadoopdn-02 kafka]#bin/kafka-topics.sh --alter --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --topic test --partitions 3
[root@sht-sgmhadoopdn-02 kafka]# bin/kafka-topics.sh --describe --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
[root@sht-sgmhadoopdn-02 kafka]#
(2)增加、修改或者刪除一個配置參數(shù)
bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --config key=value
bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --deleteConfig key
在一個終端,啟動Producer,并向我們上面創(chuàng)建的名稱為my-replicated-topic5的Topic中生產(chǎn)消息,執(zhí)行如下腳本:
[root@sht-sgmhadoopdn-01 kafka]# bin/kafka-console-producer.sh --broker-list 172.16.101.58:9092,172.16.101.59:9092,172.16.101.60:9092 --topic test
在另一個終端,啟動Consumer,并訂閱我們上面創(chuàng)建的名稱為my-replicated-topic5的Topic中生產(chǎn)的消息,執(zhí)行如下腳本:
[root@sht-sgmhadoopdn-02 kafka]# bin/kafka-console-consumer.sh --zookeeper 172.16.101.58:2181,172.16.101.59:2181,172.16.101.60:2181/kafka --from-beginning --topic test
可以在Producer終端上輸入字符串消息行,就可以在Consumer終端上看到消費者消費的消息內(nèi)容。