Flume+Kafka整合
成都創(chuàng)新互聯(lián)是一家專業(yè)提供曲靖企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站建設(shè)、做網(wǎng)站、H5建站、小程序制作等業(yè)務(wù)。10年已為曲靖眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進行中。
準備5臺內(nèi)網(wǎng)服務(wù)器創(chuàng)建Zookeeper和Kafka集群
服務(wù)器地址:
192.168.2.240
192.168.2.241
192.168.2.242
192.168.2.243
192.168.2.244
服務(wù)器系統(tǒng):Centos 6.5 64位
下載安裝包
Zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
Flume:http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Kafka:http://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
Zookeeper,F(xiàn)lume,kafka需要用到Java環(huán)境,所以先安裝JDk
yum install java-1.7.0-openjdk-devel
選擇3臺服務(wù)器作為zookeeper集群,他們的IP分別為:
192.168.2.240
192.168.2.241
192.168.2.242
注:先在第一臺服務(wù)器192.168.2.240上分別執(zhí)行(1)-(3)步。
(1)解壓:將zookeeper-3.4.6.tar.gz放入/opt目錄下
tar zxf zookeeper-3.4.6.tar.gz
(2)創(chuàng)建配置文件:將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下。然后按照如下值修改其中的配置:
tickTime=2000
dataDir=/opt/zookeeper/Data
initLimit=5
syncLimit=2
clientPort=2181
server.1=192.168.2.240:2888:3888
server.2=192.168.2.241:2888:3888
server.3=192.168.2.242:2888:3888
各個參數(shù)的意義:
tickTime:心跳檢測的時間間隔(毫秒),缺?。?000
clientPort:其他應(yīng)用(比如solr)訪問ZooKeeper的端口,缺?。?181
initLimit:初次同步的階段(followers連接到leader的階段),允許的時長(tick數(shù)量),缺?。?0
syncLimit:允許followers同步到ZooKeeper的時長(tick數(shù)量),缺?。?
dataDir:數(shù)據(jù)(比如所管理的配置文件)的存放路徑
server.X:X是集群中一個服務(wù)器的id,與myid文件中的id是一致的。右邊可以配置兩個端口,第一個端口用于Fllower和Leader之間的數(shù)據(jù)同步和其它通信,第二個端口用于Leader選舉過程中投票通信。
(3)創(chuàng)建/opt/zookeeper/Data快照目錄,并創(chuàng)建my id文件,里面寫入1。
mkdir /opt/zookeeper/Data vi /opt/zookeeper/Data/myid 1
(4)將192.168.2.240上已經(jīng)配置好的/opt/zookeeper/目錄分別拷貝至192.168.2.241和192.168.2.242。然后將對應(yīng)的myid的內(nèi)容修改為2和3
(5)啟動zookeeper集群
分別在3臺服務(wù)器上執(zhí)行啟動命令
/opt/zookeeper/bin/zkServer.sh start
一共5臺服務(wù)器,服務(wù)器IP地址:
192.168.2.240 node1
192.168.2.241 node2
192.168.2.242 node3
192.168.2.243 node4
192.168.2.244 node5
1、解壓安裝文件到/opt/目錄
cd /opt tar -zxvf kafka_2.10-0.10.0.0.tar.gz mv kafka_2.10-0.10.0.0 kafka
2、修改server. properties文件
#node1 配置
broker.id=0
port=9092
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9092
advertised.host.name=58.246.xx.xx
#碰到的坑,由于我是從線上把nginx日志拉回公司本地服務(wù)器,所以這兩選項必須配置成路由器外網(wǎng)IP地址,否則線上flume報無法連接kafka節(jié)點,報無法傳送日志消息
advertised.port=9092
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node2 配置
broker.id=1
port=9093
advertised.listeners=PLAINTEXT://58.246.xx.xx:9093
advertised.host.name=58.246.xx.xx
advertised.port=9093
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node3 配置
broker.id=2
port=9094
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9094
advertised.host.name=58.246.xx.xx
advertised.port=9094
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node4 配置
broker.id=2
port=9095
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9095
advertised.host.name=58.246.xx.xx
advertised.port=9095
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node5 配置
broker.id=2
port=9096
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9096
advertised.host.name=58.246.xx.xx
advertised.port=9096
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
啟動卡夫卡集群
分別在所有節(jié)點執(zhí)行以下命令來啟動服務(wù)
/opt/kafka/bin/kafka-server-start.sh/opt/kafka/config/server.properties &
安裝兩臺flume,一臺安裝在線上,把線上的日志傳回本地kafka,另一臺安裝在本地,把kafka集群的日志信息轉(zhuǎn)存到HDFS
收集nginx日志傳給公司內(nèi)部kafka
1、 解壓安裝包
cd /opt
tar –zxvf apache-flume-1.7.0-bin.tar.gz
2、 創(chuàng)建配置文件
Vi flume-conf.properties 添加以下內(nèi)容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F/unilifeData/logs/nginx/access.log
a1.sources.r1.channels = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000
#sinks
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = unilife_nginx_production
a1.sinks.k1.kafka.bootstrap.servers = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094
a1.sinks.k1.brokerList = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 2000
a1.sinks.k1.channel = c1
啟動flume服務(wù)
/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-conf.properties --name a1-Dflume.root.logger=INFO,LOGFILE &
轉(zhuǎn)存日志到HDFS
1、解壓安裝包
cd /opt
tar –zxvf apache-flume-1.7.0-bin.tar.gz
3、 創(chuàng)建配置文件
nginx.sources = source1
nginx.channels = channel1
nginx.sinks = sink1
nginx.sources.source1.type =org.apache.flume.source.kafka.KafkaSource
nginx.sources.source1.zookeeperConnect =master:2181,slave1:2181,slave2:2181
nginx.sources.source1.topic =unilife_nginx_production
nginx.sources.source1.groupId =flume_unilife_nginx_production
nginx.sources.source1.channels = channel1
nginx.sources.source1.interceptors = i1
nginx.sources.source1.interceptors.i1.type =timestamp
nginx.sources.source1.kafka.consumer.timeout.ms = 100
nginx.channels.channel1.type = memory
nginx.channels.channel1.capacity = 10000000
nginx.channels.channel1.transactionCapacity = 1000
nginx.sinks.sink1.type = hdfs
nginx.sinks.sink1.hdfs.path =hdfs://192.168.2.240:8020/user/hive/warehouse/nginx_log
nginx.sinks.sink1.hdfs.writeFormat=Text
nginx.sinks.sink1.hdfs.inUsePrefix=_
nginx.sinks.sink1.hdfs.rollInterval = 3600
nginx.sinks.sink1.hdfs.rollSize = 0
nginx.sinks.sink1.hdfs.rollCount = 0
nginx.sinks.sink1.hdfs.fileType = DataStream
nginx.sinks.sink1.hdfs.minBlockReplicas=1
nginx.sinks.sink1.channel = channel1
啟動服務(wù)
/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-nginx-log.properties --name nginx-Dflume.root.logger=INFO,LOGFILE &