真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Storm分布式RPC怎么配置

這篇文章主要介紹“Storm分布式RPC怎么配置”,在日常操作中,相信很多人在Storm分布式RPC怎么配置問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Storm分布式RPC怎么配置”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

創(chuàng)新互聯(lián)長(zhǎng)期為近1000家客戶(hù)提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為龍鳳企業(yè)提供專(zhuān)業(yè)的成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè),龍鳳網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。

首先需要在storm集群上把DRPC的環(huán)境準(zhǔn)備好,在storm.yaml當(dāng)中增加如下內(nèi)容

 drpc.servers:
  - "192.168.1.118"

之后通過(guò)storm drpc啟動(dòng)分布式RPC服務(wù)。

之后,跟其他的topology并沒(méi)有什么不同,我們需要寫(xiě)點(diǎn)代碼,我這邊直接從storm的例子當(dāng)中找了個(gè):

public class BasicDRPCTopology {
    public static class ExclaimBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String input = tuple.getString(1);
            collector.emit(new Values(tuple.getValue(0), input + "!"));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }

    }

    public static void main(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);

        Config conf = new Config();
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
    }
}

從main函數(shù)開(kāi)始,簡(jiǎn)單解釋一下:

首先new一個(gè)LinearDRPCTopologyBuilder對(duì)象,其中的參數(shù)【exclamation】就是我們?cè)趫?zhí)行rpc調(diào)用時(shí)候的方法名。

之后我們加入一個(gè)自己的bolt,并行數(shù)量為3

之后用StormSubmitter把這個(gè)topology提交上去就行了。

代碼完成之后,打一個(gè)jar包,用storm jar把topology提交到集群上。

客戶(hù)端調(diào)用,非常簡(jiǎn)單

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("exclamation", "china");
        System.out.println(result);

到此為止,一個(gè)最簡(jiǎn)單的DRPC調(diào)用的工作已經(jīng)完成了。

等等,還有點(diǎn)問(wèn)題,LinearDRPCTopologyBuilder 這個(gè)東西是不建議使用的(我這里的版本是0.9.3)。

源碼上有這么一行:

Trident subsumes the functionality provided by this class, so it's deprecated

大概意思就是trident這個(gè)東西已經(jīng)包含了LinearDRPCTopologyBuilder 當(dāng)中的功能。

trident是什么意思?翻譯了一下,【三叉戟】,靠,看起來(lái)很牛逼的樣子。必須試試。

那么上第二份代碼:

public class TridentDRPCTopology {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
    }

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).
                groupBy(new Fields("word")).
                aggregate(new One(), new Fields("one")).
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
        return topology.build();
    }

    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static class One implements CombinerAggregator {
        @Override
        public Integer init(TridentTuple tuple) {
            return 1;
        }

        @Override
        public Integer combine(Integer val1, Integer val2) {
            return 1;
        }

        @Override
        public Integer zero() {
            return 1;
        }
    }
}

這個(gè)topology的功能要稍稍復(fù)雜一些,給出一句話(huà),查一下一共有多少個(gè)詞,當(dāng)然了,不能重復(fù)計(jì)數(shù)。main函數(shù)當(dāng)中非常簡(jiǎn)單,提交一個(gè)topology。而這個(gè)topology的構(gòu)建過(guò)程是在buildTopology當(dāng)中完成的。

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).    //用空格分詞
                groupBy(new Fields("word")).    //分組
                aggregate(new One(), new Fields("one")).    //給每組的數(shù)量設(shè)定為1
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));    //sum計(jì)算總和

這樣的方式看起來(lái)跟spark當(dāng)中對(duì)RDD的操作是有些像的。

好了,還是打包,提交。

然后是客戶(hù)端測(cè)試:

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
        System.out.println(result);

到此,關(guān)于“Storm分布式RPC怎么配置”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!


文章名稱(chēng):Storm分布式RPC怎么配置
網(wǎng)頁(yè)URL:http://weahome.cn/article/psedje.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部