kafka安裝包下載地址:
官網網址:http://kafka.apache.org/quickstart
中文官網:http://kafka.apachecn.org/quickstart.html
在 windows 平臺,從官網下載:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/
在 centos 平臺:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
創(chuàng)新互聯(lián)建站長期為1000多家客戶提供的網站建設服務,團隊從業(yè)經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網生態(tài)環(huán)境。為大理州企業(yè)提供專業(yè)的成都做網站、成都網站制作、成都外貿網站建設,大理州網站改版等技術服務。擁有十載豐富建站經驗和眾多成功案例,為您定制開發(fā)。
?A: 安裝 JDK 1.8
A: 安裝 zookeeper 集群(也可以使用自帶 ZooKeeper,但是不推薦)
版本:kafka_2.11-1.1.0
集群規(guī)劃:hadoop01、hadoop02、hadoop03 (三個節(jié)點)
① 解壓安裝包到對應的目錄
tar zxvfkafka_2.11-1.1.0.tgz -C /application/
② 修改配置文件
[hadoop@hadoop01 ~]$ cd /application/kafka_2.11-1.1.0/config/
[hadoop@hadoop01 ~]$ vim server.properties
?broker.id=5## 當前集群中的每個 broker 節(jié)點的一個唯一編號,每個節(jié)點都不一樣
?listeners=PLAINTEXT://:9092
?listeners=PLAINTEXT://hadoop01:9092
?host.name=hadoop01## 每個節(jié)點指定為當前主機名,上面也是
?log.dirs=/home/hadoop/data/kafka-logs## kafkabroke工作節(jié)點數據存儲目錄
?num.partitions=1## kafka 的 topic 的默認分區(qū)數
?log.retention.hours=168## 日志的最長保存時間
?zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181## zookeeper 地址
③ 批量發(fā)送
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop02:$PWD
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop03:$PWD
千萬注意:要修改$KAFKA_HOME/config/server.properties 文件中的對應 broker 節(jié)點的信息
broker.id=your broker id
host.name=your broker hostname
advertised.listeners=PLAINTEXT:// your broker hostname:9092
④ 配置環(huán)境變量
[hadoop@hadoop01 application]$ sudo etc/profile
export KAFKA_HOME=/application/kafka_2.11-1.1.0
[hadoop@hadoop01 application]$source/etc/profile
⑤ 啟動集群,進行驗證(每一個節(jié)點都要啟動)
nohup kafka-server-start.sh \
/application/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \
2>~/logs/kafka_err.log &
(1)啟動集群每個節(jié)點的進程
nohup kafka-server-start.sh \
/home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \ 2>~/logs/kafka_err.log &
(2) 創(chuàng)建 topic
kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafka_test
**參數介紹**
--create 創(chuàng)建 kafka topic
--zookeeper 指定 kafka 的 zookeeper 地址
--partitions 指定分區(qū)的個數
--replication-factor 指定每個分區(qū)的副本個數
(3) 查看已經創(chuàng)建的所有 kafka topic
kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--list \
(4) 查看某個指定的 kafka topic 的詳細信息
kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--describe \ #查看詳信息
--topic kafka_test #指定需要查看的topic
Topic:topic的名稱
Partition:topic的分區(qū)編號
Leader:負責處理消息和讀寫,leader是從所有節(jié)點中隨機選出
Replicas:列出了所有的副本節(jié)點,不管節(jié)點是否在服務中。
isr:正在服務中的節(jié)點。
(5) 開啟生產者模擬生成數據:
kafka-console-producer.sh \
--broker-list hadoop01:9092 \ # broker的節(jié)點列表
--topic kafka_test
(6) 開啟消費者模擬消費數據:
kafka-console-consumer.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--from-beginning \ #從哪里開始消費
--topic kafka_test
(7) 查看某 topic 某個分區(qū)的偏移量最大值和最小值
kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--topic kafka_test \
--time -1 \
--broker-list hadoop01:9092 \
--partitions 1
(8) 增加 topic 分區(qū)數(這個操作是不被允許的)
kafka-topics.sh \
--alter \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \
--partitions 5 /
--replication-factor 2
(9) 刪除 Topic
kafka-topics.sh \
--delete \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \
這一節(jié),說來也巧,最近接收的項目是準實時(OOG+kafka+stream),并同事也跟我聊了聊小編博客的建議,大體上說,就是博客寫的太過簡單,雖然通俗易懂,但是真正實際應用的地方并不多,建議我能加上一些復雜并且經常用到的東西;說白了就是懶的找資料,想看小編現(xiàn)成的;沒辦法,那就加上一些比較常用的。
首先小編就介紹下,針對kafka通過group 消費了topic的數據后,如何自定義kafka數據消費的位置,之前的操作都是“--from-beginning”,每次都是從頭開始消費,如果消費語句為精準一次,那么該如何操作呢?
這里通過如何使用Kafka自帶的kafka-consumer-groups.sh腳本隨意設置消費者組(consumer group)的位移。需要特別強調的是,這是2.11-0.11.0.0版本提供的新功能且只適用于新版本consumer。在新版本之前,如果要為已有的consumer group調整位移必須要手動編寫Java程序調用KafkaConsumer#seek方法,費時費力不說還容易出錯。0.11.0.0版本豐富了kafka-consumer-groups腳本的功能,用戶可以直接使用該腳本很方便地為已有的consumer group重新設置位移,但前提必須是consumer group必須是inactive的,即不能是處于正在工作中的狀態(tài)。
這里首先介紹一下重設位移的三個步驟:
- 確定consumer group在topic下的作用域
→ --all-topics:為consumer group下所有topic的所有分區(qū)調整位移
→ --topic t1 --topic t2:為指定的若干個topic的所有分區(qū)調整位移
→ --topic t1:0,1,2:為指定的topic分區(qū)調整位移
- 確定位移重設策略
→ --to-earliest:把位移調整到分區(qū)當前最小位移
→ --to-latest: 把位移調整到分區(qū)當前最新位移
→ --to-current:把位移調整到分區(qū)當前位移
→ --to-offset
→ --shift-by N:把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
→ --to-datetime
→ --by-duration
- 確定執(zhí)行方案
→ 什么參數都不加:只是打印出位移調整方案,不具體執(zhí)行
→ --execute:執(zhí)行真正的位移調整
→ -export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續(xù)直接使用
具體案例演示:
#創(chuàng)建topic,并向其中生產數據
① 創(chuàng)建topic
kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 3 \
--topic 'test-group'
② 創(chuàng)建生產者,生產數據
kafka-producer-perf-test.sh \
--topic 'test-group' \
--num-records 500 \
--throughput -1 \
--record-size 100 \
--producer-props bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 acks=-1
③ 啟動一個console consumer程序,組名設置為test-group:
kafka-console-consumer.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--topic 'test-group' \
--from-beginning \
--consumer-property group.id=test-group
④ 查看當前消費者組消費topic 的細節(jié)
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--describe
由圖可知,當前消費者中將topic中的數據完全消費。 LAG表示剩余未消費的message。
#案例演示
## --to-earliest:將偏移量設置為partition開頭
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-earliest \
--execute
## --to-latest:把位移調整到分區(qū)當前最新位移
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-latest \
--execute
## --to-offset:把位移調整到指定位移處
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-offset 100 \
--execute
## --to-current:把位移調整到分區(qū)當前位移
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-current \
--execute
## --shift-by N 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--shift-by -100 \
--execute
## --to-datetime :將offset調整到大于XX日期最早的位移出
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-datetime 2019-07-31T03:40:33.000
## --by-duration 把位移調整到距離當前時間指定間隔的位移處
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--by-duration PT0H20M0S