本篇文章給大家分享的是有關(guān)kafka流量監(jiān)控的原理及實(shí)現(xiàn)方法是什么,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
10多年專注成都網(wǎng)站制作,企業(yè)網(wǎng)站制作,個(gè)人網(wǎng)站制作服務(wù),為大家分享網(wǎng)站制作知識、方案,網(wǎng)站設(shè)計(jì)流程、步驟,成功服務(wù)上千家企業(yè)。為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計(jì)及定制高端網(wǎng)站建設(shè)服務(wù),專注于企業(yè)網(wǎng)站制作,高端網(wǎng)頁制作,對紗窗等多個(gè)領(lǐng)域,擁有豐富的網(wǎng)站設(shè)計(jì)經(jīng)驗(yàn)。
工程能力
作為一個(gè)優(yōu)秀的開發(fā)人員,項(xiàng)目開發(fā)的過程中監(jiān)控告警系統(tǒng)的可靠性是可以體現(xiàn)出一個(gè)人的工程管理能力的。優(yōu)秀的監(jiān)控告警系統(tǒng)可以免去很多精力消耗,比如維護(hù),故障預(yù)判,故障及時(shí)準(zhǔn)確通知,故障定位排查等。
可以想像項(xiàng)目上線后,假如沒有監(jiān)控告警系統(tǒng),這么一個(gè)暗箱是多么可怕。
對于大數(shù)據(jù)項(xiàng)目,數(shù)據(jù)一般需要先入消息隊(duì)列,如kafka,然后分離線和實(shí)時(shí)將數(shù)據(jù)進(jìn)行解耦分流,用于實(shí)時(shí)處理和離線處理。消息隊(duì)列存在的好處:
消息隊(duì)列的訂閱者可以根據(jù)需要隨時(shí)擴(kuò)展,可以很好的擴(kuò)展數(shù)據(jù)的使用者。
消息隊(duì)列的橫向擴(kuò)展,增加吞吐量,做起來還是很簡單的。這個(gè)用傳統(tǒng)數(shù)據(jù)庫,分庫分表還是很麻煩的。
由于消息隊(duì)列的存在,也可以幫助我們抗高峰,避免高峰時(shí)期后端處理壓力過大導(dǎo)致整個(gè)業(yè)務(wù)處理宕機(jī)。
kafka在大數(shù)據(jù)項(xiàng)目中作用至關(guān)重要,那么對其的監(jiān)控告警就至關(guān)重要了,我們這里主要是講針對kafka流量的監(jiān)控告警,其目的也是很明顯的便于我們了解數(shù)據(jù)的整體情況及波動情況,以調(diào)整處理后端,如spark streaming,flume等。
kafka 監(jiān)控工具很多,常見的有kafka manager,KafkaOffsetMonitor,kafka eagle,kafka tools等,浪尖最經(jīng)常使用的是kafka manager,也建議大家使用該工具,其不僅有監(jiān)控功能還有管理功能。具體使用方法可以參看:
kafka管理神器-kafkamanager
監(jiān)控指標(biāo)
kafka的指標(biāo)服務(wù)器和客戶端都有的。具體指標(biāo)內(nèi)容,可以參看kafka官網(wǎng):
http://kafka.apache.org/0102/documentation.html#monitoring
查看可用指標(biāo)的最簡單方法是啟動jconsole并將其指向正在運(yùn)行的kafka客戶端或服務(wù)器; 這將允許使用JMX瀏覽所有指標(biāo)。
對于熟悉kafka manager的朋友都應(yīng)該看過broker相關(guān)信息,比如每秒鐘的流入的消息條數(shù),每秒鐘的流入的消息大小,流出的消息大小等。
使用kafka manager可以很方便的查看。但是,這其實(shí)不能讓我們及時(shí)的發(fā)現(xiàn)數(shù)據(jù)流量波動,或者說我們想畫個(gè)曲線的詳細(xì)對比歷史流量,它是做不到的。所以,我們要想辦法去獲取出來這些指標(biāo),然后做我們自己的展示。還有一點(diǎn)就是,流量波動告警。
浪尖這里只做了圖中幾個(gè)指標(biāo)的接口:
def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption)
}
def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption)
}
def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption)
}
def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption)
}
def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption)
}
def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption)
}
jmx客戶端
連接jmx的server是可以使用jconsole,但是滿足不了我們的需求。所以,我們使用JMXConnectorFactory 方式連接jmx。使用JMXConnectorFactory 鏈接jmx時(shí),JMXServiceURL 的參數(shù) url 必須使用 service:jmx 方式進(jìn)行連接,具體鏈接創(chuàng)建方式很簡單,幾行代碼而已,如下:
val jmxHost = "hostname"
val jmxPort = 9999
val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"
val url = new JMXServiceURL(urlString)
val jmxc = JMXConnectorFactory.connect(url )
val mbsc = jmxc.getMBeanServerConnection;
println(KafkaMetrics.getMessagesInPerSec(Kafka_0_10_2_1,mbsc,Some("test")).fifteenMinuteRate)
jmxc.close()
開啟kafka的jmx端口
kafka的jmx服務(wù)默認(rèn)時(shí)關(guān)閉的,開啟的話很簡單,只需要在kafka server的啟動腳本kafka-server-start.sh里增加一行代碼即可,內(nèi)容export JMX_PORT="9999",增加位置如下:
if [ "x$KAFKA_HEAP_OPTS" = "x"]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
測試
我這里測試就比較簡單了,主要是將消息條數(shù)打出來,大家可以根據(jù)需要自行調(diào)整,比如均值大于閾值發(fā)短信告警等。
一套完整的kafka監(jiān)控,包括:
消費(fèi)者監(jiān)控,主要是存活告警,消費(fèi)滯后告警。
生產(chǎn)者監(jiān)控,主要是存活告警,生產(chǎn)者消費(fèi)上游數(shù)據(jù)能力告警。
broker監(jiān)控,主要是存活告警,流量告警,isr列表,topic異常告警,control變換告警。
以上就是kafka流量監(jiān)控的原理及實(shí)現(xiàn)方法是什么,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。