真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

ES學(xué)習(xí)筆記之-AvgAggregation的實(shí)現(xiàn)過程分析-創(chuàng)新互聯(lián)

我們需要查看數(shù)據(jù)的統(tǒng)計(jì)量時(shí),均值是最重要的特征之一。

成都網(wǎng)絡(luò)公司-成都網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司十載經(jīng)驗(yàn)成就非凡,專業(yè)從事成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè),成都網(wǎng)頁設(shè)計(jì),成都網(wǎng)頁制作,軟文發(fā)布平臺(tái),1元廣告等。十載來已成功提供全面的成都網(wǎng)站建設(shè)方案,打造行業(yè)特色的成都網(wǎng)站建設(shè)案例,建站熱線:18980820575,我們期待您的來電!

對于海量數(shù)據(jù),這類簡單的聚合ES可以做到秒級別返回。聚合是ES的特色功能。

那么ES是如何實(shí)現(xiàn)這一功能的呢?

我們知道,ES的數(shù)據(jù)存儲(chǔ)在各個(gè)節(jié)點(diǎn)中, 所以ES的實(shí)現(xiàn)AvgAggregation時(shí)基本思路就是先統(tǒng)計(jì)各個(gè)節(jié)點(diǎn),然后匯總。

先了解ES是如何統(tǒng)計(jì)單個(gè)節(jié)點(diǎn): 參考AvgAggregator

@Override
    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
            final LeafBucketCollector sub) throws IOException {
        if (valuesSource == null) {
            return LeafBucketCollector.NO_OP_COLLECTOR;
        }
        final BigArrays bigArrays = context.bigArrays();
        final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
        return new LeafBucketCollectorBase(sub, values) {
            @Override
            public void collect(int doc, long bucket) throws IOException {
                counts = bigArrays.grow(counts, bucket + 1);
                sums = bigArrays.grow(sums, bucket + 1);

                values.setDocument(doc);
                final int valueCount = values.count();
                counts.increment(bucket, valueCount);
                double sum = 0;
                for (int i = 0; i < valueCount; i++) {
                    sum += values.valueAt(i);
                }
                sums.increment(bucket, sum);
            }
        };
    }

即實(shí)現(xiàn)Collector類的collect()方法。然后通過doc_values機(jī)制獲取文檔相關(guān)字段的值,分別匯入counts和sums兩個(gè)變量中。

收集完成counts和sums過后,就需要匯總各個(gè)節(jié)點(diǎn)的值, 這在搜索的第二階段。

從第一階段到第二階段,整個(gè)鏈路如下:
s1: 前端請求發(fā)送到集群某一節(jié)點(diǎn)的TransportSearchAction.doExecute()方法中。

 switch(searchRequest.searchType()) {
               .....
           case QUERY_THEN_FETCH:
                searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService,
                        indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
                break;
              ......   
     }
        searchAsyncAction.start();

見到start()方法,我以為這個(gè)是另啟一個(gè)線程,后面發(fā)現(xiàn)原來不是的。 這個(gè)start()方法把整個(gè)查詢過程分為兩個(gè)階段:

階段一:
performFirstPhase(), 即把請求分發(fā)到各個(gè)節(jié)點(diǎn),然后記錄節(jié)點(diǎn)處理的結(jié)果。如果返回的分片是最后一個(gè)分片,則轉(zhuǎn)入階段二。

階段二:
performFirstPhase() -> onFirstPhaseResult() -> innerMoveToSecondPhase() -> moveToSecondPhase() 。這里利用了模板設(shè)計(jì)模式。在階段二中,會(huì)再次向各個(gè)節(jié)點(diǎn)發(fā)起請求,通過docId獲取文檔內(nèi)容。

s2: 對于聚合而言, 階段二最重要的鏈路是moveToSecondPhase() -> executeFetch() -> finishHim() -> searchPhaseController.merge() , merge()中包含了如下的業(yè)務(wù)邏輯: 合并hits, 合并suggest, 合并addAggregation 等。 這里我們關(guān)注聚合。

聚合的入口方法是InternalAggregations.reduce(), 如果熟悉hadoop, reduce方法的執(zhí)行邏輯看這個(gè)名字也能理解一部分。reduce的中文翻譯“歸納”,挺生動(dòng)形象的。整個(gè)鏈路的入口為InternalAvg.doReduce()。

@Override
    public InternalAvg doReduce(List aggregations, ReduceContext reduceContext) {
        long count = 0;
        double sum = 0;
        for (InternalAggregation aggregation : aggregations) {
            count += ((InternalAvg) aggregation).count;
            sum += ((InternalAvg) aggregation).sum;
        }
        return new InternalAvg(getName(), sum, count, valueFormatter, pipelineAggregators(), getMetaData());
    }

其邏輯相當(dāng)簡單,count相加, sum相加。獲取最終的結(jié)果就是

public double getValue() {
        return sum / count;
    }

上面講述了ES分發(fā)會(huì)匯總的關(guān)鍵節(jié)點(diǎn),那么分發(fā)到各個(gè)節(jié)點(diǎn)的業(yè)務(wù)邏輯是怎樣的呢?

首先定位入口:

class SearchQueryTransportHandler extends TransportRequestHandler {
        @Override
        public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
            QuerySearchResultProvider result = searchService.executeQueryPhase(request);
            channel.sendResponse(result);
        }
    }

然后定位到QueryPhrase.execute(), 在QueryPhrase這個(gè)階段,主要做的事情如下:

aggregationPhase.preProcess(searchContext): 解析ES的語法,生成Collector.
execute: 在調(diào)用Lucene的接口查詢數(shù)據(jù)前,組合各個(gè)Collecotr, collector = MultiCollector.wrap(subCollectors); 然后查詢Lucene索引。對于AvgAggregator, 其關(guān)鍵邏輯是:

@Override
            public void collect(int doc, long bucket) throws IOException {
                counts = bigArrays.grow(counts, bucket + 1);
                sums = bigArrays.grow(sums, bucket + 1);

                values.setDocument(doc);
                final int valueCount = values.count();
                counts.increment(bucket, valueCount);
                double sum = 0;
                for (int i = 0; i < valueCount; i++) {
                    sum += values.valueAt(i);
                }
                sums.increment(bucket, sum);
            }

這個(gè)已經(jīng)是第二次出現(xiàn)了, 它的功能就是收集每個(gè)命中查詢的doc相關(guān)信息。 這里獲取每個(gè)docId對應(yīng)的value,是基于doc_value的正向索引。

以上就是整個(gè)Avg Aggregation的實(shí)現(xiàn)流程。 通過源碼,可以確認(rèn), AvgAggregation是精確可信的。 還有幾個(gè)聚合函數(shù),其思路跟AvgAggregation是一致的,就不細(xì)說了,他們分別是: Max, Min, Sum, ValueCount, Stats 。。。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


網(wǎng)頁題目:ES學(xué)習(xí)筆記之-AvgAggregation的實(shí)現(xiàn)過程分析-創(chuàng)新互聯(lián)
轉(zhuǎn)載注明:http://weahome.cn/article/djdipi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部