真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Kafka集群配置SASL+ACL

                                 **  Kafka 集群配置SASL+ACL

測試環(huán)境:**

創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供興城網(wǎng)站建設(shè)、興城做網(wǎng)站、興城網(wǎng)站設(shè)計、興城網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、興城企業(yè)網(wǎng)站模板建站服務(wù),10年興城做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。

                    系統(tǒng): CentOS 6.5 x86_64
                    JDK : java version 1.8.0_121
                    kafka: kafka_2.11-1.0.0.tgz
                    zookeeper: 3.4.5
                    ip: 192.168.49.161 (我們這里在一臺機上部署整套環(huán)境)

kafka 名詞解析:

    Broker: Kafka 集群包含一個或多個服務(wù)器,這種服務(wù)器被稱為broker
    Topic: 每條發(fā)布到Kafka 集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic 的消息
    分開存儲,邏輯上一個Topic 的消息雖然保存于一個或多個broker 上但用戶只需指定消息的Topic 即可生產(chǎn)或
    消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
    Partition: Parition 是物理上的概念,每個Topic 包含一個或多個Partition
    Producer: 負責發(fā)布消息到Kafka broker 即生產(chǎn)者
    Consumer: 消息消費者,向Kafka broker 讀取消息的客戶端即消費者
    Consumer Group:每個Consumer 屬于一個特定的Consumer Group(可為每個Consumer 指定group
    name,若不指定group name 則屬于默認的group)

Kafka 拓撲結(jié)構(gòu)(借用別人的圖)
Kafka 集群配置SASL+ACL
一個典型的Kafka 集群中包含若干Producer(可以是web 前端產(chǎn)生的Page View,或者是服務(wù)器日志,系統(tǒng) CPU、Memory 等),若干broker(Kafka 支持水平擴展,一般broker 數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper 集群。Kafka 通過Zookeeper 管理集群配置,選舉leader,以及在Consumer Group 發(fā)生變化時進行rebalance。Producer 使用push 模式將消息發(fā)布到broker,Consumer 使用pull 模式從broker 訂閱并消費消息

kafka 集群部署
一 Zookeeper 集群部署(這里部署的偽集群)

  1. zookeeper 部署比較簡單,解壓修改配置文件后即可使用, 默認的配置文件為zoo_sample.cfg 這里我們直接新建一個zoo.cfg 的文件內(nèi)容如下:

各參數(shù)意義這里不作詳細解釋,想了解可以自行參考官網(wǎng)

                                        tickTime=2000
                                        initLimit=5
                                        syncLimit=2
                                        dataDir=/opt/zook/zookeeper1/data //指定數(shù)據(jù)路徑
                                        dataLogDir=/opt/zook/zookeeper1/logs //指定日志路徑
                                        clientPort=2181 //監(jiān)聽端口
                                        server.1=127.0.0.1:7771:7772
                                        server.2=127.0.0.1:7771:7772
                                        server.3=127.0.0.1:7771:7772
  2. 在zookeeper/data 下創(chuàng)建一個名為myid 的文件,并在文件內(nèi)輸入數(shù)字1
 3. 集群部署則將上述配置的單節(jié)點再復(fù)制兩份,并將配置文件中
                            ```
                            dataDir , dataLogDir 兩個參數(shù)根據(jù)實際路徑修改
                            clientPort 分別配為2182,2183
                            myid 分別配置為2 3
                            ```
  4. 啟動集群

二 Kafka 集群部署

 由于集群最終是對外部網(wǎng)絡(luò)開放的,出于安全考慮本文采用的是SASL+ACL 控制和授權(quán)

1. broker 配置
1.1. 將配置server.properties 復(fù)制2 份并分別重命名成server-1.properties server-2.properties server-3.properties 修改server.properties 配置:(請根據(jù)實際環(huán)境填寫)

broker.id = 1 //另外兩個節(jié)點分別為2 3
host.name=192.168.49.161
log.dirs=/tmp/kafka-logs-1 //另外兩個節(jié)點分別為kafka-logs-2 kafka-logs-3 可以不
放在/tmp 下,如果放在其他地方必須注意目錄權(quán)限
zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183
port=9092 //另外兩個節(jié)點分別為9093 9094 也可以自定義
listeners=SASL_PLAINTEXT://192.168.49.161:9092 //另外兩個節(jié)點分別為9093 9094

1.2 要配置SASL 和ACL,需要在broker 端進行兩個方面的設(shè)置。
首先是創(chuàng)建包含所有認證用戶信息的JAAS 文件,本例中我們有admin qjld 兩個用戶,其中admin 為管理員用于broker 集群間的認證, qjld 為遠程應(yīng)用的認證用戶, 新增認證信息文件

    kafka_server_jaas.conf 內(nèi)容為:
    KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-mgr998778123"
    user_admin="admin-mgr998778123"
    user_qjld="123456";
    };

本例中路徑為: /opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf , 我們需要將配置文件中的內(nèi)容傳遞給JVM,因此需要修改/opt/kafka_2.11-1.0.0/bin/kafka-server-start.sh 腳本文件如下:

在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 之前一行添加export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf" 然后保存退出。

其次我們要在所有broker 配置文件(server-x.properties)中增加:

#ACL 入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
#設(shè)置admin 為超級用戶
super.users=User:admin

現(xiàn)在我們可以啟動broker 測試了

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &

啟動成功,此時broker 可以接收已認證的客戶端連接了,下面我們進行客戶端配置

//創(chuàng)建一個topic 名為test
bin/kafka-topics.sh--create --zookeeper 192.168.49.161:2181,192.168.49.161:2182, 192.168.49.161:2183 --replication-factor 1 --partitions 1 --topic test    
//查看topic 是否創(chuàng)建成功
bin/kafka-topics.sh --list --zookeeper 192.168.49.161:2181 

2. Client 配置(用于使用kafka命令連接)
2.1 新增一個配置文件如:/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf,內(nèi)容為:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="qjld"
password="123456";
};

同理我們也要將配置文件內(nèi)容傳遞給JVM, 因此需要修改

/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh 腳本文件如下:在exec $base_dir/kafka-run-class.sh
$EXTRA_ARGS kafka.Kafka "$@" 之前一行添加export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
 同樣的方法修改kafka-console-consumer.sh

2.2 需要在config/consumer.properties 和config/producer.properties 配置文件中追加:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

這兩行配置, 完成后再試運行:

bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test \
--producer.config config/producer.properties
bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test
--from-beginning  --consumer.config config/consumer.properties

接下來測試消息的寫入和讀取, 開啟兩個終端分別輸入下列

bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test //消息寫入
bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test  --from-beginning 消息讀取

如果報錯信息如下:
WARN Bootstrap broker 192.168.49.161:9092 disconnected (org.apache.kafka.clients.NetworkClient)
則說明配置的security 已生效, 要想普通用戶能讀寫消息,需要配置ACL
2.3 配置ACL


bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties
zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183 --add
--allow-principal User:qjld --operation All --topic test
```這是為qjld 這個用戶配置所有權(quán)限,如果有更細的要求則可參考網(wǎng)上的,如:Read Write 等只需要將--operation all 改為--operation Read 多種則在其后面加一個-- operation write 即可

**三    測試**
這里我選用python2.7 編寫的腳本進行測試
先安裝pip27 install kafka-python
![](/upload/otherpic50/5b755d258b838b83d79753ad5aad3d44.png)

當前名稱:Kafka集群配置SASL+ACL
URL地址:http://weahome.cn/article/gcoeij.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部