這篇文章將為大家詳細(xì)講解有關(guān)RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
專注于為中小企業(yè)提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)玄武免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
1.同步刷盤是怎么工作的?
2.異步刷盤是怎么工作的?
3.上篇文章的疑問,寫入堆外內(nèi)存的消息如何落盤的?
//初始化鏈條
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
final BrokerController controller = new BrokerController(...)
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore(...);
@4 DefaultMessageStore#DefaultMessageStore()
this.commitLog = new CommitLog(this);
@5 CommitLog#CommitLog()
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig()
.getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
//啟動(dòng)鏈條
@6 BrokerStartup#start
controller.start();
@7 BrokerController#start()
this.messageStore.start();
@8 DefaultMessageStore#start()
this.commitLog.start();
@9 CommitLog#start()
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
this.commitLogService.start();
}
小結(jié):由調(diào)用鏈可以看出,初始化并啟動(dòng)了以下線程類
1.同步刷盤 GroupCommitService
2.異步刷盤 FlushRealTimeService
3.如果開啟堆外內(nèi)存并且為異步刷盤 CommitRealTimeService
既然線程類在Broker啟動(dòng)時(shí)就啟動(dòng)了,他們?cè)谧錾赌兀?/p>
小結(jié):
1.CommitRealTimeService主要工作是將寫入堆外內(nèi)存(writeBuffer)的消息,寫入到fileChannel中,fileChannel為commitLog文件通道
2.committedPosition用于記錄將writeBuffer數(shù)據(jù)寫入到fileChannel中的內(nèi)存位點(diǎn)(相對(duì)偏移量offset)
3.committedWhere用于記錄寫入fileChannel中的物理偏移量(文件名稱+相對(duì)偏移量offset)
1.執(zhí)行onWaitEnd時(shí)交換讀寫容器,該線程類提供兩個(gè)容器來裝GroupCommitRequest
2.requestsWrite和requestsRead,每次執(zhí)行提交(刷盤)前都會(huì)進(jìn)行容器交換
3.好處:讀寫請(qǐng)求容器分離,避免潛在的鎖競(jìng)爭(zhēng)
private void swapRequests() {
List tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
1.flushedPosition 標(biāo)記已經(jīng)刷盤內(nèi)存的位點(diǎn)。即刷盤相對(duì)偏移量,刷盤到什么位置了,下次從此處刷盤即可
2.flushedWhere 標(biāo)記已經(jīng)刷盤的物理偏移量,根據(jù)此位置可精確查找到文件中消息的存儲(chǔ)位置。flushedWhere = 當(dāng)前刷盤文件名稱(該日志文件的起始物理偏移量) + flushedPosition
小結(jié):同步刷盤線程類GroupCommitService主要工作
將請(qǐng)求從讀容器中取出并通過mappedByteBuffer.force()將數(shù)據(jù)落盤。
3.異步刷盤線程類FlushRealTimeService工作流程
小結(jié):FlushRealTimeService主要工作
1.不開啟堆外外內(nèi)存刷盤方式為mappedByteBuffer.force()
2.開啟堆外內(nèi)存刷盤方式為fileChannel.force
疑問:同步刷盤線程類GroupCommitService每執(zhí)行一次都會(huì)交換讀寫容器,那刷盤請(qǐng)求什么時(shí)候放到寫容器(requestsWrite)呢?
分析完線程類后,把鏡頭切換到消息追加,看看消息進(jìn)來后是如何跟線程類交互的?
@1 CommitLog#putMessage
//同步刷盤或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
@2 CommitLog#handleDiskFlush
同步刷盤時(shí)構(gòu)造刷盤請(qǐng)求,將請(qǐng)求提交給線程類GroupCommitService,service.putRequest(request),并獲取刷盤結(jié)果。
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待MappedFile刷盤成功狀態(tài)通過countDownLatch來控制
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
}
}
未開啟堆外內(nèi)存喚醒FlushRealTimeServicee,開啟堆外內(nèi)存喚醒CommitRealTimeService。
if (!this.defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
關(guān)于“RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。