真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flume典型應(yīng)用場(chǎng)景

1.flume不同Source、Sink的配置文件編寫(xiě)

(1)Source---spool

 監(jiān)聽(tīng)是一個(gè)目錄,這個(gè)目錄不能有子目錄,監(jiān)控的是這個(gè)目錄下的文件。采集完成,這個(gè)目錄下的文件會(huì)加上后綴(.COMPLETED)
配置文件

創(chuàng)新互聯(lián)建站是一家專(zhuān)注于網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作與策劃設(shè)計(jì),武昌網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專(zhuān)注于網(wǎng)站建設(shè)10年,網(wǎng)設(shè)計(jì)領(lǐng)域的專(zhuān)業(yè)建站公司;建站業(yè)務(wù)涵蓋:武昌等地區(qū)。武昌做網(wǎng)站價(jià)格咨詢(xún):13518219792

#Name the components on this agent
#這里的a1指的是agent的名字,可以自定義,但注意:同一個(gè)節(jié)點(diǎn)下的agent的名字不能相同
#定義的是sources、sinks、channels的別名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#指定source的類(lèi)型和相關(guān)的參數(shù)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/flumedata

#設(shè)定channel
a1.channels.c1.type = memory

#設(shè)定sink
a1.sinks.k1.type = logger

#Bind the source and sink to the channel
#設(shè)置sources的通道
a1.sources.r1.channels = c1
#設(shè)置sink的通道
a1.sinks.k1.channel = c1

(2)Source---netcat

 一個(gè)NetCat Source用來(lái)監(jiān)聽(tīng)一個(gè)指定端口,并將接收到的數(shù)據(jù)的每一行轉(zhuǎn)換為一個(gè)事件。
數(shù)據(jù)源: netcat(監(jiān)控tcp協(xié)議)
Channel:內(nèi)存
數(shù)據(jù)目的地:控制臺(tái)

配置文件

#指定代理
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#指定sources
a1.sources.r1.channels = c1
#指定source的類(lèi)型
a1.sources.r1.type = netcat
#指定需要監(jiān)控的主機(jī)
a1.sources.r1.bind = 192.168.191.130
#指定需要監(jiān)控的端口
a1.sources.r1.port = 3212

#指定channel
a1.channels.c1.type = memory

#sinks  寫(xiě)出數(shù)據(jù) logger
a1.sinks.k1.channel=c1
a1.sinks.k1.type=logger

(3)Source---avro

 監(jiān)聽(tīng)AVRO端口來(lái)接受來(lái)自外部AVRO客戶(hù)端的事件流。利用Avro Source可以實(shí)現(xiàn)多級(jí)流動(dòng)、扇出流、扇入流等效果。另外也可以接受通過(guò)flume提供的Avro客戶(hù)端發(fā)送的日志信息。
數(shù)據(jù)源: avro
Channel:內(nèi)存
數(shù)據(jù)目的地:控制臺(tái)
配置文件

#指定代理
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#指定sources
a1.sources.r1 channels. = c1
#指定source的類(lèi)型
a1.sources.r1.type = avro
#指定需要監(jiān)控的主機(jī)名
a1.sources.r1.bind = hadoop03
#指定需要監(jiān)控的端口
a1.sources.r1.port = 3212

#指定channel
a1.channels.c1.type = memory

#指定sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

(4)采集日志文件到hdfs

source ====exec (一個(gè)Linux命令: tail -f)
channel====memory
sink====hdfs
注意:如果集群是高可用的集群,需要將core-site.xml 和hdfs-site.xml 放入flume的conf中。
配置文件

a1.sources = r1
a1.channels = c1
a1.sinks = k1

#指定sources
a1.sources.r1.channels = c1
#指定source的類(lèi)型
a1.sources.r1.type = exec
#指定exec的command
a1.sources.r1.command = tail -F /home/hadoop/flumedata/zy.log

#指定channel
a1.channels.c1.type = memory

#指定sink 寫(xiě)入hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
#指定hdfs上生成的文件的路徑年-月-日,時(shí)_分
a1.sinks.k1.hdfs.path = /flume/%y-%m-%d/%H_%M
#開(kāi)啟滾動(dòng)
a1.sinks.k1.hdfs.round = true
#設(shè)定滾動(dòng)的時(shí)間(設(shè)定目錄的滾動(dòng))
a1.sinks.k1.hdfs.roundValue = 24
#時(shí)間的單位
a1.sinks.k1.hdfs.roundUnit = hour
#設(shè)定文件的滾動(dòng)
#當(dāng)前文件滾動(dòng)的時(shí)間間隔(單位是:秒)
a1.sinks.k1.hdfs.rollInterval = 10
#設(shè)定文件滾動(dòng)的大小(文件多大,滾動(dòng)一次)
a1.sinks.k1.hdfs.rollSize = 1024
#設(shè)定文件滾動(dòng)的條數(shù)(多少條滾動(dòng)一次)
a1.sinks.k1.hdfs.rollCount = 10
#指定時(shí)間來(lái)源(true表示指定使用本地時(shí)間)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#設(shè)定存儲(chǔ)在hdfs上的文件類(lèi)型,(DataStream,文本)
a1.sinks.k1.hdfs.fileType = DataStream
#加文件前綴
a1.sinks.k1.hdfs.filePrefix = zzy
#加文件后綴
a1.sinks.k1.hdfs.fileSuffix = .log

