小編給大家分享一下RocketMQ如何獲取指定消息,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)建站專注于新余網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供新余營銷型網(wǎng)站建設(shè),新余網(wǎng)站制作、新余網(wǎng)頁設(shè)計(jì)、新余網(wǎng)站官網(wǎng)定制、成都小程序開發(fā)服務(wù),打造新余網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供新余網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。概要
消息查詢是什么?
消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息
RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?
問題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?
猜想1:逐個(gè)訪問broker節(jié)點(diǎn)查詢數(shù)據(jù)
猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容
實(shí)際:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。
2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。
問題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?
實(shí)際:單個(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。
源碼閱讀
0.使用方式
MessageExt msg = consumer.viewMessage(msgId);
1.消息ID解析
這個(gè)了解下就可以了
public class MessageId { private SocketAddress address; private long offset; public MessageId(SocketAddress address, long offset) { this.address = address; this.offset = offset; } //get-set } //from MQAdminImpl.java public MessageExt viewMessage( String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MessageId messageId = null; try { //從msgId字符串中解析出address和offset //address = ip:port //offset為消息在CommitLog文件中的偏移量 messageId = MessageDecoder.decodeMessageId(msgId); } catch (Exception e) { throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); } return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()), messageId.getOffset(), timeoutMillis); } //from MessageDecoder.java public static MessageId decodeMessageId(final String msgId) throws UnknownHostException { SocketAddress address; long offset; //ipv4和ipv6的區(qū)別 //如果msgId總長度超過32字符,則為ipv6 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2; byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength)); byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8)); ByteBuffer bb = ByteBuffer.wrap(port); int portInt = bb.getInt(0); address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt); // offset byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16)); bb = ByteBuffer.wrap(data); offset = bb.getLong(0); return new MessageId(address, offset); }