小編給大家分享一下storm集群WordCount的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
十年的山陰網(wǎng)站建設經驗,針對設計、前端、開發(fā)、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。成都全網(wǎng)營銷推廣的優(yōu)勢是能夠根據(jù)用戶設備顯示端的尺寸不同,自動調整山陰建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設計,從而大程度地提升瀏覽體驗。創(chuàng)新互聯(lián)從事“山陰網(wǎng)站設計”,“山陰網(wǎng)站推廣”以來,每個客戶項目都認真落實執(zhí)行。
storm集群實例運行
storm本地運行只需要storm的jar包就可以了,結果可以在控制臺直接看到,storm集群運行,結果要在log日志里看,或者存儲下來。并且,集群運行,execute方法里的輸出可以看到,但是cleanup里的輸出是看不到的,因為cleanup只有在topology結束后才會執(zhí)行,而storm是實時連續(xù)的運行的,所以輸出放在execute里或者保存起來查看。
wordcount實例代碼
代碼在前面的博客里已經寫了只是將WordCounter做了點修改
package com.storm.stormDemo; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; import com.storm.stormTest.MergeObjects; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt { public static Logger LOG = Logger.getLogger(WordCounter.class); Integer id; String name; Mapcounters; private OutputCollector collector; BufferedWriter output; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap (); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); try { output = new BufferedWriter(new FileWriter("/home/zhanghuan/Downloads/wordcount.txt" , true)); } catch (IOException e) { // TODO Auto-generated catch block try { output.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); } } public void execute(Tuple input) { String str = input.getString(0); if (!counters.containsKey(str)) { counters.put(str, 1); } else { Integer c = counters.get(str) + 1; counters.put(str, c); } Iterator iterator = counters.keySet().iterator(); while(iterator.hasNext()){ String next = iterator.next(); try { System.out.print(next + ":" + counters.get(next) + " "); output.write(next + ":" + counters.get(next) + " "); output.flush(); } catch (IOException e) { e.printStackTrace(); try { output.close(); } catch (IOException e1) { e1.printStackTrace(); } } } // 確認成功處理一個tuple collector.ack(input); } /** * Topology執(zhí)行完畢的清理工作,比如關閉連接、釋放資源等操作都會寫在這里 * 因為這只是個Demo,我們用它來打印我們的計數(shù)器 * */ public void cleanup() { LOG.info("-- Word Counter [" + name + "-" + id + "] --"); for (Map.Entry entry : counters.entrySet()) { LOG.info(entry.getKey() + ": " + entry.getValue()); } counters.clear(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub //declarer.declare(new Fields("word","number")); } public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
集群運行
storm jar StormDemo.jar com.storm.stormDemo.WordCountTopologyMain StormDemo /home/zhanghuan/Downloads/test.txt
注意:主函數(shù)路徑要寫全
如果集群報錯如下:
則打開你打的第三方jar包文件夾,在里面找到storm-core-0.10.0.jar,刪除這個jar包里的default.yarml文件,或則刪掉你打的storm jar包。
topology提交后,會啟動相應數(shù)量的worker進程和logwriter進程,ui界面上也能看到這個topology的運行
這時候你就可以查看log日志文件或者存儲位置,查看結果。
沒有數(shù)據(jù)輸入的時候,日志就像最下方一樣,保持通信。
停止topology運行
storm kill topology的名字
看完了這篇文章,相信你對“storm集群WordCount的示例分析”有了一定的了解,如果想了解更多相關知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!