這篇文章主要為大家展示了“RocketMQ中broker消息存儲(chǔ)之如何添加消息”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“RocketMQ中broker消息存儲(chǔ)之如何添加消息”這篇文章吧。
創(chuàng)新互聯(lián)始終致力于在企業(yè)網(wǎng)站建設(shè)領(lǐng)域發(fā)展。秉承“創(chuàng)新、求實(shí)、誠信、拼搏”的企業(yè)精神,致力為企業(yè)提供全面的網(wǎng)絡(luò)宣傳與技術(shù)應(yīng)用整體策劃方案,為企業(yè)提供包括“網(wǎng)站建設(shè)、響應(yīng)式網(wǎng)站、手機(jī)網(wǎng)站建設(shè)、微信網(wǎng)站建設(shè)、微信小程序定制開發(fā)、商城網(wǎng)站建設(shè)、平臺(tái)網(wǎng)站建設(shè)秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
broker存儲(chǔ)子系統(tǒng)追加消息的入口是DefaultMessageStore.asyncPutMessage,該方法會(huì)調(diào)用CommitLog.asyncPutMessage,主要實(shí)現(xiàn)如下:
// CommitLog.java public CompletableFutureasyncPutMessage(final MessageExtBrokerInner msg) { // msg準(zhǔn)備... // 1. 獲取最后一個(gè)block,準(zhǔn)備寫入 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { //.... if (null == mappedFile || mappedFile.isFull()) { // 加鎖后重試一次,嘗試新建一個(gè)block mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { // 新建block失敗,返回 log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } // 2. append到block末尾 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: // 3. block剩余空間不足,新建下一個(gè)block,再次嘗試寫入 unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; // 發(fā)送錯(cuò)誤,失敗返回... } // ... } finally { putMessageLock.unlock(); } // 4. 提交刷盤請(qǐng)求 CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); // 5. 提交同步請(qǐng)求 CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); }
MappedFile在執(zhí)行數(shù)據(jù)追加時(shí)實(shí)際調(diào)用的是MappedFile.appendMessagesInner方法:
// MappedFile public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; // 調(diào)用callback執(zhí)行實(shí)際的數(shù)據(jù)寫入,callback實(shí)現(xiàn)決定了數(shù)據(jù)的存儲(chǔ)格式 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
msg實(shí)際是由CommitLog.DefaultAppendMessageCallback執(zhí)行寫入的,DefaultAppendMessageCallback決定了msg在文件中的存儲(chǔ)格式:
// CommitLog.DefaultAppendMessageCallback public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, // 數(shù)據(jù)準(zhǔn)備,字段超長(zhǎng)則直接返回... // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { // 一. block剩余空間不足,則寫入一個(gè)特殊的msg,標(biāo)示已達(dá)block末尾 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // 二、按約定格式寫入msg // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); //... return result; }
數(shù)據(jù)追加的整體流程如下圖
以上是“RocketMQ中broker消息存儲(chǔ)之如何添加消息”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!