真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

kafka集群環(huán)境搭建與管理

這一節(jié)主要講kafka 集群環(huán)境部署,

創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括滿城網(wǎng)站建設(shè)、滿城網(wǎng)站制作、滿城網(wǎng)頁(yè)制作以及滿城網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,滿城網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到滿城省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

kafka 基礎(chǔ)概念介紹與強(qiáng)化

1)Producer:消息生產(chǎn)者,就是向kafkabroker發(fā)消息的客戶端;
2)Consumer:消息消費(fèi)者,向kafkabroker取消息的客戶端;
3)Topic:可以理解為一個(gè)隊(duì)列;
4)Consumer Group(CG):這是kafka用來(lái)實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer)
和單播(發(fā)給任意一個(gè)consumer)的手段。一個(gè)topic可以有多個(gè)CG。topic的消息會(huì)復(fù)制
(不是真的復(fù)制,是概念上的)到所有的CG,但每個(gè)partion只會(huì)把消息發(fā)給該CG中的一
個(gè)consumer。如果需要實(shí)現(xiàn)廣播,只要每個(gè)consumer有一個(gè)獨(dú)立的CG就可以了。要實(shí)現(xiàn)
單播只要所有的consumer在同一個(gè)CG。用CG還可以將consumer進(jìn)行自由的分組而不需
要多次發(fā)送消息到不同的topic;

5)Broker:一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker
可以容納多個(gè)topic;

6)Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上,
一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息
都會(huì)被分配一個(gè)有序的id(offset)。kafka只保證按一個(gè)partition中的順序?qū)⑾l(fā)給
consumer,不保證一個(gè)topic的整體(多個(gè)partition間)的順序;

分區(qū)數(shù)越多,在一定程度上會(huì)提升消息處理的吞吐量,因?yàn)閗afka是基于文件進(jìn)行讀寫,因此也需要打開(kāi)更多的文件句柄,也會(huì)增加一定的性能開(kāi)銷。
如果分區(qū)過(guò)多,那么日志分段也會(huì)很多,寫的時(shí)候由于是批量寫,其實(shí)就會(huì)變成隨機(jī)寫了,隨機(jī) I/O 這個(gè)時(shí)候?qū)π阅苡绊懞艽?。所以一般?lái)說(shuō) Kafka 不能有太多的 Partition。

7)replication-factor
用來(lái)設(shè)置主題的副本數(shù)。每個(gè)主題可以有多個(gè)副本,副本位于集群中不同的broker上,也就是說(shuō)副本的數(shù)量不能超過(guò)broker的數(shù)量,否則創(chuàng)建主題時(shí)會(huì)失敗。

在創(chuàng)建Topic的時(shí)候,有兩個(gè)參數(shù)是需要填寫的,那就是partions和replication-factor。

8)Offset:kafka的存儲(chǔ)文件都是按照offset.kafka來(lái)命名,用offset做名字的好處是方便查

找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然thefirstoffset就
是00000000000.kafka。

集群環(huán)境搭建

  • zookeeper 集群環(huán)境搭建
  1. 下載軟件包

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

  1. 解壓縮

  2. 創(chuàng)建配置目錄,修改文件

[root@es7-1-80 conf]# cat zoo.cfg
initLimit=10
syncLimit=5
dataDir=/data/zk/data
clientPort=2181
maxClientCnxns=0
server.1=172.16.0.80:2888:3888
server.2=172.16.0.91:2888:3888
server.3=172.16.0.92:2888:3888

echo 1 >> =/data/zk/data/myid

  1. 啟動(dòng),制作標(biāo)準(zhǔn)啟動(dòng)服務(wù)

    cat /etc/systemd/system/zookeeper.service
    [Unit]
    Description=zookeeper.service
    After=network.target

[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target

其他節(jié)點(diǎn)按照上述方式配置,并依次啟動(dòng)

集群狀態(tài)查看

/usr/local/zookeeper/bin/zkServer.sh status

  • kafka 集群搭建
  1. 下載軟件包

curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz

  1. 解壓縮到所需位置在

  2. 創(chuàng)建配置目錄,修改文件

broker.id=0 #
num.inetwork.threads=3 #broker 處理消息的最大線程數(shù),一般情況下不需要去修改
num.io.threads=8 # # broker處理磁盤IO 的線程數(shù) ,數(shù)值應(yīng)該大于你的硬盤數(shù)
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka_logs

num.partitions=1 # 每個(gè)topic的分區(qū)個(gè)數(shù),若是在topic創(chuàng)建時(shí)候沒(méi)有指定的話 會(huì)被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.0.91:2181,172.16.0.80:2181,172.16.0.92:2181
zookeeper.connection.timeout.ms=6000

  1. 啟動(dòng),制作標(biāo)準(zhǔn)啟動(dòng)服務(wù)

[Unit]
Description=Apache Kafka server (broker)
After=network.target
After=syslog.target
After=zookeeper.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

ExecReload=/bin/kill -HUP $MAINPID
KillMode=none

Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target

5、將kafka 命令加入到環(huán)境變量中

export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

其他節(jié)點(diǎn)按照上述方式部署,修改broker.id

配置Kafka外網(wǎng)IP地址
kafka0.10以下
修改server.properties 配置文件
advertised.host.name=xxxx
advertised.port=9092
kafka0.10以上
listeners=PLAINTEXT://0.0.0.0:9093 //綁定所有ip
advertised.listeners=PLAINTEXT://42.159.7.75:9093

  • kafka 常用操作

    創(chuàng)建主題

    ./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test1 --replication-factor 1 --partitions 3

    在創(chuàng)建Topic的時(shí)候,有兩個(gè)參數(shù)是需要填寫的,那就是partions和replication-factor。

    刪除主題

./kafka-topics.sh --zookeeper 172.16.0.80:2181 --delete --topic test1

查看主題列表

./kafka-topics.sh --zookeeper 172.16.0.80:2181:2181 --list

查看主題狀態(tài)

kafka-topics.sh --zookeeper 172.16.0.80:2181 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test2 Partition: 2 Leader: 1 Replicas: 1 Isr: 1

修改主題分區(qū)

./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test3 --replication-factor 1 --partitions 1

 kafka-topics.sh --zookeeper 172.16.0.80:2181  --alter  --topic test3 --partitions 3

  kafka-topics.sh --zookeeper 172.16.0.80:2181  --describe  --topic test3

Topic:test3 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3 Partition: 2 Leader: 0 Replicas: 0 Isr: 0

生產(chǎn)數(shù)據(jù)

kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4

hi go
[2019-11-26 15:02:32,608] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,713] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,820] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,927] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien

解決方法
port=9092
listeners=PLAINTEXT://172.16.0.92:9092 # 修改為對(duì)應(yīng)的ip

重新生產(chǎn)數(shù)據(jù)

kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4

hello kafaka

kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning

消費(fèi)數(shù)據(jù)

kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning

列出消費(fèi)者主組

消費(fèi)組 即 Consumer Group,應(yīng)該算是 Kafka 比較有亮點(diǎn)的設(shè)計(jì)了。那么何謂 Consumer Group 呢?用一句話概括就是:Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。

/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --list

獲取新版本消費(fèi)者群組testgroup的詳細(xì)信息

/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --group testgroup --describe


本文標(biāo)題:kafka集群環(huán)境搭建與管理
網(wǎng)頁(yè)網(wǎng)址:http://weahome.cn/article/jicoco.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部