1) Flume提供一個分布式的,可靠的,對大數(shù)據(jù)量的日志進(jìn)行高效收集、聚集、移動的服務(wù),F(xiàn)lume只能在Linux環(huán)境下運行。
2) Flume基于流式架構(gòu),容錯性強(qiáng),也很靈活簡單,架構(gòu)簡單。
3) Flume、Kafka用來實時進(jìn)行數(shù)據(jù)收集,Spark、Storm用來實時處理數(shù)據(jù),impala用來實時查詢。
專注于為中小企業(yè)提供做網(wǎng)站、成都做網(wǎng)站服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)永康免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
圖1.1 flume架構(gòu)
說到flume的架構(gòu),直接拿官網(wǎng)的圖來說就足夠了。
首先在每個數(shù)據(jù)源上都會部署一個 flume agent ,這個agent就是用來采取數(shù)據(jù)的。
這個agent由3個組件組成:source,channel,sink。而在flume中,數(shù)據(jù)傳輸?shù)幕締挝皇莈vent。下面講講這幾個概念
用于從數(shù)據(jù)源采集數(shù)據(jù),并將數(shù)據(jù)傳輸在channel中。source支持多種數(shù)據(jù)源采集方式。比如監(jiān)聽端口采集數(shù)據(jù),從文件中采集,從目錄中采集,從http服務(wù)中采集等。
位于source和sink之間,是數(shù)據(jù)的一個暫存區(qū)域。一般情況下,從source流出數(shù)據(jù)的速率和sink流出的數(shù)據(jù)的速率會有所差異。所以需要一個空間暫存那些還沒辦法傳輸?shù)絪ink進(jìn)行處理的數(shù)據(jù)。所以channel類似于一個緩沖區(qū),一個隊列。
從channel獲取數(shù)據(jù),并將數(shù)據(jù)寫到目標(biāo)源。目標(biāo)源支持多種,比如本地文件、hdfs、kafka、下一個flume agent的source等均可。
傳輸單元,flume傳輸?shù)幕締挝唬?headers和body兩部分,header可以添加一些頭部信息,body則是數(shù)據(jù)。
基于上面的概念,流程基本很清晰,source監(jiān)控數(shù)據(jù)源,如果產(chǎn)生新的數(shù)據(jù),則獲取數(shù)據(jù),并封裝成一個event,然后將event傳輸?shù)絚hannel,接著sink從channel拉取數(shù)據(jù)寫入到目標(biāo)源中。
?
flume的程序本身的部署非常簡單,
(1)部署jdk1.8
(2)解壓flume的程序壓縮包到指定目錄,然后添加環(huán)境變量即可
(3)修改配置文件
cd /opt/modules/apache-flume-1.8.0-bin
將模板配置文件復(fù)制重命名為正式配置文件
cp conf/flume-env.sh.template conf/flume-env.sh
添加jdk家目錄變量
vim conf/flume-env.sh
加上這句
export JAVA_HOME=/opt/modules/jdk1.8.0_144
這就完成配置了,基本沒啥難度。flume的使用重點在于agent的配置文件的編寫,根據(jù)業(yè)務(wù)場景不同,配置也不同。簡單來說其實就是對source,channel,sink三大組件的工作屬性的配置。
?
agent的配置其實就是對source、channel、sink的配置。主要有5個步驟,下面看看這個流程是怎樣的。
# 1、定義的agent名稱,指定使用的source sinks channels的名稱
# 可以有多個source sinks channels。
.sources =
這就是agent定義的完整流程,source、channel、sink每個都有不同的類型,每個類型定義的參數(shù)會有差異。下面看看source、channel、sink中常用的類型(想看完整的全部的類型就看官網(wǎng)吧)
常用屬性:
type:需指定為 netcat
bind:監(jiān)聽的主機(jī)名或者ip
port:監(jiān)聽的端口
例子:監(jiān)聽在 0.0.0.0:6666端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
常用屬性:
type:需指定為 exec
command:運行的命令
shell:運行名為所需的shell,如 /bin/bash -c
例子:監(jiān)控文件的新增內(nèi)容
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sourcesr.r1.shell = /bin/bash -c
常用的屬性:
type:設(shè)置為 spooldir
spoolDir:監(jiān)控的目錄路徑
fileSuffix:上傳完成的文件加上指定的后綴,默認(rèn)是 .COMPLETED
fileHeader:是否在event的header添加一個key標(biāo)明該文件的絕對路徑,默認(rèn)為false
ignorePattern:正則匹配,忽略的文件
還有其他很多參數(shù),具體到官網(wǎng)上看吧
例子:
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
這個源比較特別,通常用在上一個flume的sink 輸出,然后作為下一個flume的輸入的格式。
常用的屬性:
type:需指定為 avro
bind:監(jiān)聽的主機(jī)名或者ip,只能是agent所在主機(jī)的ip或者h(yuǎn)ostname
port:監(jiān)聽的端口
例子:
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
? spoolDir有一個bug,就是已經(jīng)上傳完成的文件,不能再追加內(nèi)容,否則會報錯,而且也無法讀取到新的文件內(nèi)容。所以spooldir只能用來監(jiān)控目錄下新文件的變化,沒辦法監(jiān)控已有文件的內(nèi)容變化。以往這種情況只能使用 exec源,然后使用tail -f xxxlog 的方式來監(jiān)聽文件內(nèi)容變化,但是這種方式有缺陷,就是容易丟失數(shù)據(jù)。而在flume1.7之后有一個新的source,叫TAILDIR,可以直接監(jiān)聽文件變化的內(nèi)容。看看用法:
常用屬性:
type:TAILDIR ,記住,要全部大寫
filegroups:要監(jiān)聽的文件組的名字,可以有多個文件組
filegroups.:指定文件組的包含哪些文件,可以使用擴(kuò)展正則表達(dá)式,這里可以有的小技巧 /path/.* 這樣就可以監(jiān)聽目錄下的所有文件內(nèi)容的變化
positionFile:這個文件json格式記錄了目錄下每個文件的inode,以及pos偏移量
fileHeader:是否添加header
屬性過多,可以當(dāng)官網(wǎng)看:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source
例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2 有兩個文件組
# 文件組1內(nèi)容
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
# 使用正則表達(dá)式指定文件組
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
下面再說說上面說到的 positionFile 這個東東,看看它的格式:
[{"inode":408241856,"pos":27550,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/flume.log.COMPLETED"},
{"inode":406278032,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bi
n/logs/words.txt.COMPLETED"},{"inode":406278035,"pos":0,"file":"/opt/modules/apache-flume-1.8.0-bin/logs/words.txt"},
{"inode":406278036,"pos":34,"file":"/opt/modules/apache
-flume-1.8.0-bin/logs/test.txt"}]
分析:
1、每個文件都是一個json串,由多個json串組成一個類似于數(shù)組的東西。
2、每個json包含內(nèi)容有:
inode:這個什么意思就自己具體看看文件系統(tǒng)的基本知識吧
pos:開始監(jiān)聽文件內(nèi)容的起始偏移量
file:文件絕對路徑名
3、小技巧:
(1)如果監(jiān)聽目錄時,某些文件已存在,那么flume默認(rèn)是從文件最后作為監(jiān)聽起始點進(jìn)行監(jiān)聽。當(dāng)文件內(nèi)容更新時,flume會獲取,然后sink。接著就會更新pos值。所以因為這個特點,就算flume agent突然崩了,下一次啟動時,自動從上次崩潰的pos開始監(jiān)聽,而不是從最新的文件末尾開始監(jiān)聽。這樣就不會丟失數(shù)據(jù)了,而且不會重復(fù)讀取舊數(shù)據(jù)。
(2)從(1)可知,pos就是實時更新的一個文件內(nèi)容監(jiān)聽點,如果我們想文件從頭開始監(jiān)聽,有時候有需求,需要將監(jiān)聽目錄下的文件全部傳輸一邊。這時候很簡單,將json文件中的pos改為0就好了。
4、如果沒有指定positionFile路徑,默認(rèn)為/USER_HOME/.flume/taildir_position.json
常用的屬性:
type:需指定為 memory
capacity:存儲在channel中event數(shù)量的最大值
transactionCapacity:一次傳輸?shù)膃vent的最大數(shù)量
例子:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
常用的屬性:
type:需指定為 file
checkpointDir:存儲checkpoint文件的目錄
dataDirs:存儲數(shù)據(jù)的目錄
例子:
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
這個類型是將內(nèi)存+文件作為channel,當(dāng)容量空間超過內(nèi)存時就寫到文件中
常用的屬性:
type:指定為 SPILLABLEMEMORY
memoryCapacity:使用內(nèi)存存儲的event的最大數(shù)量
overflowCapacity:存儲到文件event的最大數(shù)量
byteCapacity:使用內(nèi)存存儲的event的最大容量,單位是 bytes
checkpointDir:存儲checkpoint文件的目錄
dataDirs:存儲數(shù)據(jù)的目錄
例子:
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
生產(chǎn)環(huán)境中,flume+kafka也是常用的技術(shù)棧,但是一般是將kafka作為sink目標(biāo)
常用屬性:
type:設(shè)置為 org.apache.flume.channel.kafka.KafkaChannel
bootstrap.servers:kafka集群的服務(wù)器, ip:port,ip2:port,....
topic:kafka中的topic
consumer.group.id:消費者的groupid
例子:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
常用屬性:
type:logger
例子:
a1.sinks.k1.type = logger
這個類型比較簡單,一般用于調(diào)試時使用
這個類型主要就是用來給下一個flume作為輸入的格式,是字節(jié)流的方式,而且是序列化的序列。
常用屬性:
type:avro
hostname:輸出目標(biāo)的主機(jī)名或者ip,可以任意主機(jī),不局限于本機(jī)
ip:輸出到的端口
例子:
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
常用屬性:
type:hdfs
hdfs.path:存儲路徑 , hdfs://namenode:port/PATH
hdfs.filePrefix:上傳的文件的前綴(額外加上的)
hdfs.round:是否按時間滾動文件夾
hdfs.roundValue:滾動的時間值
hdfs.roundUnit:滾動的時間的單位
hdfs.userLocalTimeStamp:是否使用本地時間戳,true還是false
hdfs.batchSize:積攢多少個event才flush到hdfs 一次
hdfs.fileType:文件類型,DataStream(普通文件),SequenceFile(二進(jìn)制格式,默認(rèn)),CompressedStream(壓縮格式)
hdfs.rollInterval:多久生成一個新的文件,單位是秒
hdfs.rollSize:文件滾動大小,單位是 bytes
hdfs.rollCount:文件滾動是否與event數(shù)量有關(guān),true 還是false
hdfs.minBlockReplicas:最小副本數(shù)
例子:
#指定sink的類型為存儲在hdfs中
a2.sinks.k2.type = hdfs
# 路徑命名為按小時
a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = king-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件,單位是秒
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小,單位是bytes
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1
常用屬性:
type:file_roll
sink.directory:存儲路徑
例子:
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /var/log/flum
常用屬性:
tpye:org.apache.flume.sink.kafka.KafkaSink
kafka.topic:kafka話題名
kafka.bootstrap.servers:集群服務(wù)器列表,以逗號分隔
kafka.flumeBatchSize:刷寫到kafka的event數(shù)量
kafka.producer.acks:接收到時返回ack信息時,寫入的最少的副本數(shù)
kafka.producer.compression.type:壓縮類型
例子:
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
攔截器interceptors并不是必須的,它是工作在source和channel之間的一個組件,用于過濾source來的數(shù)據(jù),并輸出到channel。
使用格式:
先指定攔截器的名字,然后對每個攔截器進(jìn)行工作屬性配置
.sources..interceptors =
.sources..interceptors.. = xxxx
在event 的header中添加一個字段,用于標(biāo)明時間戳如:headers:{timestamp:111111}。
常用屬性:
type:timestamp
headerName:在header中的key名字,默認(rèn)是 timestamp
例子:
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
在event 的header中添加一個字段,用于標(biāo)明host戳,如:headers:{host:bigdata121}。
常用屬性:
type:host
hostHeader:在header中的key名字,默認(rèn)是 host
useIP:用ip還是主機(jī)名
例子:
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
在event 的header中添加一個字段,用于標(biāo)明uuid如:headers:{id:111111}。
常用屬性:
type:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headName:在header中的key名字,默認(rèn)是 id
prefix:給每個UUID添加前綴
使用正則匹配,然后替換指定字符
常用屬性:
type:search_replace
searchPattern:匹配的正則
replaceString:替換的字符串
charset:字符集,默認(rèn)UTF-8
例子:刪除特定字符開頭的字符串
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =
正則匹配,匹配到的丟棄或者留下
常用屬性:
type:regex_filter
regex:正則
excludeEvents:true為過濾掉匹配的,false為留下匹配的
例子:
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents設(shè)為false,表示過濾掉不是以A開頭的events。如果excludeEvents設(shè)為true,則表示過濾掉以A開頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = true
這里其實是利用正則的分組匹配來獲取多個匹配組,然后將每個組的匹配值存儲到header中,key可以自定義。
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
# 指定類型為 regex_extractor
a1.sources.r1.interceptors.i1.type = regex_extractor
# 分組匹配的正則
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
# 兩個分組各自的key別名
a1.sources.r1.interceptors.i1.serializers = s1 s2
# 分別設(shè)置key的名字
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
繼承接口 org.apache.flume.interceptor.Interceptor,實現(xiàn)里面的特定方法,如:
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public void close() {
}
/**
* 攔截source發(fā)送到通道channel中的消息
* 處理單個event
* @param event 接收過濾的event
* @return event 根據(jù)業(yè)務(wù)處理后的event
*/
@Override
public Event intercept(Event event) {
// 獲取事件對象中的字節(jié)數(shù)據(jù)
byte[] arr = event.getBody();
// 將獲取的數(shù)據(jù)轉(zhuǎn)換成大寫
event.setBody(new String(arr).toUpperCase().getBytes());
// 返回到消息中
return event;
}
// 處理event集合
@Override
public List intercept(List events) {
List list = new ArrayList<>();
for (Event event : events) {
list.add(intercept(event));
}
return list;
}
//用來返回攔截器對象
public static class Builder implements Interceptor.Builder {
// 獲取配置文件的屬性
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
pom.xml依賴
org.apache.flume
flume-ng-core
1.8.0
在 agent的配置文件中指定攔截器
a1.sources.r1.interceptors = i1
#全類名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder
運行命令:
bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume_Andy-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console
-C 指定額外的jar包的路徑,就是我們自己寫的攔截器的jar包
也可以將jar包放到flume程序目錄的lib目錄下
# 1.定義agent的名字a2.以及定義這個agent中的source,sink,channel的名字
a2.sources = r2
a2.sinks = k2
a2.channels = c2
#2.定義Source,定義數(shù)據(jù)來源
# 定義source類型是exec,執(zhí)行命令的方式
a2.sources.r2.type = exec
# 命令
a2.sources.r2.command = tail -F /tmp/access.log
# 使用的shell
a2.sources.r2.shell = /bin/bash -c
#3.定義sink
#指定sink的類型為存儲在hdfs中
a2.sinks.k2.type = hdfs
# 路徑命名為按小時
a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = king-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件,單位是秒
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小,單位是bytes
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1
# 4.定義Channel,類型、容量限制、傳輸容量限制
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# 5.鏈接,通過channel將source和sink連接起來
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
啟動flume-agent:
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \ flume配置目錄
--name a2 \ agent名字
--conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf agent配置
-Dflume.root.logger=INFO,console 打印日志到終端
flume1:輸出到flume2和flume3
flume2:輸出到本地文件
flume3:輸出到hdfs
flume1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給多個channel。啟動復(fù)制模式
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test
a1.sources.r1.shell = /bin/bash -c
# 這是k1 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141
# 這是k2 sink
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata111
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# 給source接入連接兩個channel.每個channel對應(yīng)一個sink
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume2.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
#備注:此處的文件夾需要先創(chuàng)建好
a3.sinks.k1.sink.directory = /opt/flume3
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
啟動時,先啟動flume2和flume3,最后啟動flume1。啟動命令不重復(fù)了。
多臺server產(chǎn)生的日志,需要各自監(jiān)控,然后匯總起來存儲,這種場景很多。
flume1(監(jiān)聽文件)和flume2(監(jiān)聽端口)各自收集數(shù)據(jù),然后分別sink到flume3,flume3負(fù)責(zé)匯總寫入hdfs
flume1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume2.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata111
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時間滾動文件夾
a3.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
啟動時先啟動flume3,然后啟動flume1和flume2
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
測試可以通過 telnet bigdata111 44444 端口來發(fā)送數(shù)據(jù)
可以在/opt/Andy文件中追加數(shù)據(jù)