序
創(chuàng)新互聯(lián)建站自2013年創(chuàng)立以來(lái),先為蔡甸等服務(wù)建站,蔡甸等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢(xún)服務(wù)。為蔡甸企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。
以前簡(jiǎn)單用過(guò)ActiveMQ但是公司項(xiàng)目上使用的是RocketMQ,所以準(zhǔn)備多花點(diǎn)時(shí)間在這上面,搞懂項(xiàng)目的配置使用。
看了很多資料,先說(shuō)說(shuō)我自己對(duì)RocketMQ的簡(jiǎn)單理解。不管是我們寫(xiě)的消費(fèi)者還是生產(chǎn)者都屬于客戶(hù)端,而我們需要安裝RocketMQ,這是屬于服務(wù)端。和ActivieMQ、zookeeper類(lèi)似,消費(fèi)者、生成者、服務(wù)端(NameServer)之間是采取觀察者模式實(shí)現(xiàn)。
在操作系統(tǒng)上安裝RocketMQ,啟動(dòng)服務(wù)端NameServer、啟動(dòng)Broker,書(shū)寫(xiě)Consumer代碼,運(yùn)行消費(fèi)者。書(shū)寫(xiě)Producer代碼,運(yùn)行生產(chǎn)者。
基本簡(jiǎn)單邏輯是這樣的,當(dāng)然其中還有很多細(xì)節(jié)。平時(shí)在測(cè)試時(shí)我們都在window上使用,踩了點(diǎn)坑,成功完成。
安裝運(yùn)行
1、下載
建議下載發(fā)行版本,我試過(guò)自己編譯,不知道為何報(bào)錯(cuò)了。
rocketmq-all-4.2.0-bin-release.zip
解壓出來(lái)如下:
2、啟動(dòng)
NameServer
在啟動(dòng)之前需要配置系統(tǒng)環(huán)境,不然會(huì)報(bào)錯(cuò)。配置完成記得重啟電腦
Please set the ROCKETMQ_HOME variable in your environment!
系統(tǒng)環(huán)境變量名:ROCKETMQ_HOME
每個(gè)人不一樣,對(duì)比如上我的路徑—-變量值:D:\rocketMQ
進(jìn)入window命令窗口,進(jìn)入bin目錄下,執(zhí)行
start mqnamesrv.cmd
如上則NameServer啟動(dòng)成功。使用期間,窗口不要關(guān)機(jī)。
Broker
同理,再次開(kāi)一個(gè)命令窗口,進(jìn)入bin目錄下,輸入
start mqbroker.cmd -n localhost:9876
如上的 ip+port 是NameServer的進(jìn)程,因?yàn)镹ameser安裝啟動(dòng)在本地,所以這里的 ip 是 localhost。
運(yùn)行如上命令,可能會(huì)報(bào)如下錯(cuò)誤。找不到或無(wú)法加載主類(lèi)
如果出此情況,打開(kāi)bin-->runbroker.cmd,修改%CLASSPATH%成
"%CLASSPATH%"
保存再次執(zhí)行如上命令。執(zhí)行成功后,窗口并不會(huì)顯示什么,只是一個(gè)空窗口,代表成功。
書(shū)寫(xiě)代碼
依賴(lài)RocketMQ
org.apache.rocketmq rocketmq-client 4.2.0
1、Consumer
public class Consumer { public static void main(String[] args) throws MQClientException { //這里填寫(xiě)group名字 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group-name-A"); //NameServer地址 consumer.setNamesrvAddr("localhost:9876"); //1:topic名字 2:tag名字 consumer.subscribe("topic-name-A", "tag-name-A"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( Listmsgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started!"); } }
先運(yùn)行起來(lái)
2、Producer
注意匹配相應(yīng)參數(shù):group topic tag
public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message message = new Message("topic-name-A","tag-name-A","Message : My blog address guozh.net".getBytes()); producer.send(message); System.out.println("Message sended"); producer.shutdown(); } }
再次運(yùn)行 producer。
然后去 Consumer 看看是否收到消息。
監(jiān)控平臺(tái)
和其他的MQ一樣,這里也提供了Window版本可視化的監(jiān)控和 Linux監(jiān)控??梢钥吹较⑾M(fèi)的具體情況,但是其實(shí)在實(shí)際開(kāi)發(fā)過(guò)程中,Window顯示的界面數(shù)據(jù)非常少,看不到多少內(nèi)容。所以實(shí)際項(xiàng)目中都是看 Linux 數(shù)據(jù)。
我們這邊項(xiàng)目看MQ消費(fèi)情況也是在Linux上部署查看。
但是可以學(xué)習(xí)學(xué)習(xí),為L(zhǎng)inux的安裝拓展畫(huà)面感。
1、下載
rocketmq-console
其實(shí)這里提供了安裝部署的方法,可以根據(jù)實(shí)際情況來(lái)
所以一步一步來(lái)吧,首先修改配置文件。修改application.properties,具體位置如下
rocketmq-console\src\main\resources
主要如上兩處需要修改,平臺(tái)部署的端口。我這里 8080 沒(méi)被使用,這里就用 8080。下面是NameServer的啟動(dòng)位置,根據(jù)自己實(shí)際情況填寫(xiě)即可。
2、啟動(dòng)
首先,上面的 Tips 也說(shuō)了,看看自己的Maven鏡像是不是阿里云的,不然下載jar可能下載不下來(lái)或者很慢,這里不用說(shuō)了。
進(jìn)入命令窗口,進(jìn)入rocketmq-console目錄,執(zhí)行。
mvn clean package -Dmaven.test.skip=true
Build成功后,再次執(zhí)行
java -jar target/rocketmq-console-ng-1.0.0.jar
完成后,進(jìn)入網(wǎng)址即可,比如我這是 localhost:8080
ok!完成,估計(jì)后面會(huì)好好的學(xué)習(xí)RocketMQ。