真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤

這篇文章將為大家詳細(xì)講解有關(guān)RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

專注于為中小企業(yè)提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)玄武免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

一、問題思考

1.同步刷盤是怎么工作的?
2.異步刷盤是怎么工作的?
3.上篇文章的疑問,寫入堆外內(nèi)存的消息如何落盤的?

二、Broker啟動(dòng)刷盤有關(guān)調(diào)用鏈
1.調(diào)用鏈

//初始化鏈條
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
final BrokerController controller = new BrokerController(...)
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore(...);
@4 DefaultMessageStore#DefaultMessageStore()
this.commitLog = new CommitLog(this);
@5 CommitLog#CommitLog()
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig()
.getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
//啟動(dòng)鏈條
@6 BrokerStartup#start
controller.start();
@7 BrokerController#start()
this.messageStore.start();
@8 DefaultMessageStore#start()
this.commitLog.start();
@9 CommitLog#start()
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
this.commitLogService.start();
}

小結(jié):由調(diào)用鏈可以看出,初始化并啟動(dòng)了以下線程類

1.同步刷盤 GroupCommitService

2.異步刷盤 FlushRealTimeService

3.如果開啟堆外內(nèi)存并且為異步刷盤 CommitRealTimeService


2.線程類關(guān)系圖

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤

三、線程類工作流程

既然線程類在Broker啟動(dòng)時(shí)就啟動(dòng)了,他們?cè)谧錾赌兀?/p>

1.堆外內(nèi)存線程類CommitRealTimeService工作流程

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤


小結(jié):
1.CommitRealTimeService主要工作是將寫入堆外內(nèi)存(writeBuffer)的消息,寫入到fileChannel中,fileChannel為commitLog文件通道

2.committedPosition用于記錄將writeBuffer數(shù)據(jù)寫入到fileChannel中的內(nèi)存位點(diǎn)(相對(duì)偏移量offset)
3.committedWhere用于記錄寫入fileChannel中的物理偏移量(文件名稱+相對(duì)偏移量offset)

2.同步刷盤線程類GroupCommitService工作流程

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤

注1:

1.執(zhí)行onWaitEnd時(shí)交換讀寫容器,該線程類提供兩個(gè)容器來裝GroupCommitRequest

2.requestsWrite和requestsRead,每次執(zhí)行提交(刷盤)前都會(huì)進(jìn)行容器交換

3.好處:讀寫請(qǐng)求容器分離,避免潛在的鎖競(jìng)爭(zhēng)

private void swapRequests() {
List tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

注2:

1.flushedPosition 標(biāo)記已經(jīng)刷盤內(nèi)存的位點(diǎn)。即刷盤相對(duì)偏移量,刷盤到什么位置了,下次從此處刷盤即可

2.flushedWhere 標(biāo)記已經(jīng)刷盤的物理偏移量,根據(jù)此位置可精確查找到文件中消息的存儲(chǔ)位置。flushedWhere = 當(dāng)前刷盤文件名稱(該日志文件的起始物理偏移量) + flushedPosition

注3:流程圖中標(biāo)記紅色部分,將刷盤結(jié)果通知給等待線程

小結(jié):同步刷盤線程類GroupCommitService主要工作
將請(qǐng)求從讀容器中取出并通過mappedByteBuffer.force()將數(shù)據(jù)落盤。

3.異步刷盤線程類FlushRealTimeService工作流程

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤

小結(jié):FlushRealTimeService主要工作
1.不開啟堆外外內(nèi)存刷盤方式為mappedByteBuffer.force()
2.開啟堆外內(nèi)存刷盤方式為fileChannel.force


疑問:同步刷盤線程類GroupCommitService每執(zhí)行一次都會(huì)交換讀寫容器,那刷盤請(qǐng)求什么時(shí)候放到寫容器(requestsWrite)呢?


四、消息追加與線程類的交互

分析完線程類后,把鏡頭切換到消息追加,看看消息進(jìn)來后是如何跟線程類交互的?


1.調(diào)用鏈

@1 CommitLog#putMessage
//同步刷盤或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
@2 CommitLog#handleDiskFlush

2.同步刷盤主要代碼

同步刷盤時(shí)構(gòu)造刷盤請(qǐng)求,將請(qǐng)求提交給線程類GroupCommitService,service.putRequest(request),并獲取刷盤結(jié)果。

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待MappedFile刷盤成功狀態(tài)通過countDownLatch來控制
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
}
}

3.異步刷盤主要代碼

未開啟堆外內(nèi)存喚醒FlushRealTimeServicee,開啟堆外內(nèi)存喚醒CommitRealTimeService。

if (!this.defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}

五、刷盤方式示意圖
1.同步刷盤示意圖

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤


2.異步刷盤未開啟堆外緩存示意圖

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤


3.異步刷盤開啟堆外緩存示意圖

RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤

關(guān)于“RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。


本文題目:RocketMQ存儲(chǔ)中如何實(shí)現(xiàn)同步刷盤和異步刷盤
文章位置:http://weahome.cn/article/gepcce.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部