真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RocketMQDLedger中日志追加流程是怎樣的

本篇文章為大家展示了RocketMQ DLedger中日志追加流程是怎樣的,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

建網(wǎng)站原本是網(wǎng)站策劃師、網(wǎng)絡程序員、網(wǎng)頁設計師等,應用各種網(wǎng)絡程序開發(fā)技術和網(wǎng)頁設計技術配合操作的協(xié)同工作。成都創(chuàng)新互聯(lián)專業(yè)提供做網(wǎng)站、成都網(wǎng)站制作,網(wǎng)頁設計,網(wǎng)站制作(企業(yè)站、響應式網(wǎng)站建設、電商門戶網(wǎng)站)等服務,從網(wǎng)站深度策劃、搜索引擎友好度優(yōu)化到用戶體驗的提升,我們力求做到極致!

我們詳細分析了源碼分析 RocketMQ DLedger 多副本之 Leader 選主,將詳細分析日志復制的實現(xiàn)。

根據(jù) raft 協(xié)議可知,當整個集群完成 Leader 選主后,集群中的主節(jié)點就可以接受客戶端的請求,而集群中的從節(jié)點只負責從主節(jié)點同步數(shù)據(jù),而不會處理讀寫請求,與M-S結構的讀寫分離有著巨大的區(qū)別。

有了前篇文章的基礎,本文將直接從 Leader 處理客戶端請求入口開始,其入口為:DLedgerServer 的 handleAppend 方法開始講起。

1、日志復制基本流程

在正式分析 RocketMQ DLedger 多副本復制之前,我們首先來了解客戶端發(fā)送日志的請求協(xié)議字段,其類圖如下所示: RocketMQ DLedger中日志追加流程是怎樣的

我們先一一介紹各個字段的含義:

  • String group 該集群所屬組名。

  • String remoteId 請求目的節(jié)點ID。

  • String localId 節(jié)點ID。

  • int code 請求響應字段,表示返回響應碼。

  • String leaderId = null 集群中的Leader Id。

  • long term 集群當前的選舉輪次。

  • byte[] body 待發(fā)送的數(shù)據(jù)。

日志的請求處理處理入口為 DLedgerServer 的 handleAppend 方法。

DLedgerServer#handleAppend

PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
reConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

Step1:首先驗證請求的合理性:

  • 如果請求的節(jié)點ID不是當前處理節(jié)點,則拋出異常。

  • 如果請求的集群不是當前節(jié)點所在的集群,則拋出異常。

  • 如果當前節(jié)點不是主節(jié)點,則拋出異常。

DLedgerServer#handleAppend

long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) {  // [@1](https://my.oschina.net/u/1198)
    AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
    appendEntryResponse.setGroup(memberState.getGroup());
    appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
    appendEntryResponse.setTerm(currTerm);
    appendEntryResponse.setLeaderId(memberState.getSelfId());
    return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {   // @2
    DLedgerEntry dLedgerEntry = new DLedgerEntry();
    dLedgerEntry.setBody(request.getBody());
    DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
    return dLedgerEntryPusher.waitAck(resEntry);
}

Step2:如果預處理隊列已經(jīng)滿了,則拒絕客戶端請求,返回 LEADER_PENDING_FULL 錯誤碼;如果未滿,將請求封裝成 DledgerEntry,則調(diào)用 dLedgerStore 方法追加日志,并且通過使用 dLedgerEntryPusher 的 waitAck 方法同步等待副本節(jié)點的復制響應,并最終將結果返回給調(diào)用方法。

  • 代碼@1:如果 dLedgerEntryPusher 的 push 隊列已滿,則返回追加一次,其錯誤碼為 LEADER_PENDING_FULL。

  • 代碼@2:追加消息到 Leader 服務器,并向從節(jié)點廣播,在指定時間內(nèi)如果未收到從節(jié)點的確認,則認為追加失敗。

接下來就按照上述三個要點進行展開:

  • 判斷 Push 隊列是否已滿

  • Leader 節(jié)點存儲消息

  • 主節(jié)點等待從節(jié)點復制 ACK

1.1 如何判斷 Push 隊列是否已滿

DLedgerEntryPusher#isPendingFull

public boolean isPendingFull(long currTerm) {
    checkTermForPendingMap(currTerm, "isPendingFull");     // [@1](https://my.oschina.net/u/1198)
    return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); // @2
}

