真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網站制作重慶分公司

消息隊列之kafka(集群搭建及shell操作)

1.kafka集群搭建

kafka安裝包下載地址:
官網網址:http://kafka.apache.org/quickstart
中文官網:http://kafka.apachecn.org/quickstart.html
在 windows 平臺,從官網下載:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/
在 centos 平臺:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

創(chuàng)新互聯(lián)建站長期為1000多家客戶提供的網站建設服務,團隊從業(yè)經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網生態(tài)環(huán)境。為大理州企業(yè)提供專業(yè)的成都做網站、成都網站制作、成都外貿網站建設,大理州網站改版等技術服務。擁有十載豐富建站經驗和眾多成功案例,為您定制開發(fā)。

(1)集群部署的基礎環(huán)境準備:

?A: 安裝 JDK 1.8
A: 安裝 zookeeper 集群(也可以使用自帶 ZooKeeper,但是不推薦)

(2)集體搭建:

版本:kafka_2.11-1.1.0
集群規(guī)劃:hadoop01、hadoop02、hadoop03 (三個節(jié)點)
① 解壓安裝包到對應的目錄
tar zxvfkafka_2.11-1.1.0.tgz -C /application/
② 修改配置文件
[hadoop@hadoop01 ~]$ cd /application/kafka_2.11-1.1.0/config/
[hadoop@hadoop01 ~]$ vim server.properties
?broker.id=5## 當前集群中的每個 broker 節(jié)點的一個唯一編號,每個節(jié)點都不一樣
消息隊列之kafka(集群搭建及shell操作)

?listeners=PLAINTEXT://:9092
?listeners=PLAINTEXT://hadoop01:9092
?host.name=hadoop01## 每個節(jié)點指定為當前主機名,上面也是
消息隊列之kafka(集群搭建及shell操作)

?log.dirs=/home/hadoop/data/kafka-logs## kafkabroke工作節(jié)點數據存儲目錄
?num.partitions=1## kafka 的 topic 的默認分區(qū)數
消息隊列之kafka(集群搭建及shell操作)

?log.retention.hours=168## 日志的最長保存時間
消息隊列之kafka(集群搭建及shell操作)

?zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181## zookeeper 地址
消息隊列之kafka(集群搭建及shell操作)
③ 批量發(fā)送
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop02:$PWD
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop03:$PWD
千萬注意:要修改$KAFKA_HOME/config/server.properties 文件中的對應 broker 節(jié)點的信息

broker.id=your broker id 
host.name=your broker hostname 
advertised.listeners=PLAINTEXT:// your broker hostname:9092

④ 配置環(huán)境變量
[hadoop@hadoop01 application]$ sudo etc/profile
export KAFKA_HOME=/application/kafka_2.11-1.1.0
[hadoop@hadoop01 application]$source/etc/profile

⑤ 啟動集群,進行驗證(每一個節(jié)點都要啟動)

nohup kafka-server-start.sh \
/application/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \
2>~/logs/kafka_err.log &

2.kafka相關shell

(1)啟動集群每個節(jié)點的進程

nohup kafka-server-start.sh \
 /home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \ 2>~/logs/kafka_err.log &

(2) 創(chuàng)建 topic

kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafka_test
**參數介紹**
--create 創(chuàng)建 kafka topic
--zookeeper 指定 kafka 的 zookeeper 地址
--partitions 指定分區(qū)的個數
--replication-factor 指定每個分區(qū)的副本個數

(3) 查看已經創(chuàng)建的所有 kafka topic

kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--list \

(4) 查看某個指定的 kafka topic 的詳細信息

kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--describe \   #查看詳信息
--topic kafka_test  #指定需要查看的topic

消息隊列之kafka(集群搭建及shell操作)
Topic:topic的名稱
Partition:topic的分區(qū)編號
Leader:負責處理消息和讀寫,leader是從所有節(jié)點中隨機選出
Replicas:列出了所有的副本節(jié)點,不管節(jié)點是否在服務中。
isr:正在服務中的節(jié)點。
(5) 開啟生產者模擬生成數據:

kafka-console-producer.sh \
--broker-list hadoop01:9092 \  # broker的節(jié)點列表
--topic kafka_test

(6) 開啟消費者模擬消費數據:

kafka-console-consumer.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--from-beginning \  #從哪里開始消費
--topic kafka_test

(7) 查看某 topic 某個分區(qū)的偏移量最大值和最小值

kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--topic kafka_test \
--time -1 \
--broker-list hadoop01:9092 \
--partitions 1

(8) 增加 topic 分區(qū)數(這個操作是不被允許的)

kafka-topics.sh \
--alter \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \
--partitions 5 /
--replication-factor 2

(9) 刪除 Topic

kafka-topics.sh \
--delete \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \

3.kafka的consumer group 位移offset重設

   這一節(jié),說來也巧,最近接收的項目是準實時(OOG+kafka+stream),并同事也跟我聊了聊小編博客的建議,大體上說,就是博客寫的太過簡單,雖然通俗易懂,但是真正實際應用的地方并不多,建議我能加上一些復雜并且經常用到的東西;說白了就是懶的找資料,想看小編現(xiàn)成的;沒辦法,那就加上一些比較常用的。
  首先小編就介紹下,針對kafka通過group 消費了topic的數據后,如何自定義kafka數據消費的位置,之前的操作都是“--from-beginning”,每次都是從頭開始消費,如果消費語句為精準一次,那么該如何操作呢?
  這里通過如何使用Kafka自帶的kafka-consumer-groups.sh腳本隨意設置消費者組(consumer group)的位移。需要特別強調的是,這是2.11-0.11.0.0版本提供的新功能且只適用于新版本consumer。在新版本之前,如果要為已有的consumer group調整位移必須要手動編寫Java程序調用KafkaConsumer#seek方法,費時費力不說還容易出錯。0.11.0.0版本豐富了kafka-consumer-groups腳本的功能,用戶可以直接使用該腳本很方便地為已有的consumer group重新設置位移,但前提必須是consumer group必須是inactive的,即不能是處于正在工作中的狀態(tài)
  這里首先介紹一下重設位移的三個步驟:
    - 確定consumer group在topic下的作用域
      → --all-topics:為consumer group下所有topic的所有分區(qū)調整位移
      → --topic t1 --topic t2:為指定的若干個topic的所有分區(qū)調整位移
      → --topic t1:0,1,2:為指定的topic分區(qū)調整位移

    - 確定位移重設策略
      → --to-earliest:把位移調整到分區(qū)當前最小位移
      → --to-latest: 把位移調整到分區(qū)當前最新位移
      → --to-current:把位移調整到分區(qū)當前位移
      → --to-offset :把位移調整到指定位移處
      → --shift-by N:把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
      → --to-datetime :把位移調整到大于給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
      → --by-duration :把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,比如PT0H5M0S
    - 確定執(zhí)行方案
      → 什么參數都不加:只是打印出位移調整方案,不具體執(zhí)行
      → --execute:執(zhí)行真正的位移調整
      → -export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續(xù)直接使用

   具體案例演示:
#創(chuàng)建topic,并向其中生產數據

①   創(chuàng)建topic
kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 3 \
--topic 'test-group'

②   創(chuàng)建生產者,生產數據
kafka-producer-perf-test.sh \
--topic 'test-group' \
--num-records 500 \
--throughput -1 \
--record-size 100 \
--producer-props bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 acks=-1
③   啟動一個console consumer程序,組名設置為test-group:
kafka-console-consumer.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--topic 'test-group' \
--from-beginning \
--consumer-property group.id=test-group

④   查看當前消費者組消費topic 的細節(jié)
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--describe

由圖可知,當前消費者中將topic中的數據完全消費。 LAG表示剩余未消費的message。

#案例演示

## --to-earliest:將偏移量設置為partition開頭 
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-earliest \
--execute
## --to-latest:把位移調整到分區(qū)當前最新位移
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-latest \
--execute
## --to-offset:把位移調整到指定位移處
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-offset 100 \
--execute
    ## --to-current:把位移調整到分區(qū)當前位移
    kafka-consumer-groups.sh \
    --bootstrap-server  hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --to-current \
    --execute

    ## --shift-by N  把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
        kafka-consumer-groups.sh \
        --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
        --group test-group \
        --reset-offsets \
        --all-topics \
        --shift-by -100 \
        --execute
    ##  --to-datetime :將offset調整到大于XX日期最早的位移出
    kafka-consumer-groups.sh \
    --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --to-datetime 2019-07-31T03:40:33.000
    ## --by-duration  把位移調整到距離當前時間指定間隔的位移處
    kafka-consumer-groups.sh \
    --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --by-duration PT0H20M0S

標題名稱:消息隊列之kafka(集群搭建及shell操作)
轉載注明:http://weahome.cn/article/isgehi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部