RocketMQ中怎么對DLedger進行整合,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
10年專注成都網站制作,企業(yè)網站制作,個人網站制作服務,為大家分享網站制作知識、方案,網站設計流程、步驟,成功服務上千家企業(yè)。為您提供網站建設,網站制作,網頁設計及定制高端網站建設服務,專注于企業(yè)網站制作,高端網頁制作,對成都木托盤等多個領域,擁有豐富的網站推廣經驗。
RocketMQ 的消息存儲文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲全量的消息,consumequeue、index 文件都是基于 commitlog 文件構建的。要使用 DLedger 來實現(xiàn)消息存儲的一致性,應該關鍵是要實現(xiàn) commitlog 文件的一致性,即 DLedger 要整合的對象應該是 commitlog 文件,即只需保證 raft 協(xié)議的復制組內各個節(jié)點的 commitlog 文件一致即可。
我們知道使用文件存儲消息都會基于一定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數(shù)、消息長度,消息屬性、消息體等,而我們再來回顧一下 DLedger 日志的存儲格式:
DLedger 要整合 commitlog 文件,是不是可以把 rocketmq 消息,即一個個 commitlog 條目整體當成 DLedger 的 body 字段即可。
還等什么,跟我一起來看源碼吧?。?!別急,再拋一個問題,DLedger 整合 RocketMQ commitlog,能不能做到平滑升級?
帶著這些思考和問題,一起來探究 DLedger 是如何整合 RocketMQ 的。
> 溫馨提示:本文不會詳細介紹 Broker 端的啟動流程,只會點出在啟動過程中與 DLedger 相關的代碼,如想詳細了解 Broker 的啟動流程,建議關注筆者的《RocketMQ技術內幕》一書。
Broker 涉及到 DLedger 相關關鍵點如下:
DefaultMessageStore 構造方法
if(messageStoreConfig.isEnableDLegerCommitLog()) { // [@1](https://my.oschina.net/u/1198) this.commitLog = new DLedgerCommitLog(this); else { this.commitLog = new CommitLog(this); // @2 }
代碼@1:如果開啟 DLedger ,commitlog 的實現(xiàn)類為 DLedgerCommitLog,也是本文需要關注的關鍵所在。
代碼@2:如果未開啟 DLedger,則使用舊版的 Commitlog實現(xiàn)類。
BrokerController#initialize
if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }
主要調用 LedgerLeaderElector 的 addRoleChanneHandler 方法增加 節(jié)點角色變更事件監(jiān)聽器,DLedgerRoleChangeHandler 是實現(xiàn)主從切換的另外一個關鍵點。
DefaultMessageStore#load
// load Commit Log result = result && this.commitLog.load(); // [@1](https://my.oschina.net/u/1198) // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); // @2 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); }
代碼@1、@2 最終都是委托 commitlog 對象來執(zhí)行,這里的關鍵又是如果開啟了 DLedger,則最終調用的是 DLedgerCommitLog。
經過上面的鋪墊,主角 DLedgerCommitLog “閃亮登場“了。
> 溫馨提示:由于 Commitlog 的絕大部分方法都已經在《RocketMQ技術內幕》一書中詳細介紹了,并且 DLedgerCommitLog 的實現(xiàn)原理與 Commitlog 文件的實現(xiàn)原理類同,本文會一筆帶過關于存儲部分的實現(xiàn)細節(jié)。
DLedgerCommitlog 繼承自 Commitlog。讓我們一一來看一下它的核心屬性。
DLedgerServer dLedgerServer 基于 raft 協(xié)議實現(xiàn)的集群內的一個節(jié)點,用 DLedgerServer 實例表示。
DLedgerConfig dLedgerConfig DLedger 的配置信息。
DLedgerMmapFileStore dLedgerFileStore DLedger 基于文件映射的存儲實現(xiàn)。
MmapFileList dLedgerFileList DLedger 所管理的存儲文件集合,對比 RocketMQ 中的 MappedFileQueue。
int id 節(jié)點ID,0 表示主節(jié)點,非0表示從節(jié)點
MessageSerializer messageSerializer 消息序列器。
long beginTimeInDledgerLock = 0 用于記錄 消息追加的時耗(日志追加所持有鎖時間)。
long dividedCommitlogOffset = -1 記錄的舊 commitlog 文件中的最大偏移量,如果訪問的偏移量大于它,則訪問 dledger 管理的文件。
boolean isInrecoveringOldCommitlog = false 是否正在恢復舊的 commitlog 文件。
接下來我們將詳細介紹 DLedgerCommitlog 各個核心方法及其實現(xiàn)要點。
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); // @1 dLedgerConfig = new DLedgerConfig(); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setStoreType(DLedgerConfig.FILE); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; // @2 dLedgerServer = new DLedgerServer(dLedgerConfig); // @3 dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; dLedgerFileStore.addAppendHook(appendHook); // @4 dLedgerFileList = dLedgerFileStore.getDataFileList(); this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); // @5 }
代碼@1:調用父類 即 CommitLog 的構造函數(shù),加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級 DLedger 的消息。我們稍微看一下 CommitLog 的構造函數(shù):
代碼@2:構建 DLedgerConfig 相關配置屬性,其主要屬性如下:
enableDiskForceClean 是否強制刪除文件,取自 broker 配置屬性 cleanFileForciblyEnable,默認為 true 。
storeType DLedger 存儲類型,固定為 基于文件的存儲模式。
dLegerSelfId leader 節(jié)點的 id 名稱,示例配置:n0,其配置要求第二個字符后必須是數(shù)字。
dLegerGroup DLeger group 的名稱,建議與 broker 配置屬性 brokerName 保持一致。
dLegerPeers DLeger Group 中所有的節(jié)點信息,其配置示例 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多個節(jié)點使用分號隔開。
storeBaseDir 設置 DLedger 的日志文件的根目錄,取自 borker 配件文件中的 storePathRootDir ,即 RocketMQ 的數(shù)據(jù)存儲根路徑。
mappedFileSizeForEntryData 設置 DLedger 的單個日志文件的大小,取自 broker 配置文件中的 - mapedFileSizeCommitLog,即與 commitlog 文件的單個文件大小一致。
deleteWhen DLedger 日志文件的刪除時間,取自 broker 配置文件中的 deleteWhen,默認為凌晨 4點。
fileReservedHours DLedger 日志文件保留時長,取自 broker 配置文件中的 fileReservedHours,默認為 72h。
代碼@3:根據(jù) DLedger 配置信息創(chuàng)建 DLedgerServer,即創(chuàng)建 DLedger 集群節(jié)點,集群內各個節(jié)點啟動后,就會觸發(fā)選主。
代碼@4:構建 appendHook 追加鉤子函數(shù),這是兼容 Commitlog 文件很關鍵的一步,后面會詳細介紹其作用。
代碼@5:構建消息序列化。
根據(jù)上述的流程圖,構建好 DefaultMessageStore 實現(xiàn)后,就是調用其 load 方法,在啟用 DLedger 機制后,會依次調用 DLedgerCommitlog 的 load、recover 方法。
public boolean load() { boolean result = super.load(); if (!result) { return false; } return true; }
DLedgerCommitLog 的 laod 方法實現(xiàn)比較簡單,就是調用 其父類 Commitlog 的 load 方法,即這里也是為了啟用 DLedger 時能夠兼容以前的消息。
在 Broker 啟動時會加載 commitlog、consumequeue等文件,需要恢復其相關是數(shù)據(jù)結構,特別是與寫入、刷盤、提交等指針,其具體調用 recover 方法。 DLedgerCommitLog#recover
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // @1 recover(maxPhyOffsetOfConsumeQueue); }
首先會先恢復 consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,然后根據(jù)該物理偏移量進行恢復。 接下來看一下該方法的處理流程與關鍵點。
DLedgerCommitLog#recover
dLedgerFileStore.load();
Step1:加載 DLedger 相關的存儲文件,并一一構建對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針為文件的大小。
DLedgerCommitLog#recover
if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); // @1 dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); // @2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // @3 disableDeleteDledger(); } long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { // @4 log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); } return; }
Step2:如果已存在 DLedger 的數(shù)據(jù)文件,則只需要恢復 DLedger 相關數(shù)據(jù)文建,因為在加載舊的 commitlog 文件時已經將其重要的數(shù)據(jù)指針設置為最大值。其關鍵實現(xiàn)點如下:
首先調用 DLedger 文件存儲實現(xiàn)類 DLedgerFileStore 的 recover 方法,恢復管轄的 MMapFile 對象(一個文件對應一個MMapFile實例)的相關指針,其實現(xiàn)方法與 RocketMQ 的 DefaultMessageStore 的恢復過程類似。
設置 dividedCommitlogOffset 的值為 DLedger 中所有物理文件的最小偏移量。操作消息的物理偏移量小于該值,則從 commitlog 文件中查找;物理偏移量大于等于該值的話則從 DLedger 相關的文件中查找消息。
如果存在舊的 commitlog 文件,則禁止刪除 DLedger 文件,其具體做法就是禁止強制刪除文件,并將文件的有效存儲時間設置為 10 年。
如果 consumequeue 中存儲的最大物理偏移量大于 DLedger 中最大的物理偏移量,則刪除多余的 consumequeue 文件。
>溫馨提示:為什么當存在 commitlog 文件的情況下,不能刪除 DLedger 相關的日志文件呢?
因為在此種情況下,如果 DLedger 中的物理文件有刪除,則物理偏移量會斷層。
正常情況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續(xù)的,這樣非常方便是訪問 commitlog 還是 訪問 DLedger ,但如果DLedger 部分文件刪除后,這兩個值就變的不連續(xù),就會造成中間的文件空洞,無法被連續(xù)訪問。
DLedgerCommitLog#recover
isInrecoveringOldCommitlog = true; super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false;
Step3:如果啟用了 DLedger 并且是初次啟動(還未生成 DLedger 相關的日志文件),則需要恢復 舊的 commitlog 文件。
DLedgerCommitLog#recover
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { // @1 return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; // 1 TOTAL SIZE byteBuffer.getInt(); //size int magicCode = byteBuffer.getInt(); if (magicCode == CommitLog.BLANK_MAGIC_CODE) { // @2 needWriteMagicCode = false; } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } dLedgerConfig.setEnableDiskForceClean(false); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); // @3 log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); if (needWriteMagicCode) { // @4 byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); mappedFile.flush(0); } mappedFile.setWrotePosition(mappedFile.getFileSize()); // @5 mappedFile.setCommittedPosition(mappedFile.getFileSize()); mappedFile.setFlushedPosition(mappedFile.getFileSize()); dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); }
Step4:如果存在舊的 commitlog 文件,需要將最后的文件剩余部分全部填充,即不再接受新的數(shù)據(jù)寫入,新的數(shù)據(jù)全部寫入到 DLedger 的數(shù)據(jù)文件中。其關鍵實現(xiàn)點如下:
嘗試查找最后一個 commitlog 文件,如果未找到,則結束。
從最后一個文件的最后寫入點(原 commitlog 文件的 待寫入位點)嘗試去查找寫入的魔數(shù),如果存在魔數(shù)并等于 CommitLog.BLANK_MAGIC_CODE,則無需再寫入魔數(shù),在升級 DLedger 第一次啟動時,魔數(shù)為空,故需要寫入魔數(shù)。
初始化 dividedCommitlogOffset ,等于最后一個文件的起始偏移量加上文件的大小,即該指針指向最后一個文件的結束位置。
將最后一個 commitlog 未寫滿的數(shù)據(jù)全部寫入,其方法為 設置消息體的 size 與 魔數(shù)即可。
設置最后一個文件的 wrotePosition、flushedPosition、committedPosition 為文件的大小,同樣有意味者最后一個文件已經寫滿,下一條消息將寫入 DLedger 中。
在啟用 DLedger 機制時 Broker 的啟動流程就介紹到這里了,相信大家已經了解 DLedger 在整合 RocketMQ 上做的努力,接下來我們從消息追加、消息讀取兩個方面再來探討 DLedger 是如何無縫整合 RocketMQ 的,實現(xiàn)平滑升級的。
> 溫馨提示:本節(jié)同樣也不會詳細介紹整個消息追加(存儲流程),只是要點出與 DLedger(多副本、主從切換)相關的核心關鍵點。如果想詳細了解消息追加的流程,可以閱讀筆者所著的《RocketMQ技術內幕》一書。
DLedgerCommitLog#putMessage
AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.data); dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); }
關鍵點一:消息追加時,則不再寫入到原先的 commitlog 文件中,而是調用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有集群內的 Leader 節(jié)點負責消息追加以及在消息復制,只有超過集群內的半數(shù)節(jié)點成功寫入消息后,才會返回寫入成功。如果追加成功,將會返回本次追加成功后的起始偏移量,即 pos 屬性,即類似于 rocketmq 中 commitlog 的偏移量,即物理偏移量。
DLedgerCommitLog#putMessage
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);
關鍵點二:根據(jù) DLedger 的起始偏移量計算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲協(xié)議,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結構,返回給客戶端的消息偏移量為 body 字段的開始偏移量,即通過 putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是一樣的,即從開偏移量開始,可以正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關于 pos 以及 wroteOffset 的圖解如下:
DLedgerCommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { // @1 return super.getMessage(offset, size); } int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); // @2 if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return convertSbr(mappedFile.selectMappedBuffer(pos, size)); // @3 } return null; }
消息查找比較簡單,因為返回給客戶端消息,轉發(fā)給 consumequeue 的消息物理偏移量并不是 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實現(xiàn)關鍵點如下:
如果查找的物理偏移量小于 dividedCommitlogOffset,則從原先的 commitlog 文件中查找。
然后根據(jù)物理偏移量按照二分方找到具體的物理文件。
對物理偏移量取模,得出在該物理文件中中的絕對偏移量,進行消息查找即可,因為只有知道其物理偏移量,從該處先將消息的長度讀取出來,然后即可讀出一條完整的消息。
根據(jù)上面詳細的介紹,我想讀者朋友們應該不難得出如下結論:
DLedger 在整合時,使用 DLedger 條目包裹 RocketMQ 中的 commitlog 條目,即在 DLedger 條目的 body 字段來存儲整條 commitlog 條目。
引入 dividedCommitlogOffset 變量,表示物理偏移量小于該值的消息存在于舊的 commitlog 文件中,實現(xiàn) 升級 DLedger 集群后能訪問到舊的數(shù)據(jù)。
新 DLedger 集群啟動后,會將最后一個 commitlog 填充,即新的數(shù)據(jù)不會再寫入到 原先的 commitlog 文件。
消息追加到 DLedger 數(shù)據(jù)日志文件中,返回的偏移量不是 DLedger 條目的起始偏移量,而是DLedger 條目中 body 字段的起始偏移量,即真實消息的起始偏移量,保證消息物理偏移量的語義與 RocketMQ Commitlog一樣。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。