真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flume如何整合kafka

這篇文章主要為大家展示了“Flume如何整合kafka”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Flume如何整合kafka”這篇文章吧。

成都做網(wǎng)站、成都網(wǎng)站制作的開(kāi)發(fā),更需要了解用戶,從用戶角度來(lái)建設(shè)網(wǎng)站,獲得較好的用戶體驗(yàn)。成都創(chuàng)新互聯(lián)多年互聯(lián)網(wǎng)經(jīng)驗(yàn),見(jiàn)的多,溝通容易、能幫助客戶提出的運(yùn)營(yíng)建議。作為成都一家網(wǎng)絡(luò)公司,打造的就是網(wǎng)站建設(shè)產(chǎn)品直銷的概念。選擇成都創(chuàng)新互聯(lián),不只是建站,我們把建站作為產(chǎn)品,不斷的更新、完善,讓每位來(lái)訪用戶感受到浩方產(chǎn)品的價(jià)值服務(wù)。

Using Kafka with Flume

在CDH 5.2.0 及更高的版本中, Flume 包含一個(gè)Kafka source and sink。使用它們可以讓數(shù)據(jù)從Kafka流入Hadoop或者從任何Flume source 流入Kafka。

     重要提示:不能配置一個(gè)Kafka source發(fā)送數(shù)據(jù)到 a Kafka sink.如果這么做, the Kafka source sets the topic in the event header, overriding the sink configuration and creating an infinite loop, sending messages back and forth between the source and sink. If you need to use both a source and a sink, use an interceptor to modify the event header and set a different topic.

Kafka Source

使用Kafka source 讓數(shù)據(jù)從Kafka topics 流入 Hadoop. The Kafka source 可以與任何Flume sink合并, 這樣很容易把數(shù)據(jù)從 Kafka 寫到 HDFS, HBase, 以及Solr.

下面的 Flume 配置示例,是使用 Kafka source 發(fā)送數(shù)據(jù)到 HDFS sink:

tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1
 
 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = zk01.example.com:2181
 tier1.sources.source1.topic = weblogs
 tier1.sources.source1.groupId = flume
 tier1.sources.source1.channels = channel1
 tier1.sources.source1.interceptors = i1
 tier1.sources.source1.interceptors.i1.type = timestamp
 tier1.sources.source1.kafka.consumer.timeout.ms = 100
 
 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000
 
 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
 tier1.sinks.sink1.hdfs.rollInterval = 5
 tier1.sinks.sink1.hdfs.rollSize = 0
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel = channel1

為了更高的吞吐量, 可以配置多個(gè)Kafka sources讀取一個(gè) topic.如果所有sources配置一個(gè)相同的groupID, 并且topic 有多個(gè)分區(qū), 設(shè)置每一個(gè)source 從不同的分區(qū)讀取數(shù)據(jù),就可以改善效率.

下面的列表描述Kafka source 支持的參數(shù); 必須的參數(shù)使用粗體列出.

Table 1. Kafka Source Properties  

Property NameDefault ValueDescription
type 必須設(shè)置為org.apache.flume.source.kafka.KafkaSource.
zookeeperConnect The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181).
topic source 讀取消息的Kafka topic。 Flume 每個(gè)source只支持一個(gè) topic.。
groupIDflumeThe unique identifier of the Kafka consumer group. Set the same groupID in all sources to indicate that they belong to the same consumer group.
batchSize1000向channel寫入消息的最多條數(shù)
batchDurationMillis1000向channel書寫的最大時(shí)間 (毫秒)  。 
其他Kafka consumer  支持的屬性 通過(guò)Kafka source配置Kafka consumer??梢允褂萌魏蝐onsumer 支持的屬性。 Prepend the consumer property name with the prefix kafka. (for example, kafka.fetch.min.bytes). See the Kafka documentation for the full list of Kafka consumer properties.

調(diào)優(yōu)

Kafka source 重寫了兩個(gè)Kafka consumer 的屬性:

  1. auto.commit.enable 設(shè)置為 false by the source, and every batch is committed. 為了改善性能, 設(shè)置為 true 改為使用 kafka.auto.commit.enable。 這個(gè)可能會(huì)丟失數(shù)據(jù) if the source goes down before committing.

  2. consumer.timeout.ms設(shè)置為 10, so when Flume polls Kafka for new data, it waits no more than 10 ms for the data to be available. Setting this to a higher value can reduce CPU utilization due to less frequent polling, but introduces latency in writing batches to the channel.

Kafka Sink

