RocketMQ如何快速入門,相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
我們提供的服務(wù)有:成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、林甸ssl等。為千余家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的林甸網(wǎng)站制作公司
本章簡單講講RocketMQ的入門操作,消息發(fā)送和消息接收。
org.apache.rocketmq rocketmq-client 4.2.0
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { //構(gòu)建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
查看結(jié)果
public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_test_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(ListparamList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功 } }); consumer.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
查看結(jié)果
看到消費(fèi)的結(jié)果大家可能有疑問,我們生產(chǎn)消息的時(shí)候是按照順序生產(chǎn)的消息,消費(fèi)時(shí)候?yàn)槭裁床皇琼樞蛳M(fèi)下來的。
MQ消息的無序性,每個(gè)主題對應(yīng)多個(gè)隊(duì)列,生產(chǎn)消息時(shí)是根據(jù)算法放置不同的隊(duì)列中,消費(fèi)則就是無序了(有序消息后面討論)
也有可能出現(xiàn)一條消息被消費(fèi)了多次,RocketMQ的目標(biāo)就是不丟數(shù)據(jù),每條消息至少發(fā)送一次,內(nèi)部通過ACK的確認(rèn)機(jī)制實(shí)現(xiàn)的后面會(huì)重點(diǎn)討論
為了方便的查看消息的詳情我們可以通過消息的管控臺(tái)更好的管理和查看消息詳情,當(dāng)然我們也可以通過后臺(tái)的提供的命令來為運(yùn)維提供更多的管理。
RocketMQ-Console地址: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
可以直接下載到本地之后通過mavne進(jìn)行編譯獲取jar,該項(xiàng)目是SpringBoot項(xiàng)目
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
丟到linux服務(wù)器上啟動(dòng)
(1)啟動(dòng)時(shí)設(shè)置具體的RocketMQ的參數(shù)
java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.10.12.203:9876;10.10.12.204:9876
(2)直接修改rocketmq-console-ng-1.0.0.jar中的配置文件,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據(jù)自己的NamesrvAddr進(jìn)行修改rocketmq.config.namesrvAddr的值,默認(rèn)端口12581
瀏覽器登錄查看控制臺(tái)信息
查看RocketMQ集群的節(jié)點(diǎn)信息
根據(jù)主題時(shí)間段查詢消息
查看某條消息的具體信息
管控臺(tái)提供了很多運(yùn)維功能能極大的提高我們的運(yùn)維效率,里面的功能包括創(chuàng)建主題、修改主題、發(fā)送消息、對消費(fèi)者的信息進(jìn)行查看等功能我們不一一介紹,可以簡單的了解使用。
看完上述內(nèi)容,你們掌握RocketMQ如何快速入門的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!