這篇文章給大家介紹如何解析Flume與Kafka整合,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
創(chuàng)新互聯(lián)專注于中大型企業(yè)的成都網(wǎng)站建設(shè)、成都做網(wǎng)站和網(wǎng)站改版、網(wǎng)站營銷服務(wù),追求商業(yè)策劃與數(shù)據(jù)分析、創(chuàng)意藝術(shù)與技術(shù)開發(fā)的融合,累計客戶成百上千家,服務(wù)滿意度達(dá)97%。幫助廣大客戶順利對接上互聯(lián)網(wǎng)浪潮,準(zhǔn)確優(yōu)選出符合自己需要的互聯(lián)網(wǎng)運(yùn)用,我們將一直專注成都品牌網(wǎng)站建設(shè)和互聯(lián)網(wǎng)程序開發(fā),在前進(jìn)的路上,與客戶一起成長!
Flume與Kafka整合
一、概念
1、Flume:Cloudera 開發(fā)的分布式日志收集系統(tǒng),是一種分布式,可靠且可用的服務(wù),用于高效地收集,匯總和移動大量日志數(shù)據(jù)。 它具有基于流式數(shù)據(jù)流的簡單而靈活的架構(gòu)。它具有可靠的可靠性機(jī)制和許多故障轉(zhuǎn)移和恢復(fù)機(jī)制,具有強(qiáng)大的容錯性和容錯能力。它使用一個簡單的可擴(kuò)展數(shù)據(jù)模型,允許在線分析應(yīng)用程序。Flume分為OG、NG版本,其中Flume OG 的最后一個發(fā)行版本 0.94.0,之后為NG版本。
2、Kafka:作為一個集群運(yùn)行在一臺或多臺可以跨越多個數(shù)據(jù)中心的服務(wù)器上。在Kafka中,客戶端和服務(wù)器之間的通信是通過一種簡單的,高性能的,語言不可知的TCP協(xié)議完成的。協(xié)議是版本控制的,并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端可以使用多種語言。
3、Kafka通常用于兩大類應(yīng)用,如下:
A、構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時流數(shù)據(jù)管道
B、構(gòu)建實(shí)時流應(yīng)用程序,用于轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流
C、Kafka每個記錄由一個鍵,一個值和一個時間戳組成。
二、產(chǎn)述背景
基于大數(shù)據(jù)領(lǐng)域?qū)崿F(xiàn)日志數(shù)據(jù)時時采集及數(shù)據(jù)傳遞等需要,據(jù)此需求下試著完成flume+kafka扇入、扇出功能整合,其中扇出包括:復(fù)制流、復(fù)用流等功能性測試。后續(xù)根據(jù)實(shí)際需要,將完善kafka與spark streaming進(jìn)行整合整理工作。
注:此文檔僅限于功能性測試,性能優(yōu)化方面請大家根據(jù)實(shí)際情況增加。
三、部署安裝
1、測試環(huán)境說明:
操作系統(tǒng):CentOS 7
Flume版本:flume-ng-1.6.0-cdh6.7.0
Kafka版本:kafka_2.11-0.10.0.1
JDK版本:JDK1.8.0
Scala版本:2.11.8
2、測試步驟:
2.1、flume部署
2.1.1、下載安裝介質(zhì),并解壓:
此處)折疊或打開
此處)折疊或打開
此處)折疊或打開
cd /app/apache-flume-1.6.0-cdh6.7.0-bin
vi netcatOrKafka-memory-logger.conf
netcatagent.sources = netcat_sources
netcatagent.channels = c1 c2
netcatagent.sinks = logger_sinks kafka_sinks
netcatagent.sources.netcat_sources.type = netcat
netcatagent.sources.netcat_sources.bind = 0.0.0.0
netcatagent.sources.netcat_sources.port = 44444
netcatagent.channels.c1.type = memory
netcatagent.channels.c1.capacity = 1000
netcatagent.channels.c1.transactionCapacity = 100
netcatagent.channels.c2.type = memory
netcatagent.channels.c2.capacity = 1000
netcatagent.channels.c2.transactionCapacity = 100
netcatagent.sinks.logger_sinks.type = logger
netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink
netcatagent.sinks.kafka_sinks.topic = test
netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082
netcatagent.sinks.kafka_sinks.requiredAcks = 0
##netcatagent.sinks.kafka_sinks.batchSize = 20
netcatagent.sinks.kafka_sinks.producer.type=sync
netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8
netcatagent.sinks.kafka_sinks.partition.key=0
netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder
netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition
netcatagent.sinks.kafka_sinks.max.message.size=1000000
netcatagent.sources.netcat_sources.selector.type = replicating
netcatagent.sources.netcat_sources.channels = c1 c2
netcatagent.sinks.logger_sinks.channel = c1
netcatagent.sinks.kafka_sinks.channel = c2
2.4.2、啟動各測試命令:
A、啟動flume的agent(于192.168.137.130):
flume-ng agent --name netcatagent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
-Dflume.root.logger=INFO,console
B、啟動kafka消費(fèi)者(于192.168.137.132):
kafka-console-consumer.sh \
--zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--from-beginning --topic test
C、測試發(fā)送(于192.168.137.130與于192.168.137.132)
telnet發(fā)送結(jié)果
kafka消費(fèi)結(jié)果
最終logger接收結(jié)果
至此flume+kafka扇出--復(fù)制流測試(扇入源為:netcat;輸出為:kafka+Flume的Logger)測試與驗證完成。
2.5、flume+kafka扇出--復(fù)用流測試(扇入源為:netcat;輸出為:kafka+Flume的Logger)
暫無,后續(xù)補(bǔ)充
四、部署安裝及驗證過程中出現(xiàn)的問題
1、做flume+kafka扇入測試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時,一直未收到kafka數(shù)據(jù)
主要原因是在做kafka的配置時在配置文件(server.properties)中寫成內(nèi)容:
zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
但在創(chuàng)建topics時,使用的是:
kafka-topics.sh --create \
--zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在創(chuàng)建topics的時增加了/kafka
最終使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令檢查沒有topics信息才發(fā)現(xiàn)此問題
解決辦法:將兩個信息同步即可
2、做flume+kafka扇入測試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時,啟動flume的agent時報錯。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
... 11 more
解決辦法:官網(wǎng)資料存在問題,org.apache.flume.source.kafka,KafkaSource其中不應(yīng)該包括逗號,改為:org.apache.flume.source.kafka.KafkaSource即可。詳細(xì)官網(wǎng)
關(guān)于如何解析Flume與Kafka整合就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。