真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何解析Flume與Kafka整合

這篇文章給大家介紹如何解析Flume與Kafka整合,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

創(chuàng)新互聯(lián)專注于中大型企業(yè)的成都網(wǎng)站建設(shè)、成都做網(wǎng)站和網(wǎng)站改版、網(wǎng)站營銷服務(wù),追求商業(yè)策劃與數(shù)據(jù)分析、創(chuàng)意藝術(shù)與技術(shù)開發(fā)的融合,累計客戶成百上千家,服務(wù)滿意度達(dá)97%。幫助廣大客戶順利對接上互聯(lián)網(wǎng)浪潮,準(zhǔn)確優(yōu)選出符合自己需要的互聯(lián)網(wǎng)運(yùn)用,我們將一直專注成都品牌網(wǎng)站建設(shè)和互聯(lián)網(wǎng)程序開發(fā),在前進(jìn)的路上,與客戶一起成長!

Flume與Kafka整合


一、概念
1、Flume:Cloudera 開發(fā)的分布式日志收集系統(tǒng),是一種分布式,可靠且可用的服務(wù),用于高效地收集,匯總和移動大量日志數(shù)據(jù)。 它具有基于流式數(shù)據(jù)流的簡單而靈活的架構(gòu)。它具有可靠的可靠性機(jī)制和許多故障轉(zhuǎn)移和恢復(fù)機(jī)制,具有強(qiáng)大的容錯性和容錯能力。它使用一個簡單的可擴(kuò)展數(shù)據(jù)模型,允許在線分析應(yīng)用程序。Flume分為OG、NG版本,其中Flume OG 的最后一個發(fā)行版本 0.94.0,之后為NG版本。
 
2、Kafka:作為一個集群運(yùn)行在一臺或多臺可以跨越多個數(shù)據(jù)中心的服務(wù)器上。在Kafka中,客戶端和服務(wù)器之間的通信是通過一種簡單的,高性能的,語言不可知的TCP協(xié)議完成的。協(xié)議是版本控制的,并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端可以使用多種語言。

3、Kafka通常用于兩大類應(yīng)用,如下:
   A、構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時流數(shù)據(jù)管道
   B、構(gòu)建實(shí)時流應(yīng)用程序,用于轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流
   C、Kafka每個記錄由一個鍵,一個值和一個時間戳組成。

二、產(chǎn)述背景
    基于大數(shù)據(jù)領(lǐng)域?qū)崿F(xiàn)日志數(shù)據(jù)時時采集及數(shù)據(jù)傳遞等需要,據(jù)此需求下試著完成flume+kafka扇入、扇出功能整合,其中扇出包括:復(fù)制流、復(fù)用流等功能性測試。后續(xù)根據(jù)實(shí)際需要,將完善kafka與spark streaming進(jìn)行整合整理工作。
    注:此文檔僅限于功能性測試,性能優(yōu)化方面請大家根據(jù)實(shí)際情況增加。

三、部署安裝
1、測試環(huán)境說明:
   操作系統(tǒng):CentOS 7
   Flume版本:flume-ng-1.6.0-cdh6.7.0
   Kafka版本:kafka_2.11-0.10.0.1
   JDK版本:JDK1.8.0
   Scala版本:2.11.8
2、測試步驟:
2.1、flume部署
2.1.1、下載安裝介質(zhì),并解壓:

此處)折疊或打開

此處)折疊或打開

此處)折疊或打開

  1. cd /app/apache-flume-1.6.0-cdh6.7.0-bin

  2. vi netcatOrKafka-memory-logger.conf

  3.     netcatagent.sources = netcat_sources

  4.     netcatagent.channels = c1 c2

  5.     netcatagent.sinks = logger_sinks kafka_sinks

  6.     

  7.     netcatagent.sources.netcat_sources.type = netcat

  8.     netcatagent.sources.netcat_sources.bind = 0.0.0.0

  9.     netcatagent.sources.netcat_sources.port = 44444

  10.     

  11.     netcatagent.channels.c1.type = memory

  12.     netcatagent.channels.c1.capacity = 1000

  13.     netcatagent.channels.c1.transactionCapacity = 100

  14.     

  15.     netcatagent.channels.c2.type = memory

  16.     netcatagent.channels.c2.capacity = 1000

  17.     netcatagent.channels.c2.transactionCapacity = 100

  18.     

  19.     netcatagent.sinks.logger_sinks.type = logger

  20.     

  21.     netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

  22.     netcatagent.sinks.kafka_sinks.topic = test

  23.     netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082

  24.     netcatagent.sinks.kafka_sinks.requiredAcks = 0

  25.     ##netcatagent.sinks.kafka_sinks.batchSize = 20

  26.     netcatagent.sinks.kafka_sinks.producer.type=sync

  27.     netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

  28.     netcatagent.sinks.kafka_sinks.partition.key=0

  29.     netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

  30.     netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

  31.     netcatagent.sinks.kafka_sinks.max.message.size=1000000

  32.     

  33.     netcatagent.sources.netcat_sources.selector.type = replicating

  34.     

  35.     netcatagent.sources.netcat_sources.channels = c1 c2

  36.     netcatagent.sinks.logger_sinks.channel = c1

  37.     netcatagent.sinks.kafka_sinks.channel = c2

2.4.2、啟動各測試命令:
   A、啟動flume的agent(于192.168.137.130):
      flume-ng agent --name netcatagent \
       --conf $FLUME_HOME/conf \
       --conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
       -Dflume.root.logger=INFO,console
    B、啟動kafka消費(fèi)者(于192.168.137.132):
        kafka-console-consumer.sh \
         --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
          --from-beginning --topic test
     C、測試發(fā)送(于192.168.137.130與于192.168.137.132)
telnet發(fā)送結(jié)果

如何解析Flume與Kafka整合

kafka消費(fèi)結(jié)果

如何解析Flume與Kafka整合

最終logger接收結(jié)果

如何解析Flume與Kafka整合

         
   至此flume+kafka扇出--復(fù)制流測試(扇入源為:netcat;輸出為:kafka+Flume的Logger)測試與驗證完成。
   
2.5、flume+kafka扇出--復(fù)用流測試(扇入源為:netcat;輸出為:kafka+Flume的Logger)

   暫無,后續(xù)補(bǔ)充



四、部署安裝及驗證過程中出現(xiàn)的問題

    1、做flume+kafka扇入測試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時,一直未收到kafka數(shù)據(jù)
        主要原因是在做kafka的配置時在配置文件(server.properties)中寫成內(nèi)容:
       zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
       但在創(chuàng)建topics時,使用的是:
       kafka-topics.sh --create \
   --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
 
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在創(chuàng)建topics的時增加了/kafka
最終使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令檢查沒有topics信息才發(fā)現(xiàn)此問題
   
   解決辦法:將兩個信息同步即可
   
2、做flume+kafka扇入測試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時,啟動flume的agent時報錯。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more


   解決辦法:官網(wǎng)資料存在問題,org.apache.flume.source.kafka,KafkaSource其中不應(yīng)該包括逗號,改為:org.apache.flume.source.kafka.KafkaSource即可。詳細(xì)官網(wǎng)
如何解析Flume與Kafka整合

關(guān)于如何解析Flume與Kafka整合就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


文章名稱:如何解析Flume與Kafka整合
本文路徑:http://weahome.cn/article/gdhhgc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部