kafkaflume.conf
偏關(guān)網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),偏關(guān)網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為偏關(guān)成百上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個(gè)售后服務(wù)好的偏關(guān)做網(wǎng)站的公司定做!
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /usr/local/src/flume/testflume2.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#設(shè)置Kafka接收器
#
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
##設(shè)置Kafka的broker地址和端口號(hào)
agent.sinks.k1.kafka.bootstrap.servers = master:9092,node01:9092,node02:9092
##設(shè)置Kafka的Topic
agent.sinks.k1.kafka.topic = mydemo5
##設(shè)置序列化方式
#
#
agent.sinks.k1.kafka.flumeBatchSize = 20
agent.sinks.k1.kafka.producer.acks = 1
agent.sinks.k1.kafka.producer.linger.ms = 1
agent.sinks.ki.kafka.producer.compression.type = snappy
agent.sinks.k1.channel=c1
創(chuàng)建一個(gè)topic
kafka-topics --create --zookeeper master:2181/kafka --replication-factor 2 --partitions 3 --topic mydemo5
啟動(dòng)flume作為消息生產(chǎn)者寫道kafka上
flume-ng agent -c conf -f kafkaflume.conf -n agent -Dflume.root.logger=INFO,console
消費(fèi)kafka的數(shù)據(jù)
kafka-console-consumer --bootstrap-server master:9092 --topic mydemo5 --from-beginning
讀取一個(gè)文件的內(nèi)容寫入追加到新的文件
#!/bin/bash
while read line
do
echo $line
sleep 0.01
echo -e $line >> /usr/local/src/flume/testflume2.log
done < /etc/sudo-ldap.conf