這篇文章主要介紹“Storm的Topology怎么配置”,在日常操作中,相信很多人在Storm的Topology怎么配置問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm的Topology怎么配置”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名與空間、網(wǎng)頁空間、營銷軟件、網(wǎng)站建設、臨洮網(wǎng)站維護、網(wǎng)站推廣。
設計一個拓撲時,你要做的最重要的事情之一就是定義如何在各組件之間交換數(shù)據(jù)(數(shù)據(jù)流是如何被bolts消費的)。一個數(shù)據(jù)流組指定了每個bolt會消費哪些數(shù)據(jù)流,以及如何消費它們。
NOTE:一個節(jié)點能夠發(fā)布一個以上的數(shù)據(jù)流,一個數(shù)據(jù)流組允許我們選擇接收哪個。
數(shù)據(jù)流組在定義拓撲時設置,就像我們在第二章看到的:
··· builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); ···
在前面的代碼塊里,一個bolt由TopologyBuilder對象設定, 然后使用隨機數(shù)據(jù)流組指定數(shù)據(jù)源。數(shù)據(jù)流組通常將數(shù)據(jù)源組件的ID作為參數(shù),取決于數(shù)據(jù)流組的類型不同還有其它可選參數(shù)。
NOTE:每個InputDeclarer可以有一個以上的數(shù)據(jù)源,而且每個數(shù)據(jù)源可以分到不同的組。
隨機流組是最常用的數(shù)據(jù)流組。它只有一個參數(shù)(數(shù)據(jù)源組件),并且數(shù)據(jù)源會向隨機選擇的bolt發(fā)送元組,保證每個消費者收到近似數(shù)量的元組。
隨機數(shù)據(jù)流組用于數(shù)學計算這樣的原子操作。然而,如果操作不能被隨機分配,就像第二章為單詞計數(shù)的例子,你就要考慮其它分組方式了。
域數(shù)據(jù)流組允許你基于元組的一個或多個域控制如何把元組發(fā)送給bolts。它保證擁有相同域組合的值集發(fā)送給同一個bolt?;氐絾卧~計數(shù)器的例子,如果你用word域為數(shù)據(jù)流分組,word-normalizer bolt將只會把相同單詞的元組發(fā)送給同一個word-counterbolt實例。
··· builder.setBolt("word-counter", new WordCounter(),2) .fieldsGrouping("word-normalizer", new Fields("word")); ···
NOTE: 在域數(shù)據(jù)流組中的所有域集合必須存在于數(shù)據(jù)源的域聲明中。
全部數(shù)據(jù)流組,為每個接收數(shù)據(jù)的實例復制一份元組副本。這種分組方式用于向bolts發(fā)送信號。比如,你要刷新緩存,你可以向所有的bolts發(fā)送一個刷新緩存信號。在單詞計數(shù)器的例子里,你可以使用一個全部數(shù)據(jù)流組,添加清除計數(shù)器緩存的功能(見拓撲示例)
public void execute(Tuple input) { String str = null; try{ if(input.getSourceStreamId().equals("signals")){ str = input.getStringByField("action"); if("refreshCache".equals(str)) counters.clear(); } }catch (IllegalArgumentException e){ //什么也不做 } ··· }
我們添加了一個if分支,用來檢查源數(shù)據(jù)流。Storm允許我們聲明具名數(shù)據(jù)流(如果你不把元組發(fā)送到一個具名數(shù)據(jù)流,默認發(fā)送到名為”default“的數(shù)據(jù)流)。這是一個識別元組的極好的方式,就像這個例子中,我們想識別signals一樣。 在拓撲定義中,你要向word-counter bolt添加第二個數(shù)據(jù)流,用來接收從signals-spout數(shù)據(jù)流發(fā)送到所有bolt實例的每一個元組。
builder.setBolt("word-counter", new WordCounter(),2) .fieldsGroupint("word-normalizer",new Fields("word")) .allGrouping("signals-spout","signals");
signals-spout的實現(xiàn)請參考git倉庫。
你可以通過實現(xiàn)backtype.storm.grouping.CustormStreamGrouping接口創(chuàng)建自定義數(shù)據(jù)流組,讓你自己決定哪些bolt接收哪些元組。
讓我們修改單詞計數(shù)器示例,使首字母相同的單詞由同一個bolt接收。
public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public ListchooseTasks(List
這是一個CustomStreamGrouping的簡單實現(xiàn),在這里我們采用單詞首字母字符的整數(shù)值與任務數(shù)的余數(shù),決定接收元組的bolt。
按下述方式word-normalizer修改即可使用這個自定義數(shù)據(jù)流組。
builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping());
這是一個特殊的數(shù)據(jù)流組,數(shù)據(jù)源可以用它決定哪個組件接收元組。與前面的例子類似,數(shù)據(jù)源將根據(jù)單詞首字母決定由哪個bolt接收元組。要使用直接數(shù)據(jù)流組,在WordNormalizer bolt中,使用emitDirect方法代替emit。
public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //對元組做出應答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } }
在prepare方法中計算任務數(shù)
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); }
在拓撲定義中指定數(shù)據(jù)流將被直接分組:
builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer");
全局數(shù)據(jù)流組把所有數(shù)據(jù)源創(chuàng)建的元組發(fā)送給單一目標實例(即擁有最低ID的任務)。
寫作本書時(Stom0.7.1版),這個數(shù)據(jù)流組相當于隨機數(shù)據(jù)流組。也就是說,使用這個數(shù)據(jù)流組時,并不關心數(shù)據(jù)流是如何分組的。
到目前為止,你已經(jīng)用一個叫做LocalCluster的工具在你的本地機器上運行了一個拓撲。Storm的基礎工具,使你能夠在自己的計算機上方便的運行和調(diào)試不同的拓撲。但是你怎么把自己的拓撲提交給運行中的Storm集群呢?Storm有一個有趣的功能,在一個真實的集群上運行自己的拓撲是很容易的事情。要實現(xiàn)這一點,你需要把LocalCluster換成StormSubmitter并實現(xiàn)submitTopology方法, 它負責把拓撲發(fā)送給集群。
下面是修改后的代碼:
//LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, //builder.createTopology()); StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf, builder.createTopology()); //Thread.sleep(1000); //cluster.shutdown();
NOTE: 當你使用StormSubmitter時,你就不能像使用LocalCluster時一樣通過代碼控制集群了。
接下來,把源碼壓縮成一個jar包,運行Storm客戶端命令,把拓撲提交給集群。如果你已經(jīng)使用了Maven, 你只需要在命令行進入源碼目錄運行:mvn package。
現(xiàn)在你生成了一個jar包,使用storm jar命令提交拓撲(關于如何安裝Storm客戶端請參考附錄A)。命令格式:storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3。
對于這個例子,在拓撲工程目錄下面運行:
storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt
通過這些命令,你就把拓撲發(fā)布集群上了。
如果想停止或殺死它,運行:
storm kill Count-Word-Topology-With-Refresh-Cache
NOTE:拓撲名稱必須保證惟一性。
NOTE:如何安裝Storm客戶端,參考附錄A
有一種特殊的拓撲類型叫做分布式遠程過程調(diào)用(DRPC),它利用Storm的分布式特性執(zhí)行遠程過程調(diào)用(RPC)(見下圖)。Storm提供了一些用來實現(xiàn)DRPC的工具。第一個是DRPC服務器,它就像是客戶端和Storm拓撲之間的連接器,作為拓撲的spout的數(shù)據(jù)源。它接收一個待執(zhí)行的函數(shù)和函數(shù)參數(shù),然后對于函數(shù)操作的每一個數(shù)據(jù)塊,這個服務器都會通過拓撲分配一個請求ID用來識別RPC請求。拓撲執(zhí)行最后的bolt時,它必須分配RPC請求ID和結果,使DRPC服務器把結果返回正確的客戶端。
NOTE:單實例DRPC服務器能夠執(zhí)行許多函數(shù)。每個函數(shù)由一個惟一的名稱標識。
Storm提供的第二個工具(已在例子中用過)是LineDRPCTopologyBuilder,一個輔助構建DRPC拓撲的抽象概念。生成的拓撲創(chuàng)建DRPCSpouts——它連接到DRPC服務器并向拓撲的其它部分分發(fā)數(shù)據(jù)——并包裝bolts,使結果從最后一個bolt返回。依次執(zhí)行所有添加到LinearDRPCTopologyBuilder對象的bolts。
作為這種類型的拓撲的一個例子,我們創(chuàng)建了一個執(zhí)行加法運算的進程。雖然這是一個簡單的例子,但是這個概念可以擴展到復雜的分布式計算。
bolt按下面的方式聲明輸出:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","result")); }
因為這是拓撲中惟一的bolt,它必須發(fā)布RPC ID和結果。execute方法負責執(zhí)行加法運算。
public void execute(Tuple input) { String[] numbers = input.getString(1).split("\\+"); Integer added = 0; if(numbers.length<2){ throw new InvalidParameterException("Should be at least 2 numbers"); } for(String num : numbers){ added += Integer.parseInt(num); } collector.emit(new Values(input.getValue(0),added)); }
包含加法bolt的拓撲定義如下:
public static void main(String[] args) { LocalDRPC drpc = new LocalDRPC(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add"); builder.addBolt(AdderBolt(),2); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpcder-topology", conf, builder.createLocalTopology(drpc)); String result = drpc.execute("add", "1+-1"); checkResult(result,0); result = drpc.execute("add", "1+1+5+10"); checkResult(result,17); cluster.shutdown(); drpc.shutdown(); }
創(chuàng)建一個LocalDRPC對象在本地運行DRPC服務器。接下來,創(chuàng)建一個拓撲構建器(譯者注:LineDRpctopologyBuilder對象),把bolt添加到拓撲。運行DRPC對象(LocalDRPC對象)的execute方法測試拓撲。
NOTE:使用DRPCClient類連接遠程DRPC服務器。DRPC服務器暴露了Thrift API,因此可以跨語言編程;并且不論是在本地還是在遠程運行DRPC服務器,它們的API都是相同的。 對于采用Storm配置的DRPC配置參數(shù)的Storm集群,調(diào)用構建器對象的createRemoteTopology向Storm集群提交一個拓撲,而不是調(diào)用createLocalTopology。
到此,關于“Storm的Topology怎么配置”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
本文標題:Storm的Topology怎么配置
標題鏈接:http://weahome.cn/article/pdceeh.html