這篇文章主要介紹了Kafka集群優(yōu)化的方法是什么的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Kafka集群優(yōu)化的方法是什么文章都會(huì)有所收獲,下面我們一起來看看吧。
創(chuàng)新互聯(lián)是一家專業(yè)提供富拉爾基企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、H5建站、小程序制作等業(yè)務(wù)。10年已為富拉爾基眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
背景
個(gè)推作為專業(yè)的數(shù)據(jù)智能服務(wù)商,已經(jīng)成功服務(wù)了數(shù)十萬APP,每日的消息下發(fā)量達(dá)百億級(jí)別,由此產(chǎn)生了海量日志數(shù)據(jù)。為了應(yīng)對(duì)業(yè)務(wù)上的各種需求,我們需要采集并集中化日志進(jìn)行計(jì)算,為此個(gè)推選用了高可用的、高可靠的、分布式的Flume系統(tǒng)以對(duì)海量日志進(jìn)行采集、聚合和傳輸。此外,個(gè)推也不斷對(duì)Flume進(jìn)行迭代升級(jí),以實(shí)現(xiàn)自己對(duì)日志的特定需求。
原有的異地機(jī)房日志匯聚方式,整個(gè)流程相對(duì)來說比較簡(jiǎn)單,A機(jī)房業(yè)務(wù)產(chǎn)生的日志通過多種方式寫入該機(jī)房Kafka集群,然后B機(jī)房的Flume通過網(wǎng)絡(luò)專線實(shí)時(shí)消費(fèi)A機(jī)房Kafka的日志數(shù)據(jù)后寫入本機(jī)房的Kafka集群,所有機(jī)房的數(shù)據(jù)就是通過相同方式在B機(jī)房Kakfa集群中集中化管理。如圖一所示:
圖一:原有異地日志傳輸模式
但是隨著業(yè)務(wù)量的不斷增加,日志數(shù)據(jù)在逐漸增多的過程中對(duì)帶寬要求變高,帶寬的瓶頸問題日益凸顯。按照1G的專線帶寬成本2~3w/月來計(jì)算,一個(gè)異地機(jī)房一年僅專線帶寬擴(kuò)容成本就高達(dá)30w以上。對(duì)此,如何找到一種成本更加低廉且符合當(dāng)前業(yè)務(wù)預(yù)期的傳輸方案呢?Avro有快速壓縮的二進(jìn)制數(shù)據(jù)形式,并能有效節(jié)約數(shù)據(jù)存儲(chǔ)空間和網(wǎng)絡(luò)傳輸帶寬,從而成為優(yōu)選方案。
優(yōu)化思路
Avro簡(jiǎn)介
Avro是一個(gè)數(shù)據(jù)序列化系統(tǒng)。它是Hadoop的一個(gè)子項(xiàng)目,也是Apache的一個(gè)獨(dú)立的項(xiàng)目,其主要特點(diǎn)如下:
● 豐富的數(shù)據(jù)結(jié)構(gòu);
● 可壓縮、快速的二進(jìn)制數(shù)據(jù)類型;
● 可持久化存儲(chǔ)的文件類型;
● 遠(yuǎn)程過程調(diào)用(RPC);
● 提供的機(jī)制使動(dòng)態(tài)語言可以方便地處理數(shù)據(jù)。
具體可參考官方網(wǎng)站:http://avro.apache.org/
Flume Avro方案
Flume的RPC Source是Avro Source,它被設(shè)計(jì)為高擴(kuò)展的RPC服務(wù)端,能從其他Flume Agent 的Avro Sink或者Flume SDK客戶端,接收數(shù)據(jù)到Flume Agent中,具體流程如圖二所示:
圖二:Avro Source流程
針對(duì)該模式,我們的日志傳輸方案計(jì)劃變更為A機(jī)房部署Avro Sink用以消費(fèi)該機(jī)房Kafka集群的日志數(shù)據(jù),壓縮后發(fā)送到B機(jī)房的Avro Source,然后解壓寫入B機(jī)房的Kafka集群,具體的傳輸模式如圖三所示:
圖三:Flume Avro傳輸模式
可能存在的問題
我們預(yù)估可能存在的問題主要有以下三點(diǎn):
● 當(dāng)專線故障的時(shí)候,數(shù)據(jù)是否能保證完整性;
● 該模式下CPU和內(nèi)存等硬件的消耗評(píng)估;
● 傳輸性能問題。
驗(yàn)證情況
針對(duì)以上的幾個(gè)問題,我們做了幾項(xiàng)對(duì)比實(shí)驗(yàn)。
環(huán)境準(zhǔn)備情況說明:
1. 兩臺(tái)服務(wù)器192.168.10.81和192.168.10.82,以及每臺(tái)服務(wù)器上對(duì)應(yīng)一個(gè)Kakfa集群,模擬A機(jī)房和B機(jī)房;
2. 兩個(gè)Kafka集群中對(duì)應(yīng)topicA(源端)和topicB(目標(biāo)端)。在topicA中寫入合計(jì)大小11G的日志數(shù)據(jù)用來模擬原始端日志數(shù)據(jù)。
3. 192.168.10.82上部署一個(gè)Flume,模擬原有傳輸方式。
4. 192.168.10.81服務(wù)器部署Avro Sink,192.168.10.82部署Avro Source,模擬Flume Avro傳輸模式。
原有Flume模式驗(yàn)證(非Avro)
監(jiān)控Kafka消費(fèi)情況:
81流量統(tǒng)計(jì):
82流量統(tǒng)計(jì):
消費(fèi)全部消息耗時(shí):20min
消費(fèi)總?cè)罩緱l數(shù)統(tǒng)計(jì):129,748,260
總流量:13.5G
Avro模式驗(yàn)證
配置說明:
Avro Sink配置:
#kafkasink 是kafkatokafka的sinks的名字,可配多個(gè),空格分開kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSourcekafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect =192.168.10.81:2181kafkatokafka.sources.kafka_dmc_bullet.topic = topicAkafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms =150000kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms =10000kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavrokafkatokafka.sources.kafka_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet的配置,可配置多個(gè)sink提高壓縮傳輸效率kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSinkkafkatokafka.sinks.kafkasink_dmc_bullet.hostname =192.168.10.82kafkatokafka.sinks.kafkasink_dmc_bullet.port =55555//與source的rpc端口一一對(duì)應(yīng)kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate//壓縮模式kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level =6//壓縮率1~9kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet配的channel,只配一個(gè)kafkatokafka.channels.channel_dmc_bullet.type = memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000#kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000#kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =5000kafkatokafka.channels.channel_dmc_bullet.keep-alive =60
Avro Source配置:
#kafkasink 是kafkatokafka的sinks的名字,可配多個(gè),空格分開kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type= avrokafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.bind =0.0.0.0kafkatokafka.sources.kafka_dmc_bullet.port =55555//rpc端口綁定kafkatokafka.sources.kafka_dmc_bullet.compression-type= deflate//壓縮模式kafkatokafka.sources.kafka_dmc_bullet.batchSize =100#source kafkasink_dmc_bullet的配置kafkatokafka.sinks.kafkasink_dmc_bullet.type= org.apache.flume.sink.kafka.KafkaSinkkafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitionerkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicBkafkatokafka.sinks.kafkasink_dmc_bullet.brokerList =192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =500kafkatokafka.channels.channel_dmc_bullet.type= memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =1000
監(jiān)控Kafka消費(fèi)情況
81流量統(tǒng)計(jì):
82流量統(tǒng)計(jì):
消費(fèi)全部消息耗時(shí):26min
消費(fèi)總?cè)罩緱l數(shù)統(tǒng)計(jì):129,748,260
總流量:1.69G
故障模擬
1. 模擬專線故障,在A、B兩機(jī)房不通的情況下,Avro Sink報(bào)錯(cuò)如下:
2. 監(jiān)控Kafka消費(fèi)情況,發(fā)現(xiàn)消費(fèi)者已停止消費(fèi):
3. 故障處理恢復(fù)后繼續(xù)消費(fèi)剩余日志,經(jīng)統(tǒng)計(jì),總?cè)罩緱l數(shù)為:129,747,255。
結(jié)論
1. 當(dāng)專線發(fā)生故障時(shí),正在網(wǎng)絡(luò)傳輸中的通道外數(shù)據(jù)可能會(huì)有少部分丟失,其丟失原因?yàn)榫W(wǎng)絡(luò)原因,與Avro模式無關(guān);故障后停止消費(fèi)的數(shù)據(jù)不會(huì)有任何的丟失問題,由于網(wǎng)絡(luò)原因丟失的數(shù)據(jù)需要評(píng)估其重要性以及是否需要補(bǔ)傳。
2. 流量壓縮率達(dá)80%以上,同時(shí)我們也測(cè)試了等級(jí)為1~9的壓縮率,6跟9非常接近,CPU和內(nèi)存的使用率與原有傳輸模式相差不大,帶寬的優(yōu)化效果比較明顯。
3. 傳輸性能由于壓縮的原因適當(dāng)變?nèi)?,單Sink由原先20分鐘延長至26分鐘,可適當(dāng)增加Sink的個(gè)數(shù)來提高傳輸速率。
生產(chǎn)環(huán)境實(shí)施結(jié)果
實(shí)施結(jié)果如下:
1. 由于還有其它業(yè)務(wù)的帶寬占用,總帶寬使用率節(jié)省了50%以上,現(xiàn)階段高峰期帶寬速率不超過400Mbps;
2. 每個(gè)Sink傳輸速率的極限大概是3000條每秒,壓縮傳輸速率問題通過增加Sink的方式解決,但會(huì)適當(dāng)增加CPU和內(nèi)存的損耗。
關(guān)于“Kafka集群優(yōu)化的方法是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Kafka集群優(yōu)化的方法是什么”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。