主要分兩個步驟: 代碼@1:檢查當前投票輪次是否在 PendingMap 中,如果不在,則初始化,其結構為:Map< Long/* 投票輪次*/, ConcurrentMap>>。

代碼@2:檢測當前等待從節(jié)點返回結果的個數(shù)是否超過其最大請求數(shù)量,可通過maxPendingRequests Num 配置,該值默認為:10000。

上述邏輯比較簡單,但疑問隨著而來,ConcurrentMap> 中的數(shù)據(jù)是從何而來的呢?我們不妨接著往下看。

1.2 Leader 節(jié)點存儲數(shù)據(jù)

Leader 節(jié)點的數(shù)據(jù)存儲主要由 DLedgerStore 的 appendAsLeader 方法實現(xiàn)。DLedger 分別實現(xiàn)了基于內(nèi)存、基于文件的存儲實現(xiàn),本文重點關注基于文件的存儲實現(xiàn),其實現(xiàn)類為:DLedgerMmapFileStore。

下面重點來分析一下數(shù)據(jù)存儲流程,其入口為DLedgerMmapFileStore 的 appendAsLeader 方法。

DLedgerMmapFileStore#appendAsLeader

PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);

Step1:首先判斷是否可以追加數(shù)據(jù),其判斷依據(jù)主要是如下兩點:

  • 當前節(jié)點的狀態(tài)是否是 Leader,如果不是,則拋出異常。

  • 當前磁盤是否已滿,其判斷依據(jù)是 DLedger 的根目錄或數(shù)據(jù)文件目錄的使用率超過了允許使用的最大值,默認值為85%。

ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();

Step2:從本地線程變量獲取一個數(shù)據(jù)與索引 buffer。其中用于存儲數(shù)據(jù)的 ByteBuffer,其容量固定為 4M ,索引的 ByteBuffer 為兩個索引條目的長度,固定為64個字節(jié)。

DLedgerEntryCoder.encode(entry, dataBuffer);
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
    byteBuffer.clear();
    int size = entry.computSizeInBytes();
    //always put magic on the first position
    byteBuffer.putInt(entry.getMagic());
    byteBuffer.putInt(size);
    byteBuffer.putLong(entry.getIndex());
    byteBuffer.putLong(entry.getTerm());
    byteBuffer.putLong(entry.getPos());
    byteBuffer.putInt(entry.getChannel());
    byteBuffer.putInt(entry.getChainCrc());
    byteBuffer.putInt(entry.getBodyCrc());
    byteBuffer.putInt(entry.getBody().length);
    byteBuffer.put(entry.getBody());
    byteBuffer.flip();
}

Step3:將 DLedgerEntry,即將數(shù)據(jù)寫入到 ByteBuffer中,從這里看出,每一次寫入會調(diào)用 ByteBuffer 的 clear 方法,將數(shù)據(jù)清空,從這里可以看出,每一次數(shù)據(jù)追加,只能存儲4M的數(shù)據(jù)。

DLedgerMmapFileStore#appendAsLeader

synchronized (memberState) {
    PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
	// ... 省略代碼
}

Step4:鎖定狀態(tài)機,并再一次檢測節(jié)點的狀態(tài)是否是 Leader 節(jié)點。

DLedgerMmapFileStore#appendAsLeader

long nextIndex = ledgerEndIndex + 1;
entry.setIndex(nextIndex);
entry.setTerm(memberState.currTerm());
entry.setMagic(CURRENT_MAGIC);
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);

Step5:為當前日志條目設置序號,即 entryIndex 與 entryTerm (投票輪次)。并將魔數(shù)、entryIndex、entryTerm 等寫入到 bytebuffer 中。

DLedgerMmapFileStore#appendAsLeader

long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);

Step6:計算新的消息的起始偏移量,關于 dataFileList 的 preAppend 后續(xù)詳細介紹其實現(xiàn),然后將該偏移量寫入日志的 bytebuffer 中。

DLedgerMmapFileStore#appendAsLeader

for (AppendHook writeHook : appendHooks) {
    writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}

Step7:執(zhí)行鉤子函數(shù)。

DLedgerMmapFileStore#appendAsLeader

long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);

Step8:將數(shù)據(jù)追加到 pagecache 中。該方法稍后詳細介紹。

DLedgerMmapFileStore#appendAsLeader

DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);

Step9:構建條目索引并將索引數(shù)據(jù)追加到 pagecache。

DLedgerMmapFileStore#appendAsLeader

ledgerEndIndex++;
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
    ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();

