這篇文章給大家分享的是有關(guān)flume如何安裝并整合kafka的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。
創(chuàng)新互聯(lián)公司堅(jiān)信:善待客戶,將會(huì)成為終身客戶。我們能堅(jiān)持多年,是因?yàn)槲覀円恢笨芍档眯刨?。我們從不忽悠初訪客戶,我們用心做好本職工作,不忘初心,方得始終。10年網(wǎng)站建設(shè)經(jīng)驗(yàn)創(chuàng)新互聯(lián)公司是成都老牌網(wǎng)站營銷服務(wù)商,為您提供成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、html5、網(wǎng)站制作、品牌網(wǎng)站建設(shè)、成都微信小程序服務(wù),給眾多知名企業(yè)提供過好品質(zhì)的建站服務(wù)。
Flume agent之間的通信(參考圖書)
flume內(nèi)置了專門的RPC sink-source對(duì)來處理agent之間的數(shù)據(jù)傳輸。 source是負(fù)責(zé)接收數(shù)據(jù)到Flume Agent的組件。包括Avro Source、Thrift source 、HTTP Source、Spooling Directory Source、Syslog Source、Exec Source、JMS Source等。 channel是位于source和sink之間的緩沖區(qū),是保證數(shù)據(jù)不丟失的關(guān)鍵。 sink從Channel中讀取事件,每一個(gè)sink只能從一個(gè)Channel鐘讀取事件,必須給每一個(gè)sink配置Channel,否則會(huì)從agent中移除。
cd /data/
wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar axf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin
vim /etc/profile
#FLUSM export FLUME_HOME=/data/apache-flume-1.8.0-bin export PATH=$PATH:${FLUME_HOME}/bin export HADOOP_HOME=/data/hadoop
source /etc/profile
cd ${FLUME_HOME}/conf/
cp flume-env.sh.template flume-env.sh
修改 flume-env.sh
export JAVA_HOME=/usr/local/jdk export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" export HADOOP_HOME=/data/hadoop
驗(yàn)證安裝
flume-ng version
cd ${FLUME_HOME}/conf/
添加配置文件
vim avro.conf
#Name the components on this agent agent.sources = avroSrc agent.channels = avroChannel #Describe/configure the source agent.sources.avroSrc.type = netcat agent.sources.avroSrc.bind = localhost agent.sources.avroSrc.port = 62000 #Describe the sink agent.sinks.avroSink.type = logger #Use a channel that buffers events in memory agent.channels.avroChannel.type = memory agent.channels.avroChannel.capacity = 1000 agent.channels.avroChannel.transactionCapacity = 100 #Bind the source and sink to the channel agent.sinks = avroSink agent.sources.avroSrc.channels = avroChannel agent.sinks.avroSink.channel = avroChannel
“#測試agent.sources.avroSrc.type用avro,然后報(bào)錯(cuò)
#org.apache.avro.AvroRuntimeException: Excessively large list #allocation request detected: 1863125517 items! Connection #closed”
運(yùn)行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/avro.conf -n agent -Dflume.root.logger=INFO,console
使用Telnet連接測試
telnet localhost 6200
查看
cd ${FLUME_HOME}/conf/
添加配置文件
vim exec.conf
#example.conf: A single-node Flume configuration #Name the components on this agent agentexec.sources = avroexec agentexec.sinks = sinkexec agentexec.channels = channelexec #Describe/configure the sources #Describe/configure the source agentexec.sources.avroexec.bind = localhost agentexec.sources.avroexec.port = 630000 agentexec.sources.avroexec.type = exec agentexec.sources.avroexec.command = tail -F /tmp/testexec.log #Describe the sink agentexec.sinks.sinkexec.type = logger #Use a channel which buffers events in memory agentexec.channels.channelexec.type = memory agentexec.channels.channelexec.capacity = 100000 agentexec.channels.channelexec.transactionCapacity = 10000 #Bind the source and sink to the channel agentexec.sources.avroexec.channels = channelexec agentexec.sinks.sinkexec.channel = channelexec
運(yùn)行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/exec.conf --name agentexec -Dflume.root.logger=INFO,console
測試
尷尬,只獲取到了一部分(暫時(shí)沒有占到解決方法)
前提:安裝kafka集群
cd ${FLUME_HOME}/conf/
添加配置文件
vim single_agent.conf
#agent name a1 a1.sources = source1 a1.channels = channel1 a1.sinks = sink1 #set source #“測試使用將數(shù)據(jù)放在了/tmp目錄下,注意設(shè)置” a1.sources.source1.type = spooldir a1.sources.source1.spoolDir=/tmp/spooldir a11.sources.source1.fileHeader = false #set sink a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 a1.sinks.sink1.topic= spooldir #set channel #“測試使用將數(shù)據(jù)放在了/tmp目錄下,注意設(shè)置” a1.channels.channel1.type = file a1.channels.channel1.checkpointDir = /tmp/flume_data/checkpoint a1.channels.channel1.dataDirs= /tmp/flume_data/data #bind a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1
創(chuàng)建文件存放目錄
mkdir -pv /tmp/spooldir mkdir -pv /tmp/flume_data/checkpoint mkdir -pv /tmp/flume_data/data
(所有節(jié)點(diǎn))啟動(dòng)kafka集群
kafka-server-start.sh /data/kafka_2.11-1.0.0/config/server.properties
創(chuàng)建kafka的topic
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic spooldir --replication-factor 1 --partitions 3
查看topic
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
創(chuàng)建kafka的consumer
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic spooldir --from-beginning
(新窗口)啟動(dòng)flume的agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
寫入測試
[root@master conf]# echo "hello ,test flume spooldir source" >> /tmp/spooldir/spool.txt
flume-ng信息
kafka信息
前提:安裝hbase集群
cd ${FLUME_HOME}/conf/
mkdir hbase && cd hbase
添加配置文件,這里需要兩個(gè)agent端
hbase-back.conf用于收集本地?cái)?shù)據(jù),hbase-front.conf用于將數(shù)據(jù)寫入hbase
vim hbase-back.conf
agent.sources =backsrc agent.channels=memoryChannel agent.sinks =remotesink #Describe the sources agent.sources.backsrc.type = exec agent.sources.backsrc.command = tail -F /tmp/test/data/data.txt agent.sources.backsrc.checkperiodic = 1000 agent.sources.backsrc.channels=memoryChannel #Describe the channels agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.keep-alive = 30 agent.channels.memoryChannel.capacity = 1000 agent.channels.memoryChannel.transactionCapacity = 1000 #Describe the sinks agent.sinks.remotesink.type = avro agent.sinks.remotesink.hostname = master agent.sinks.remotesink.port = 9999 agent.sinks.remotesink.channel= memoryChannel
vim hbase-front.conf
agent.sources = frontsrc agent.channels = memoryChannel agent.sinks = fileSink #Describe the sources agent.sources.frontsrc.type = avro agent.sources.frontsrc.bind = master agent.sources.frontsrc.port = 9999 agent.sources.frontsrc.channels = memoryChannel #Describe the channels agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.keep-alive = 30 agent.channels.memoryChannel.capacity = 1000 agent.channels.memoryChannel.transactionCapacity =1000 #Describe the sinks agent.sinks.fileSink.type = hbase agent.sinks.fileSink.channel=memoryChannel agent.sinks.fileSink.table = access_log agent.sinks.fileSink.columnFamily = t agent.sinks.fileSink.batchSize= 50 agent.sinks.fileSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer agent.sinks.fileSink.zookeeperQuorum = master:2181,slave1:2181,slave2:2181 agent.sinks.fileSink.znodeParent = /hbase agent.sinks.fileSink.timeout = 90000
創(chuàng)建本地文件和目錄
mkdir -pv /tmp/test/data && touch /tmp/test/data/data.txt
創(chuàng)建hbase中的表
hbase shell
創(chuàng)建表
create 'access_log','t'
查看
list
啟動(dòng)back agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-back.conf --name agent -Dflume.root.logger=INFO,console
啟動(dòng)后會(huì)報(bào)錯(cuò)
18/01/22 22:29:28 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: 192.168.3.58, port: 9999
org.apache.flume.FlumeException: NettyAvroRpcClient { host: master, port: 9999 }: RPC connection error
這是因?yàn)閍vro連接沒有完成,現(xiàn)在只啟動(dòng)了sink端,沒有source端,等啟動(dòng)了front后就會(huì)顯示連接上了
啟動(dòng)front agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-front.conf --name agent -Dflume.root.logger=INFO,console
向本地文件中追加內(nèi)容,然后在hbase中查看
echo "hello ,test flush to hbase">>/tmp/test/data/data.txt
寫入的過程中兩個(gè)agent不會(huì)打印日志
查看hbase中的數(shù)據(jù)
hbase shell scan "access_log"
flume向hbase中寫入日志會(huì)有一定時(shí)間的延遲
原理和寫入hbase一樣,理解了hbase寫入流程就很好理解寫入其它服務(wù)了,詳細(xì)配置參考官方文檔。
前提:安裝hadoop集群
cd ${FLUME_HOME}/conf/
mkdir hdfs && cd hdfs
添加配置文件,這里需要兩個(gè)agent端
hadoop-back.conf用于收集本地?cái)?shù)據(jù),hadoop-front.conf用于將數(shù)據(jù)寫入hadoop
vim hadoop-back.conf
#Namethe components hadoop.sources= backsrc hadoop.sinks= fileSink hadoop.channels= memoryChannel #Source hadoop.sources.backsrc.type= spooldir hadoop.sources.backsrc.spoolDir= /tmp/data/hadoop hadoop.sources.backsrc.channels= memoryChannel hadoop.sources.backsrc.fileHeader = true #Channel hadoop.channels.memoryChannel.type= memory hadoop.channels.memoryChannel.keep-alive = 30 hadoop.channels.memoryChannel.capacity = 1000 hadoop.channels.memoryChannel.transactionCapacity = 1000 #Sink hadoop.sinks.fileSink.type= avro hadoop.sinks.fileSink.hostname= master hadoop.sinks.fileSink.port= 10000 hadoop.sinks.fileSink.channel= memoryChannel
vim hadoop-front.conf
#Namethe components hadoop.sources= frontsrc hadoop.channels= memoryChannel hadoop.sinks= remotesink #Source hadoop.sources.frontsrc.type= avro hadoop.sources.frontsrc.bind= master hadoop.sources.frontsrc.port= 10000 hadoop.sources.frontsrc.channels= memoryChannel #Channel hadoop.channels.memoryChannel.type= memory hadoop.channels.memoryChannel.keep-alive = 30 hadoop.channels.memoryChannel.capacity = 1000 hadoop.channels.memoryChannel.transactionCapacity =1000 #Sink hadoop.sinks.remotesink.type= hdfs hadoop.sinks.remotesink.hdfs.path=hdfs://master/flume hadoop.sinks.remotesink.hdfs.rollInterval = 0 hadoop.sinks.remotesink.hdfs.idleTimeout = 10000 hadoop.sinks.remotesink.hdfs.fileType= DataStream hadoop.sinks.remotesink.hdfs.writeFormat= Text hadoop.sinks.remotesink.hdfs.threadsPoolSize = 20 hadoop.sinks.remotesink.channel= memoryChannel
創(chuàng)建本地目錄并修改權(quán)限
mkdir -pv /tmp/data/hadoop && chmod -R 777 /tmp/data/
創(chuàng)建hdfs中的目錄并修改權(quán)限
hadoop fs -mkdir /flume hadoop fs -chmod 777 /flume hadoop fs -ls /
向本地目錄中寫入文件
echo "hello, test hadoop" >> /tmp/data/hadoop/hadoop.log echo "hello, test flume" >> /tmp/data/hadoop/flume.log echo "hello, test helloworld" >> /tmp/data/hadoop/helloworld.log
查看hdfs中的文件和文件信息
hadoop fs -ls /flume hadoop fs -cat /flume/FlumeData.1516634328510.tmp
感謝各位的閱讀!關(guān)于“flume如何安裝并整合kafka”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!