這篇文章主要介紹“Storm分布式RPC怎么配置”,在日常操作中,相信很多人在Storm分布式RPC怎么配置問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Storm分布式RPC怎么配置”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
創(chuàng)新互聯(lián)長(zhǎng)期為近1000家客戶(hù)提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為龍鳳企業(yè)提供專(zhuān)業(yè)的成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè),龍鳳網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。
首先需要在storm集群上把DRPC的環(huán)境準(zhǔn)備好,在storm.yaml當(dāng)中增加如下內(nèi)容
drpc.servers:
- "192.168.1.118"
之后通過(guò)storm drpc啟動(dòng)分布式RPC服務(wù)。
之后,跟其他的topology并沒(méi)有什么不同,我們需要寫(xiě)點(diǎn)代碼,我這邊直接從storm的例子當(dāng)中找了個(gè):
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
}
}
從main函數(shù)開(kāi)始,簡(jiǎn)單解釋一下:
首先new一個(gè)LinearDRPCTopologyBuilder對(duì)象,其中的參數(shù)【exclamation】就是我們?cè)趫?zhí)行rpc調(diào)用時(shí)候的方法名。
之后我們加入一個(gè)自己的bolt,并行數(shù)量為3
之后用StormSubmitter把這個(gè)topology提交上去就行了。
代碼完成之后,打一個(gè)jar包,用storm jar把topology提交到集群上。
客戶(hù)端調(diào)用,非常簡(jiǎn)單
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("exclamation", "china");
System.out.println(result);
到此為止,一個(gè)最簡(jiǎn)單的DRPC調(diào)用的工作已經(jīng)完成了。
等等,還有點(diǎn)問(wèn)題,LinearDRPCTopologyBuilder 這個(gè)東西是不建議使用的(我這里的版本是0.9.3)。
源碼上有這么一行:
Trident subsumes the functionality provided by this class, so it's deprecated
大概意思就是trident這個(gè)東西已經(jīng)包含了LinearDRPCTopologyBuilder 當(dāng)中的功能。
trident是什么意思?翻譯了一下,【三叉戟】,靠,看起來(lái)很牛逼的樣子。必須試試。
那么上第二份代碼:
public class TridentDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")).
groupBy(new Fields("word")).
aggregate(new One(), new Fields("one")).
aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
return topology.build();
}
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static class One implements CombinerAggregator
@Override
public Integer init(TridentTuple tuple) {
return 1;
}
@Override
public Integer combine(Integer val1, Integer val2) {
return 1;
}
@Override
public Integer zero() {
return 1;
}
}
}
這個(gè)topology的功能要稍稍復(fù)雜一些,給出一句話(huà),查一下一共有多少個(gè)詞,當(dāng)然了,不能重復(fù)計(jì)數(shù)。main函數(shù)當(dāng)中非常簡(jiǎn)單,提交一個(gè)topology。而這個(gè)topology的構(gòu)建過(guò)程是在buildTopology當(dāng)中完成的。
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). //用空格分詞
groupBy(new Fields("word")). //分組
aggregate(new One(), new Fields("one")). //給每組的數(shù)量設(shè)定為1
aggregate(new Fields("one"), new Sum(), new Fields("word-count")); //sum計(jì)算總和
這樣的方式看起來(lái)跟spark當(dāng)中對(duì)RDD的操作是有些像的。
好了,還是打包,提交。
然后是客戶(hù)端測(cè)試:
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
System.out.println(result);
到此,關(guān)于“Storm分布式RPC怎么配置”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!