Kafka的使用和錯(cuò)誤解決
創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),巨野企業(yè)網(wǎng)站建設(shè),巨野品牌網(wǎng)站建設(shè),網(wǎng)站定制,巨野網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,巨野網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。一、下載kafka解壓縮:配置環(huán)境變量
vim /etc/profile
export KAFKA_HOME=/root/kafka_2.11-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
二 、kafka中需要使用zookeeper
(一)使用kafka自帶的zookeeper
先將zookeeper啟動(dòng),如果在偽分布式下,kafka已經(jīng)集成了zk,在kafka中的config目錄下。
可以編輯config/zookeeper.properties修改zookeeper的端口號(hào)。
后臺(tái)啟動(dòng)zookeeper:[root@mail bin]# nohup zookeeper-server-start.sh ../config/zookeeper.properties &
`[root@mail bin]# nohup kafka-server-start.sh ../config/server.properties &`
3.測(cè)試:模擬消息的消費(fèi)和生產(chǎn)
(1)創(chuàng)建主題
[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Created topic "KafkaTestTopic".
(2)創(chuàng)建生產(chǎn)者
[root@mail bin]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092
查看server.properties中的#listeners=PLAINTEXT://:9092,獲取kafka的端口
(3)創(chuàng)建消費(fèi)者
[root@mail bin]# kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2281
(二)使用非kafka自帶的zookeeper
使用zookeeper(非kafka自帶)
[root@mail zookeeper-3.4.10]# bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
(1) 創(chuàng)建主題
[root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic secondTopic --partitions 1 --replication-factor 1
Created topic "secondTopic".
(2)kafka啟動(dòng)
[root@mail kafka_2.11-1.0.0]# nohup bin/kafka-server-start.sh config/server.properties &
(3)kafka生產(chǎn)者
[root@mail kafka_2.11-1.0.0]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092
(4)kafka消費(fèi)者
[root@mail kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
(5)查看kafka中的數(shù)據(jù)
[root@mail kafka_2.11-1.0.0]# ls
bin config libs LICENSE logs logs-kafka nohup.out NOTICE site-docs
[root@mail kafka_2.11-1.0.0]# cd logs-kafka/ #kafka中的數(shù)據(jù)存儲(chǔ)目錄
##這個(gè)目錄是在kafka的config/server.properties文件中進(jìn)行配置的
log.dirs=/root/kafka/kafka_2.11-1.0.0/logs-kafka
[root@mail logs-kafka]# ls #查看kafka中的主題
cleaner-offset-checkpoint __consumer_offsets-20 __consumer_offsets-33 __consumer_offsets-46 kafka_test-0
__consumer_offsets-0 __consumer_offsets-21 __consumer_offsets-34 __consumer_offsets-47 KafkaTestTopic-0
__consumer_offsets-1 __consumer_offsets-22 __consumer_offsets-35 __consumer_offsets-48 log-start-offset-checkpoint
__consumer_offsets-10 __consumer_offsets-23 __consumer_offsets-36 __consumer_offsets-49 meta.properties
__consumer_offsets-11 __consumer_offsets-24 __consumer_offsets-37 __consumer_offsets-5 My_LOVE_TOPIC-0
__consumer_offsets-12 __consumer_offsets-25 __consumer_offsets-38 __consumer_offsets-6 mytopic-0
__consumer_offsets-13 __consumer_offsets-26 __consumer_offsets-39 __consumer_offsets-7 recovery-point-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-27 __consumer_offsets-4 __consumer_offsets-8 replication-offset-checkpoint
__consumer_offsets-15 __consumer_offsets-28 __consumer_offsets-40 __consumer_offsets-9 stock-quotation-0
__consumer_offsets-16 __consumer_offsets-29 __consumer_offsets-41 hello-0 stock-quotation-avro-0
__consumer_offsets-17 __consumer_offsets-3 __consumer_offsets-42 hello-1 stock-quotation-partition-0
__consumer_offsets-18 __consumer_offsets-30 __consumer_offsets-43 hello-2 TEST-TOPIC-0
__consumer_offsets-19 __consumer_offsets-31 __consumer_offsets-44 hello-3
__consumer_offsets-2 __consumer_offsets-32 __consumer_offsets-45 hello-4
[root@mail logs-kafka]# cd KafkaTestTopic-0/ #查看kakfa的主題為KafkaTestTopic的0號(hào)分區(qū)
[root@mail KafkaTestTopic-0]# ls
00000000000000000000.index 00000000000000000000.timeindex leader-epoch-checkpoint
00000000000000000000.log 00000000000000000063.snapshot
[root@mail KafkaTestTopic-0]# tail -f 000000000000000000.log #kafka中的數(shù)據(jù)存儲(chǔ)文件
(6)修改kafka的分區(qū)數(shù),觀察kafka的變化
## 修改kafka分區(qū)數(shù)
[root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic KafkaTestTopic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@mail kafka_2.11-1.0.0]# ls
bin config libs LICENSE logs logs-kafka nohup.out NOTICE site-docs
[root@mail kafka_2.11-1.0.0]# cd logs-kafka/
#發(fā)現(xiàn)出現(xiàn)kakfa的主題為KafkaTestTopic的0號(hào)分區(qū),1號(hào)分區(qū),2號(hào)分區(qū),總共3個(gè)分區(qū)
[root@mail logs-kafka]# ls
cleaner-offset-checkpoint __consumer_offsets-20 __consumer_offsets-33 __consumer_offsets-46 kafka_test-0
__consumer_offsets-0 __consumer_offsets-21 __consumer_offsets-34 __consumer_offsets-47 KafkaTestTopic-0
__consumer_offsets-1 __consumer_offsets-22 __consumer_offsets-35 __consumer_offsets-48 KafkaTestTopic-1
__consumer_offsets-10 __consumer_offsets-23 __consumer_offsets-36 __consumer_offsets-49 KafkaTestTopic-2
__consumer_offsets-11 __consumer_offsets-24 __consumer_offsets-37 __consumer_offsets-5 log-start-offset-checkpoint
__consumer_offsets-12 __consumer_offsets-25 __consumer_offsets-38 __consumer_offsets-6 meta.properties
__consumer_offsets-13 __consumer_offsets-26 __consumer_offsets-39 __consumer_offsets-7 My_LOVE_TOPIC-0
__consumer_offsets-14 __consumer_offsets-27 __consumer_offsets-4 __consumer_offsets-8 mytopic-0
__consumer_offsets-15 __consumer_offsets-28 __consumer_offsets-40 __consumer_offsets-9 recovery-point-offset-checkpoint
__consumer_offsets-16 __consumer_offsets-29 __consumer_offsets-41 hello-0 replication-offset-checkpoint
__consumer_offsets-17 __consumer_offsets-3 __consumer_offsets-42 hello-1 stock-quotation-0
__consumer_offsets-18 __consumer_offsets-30 __consumer_offsets-43 hello-2 stock-quotation-avro-0
__consumer_offsets-19 __consumer_offsets-31 __consumer_offsets-44 hello-3 stock-quotation-partition-0
__consumer_offsets-2 __consumer_offsets-32 __consumer_offsets-45 hello-4 TEST-TOPIC-0
[root@mail KafkaTestTopic-1]# ls #查看kakfa的主題為KafkaTestTopic的1號(hào)分區(qū)
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint
[root@mail KafkaTestTopic-1]# tail -f 00000000000000000000.log
三、可能出現(xiàn)的錯(cuò)誤:
(1)
[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
[2018-11-20 16:44:16,269] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
(kafka.admin.TopicCommand$)
解決:修改server.properties中的:zookeeper.connect=localhost:2281,讓2281端口號(hào)和zookeeper.properties中的zookeeper端口號(hào)一致,然后重啟kafka。**
(2)
kafka.common.KafkaException: fetching topic metadata for topics [Set(KafkaTestTopic)] from broker [ArrayBuffer(BrokerEndPoint(0,123.125.50.7,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
(3)
[2018-11-20 17:28:53,411] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 52 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,513] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 53 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,617] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 54 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,721] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 55 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解決(2)和(3)的錯(cuò)誤:
修改server.properties中的
I、listeners=PLAINTEXT://localhost:9092
II、 advertised.listeners=PLAINTEXT://localhost:9092
(4) [2018-11-29 09:44:35,275] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
解決:可能的原因:kafka未啟動(dòng),重啟啟動(dòng)kafka。
kafka中查看zookeeper狀態(tài):
bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"
(5)Failed to find leader for Set(KafkaTestTopic-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
解決:修改kafka配置文件
[root@mail config]# vim server.properties
修改:advertised.host.name=正確的IP地址
四、Kafka相關(guān)操作
(1)查看有哪些主題
[root@mail ~]# kafka-topics.sh --describe --zookeeper localhost:2281
Topic:KafkaTestTopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
在kafka中每個(gè)分區(qū)都有一個(gè)編號(hào),從0開始;
在kafka中如果有多個(gè)副本的話,就會(huì)存在leader與follower的關(guān)系;Leader表示領(lǐng)導(dǎo),Leader:0表示當(dāng)前這個(gè)副本為leader所在的broker是哪一個(gè)。
(2)只看主題名稱
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281
KafkaTestTopic
(3)查看指定主題的信息
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic
(4)查看指定的topic是否存在
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 --topic KafkaTestTopic
或
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 | grep KafkaTestTopic
(5) 修改主題的分區(qū)數(shù)
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic
Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
(6)修改配置項(xiàng)
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --config flush.messages=1
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic
Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs:flush.messages=1
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
(7)刪除配置項(xiàng)
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --delete-config flush.messages
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "KafkaTestTopic".
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic
Topic:KafkaTestTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: KafkaTestTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: KafkaTestTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
(8)刪除主題
[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --delete --topic KafkaTestTopic
注意:刪除只是標(biāo)記刪除;當(dāng)服務(wù)器重啟就會(huì)刪除已標(biāo)記的topic,這個(gè)和kafka的版本有關(guān)。
五、Java中Kakfa配置
(1).Java中使用kafka的Producer端配置
Properties props=new Properties();
// Kafka服務(wù)端的主機(jī)名和端口號(hào),多個(gè)的話,使用逗號(hào)分隔
props.put("bootstrap.servers","ip:9092");
// 等待所有副本節(jié)點(diǎn)的應(yīng)答
props.put("acks", "all");
// 消息發(fā)送大嘗試次數(shù)
props.put("retries",0);
// 一批消息處理大小
props.put("batch.size","16384");
// 請(qǐng)求延時(shí)
props.put("linger.ms",1);
// 發(fā)送緩存區(qū)內(nèi)存大小
props.put("buffer.memory", 33554430);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
(2).Java中使用kafka的Consumer端配置
Properties props=new Properties();
// 定義kakfa 服務(wù)的地址,不需要將所有broker指定上
props.put("bootstrap.servers","ip:9092");
// 制定consumer group
props.put("group.id","test1");
// 是否自動(dòng)確認(rèn)offset
props.put("enable.auto.commit", "true");
// 自動(dòng)確認(rèn)offset的時(shí)間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。