這篇文章主要介紹“RocketMQ的刷盤策略以及實現同步刷盤和異步刷盤的實例代碼”,在日常操作中,相信很多人在RocketMQ的刷盤策略以及實現同步刷盤和異步刷盤的實例代碼問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ的刷盤策略以及實現同步刷盤和異步刷盤的實例代碼”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
我們提供的服務有:網站制作、成都網站設計、微信公眾號開發(fā)、網站優(yōu)化、網站認證、臨城ssl等。為上1000家企事業(yè)單位解決了網站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術的臨城網站制作公司
RocketMQ提供了兩種刷盤策略同步刷盤、異步刷盤
同步刷盤:在消息到達MQ后,RocketMQ需要將數據持久化,同步刷盤是指數據到達內存之后,必須刷到commitlog日志之后才算成功,然后返回producer數據已經發(fā)送成功。
異步刷盤:,同步刷盤是指數據到達內存之后,返回producer說數據已經發(fā)送成功。,然后再寫入commitlog日志。
復制方式 | 優(yōu)點 | 缺點 | 適應場景 |
---|---|---|---|
同步刷盤 | 保證了消息不丟失 | 吞吐率相對于異步刷盤要低 | 消息可靠性要求較高的場景 |
異步刷盤 | 系統(tǒng)的吞吐量提高 | 系統(tǒng)斷電等異常時會有部分丟失 | 對應吞吐量要求較高的場景 |
下面我們從源碼的角度分析其實現的邏輯
CommitLog.putMessage()方法中的刷盤的核心方法handleDiskFlush()
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush 同步刷盤 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; //客戶端確認要等待刷盤成功 if (messageExt.isWaitStoreMsgOK()) { //封裝刷盤請求對象 nextoffset : 當前內存寫的位置 + 本次要寫入的字節(jié)數 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); //添加刷盤請求(后臺定時任務進行刷盤,每隔10毫秒批量刷盤。10毫秒中如果有多個請求,則多個請求一塊刷盤) service.putRequest(request); //等待刷盤請求結果(最長等待5秒鐘,刷盤成功后馬上可以獲取結果。) boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } }else {// Asynchronous flush 異步刷盤 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { //喚醒FlushRealTimeService服務線程 flushCommitLogService.wakeup(); } else { //喚醒CommitRealTimeService服務線程 commitLogService.wakeup(); } } }
查看同步刷盤的核心類GroupCommitService中的核心屬性
private volatile List
我們查看其run()方法
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //等待通知,如果數據過來,提前結束等待執(zhí)行onWaitEnd()方法交換讀寫swapRequests() //刷盤請求的requestsWrite->requestsRead this.waitForRunning(10); //執(zhí)行刷盤 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } //省略代碼... }
waitForRunning方法中執(zhí)行了swapRequests()方法
private void swapRequests() { Listtmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }
GroupCommitService接收到的刷盤請求通過putRequest()方法加入到requestsWrite集合中,swapRequests()方法將requestsWrite請求集合交換到requestsRead集合中供刷盤使用,我們重點查看doCommit()方法
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { //循環(huán)每一個刷盤請求 for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { //判斷是否已經刷盤過了,刷盤的位置和當前消息下次刷盤需要的位置比較 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { //0代碼立刻刷盤,不管緩存中消息有多少 CommitLog.this.mappedFileQueue.flush(0); } } //返回刷盤的結果 req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); //設置刷盤的時間點 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空requestsRead對象 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
mappedFileQueue.flush(0)立刻刷盤
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); //刷盤,返回刷寫到磁盤指針 int offset = mappedFile.flush(flushLeastPages); //計算當前的刷盤指針,之前的所有數據已經持久化到磁盤中 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
mappedFile.flush(0);保證立刻刷盤后面異步刷盤時也會調用mappedFile.flush()方法
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { //喚醒FlushRealTimeService服務線程 flushCommitLogService.wakeup(); } else { //喚醒CommitRealTimeService服務線程 commitLogService.wakeup(); }
我們發(fā)現異步刷盤的時候有兩種方式,一種是堆外內存池開啟時啟動CommitRealTimeService服務線程,另一個是默認執(zhí)行的FlushRealTimeService服務線程進行刷盤操作,關于TransientStorePoolEnable在《RocketMQ內存映射》章節(jié)中的**“創(chuàng)建映射文件MappedFile”**中有介紹
圖3-1
查看其run()方法
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每次刷盤的間隔時間,默認 200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 每次commit最少的頁數 默認4頁 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 如果上次刷新的時間+該值 小于當前時間,則改變flushPhysicQueueLeastPages =0 默認為200 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); //距離上一次刷盤時間超過200ms則立刻刷盤,commit最少的頁數置為0 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { //刷盤 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
這種方式和同步刷盤一樣就是mappedFileQueue.commit(commitDataLeastPages)參數有限制,數據達到一定量的時候才進行刷盤操作提高數據的刷盤性能。
查看其run()方法
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每次刷盤的間隔時間,默認 200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 每次commit最少的頁數 默認4頁 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 如果上次刷新的時間+該值 小于當前時間,則改變flushPhysicQueueLeastPages =0 默認為200 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); //距離上一次刷盤時間超過200ms則立刻刷盤,commit最少的頁數置為0 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { //刷盤 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); //返回的是false說明數據已經commit到了fileChannel中 if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
我們發(fā)現其刷盤方法不一樣mappedFileQueue.commit()調用MappedFile.commit()方法
public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } //如果提交的數據不滿commitLeastPages則不執(zhí)行本次的提交,待下一次提交 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); }
查看其核心刷盤方法
protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { //創(chuàng)建writeBuffer的共享緩存區(qū) ByteBuffer byteBuffer = writeBuffer.slice(); //將指針回退到上一次提交的位置 byteBuffer.position(lastCommittedPosition); //設置limit為writePos byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); //將committedPosition指針到wrotePosition的數據復制(寫入)到fileChannel中 this.fileChannel.write(byteBuffer); //更新committedPosition指針為writePos this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
commit0()只是將緩存數據加入到fileChannel中,我們在CommitRealTimeService.run()方法中看到喚醒flushCommitLogService線程需要將fileChannel中的數據flush到磁盤中,我們發(fā)現兩種方式都需要走flushCommitLogService.run()方法最后都執(zhí)行MappedFile.flush(int)
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } //設置刷盤后的指針 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
兩種緩存方式走的刷盤邏輯也不同,可以查看**“圖3-1”**兩種方式的處理流程圖
我們還發(fā)現一個方法isAbleToFlush()判斷是否需要刷盤
private boolean isAbleToFlush(final int flushLeastPages) { int flush = this.flushedPosition.get(); int write = getReadPosition(); if (this.isFull()) { return true; } if (flushLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } return write > flush; }
同步刷盤時flushLeastPages=0立刻刷盤
異步刷盤時flushLeastPages=4 ,默認是4,需要刷盤的數據達到PageCache的頁數4倍時才會刷盤,或者距上一次刷盤時間>=200ms則設置flushLeastPages=0立刻刷盤
同步刷盤時無論消息的大小都立刻刷盤,線程阻塞等待刷盤結果
異步刷盤有兩種方式但是其邏輯都是需要刷盤的數據OS_PAGE_SIZE的4倍即(1024 * 4)*4=16k或者距上一次刷盤時間>=200ms時才刷盤,提高數據的刷盤性能
到此,關于“RocketMQ的刷盤策略以及實現同步刷盤和異步刷盤的實例代碼”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯網站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
分享名稱:RocketMQ的刷盤策略以及實現同步刷盤和異步刷盤的實例代碼
本文來源:http://weahome.cn/article/jjpidg.html