本篇內(nèi)容主要講解“Storm怎么實(shí)現(xiàn)單詞計(jì)數(shù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Storm怎么實(shí)現(xiàn)單詞計(jì)數(shù)”吧!
“只有客戶發(fā)展了,才有我們的生存與發(fā)展!”這是創(chuàng)新互聯(lián)的服務(wù)宗旨!把網(wǎng)站當(dāng)作互聯(lián)網(wǎng)產(chǎn)品,產(chǎn)品思維更注重全局思維、需求分析和迭代思維,在網(wǎng)站建設(shè)中就是為了建設(shè)一個(gè)不僅審美在線,而且實(shí)用性極高的網(wǎng)站。創(chuàng)新互聯(lián)對成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、網(wǎng)站制作、網(wǎng)站開發(fā)、網(wǎng)頁設(shè)計(jì)、網(wǎng)站優(yōu)化、網(wǎng)絡(luò)推廣、探索永無止境。
在上一次單詞計(jì)數(shù)的基礎(chǔ)上做如下改動(dòng): 使用 自定義 分組策略,將首字母相同的單詞發(fā)送給同一個(gè)task計(jì)數(shù)
自定義 CustomStreamGrouping
package com.zhch.v4; import backtype.storm.generated.GlobalStreamId; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.WorkerTopologyContext; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ModuleGrouping implements CustomStreamGrouping, Serializable { private Listtasks; @Override public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List targetTasks) { this.tasks = targetTasks; } @Override public List chooseTasks(int taskId, List
數(shù)據(jù)源spout
package com.zhch.v4; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.io.BufferedReader; import java.io.FileReader; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class SentenceSpout extends BaseRichSpout { private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMappending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); } }
實(shí)現(xiàn)語句分割bolt
package com.zhch.v4; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { collector.emit(tuple, new Values(word)); } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
實(shí)現(xiàn)單詞計(jì)數(shù)bolt
package com.zhch.v4; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import java.io.BufferedWriter; import java.io.FileWriter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMapcounts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap (); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); List keys = new ArrayList (); keys.addAll(this.counts.keySet()); Collections.sort(keys); for (String key : keys) { Long c = this.counts.get(key); writer.write(key + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); } }
實(shí)現(xiàn)單詞計(jì)數(shù)topology
package com.zhch.v4; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; public class WordCountTopology { public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v4"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 2) .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定義 分組策略 Config config = new Config(); config.put("wordsFile", args[0]); if (args != null && args.length > 1) { config.setNumWorkers(2); //集群模式啟動(dòng) StormSubmitter.submitTopology(args[1], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } } }
提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4
運(yùn)行結(jié)果:
[grid@hadoop5 stormData]$ cat result.txt Apache : 1 ETL : 1 It : 1 Storm : 4 a : 4 analytics : 1 and : 5 any : 1 at : 1 can : 1 cases: : 1 clocked : 1 computation : 2 continuous : 1 easy : 2 guarantees : 1 is : 6 it : 2 machine : 1 makes : 1 many : 1 million : 1 more : 1 of : 2 online : 1 open : 1 operate : 1 over : 1 scalable : 1 second : 1 set : 1 simple : 1 source : 1 streams : 1 system : 1 unbounded : 1 up : 1 use : 2 used : 1 what : 1 will : 1 with : 1 your : 1 [grid@hadoop6 stormData]$ cat result.txt Hadoop : 1 RPC : 1 batch : 1 be : 2 benchmark : 1 data : 2 did : 1 distributed : 2 doing : 1 fast: : 1 fault-tolerant : 1 for : 2 free : 1 fun : 1 has : 1 language : 1 learning : 1 lot : 1 node : 1 per : 2 process : 1 processed : 2 processing : 2 programming : 1 realtime : 3 reliably : 1 to : 3 torm : 1 tuples : 1
到此,相信大家對“Storm怎么實(shí)現(xiàn)單詞計(jì)數(shù)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!