2.flume典型的使用場(chǎng)景

(1)多代理流

flume典型應(yīng)用場(chǎng)景
 從第一臺(tái)機(jī)器的flume agent傳送到第二臺(tái)機(jī)器的flume agent。
例:
規(guī)劃
hadoop02:tail-avro.properties
   使用 exec “tail -F /home/hadoop/testlog/welog.log”獲取采集數(shù)據(jù)
   使用 avro sink 數(shù)據(jù)都下一個(gè) agent
hadoop03:avro-hdfs.properties
   使用 avro 接收采集數(shù)據(jù)
   使用 hdfs sink 數(shù)據(jù)到目的地
配置文件

#tail-avro.properties
a1.sources = r1 
a1.sinks = k1
a1.channels = c1
#Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log 
a1.sources.r1.channels = c1 
#Describe the sink
a1.sinks.k1.type = avro 
a1.sinks.k1.channel = c1 
a1.sinks.k1.hostname = hadoop02 
a1.sinks.k1.port = 4141 
a1.sinks.k1.batch-size = 2
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#avro-hdfs.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
#Describe k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = date_
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)多路復(fù)用采集

flume典型應(yīng)用場(chǎng)景
 在一份agent中有多個(gè)channel和多個(gè)sink,然后多個(gè)sink輸出到不同的文件或者文件系統(tǒng)中。
規(guī)劃:
Hadoop02:(tail-hdfsandlogger.properties)
   使用 exec “tail -F /home/hadoop/testlog/datalog.log”獲取采集數(shù)據(jù)
   使用 sink1 將數(shù)據(jù) 存儲(chǔ)hdfs
   使用 sink2 將數(shù)據(jù)都存儲(chǔ) 控制臺(tái)

配置文件

#tail-hdfsandlogger.properties
#2個(gè)channel和2個(gè)sink的配置文件
#Name the components on this agent
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2

#Describe/configure tail -F source1
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /home/hadoop/logs/catalina.out
#指定source進(jìn)行扇出到多個(gè)channnel的規(guī)則
a1.sources.s1.selector.type = replicating
a1.sources.s1.channels = c1 c2

#Use a channel which buffers events in memory
#指定channel c1
a1.channels.c1.type = memory
#指定channel c2
a1.channels.c2.type = memory

#Describe the sink
#指定k1的設(shè)置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.channel = c1
#指定k2的
a1.sinks.k2.type = logger
a1.sinks.k2.channel = c2

(3)高可用部署采集

flume典型應(yīng)用場(chǎng)景
 首先在三個(gè)web服務(wù)器中收集數(shù)據(jù),然后交給collect,此處的collect是高可用的,首先collect01是主,所有收集到的數(shù)據(jù)發(fā)送給他,collect02只是出于熱備狀態(tài)不接受數(shù)據(jù),當(dāng)collect01宕機(jī)的時(shí)候,collect02頂替,然后接受數(shù)據(jù),最終將數(shù)據(jù)發(fā)送給hdfs或者kafka。
agent和collecotr的部署
flume典型應(yīng)用場(chǎng)景
 Agent1、Agent2數(shù)據(jù)分別流入到Collector1和Collector2中,F(xiàn)lume NG 本 身提供了 Failover 機(jī)制,可以自動(dòng)切換和恢復(fù)。再由Collector1和Collector2將數(shù)據(jù)輸出到hdfs中。
示意圖
flume典型應(yīng)用場(chǎng)景
配置文件

#ha_agent.properties
#agent name: agent1
agent1.channels = c1
agent1.sources = r1

agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/hadoop/testlog/testha.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp

#set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 52020
#set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop03
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
#ha_collector.properties
#set agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#other node,nna to nns
a1.sources.r1.type = avro
##當(dāng)前主機(jī)為什么,就修改成什么主機(jī)名
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
##當(dāng)前主機(jī)為什么,就修改成什么主機(jī)名
a1.sources.r1.interceptors.i1.value = hadoop03
a1.sources.r1.channels = c1

#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfs
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

最后啟動(dòng)

#先啟動(dòng) hadoop02 和 hadoop03 上的 collector 角色:
bin/flume-ng agent -c conf -f agentconf/ha_collector.properties -n a1 - Dflume.root.logger=INFO,console
#然后啟動(dòng) hadoop01,hadoop02 上的 agent 角色:
bin/flume-ng agent -c conf -f agentconf/ha_agent.properties -n agent1 - Dflume.root.logger=INFO,console

分享名稱(chēng):flume典型應(yīng)用場(chǎng)景
分享地址:http://weahome.cn/article/pcegcs.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部