真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

storm中如何自定義數(shù)據(jù)分組

今天就跟大家聊聊有關(guān)storm中如何自定義數(shù)據(jù)分組,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

創(chuàng)新互聯(lián)公司是一家專(zhuān)業(yè)提供來(lái)賓企業(yè)網(wǎng)站建設(shè),專(zhuān)注與成都網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、H5開(kāi)發(fā)、小程序制作等業(yè)務(wù)。10年已為來(lái)賓眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專(zhuān)業(yè)網(wǎng)站設(shè)計(jì)公司優(yōu)惠進(jìn)行中。

數(shù)據(jù)流組

設(shè)計(jì)一個(gè)拓?fù)鋾r(shí),你要做的最重要的事情之一就是定義如何在各組件之間交換數(shù)據(jù)(數(shù)據(jù)流是如何被bolts消費(fèi)的)。一個(gè)數(shù)據(jù)流組指定了每個(gè)bolt會(huì)消費(fèi)哪些數(shù)據(jù)流,以及如何消費(fèi)它們。 

storm自帶數(shù)據(jù)流組

隨機(jī)數(shù)據(jù)流組

隨機(jī)流組是最常用的數(shù)據(jù)流組。它只有一個(gè)參數(shù)(數(shù)據(jù)源組件),并且數(shù)據(jù)源會(huì)向隨機(jī)選擇的bolt發(fā)送元組,保證每個(gè)消費(fèi)者收到近似數(shù)量的元組。

 builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

 域數(shù)據(jù)流組

域數(shù)據(jù)流組允許你基于元組的一個(gè)或多個(gè)域控制如何把元組發(fā)送給bolts。它保證擁有相同域組合的值集發(fā)送給同一個(gè)bolt?;氐絾卧~計(jì)數(shù)器的例子,如果你用word域?yàn)閿?shù)據(jù)流分組,word-normalizer bolt將只會(huì)把相同單詞的元組發(fā)送給同一個(gè)word-counterbolt實(shí)例。

 builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));

全部數(shù)據(jù)流組

全部數(shù)據(jù)流組,為每個(gè)接收數(shù)據(jù)的實(shí)例復(fù)制一份元組副本。這種分組方式用于向bolts發(fā)送信號(hào)。比如,你要刷新緩存,你可以向所有的bolts發(fā)送一個(gè)刷新緩存信號(hào)。在單詞計(jì)數(shù)器的例子里,你可以使用一個(gè)全部數(shù)據(jù)流組,添加清除計(jì)數(shù)器緩存的功能 

builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");

直接數(shù)據(jù)流組

這是一個(gè)特殊的數(shù)據(jù)流組,數(shù)據(jù)源可以用它決定哪個(gè)組件接收元組

 builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

。與前面的例子類(lèi)似,數(shù)據(jù)源將根據(jù)單詞首字母決定由哪個(gè)bolt接收元組。要使用直接數(shù)據(jù)流組,在WordNormalizer bolt中,使用emitDirect方法代替emit。

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //對(duì)元組做出應(yīng)答
        collector.ack(input);
    }
    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

在prepare方法中計(jì)算任務(wù)數(shù)

 public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

全局?jǐn)?shù)據(jù)流組

全局?jǐn)?shù)據(jù)流組把所有數(shù)據(jù)源創(chuàng)建的元組發(fā)送給單一目標(biāo)實(shí)例(即擁有最低ID的任務(wù))。

不分組

這個(gè)數(shù)據(jù)流組相當(dāng)于隨機(jī)數(shù)據(jù)流組。也就是說(shuō),使用這個(gè)數(shù)據(jù)流組時(shí),并不關(guān)心數(shù)據(jù)流是如何分組的。

自定義數(shù)據(jù)流組

storm自定義數(shù)據(jù)流組和hadoop Partitioner分組很相似,storm自定義分組要實(shí)現(xiàn)CustomStreamGrouping接口,接口源碼如下:

public   interface   CustomStreamGrouping  extends   Serializable {
 
    void   prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks);
 
    List chooseTasks( int   taskId, List values);
}

targetTasks就是Storm運(yùn)行時(shí)告訴你,當(dāng)前有幾個(gè)目標(biāo)Task可以選擇,每一個(gè)都給編上了數(shù)字編號(hào)。而 chooseTasks(int taskId, List values); 就是讓你選擇,你的這條數(shù)據(jù)values,是要哪幾個(gè)目標(biāo)Task處理?

這是我寫(xiě)的一個(gè)自定義分組,總是把數(shù)據(jù)分到第一個(gè)Task:

public   class   MyFirstStreamGrouping  implements   CustomStreamGrouping {
     private   static   Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping. class );
 
     private   List tasks;
 
     @Override
     public   void   prepare(WorkerTopologyContext context, GlobalStreamId stream,
         List targetTasks) {
         this .tasks = targetTasks;
         log.info(tasks.toString());
     }  
     @Override
     public   List chooseTasks( int   taskId, List values) {
         log.info(values.toString());
         return   Arrays.asList(tasks.get( 0 ));
     }
}

從上面的代碼可以看出,該自定義分組會(huì)把數(shù)據(jù)歸并到第一個(gè)TaskArrays.asList(tasks.get(0));,也就是數(shù)據(jù)到達(dá)后總是被派發(fā)到第一組。和Hadoop不同的是,Storm允許一條數(shù)據(jù)被多個(gè)Task處理,因此返回值是List .就是讓你來(lái)在提供的 'List targetTasks' Task中選擇任意的幾個(gè)(必須至少是一個(gè))Task來(lái)處理數(shù)據(jù)。

第二個(gè)自定義分組,wordcount中使首字母相同的單詞交給同一個(gè)bolt處理:

public class ModuleGrouping implements CustormStreamGrouping{
        int numTasks = 0;
        @Override
        public List chooseTasks(List values) {
            List boltIds = new ArrayList();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }
        @Override
        public void prepare(TopologyContext context, Fields outFields, List targetTasks) {
            numTasks = targetTasks.size();
        }
    }

這是一個(gè)CustomStreamGrouping的簡(jiǎn)單實(shí)現(xiàn),在這里我們采用單詞首字母字符的整數(shù)值與任務(wù)數(shù)的余數(shù),決定接收元組的bolt。

builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());

看完上述內(nèi)容,你們對(duì)storm中如何自定義數(shù)據(jù)分組有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。


新聞名稱(chēng):storm中如何自定義數(shù)據(jù)分組
當(dāng)前地址:http://weahome.cn/article/jpoosh.html

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部