Step10:ledgerEndeIndex 加一(下一個條目)的序號。并設置 leader 節(jié)點的狀態(tài)機的 ledgerEndIndex 與 ledgerEndTerm。

Leader 節(jié)點數(shù)據(jù)追加就介紹到這里,稍后會重點介紹與存儲相關方法的實現(xiàn)細節(jié)。

1.3 主節(jié)點等待從節(jié)點復制 ACK

其實現(xiàn)入口為 dLedgerEntryPusher 的 waitAck 方法。

DLedgerEntryPusher#waitAck

public CompletableFuture waitAck(DLedgerEntry entry) {
    updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());    // @1
    if (memberState.getPeerMap().size() == 1) {                                                                  // @2
        AppendEntryResponse response = new AppendEntryResponse();
        response.setGroup(memberState.getGroup());
        response.setLeaderId(memberState.getSelfId());
        response.setIndex(entry.getIndex());
        response.setTerm(entry.getTerm());
        response.setPos(entry.getPos());
        return AppendFuture.newCompletedFuture(entry.getPos(), response);
    } else {
        checkTermForPendingMap(entry.getTerm(), "waitAck");                                            
        AppendFuture future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // @3
        future.setPos(entry.getPos());
        CompletableFuture old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);     // @4
        if (old != null) {
            logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
        }
        wakeUpDispatchers();                                       // @5
        return future;
    }
}

代碼@1:更新當前節(jié)點的 push 水位線。 代碼@2:如果集群的節(jié)點個數(shù)為1,無需轉(zhuǎn)發(fā),直接返回成功結果。 代碼@3:構建 append 響應 Future 并設置超時時間,默認值為:2500 ms,可以通過 maxWaitAckTimeMs 配置改變其默認值。 代碼@4:將構建的 Future 放入等待結果集合中。 代碼@5:喚醒 Entry 轉(zhuǎn)發(fā)線程,即將主節(jié)點中的數(shù)據(jù) push 到各個從節(jié)點。

接下來分別對上述幾個關鍵點進行解讀。

1.3.1 updatePeerWaterMark 方法

DLedgerEntryPusher#updatePeerWaterMark

private void updatePeerWaterMark(long term, String peerId, long index) {    // 代碼@1
    synchronized (peerWaterMarksByTerm) { 
       checkTermForWaterMark(term, "updatePeerWaterMark");                     // 代碼@2
        if (peerWaterMarksByTerm.get(term).get(peerId) < index) {                   // 代碼@3
            peerWaterMarksByTerm.get(term).put(peerId, index);
        }
    }
}

代碼@1:先來簡單介紹該方法的兩個參數(shù):

  • long term 當前的投票輪次。

  • String peerId 當前節(jié)點的ID。

  • long index 當前追加數(shù)據(jù)的序號。

代碼@2:初始化 peerWaterMarksByTerm 數(shù)據(jù)結構,其結果為 < Long /** term */, Map< String /** peerId */, Long /** entry index*/>。

代碼@3:如果 peerWaterMarksByTerm 存儲的 index 小于當前數(shù)據(jù)的 index,則更新。

1.3.2 wakeUpDispatchers 詳解

DLedgerEntryPusher#updatePeerWaterMark

public void wakeUpDispatchers() {
    for (EntryDispatcher dispatcher : dispatcherMap.values()) {
        dispatcher.wakeup();
    }
}

該方法主要就是遍歷轉(zhuǎn)發(fā)器并喚醒。本方法的核心關鍵就是 EntryDispatcher,在詳細介紹它之前我們先來看一下該集合的初始化。

DLedgerEntryPusher 構造方法

for (String peer : memberState.getPeerMap().keySet()) {
    if (!peer.equals(memberState.getSelfId())) {
        dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
    }
}

原來在構建 DLedgerEntryPusher 時會為每一個從節(jié)點創(chuàng)建一個 EntryDispatcher 對象。

顯然,日志的復制由 DLedgerEntryPusher 來實現(xiàn)。由于篇幅的原因,該部分內(nèi)容將在下篇文章中繼續(xù)。

上面在講解 Leader 追加日志時并沒有詳細分析存儲相關的實現(xiàn),為了知識體系的完備,接下來我們來分析一下其核心實現(xiàn)。

2、日志存儲實現(xiàn)詳情

本節(jié)主要對 MmapFileList 的 preAppend 與 append 方法進行詳細講解。

> 存儲部分的設計請查閱筆者的博客:源碼分析 RocketMQ DLedger 多副本存儲實現(xiàn),MmapFileList 對標 RocketMQ 的MappedFileQueue。

