真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

zookeeper(13)源碼分析-請(qǐng)求處理鏈(3)-創(chuàng)新互聯(lián)

FinalRequestProcessor是請(qǐng)求處理鏈中最后的一個(gè)處理器。

成都創(chuàng)新互聯(lián)提供高防主機(jī)、云服務(wù)器、香港服務(wù)器、綿陽(yáng)服務(wù)器托管
public class FinalRequestProcessor implements RequestProcessor {
    ZooKeeperServer zks;
}

FinalRequestProcessor只實(shí)現(xiàn)了RequestProcessor接口,需要實(shí)現(xiàn)process Request方法和shutdown方法。

核心屬性為zks,表示Zookeeper服務(wù)器,可以通過zks訪問到Zookeeper內(nèi)存數(shù)據(jù)庫(kù)。

我們看一下核心方法process Request代碼:

同步代碼塊

synchronized (zks.outstandingChanges) {
            // Need to process local session requests
            // 當(dāng)前節(jié)點(diǎn),處理請(qǐng)求,若為事務(wù)性請(qǐng)求,則提交到ZooKeeper內(nèi)存數(shù)據(jù)庫(kù)中。
            // 對(duì)于processTxn函數(shù)而言,其最終會(huì)調(diào)用DataTree的processTxn
            rc = zks.processTxn(request);

            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            //只有寫請(qǐng)求才會(huì)有消息頭
            if (request.getHdr() != null) {
                TxnHeader hdr = request.getHdr();
                Record txn = request.getTxn();
                long zxid = hdr.getZxid();
                //當(dāng)outstandingChanges不為空且其首元素的zxid小于等于請(qǐng)求的zxid時(shí),
                // 就會(huì)一直從outstandingChanges中取出首元素,并且對(duì)outstandingChangesForPath做相應(yīng)的操作
               while (!zks.outstandingChanges.isEmpty()
                       && zks.outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = zks.outstandingChanges.remove();
                    if (cr.zxid < zxid) {
                        LOG.warn("Zxid outstanding " + cr.zxid
                                 + " is less than current " + zxid);
                    }
                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                        zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
            }

            // do not add non quorum packets to the queue.
            //判斷是否為事務(wù)性請(qǐng)求則是通過調(diào)用isQuorum函數(shù)
            //只將quorum包(事務(wù)性請(qǐng)求)添加進(jìn)隊(duì)列
            //addCommittedProposal函數(shù)將請(qǐng)求添加至ZKDatabase的committedLog結(jié)構(gòu)中
            if (request.isQuorum()) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }

如果請(qǐng)求是ping

根據(jù)請(qǐng)求的創(chuàng)建時(shí)間來更新Zookeeper服務(wù)器的延遲,updateLatency函數(shù)中會(huì)記錄大延遲、最小延遲、總的延遲和延遲次數(shù)。
然后更新響應(yīng)中的狀態(tài),如請(qǐng)求創(chuàng)建到響應(yīng)該請(qǐng)求總共花費(fèi)的時(shí)間、最后的操作類型等。然后設(shè)置響應(yīng)后返回

case OpCode.ping: {
                //更新延遲
                zks.serverStats().updateLatency(request.createTime);

                lastOp = "PING";
                // 更新響應(yīng)的狀態(tài)
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                        request.createTime, Time.currentElapsedTime());
                // 設(shè)置響應(yīng)
                cnxn.sendResponse(new ReplyHeader(-2,
                        zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
                return;
            }

其他請(qǐng)求與此類似,
最后會(huì)根據(jù)其他請(qǐng)求再次更新服務(wù)器的延遲,設(shè)置響應(yīng)的狀態(tài)等

// 獲取最后處理的zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
// 響應(yīng)頭
        ReplyHeader hdr =
            new ReplyHeader(request.cxid, lastZxid, err.intValue());
// 更新服務(wù)器延遲
        zks.serverStats().updateLatency(request.createTime);
                // 更新狀態(tài)
        cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                    request.createTime, Time.currentElapsedTime());

最后使用sendResponse函數(shù)將響應(yīng)發(fā)送給請(qǐng)求方。

try {
            //返回相應(yīng)
            cnxn.sendResponse(hdr, rsp, "response");
            if (request.type == OpCode.closeSession) {
                //關(guān)閉會(huì)話
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error("FIXMSG",e);
        }

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。


當(dāng)前題目:zookeeper(13)源碼分析-請(qǐng)求處理鏈(3)-創(chuàng)新互聯(lián)
網(wǎng)站地址:http://weahome.cn/article/dpesgd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部