小編給大家分享一下Storm-kafka中如何實(shí)現(xiàn)一個對于kafkaBroker動態(tài)讀取的Class,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
企業(yè)建站必須是能夠以充分展現(xiàn)企業(yè)形象為主要目的,是企業(yè)文化與產(chǎn)品對外擴(kuò)展宣傳的重要窗口,一個合格的網(wǎng)站不僅僅能為公司帶來巨大的互聯(lián)網(wǎng)上的收集和信息發(fā)布平臺,創(chuàng)新互聯(lián)面向各種領(lǐng)域:高空作業(yè)車租賃等成都網(wǎng)站設(shè)計(jì)、成都營銷網(wǎng)站建設(shè)解決方案、網(wǎng)站設(shè)計(jì)等建站排名服務(wù)。
實(shí)現(xiàn)一個對于kafkaBroker 動態(tài)讀取的Class - DynamicBrokersReader
DynamicBrokersReader
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.utils.Utils; import com.mixbox.storm.kafka.trident.GlobalPartitionInformation; import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFrameworkFactory; import com.netflix.curator.retry.RetryNTimes; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; /** * 動態(tài)的Broker讀 我們維護(hù)了有一個與zk之間的連接,提供了獲取指定topic的每一個partition正在活動著的leader所對應(yīng)的broker * 這樣你有能力知道,當(dāng)前的這些topic,哪一些broker是活動的 * @author Yin Shuai */ public class DynamicBrokersReader { public static final Logger LOG = LoggerFactory .getLogger(DynamicBrokersReader.class); // 對于Client CuratorFrameWork的封裝 private CuratorFramework _curator; // 在Zk上注冊的位置 private String _zkPath; // 指定的_topic private String _topic; public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) { _zkPath = zkPath; _topic = topic; try { _curator = CuratorFrameworkFactory .newClient( zkStr, Utils.getInt(conf .get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), 15000, new RetryNTimes( Utils.getInt(conf .get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } _curator.start(); } public DynamicBrokersReader(String zkPath) { this._zkPath = zkPath; } /** * 確定指定topic下,每一個partition的leader,所對應(yīng)的 主機(jī)和端口, 并將它們存入到全部分區(qū)信息中 * */ public GlobalPartitionInformation getBrokerInfo() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(); try { // 拿到當(dāng)前的分區(qū)數(shù)目 int numPartitionsForTopic = getNumPartitions(); /** * /brokers/ids */ String brokerInfoPath = brokerPath(); // 默認(rèn)的我們的分區(qū)數(shù)目就只有 0, 1 兩個 for (int partition = 0; partition < numPartitionsForTopic; partition++) { // 這里請主要參考分區(qū)和領(lǐng)導(dǎo)者的關(guān)系 int leader = getLeaderFor(partition); // 拿到領(lǐng)導(dǎo)者以后的zookeeper路徑 String leaderPath = brokerInfoPath + "/" + leader; try { byte[] brokerData = _curator.getData().forPath(leaderPath); /** * 在這里, 我們拿到的brokerData為: * {"jmx_port":-1,"timestamp":"1403076810435" * ,"host":"192.168.50.207","version":1,"port":9092} 注意 * 這里是字節(jié)數(shù)組開始轉(zhuǎn)json */ Broker hp = getBrokerHost(brokerData); /** * 記錄好 每一個分區(qū) partition 所對應(yīng)的 Broker */ globalPartitionInformation.addPartition(partition, hp); } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { LOG.error("Node {} does not exist ", leaderPath); } } } catch (Exception e) { throw new RuntimeException(e); } LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); return globalPartitionInformation; } /** * @return 拿到指定topic下的分區(qū)數(shù)目 */ private int getNumPartitions() { try { String topicBrokersPath = partitionPath(); Listchildren = _curator.getChildren().forPath( topicBrokersPath); return children.size(); } catch (Exception e) { throw new RuntimeException(e); } } /** * @return 拿到的topic在zookeeper注冊的分區(qū)地址 * brokers/topics/storm-sentence/partitions */ public String partitionPath() { return _zkPath + "/topics/" + _topic + "/partitions"; } /** * 持有的是Broker節(jié)點(diǎn)的id號碼,這個id號是在配置的過程中為每一個Broker分配的 * @return /brokers/ids */ public String brokerPath() { return _zkPath + "/ids"; } /** * get /brokers/topics/distributedTopic/partitions/1/state { * "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, * "version":1 } * * 說明一下,在kafka之中,每一個分區(qū)都會有一個Leader,有0個或者多個的followers, 一個leader會處理這個分區(qū)的所有請求 * @param partition * @return */ private int getLeaderFor(long partition) { try { String topicBrokersPath = partitionPath(); byte[] hostPortData = _curator.getData().forPath( topicBrokersPath + "/" + partition + "/state"); @SuppressWarnings("unchecked") Map
對于以上代碼須知:
1 : 我們持有了一個ZkPath , 在Storm-kafka的class之中我們默認(rèn)的是/brokers
2 : _topic , 目前我們是針對的是Topic, 也就是說我們的partition,leader都是針對于單個Topic的
3:
1 int numPartitionsForTopic = getNumPartitions();
針對與一個Topic,首先我們要取當(dāng)前的分區(qū)數(shù),一般的情況,我們在kafka之中默認(rèn)的分區(qū)數(shù)為2
2 String brokerInfoPath = brokerPath();
拿到 /brokers/ids 的分區(qū)號
3: for (int partition = 0; partition < numPartitionsForTopic; partition++) {
依次的遍歷每一個分區(qū)
4:int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);
再通過分區(qū)拿到領(lǐng)導(dǎo)者,以及領(lǐng)導(dǎo)者的路徑,最后拿到領(lǐng)導(dǎo)者的數(shù)據(jù):
我們舉一個小例子
* 在這里, 我們拿到的brokerData為:
* {"jmx_port":-1,"timestamp":"1403076810435"
* ,"host":"192.168.50.207","version":1,"port":9092}
4:Broker hp = getBrokerHost(brokerData);
拿到某一個Topic自己的分區(qū)在kafka所對應(yīng)的Broker,并且其封裝到 globalPartitionInformation
5 globalPartitionInformation.addPartition(partition, hp);
GlobalPartitionInformaton底層維護(hù)了一個HashMap
簡單的來說:DynamicBrokersReader 針對某一個Topic維護(hù)了 每一個分區(qū) partition 所對應(yīng)的 Broker
以上是“Storm-kafka中如何實(shí)現(xiàn)一個對于kafkaBroker動態(tài)讀取的Class”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!