2.1 MmapFileList 的 preAppend 詳解

該方法最終會調(diào)用兩個參數(shù)的 preAppend方法,故我們直接來看兩個參數(shù)的 preAppend 方法。

MmapFileList#preAppend

public long preAppend(int len, boolean useBlank) {                // @1
    MmapFile mappedFile = getLastMappedFile();                   // @2 start
    if (null == mappedFile || mappedFile.isFull()) {
        mappedFile = getLastMappedFile(0);
    }
    if (null == mappedFile) {
        logger.error("Create mapped file for {}", storePath);
        return -1;
    }                                                                                            // @2 end
    int blank = useBlank ? MIN_BLANK_LEN : 0;
    if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {   // @3
        if (blank < MIN_BLANK_LEN) {
            logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);
            return -1;
        } else {
            ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition());     // @4
            byteBuffer.putInt(BLANK_MAGIC_CODE);                                                                                                      // @5
            byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());                                               // @6
            if (mappedFile.appendMessage(byteBuffer.array())) {                                                                                     // @7
                //need to set the wrote position
                mappedFile.setWrotePosition(mappedFile.getFileSize());
            } else {
                logger.error("Append blank error for {}", storePath);
                return -1;
            }
            mappedFile = getLastMappedFile(0);
            if (null == mappedFile) {
                logger.error("Create mapped file for {}", storePath);
                return -1;
            }
        }
    }
    return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();// @8
}

代碼@1:首先介紹其參數(shù)的含義:

  • int len 需要申請的長度。

  • boolean useBlank 是否需要填充,默認為true。

代碼@2:獲取最后一個文件,即獲取當前正在寫的文件。

代碼@3:如果需要申請的資源超過了當前文件可寫字節(jié)時,需要處理的邏輯。代碼@4-@7都是其處理邏輯。

代碼@4:申請一個當前文件剩余字節(jié)的大小的bytebuffer。

代碼@5:先寫入魔數(shù)。

代碼@6:寫入字節(jié)長度,等于當前文件剩余的總大小。

代碼@7:寫入空字節(jié),代碼@4-@7的用意就是寫一條空Entry,填入魔數(shù)與 size,方便解析。

代碼@8:如果當前文件足以容納待寫入的日志,則直接返回其物理偏移量。

經(jīng)過上述代碼解讀,我們很容易得出該方法的作用,就是返回待寫入日志的起始物理偏移量。

2.2 MmapFileList 的 append 詳解

最終會調(diào)用4個參數(shù)的 append 方法,其代碼如下: MmapFileList#append

public long append(byte[] data, int pos, int len, boolean useBlank) {  // @1
    if (preAppend(len, useBlank) == -1) {
		return -1;
    }
    MmapFile mappedFile = getLastMappedFile();                               // @2
    long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();   // @3
    if (!mappedFile.appendMessage(data, pos, len)) {            // @4
        logger.error("Append error for {}", storePath);
        return -1;
    }
    return currPosition;
}

代碼@1:首先介紹一下各個參數(shù):

  • byte[] data 待寫入的數(shù)據(jù),即待追加的日志。

  • int pos 從 data 字節(jié)數(shù)組哪個位置開始讀取。

  • int len 待寫入的字節(jié)數(shù)量。

  • boolean useBlank 是否使用填充,默認為 true。

代碼@2:獲取最后一個文件,即當前可寫的文件。

代碼@3:獲取當前寫入指針。

代碼@4:追加消息。

最后我們再來看一下 appendMessage,具體的消息追加實現(xiàn)邏輯。

DefaultMmapFile#appendMessage

public boolean appendMessage(final byte[] data, final int offset, final int length) {
    int currentPos = this.wrotePosition.get();

    if ((currentPos + length) <= this.fileSize) {
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // @1
        byteBuffer.position(currentPos);
        byteBuffer.put(data, offset, length);
        this.wrotePosition.addAndGet(length);
        return true;
    }
    return false;
}

該方法我主要是想突出一下寫入的方式是 mappedByteBuffer,是通過 FileChannel 的 map 方法創(chuàng)建,即我們常說的 PageCache,即消息追加首先是寫入到 pageCache 中。

上述內(nèi)容就是RocketMQ DLedger中日志追加流程是怎樣的,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


標題名稱:RocketMQDLedger中日志追加流程是怎樣的
網(wǎng)頁鏈接:http://weahome.cn/article/psjisi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部