真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Storm如何實(shí)現(xiàn)單詞計(jì)數(shù)

這篇文章主要介紹“Storm如何實(shí)現(xiàn)單詞計(jì)數(shù)”,在日常操作中,相信很多人在Storm如何實(shí)現(xiàn)單詞計(jì)數(shù)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm如何實(shí)現(xiàn)單詞計(jì)數(shù)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

為雄縣等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及雄縣網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)、雄縣網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

1. 使用mvn命令創(chuàng)建項(xiàng)目

mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1

然后編輯配置文件pom.xml,添加storm依賴 


  org.apache.storm
  storm-core
  0.9.4

最后通過下述命令來編譯項(xiàng)目,編譯正確完成后導(dǎo)入到IDE中 

mvn install

當(dāng)然,也可以在IDE中安裝maven插件,從而直接在IDE中創(chuàng)建maven項(xiàng)目 


2. 實(shí)現(xiàn)數(shù)據(jù)源,用重復(fù)的靜態(tài)語句來模擬數(shù)據(jù)源

package storm.test.v1;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {
    private String[] sentences = {
            "storm integrates with the queueing",
            "and database technologies you already use",
            "a storm topology consumes streams of data",
            "and processes those streams in arbitrarily complex ways",
            "repartitioning the streams between each stage of the computation however needed"
    };
    private int index = 0;

    private SpoutOutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }

        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
        }
    }
}

3. 實(shí)現(xiàn)語句分割bolt 

package storm.test.v1;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            this.collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

4. 實(shí)現(xiàn)單詞計(jì)數(shù)bolt 

package storm.test.v1;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap counts = null;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}

5. 實(shí)現(xiàn)上報(bào)bolt 

package storm.test.v1;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReportBolt extends BaseRichBolt {
    private HashMap counts = null;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        counts = new HashMap();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        this.counts.put(word, count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    @Override
    public void cleanup() { //本地模式下,終止topology時(shí)可以保證cleanup()被執(zhí)行
        System.out.println("--- FINAL COUNTS ---");
        List keys = new ArrayList();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("----------");
    }
}

6. 實(shí)現(xiàn)單詞計(jì)數(shù)topology 

package storm.test.v1;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout); //注冊數(shù)據(jù)源
        builder.setBolt(SPLIT_BOLT_ID, spiltBolt) //注冊bolt
                .shuffleGrouping(SENTENCE_SPOUT_ID); //該bolt訂閱spout隨機(jī)均勻發(fā)射來的數(shù)據(jù)流
        builder.setBolt(COUNT_BOLT_ID, countBolt)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); //該bolt訂閱spiltBolt發(fā)射來的數(shù)據(jù)流,并且保證"word"字段值相同的tuple會(huì)被路由到同一個(gè)countBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID); //該bolt訂閱countBolt發(fā)射來的數(shù)據(jù)流,并且所有的tuple都會(huì)被路由到唯一的一個(gè)reportBolt中

        Config config = new Config();

        //本地模式啟動(dòng)
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
        }
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

7. 運(yùn)行結(jié)果:

--- FINAL COUNTS ---
a : 302
already : 302
and : 604
arbitrarily : 302
between : 302
complex : 302
computation : 302
consumes : 302
data : 302
database : 302
each : 302
however : 302
in : 302
integrates : 302
needed : 302
of : 604
processes : 302
queueing : 302
repartitioning : 302
stage : 302
storm : 604
streams : 906
technologies : 302
the : 906
those : 302
topology : 302
use : 302
ways : 302
with : 302
you : 302
----------

到此,關(guān)于“Storm如何實(shí)現(xiàn)單詞計(jì)數(shù)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!


本文名稱:Storm如何實(shí)現(xiàn)單詞計(jì)數(shù)
當(dāng)前URL:http://weahome.cn/article/ipehph.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部