使用Kafka sink 從一個(gè) Flume source發(fā)送數(shù)據(jù)到 Kafka . You can use the Kafka sink in addition to Flume sinks such as HBase or HDFS.

The following Flume configuration example uses a Kafka sink with an exec source:

tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1
 
 tier1.sources.source1.type = exec
 tier1.sources.source1.command = /usr/bin/vmstat 1
 tier1.sources.source1.channels = channel1
 
 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000
 
 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
 tier1.sinks.sink1.topic = sink1
 tier1.sinks.sink1.brokerList = kafka01.example.com:9092,kafka02.example.com:9092
 tier1.sinks.sink1.channel = channel1
 tier1.sinks.sink1.batchSize = 20

The following table describes parameters the Kafka sink supports; required properties are listed in bold.

Table 2. Kafka Sink Properties  

Property NameDefault ValueDescription
type 必須設(shè)置為: org.apache.flume.sink.kafka.KafkaSink.
brokerList The brokers the Kafka sink uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.
topicdefault-flume-topicThe Kafka topic to which messages are published by default. If the event header contains a topic field, the event is published to the designated topic, overriding the configured topic.
batchSize100The number of messages to process in a single batch. Specifying a larger batchSize can improve throughput and increase latency.
requiredAcks1The number of replicas that must acknowledge a message before it is written successfully. Possible values are 0 (do not wait for an acknowledgement), 1 (wait for the leader to acknowledge only), and -1 (wait for all replicas to acknowledge). To avoid potential loss of data in case of a leader failure, set this to -1.
其他Kafka producer所支持的屬性 Used to configure the Kafka producer used by the Kafka sink. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties.

Kafka sink 使用 topic 以及 key properties from the FlumeEvent headers to determine where to send events in Kafka. If the header contains the topic property, that event is sent to the designated topic, overriding the configured topic. If the header contains the key property, that key is used to partition events within the topic. Events with the same key are sent to the same partition. If the key parameter is not specified, events are distributed randomly to partitions. Use these properties to control the topics and partitions to which events are sent through the Flume source or interceptor.

Kafka Channel

CDH 5.3 以及更高的版本包含一個(gè)Kafka channel to Flume in addition to the existing memory and file channels. 可以使用Kafka channel:

  • To write to Hadoop directly from Kafka without using a source.不使用source,從Kafka直接向hadoop中寫數(shù)據(jù)。

  • To write to Kafka directly from Flume sources without additional buffering.不使用額外的緩沖區(qū)直接從Flume source向Kafka寫數(shù)據(jù)。

  • As a reliable and highly available channel for any source/sink combination.可以與任何source/sink結(jié)合。

如下的 Flume 配置使用了一個(gè)Kafka channel 以及一個(gè)exec source 和 hdfs sink:  

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = exec
tier1.sources.source1.command = /usr/bin/vmstat 1
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.brokerList = kafka02.example.com:9092,kafka03.example.com:9092
tier1.channels.channel1.topic = channel2
tier1.channels.channel1.zookeeperConnect = zk01.example.com:2181
tier1.channels.channel1.parseAsFlumeEvent = true

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

下面的列表描述了Kafka channel 所支持的參數(shù); 粗體為必要參數(shù).

Table 3. Kafka Channel Properties  

Property NameDefault ValueDescription
type 必須設(shè)置為:org.apache.flume.channel.kafka.KafkaChannel.
brokerList The brokers the Kafka channel uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.
zookeeperConnect The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181).
topicflume-channelThe Kafka topic the channel will use.
groupIDflumeThe unique identifier of the Kafka consumer group the channel uses to register with Kafka.
parseAsFlumeEventtrueSet to true if a Flume source is writing to the channel and expects AvroDataums with the FlumeEvent schema (org.apache.flume.source.avro.AvroFlumeEvent) in the channel. Set to false if other producers are writing to the topic that the channel is using.
readSmallestOffsetfalseIf true, reads all data in the topic. If false, reads only data written after the channel has started. Only used when parseAsFlumeEvent is false.
kafka.consumer.timeout.ms100當(dāng)向sink寫數(shù)據(jù)時(shí)輪詢的間隔時(shí)間.
其他Kafka producer所支持的屬性 Used to configure the Kafka producer. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties.

<< Using Kafka with Spark Streaming?2015 Cloudera, Inc. All rights reservedAdditional Information >>

Terms and Conditions  Privacy Policy

以上是“Flume如何整合kafka”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


當(dāng)前名稱:Flume如何整合kafka
文章來(lái)源:http://weahome.cn/article/gddpph.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部