真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RocketMQ

RocketMQ

本文檔主要是rocketmq實(shí)際代碼使用,常見詞語介紹等查看其他文檔

在越城等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站制作、做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計(jì),營(yíng)銷型網(wǎng)站,成都外貿(mào)網(wǎng)站制作,越城網(wǎng)站建設(shè)費(fèi)用合理。

一 下載

http://rocketmq.apache.org/release_notes/release-notes-4.3.2/ 二進(jìn)制文件下載地址,下載后可以直接解壓運(yùn)行

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-source-release.zip 源碼方式下載地址, 下載后需要自己打包

二 啟動(dòng)

2.1 啟動(dòng)nameserver

進(jìn)入rocketmq的bin目錄

nohup sh mqnamesrv &

2.2 啟動(dòng)broker server

進(jìn)入bin目錄

nohup sh mqbroker -n localhost:9876? autoCreateTopicEnable=true &
集群方式參考集群配置文件RocketMQ集群

2.3 啟動(dòng)失敗

默認(rèn)情況下,我們的服務(wù)器都是單獨(dú)的獨(dú)立服務(wù)器,不會(huì)出現(xiàn)這種情況,但是我們?cè)跍y(cè)試過程中使用的是虛擬機(jī), 配置不夠,會(huì)導(dǎo)致無法啟動(dòng)

修改runbroker.sh 和 runserver.sh

分別找到下圖中的指示位置

修改內(nèi)存大小即可,大小請(qǐng)自己按照自己虛擬機(jī)的配置適當(dāng)調(diào)整,比如我修改為了以下值

RocketMQ

RocketMQ

三 圖形化界面

此處非必須,實(shí)際開發(fā)中使用較少

下載rocketmq-console源碼:https://github.com/apache/rocketmq-externals

進(jìn)入子目錄rocketmq-console

執(zhí)行mvn命令打包

mvn clean package -DskipTests

進(jìn)入target目錄

rocketmq-console-ng-1.0.0.jar即為springBoot項(xiàng)目

在該目錄下CMD執(zhí)行命令:

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.89.0.65:9876?
其中
--server.port為運(yùn)行的這個(gè)web應(yīng)用的端口,如果不設(shè)置的話默認(rèn)為8080--rocketmq.config.namesrvAddrRocketMQ命名服務(wù)地址,如果不設(shè)置的話默認(rèn)為“”
OK了,訪問下http://localhost:12581試試吧。

或者打包成 war 包扔到 tomcat 中運(yùn)行

四 入門案例

此案例中使用的是一個(gè)消費(fèi)者,所以消費(fèi)者代碼只有一個(gè)

4.1 pom.xml

???

? ??????
???????
??????????? org.apache.rocketmq
??????????? rocketmq-client
??????????? 4.3.2
???????

???

4.2 同步消息模式

原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包的通訊方式。

應(yīng)用場(chǎng)景:此種方式應(yīng)用場(chǎng)景非常廣泛,例如重要通知郵件、報(bào)名短信通知、營(yíng)銷短信系統(tǒng)等。

RocketMQ

4.2.1 生產(chǎn)者

/**
?* Created by jackiechan on 18-8-19/下午8:37.
?* 原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包的通訊方式。
?*
?* 應(yīng)用場(chǎng)景:此種方式應(yīng)用場(chǎng)景非常廣泛,例如重要通知郵件、報(bào)名短信通知、營(yíng)銷短信系統(tǒng)等
?*/
public class SyncProducer01 {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new
??????????????? DefaultMQProducer("group1");//groupname 同一個(gè)group代表是集群
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設(shè)置nameserver地址
??????? //設(shè)置實(shí)例名字
??????? producer.setInstanceName("producer");//默認(rèn)不需要設(shè)置,會(huì)以ip@pid作為名字, ip是機(jī)器ip,pid是jvmpid
??????? producer.start();
??????? for (int i = 0; i < 100; i++) {
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? //topic和tags在消費(fèi)者那邊獲取到消息后都可以獲取, 可以通過tag區(qū)分消息
??????????? Message msg = new Message("TopicTest" /* Topic 消息所屬的topic */,
??????????????????? "TagA" /* Tag */,
??????????????????? ("Hello RocketMQ " +
??????????????????????????? i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
??????????? );
??????????? //Call send message to deliver message to one of brokers.
??????????? SendResult sendResult = producer.send(msg);
??????????? System.out.printf("%s%n", sendResult);
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? producer.shutdown();
??? }
}

4.3 異步消息模式

原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包的通訊方式。MQ 的異步發(fā)送,需要用戶實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),在執(zhí)行消息的異步發(fā)送時(shí),應(yīng)用不需要等待服務(wù)器響應(yīng)即可直接返回,通過回調(diào)接口接收務(wù)器響應(yīng),并對(duì)服務(wù)器的響應(yīng)結(jié)果進(jìn)行處理。

應(yīng)用場(chǎng)景:異步發(fā)送一般用于鏈路耗時(shí)較長(zhǎng),對(duì) RT 響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場(chǎng)景,例如用戶視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。

RocketMQ

4.3.1 生產(chǎn)者


/**
?* Created by jackiechan on 18-8-19/下午10:05
?*
?* @author jackiechan
?* 原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包的通訊方式。MQ 的異步發(fā)送,需要用戶實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),在執(zhí)行消息的異步發(fā)送時(shí),應(yīng)用不需要等待服務(wù)器響應(yīng)即可直接返回,通過回調(diào)接口接收務(wù)器響應(yīng),并對(duì)服務(wù)器的響應(yīng)結(jié)果進(jìn)行處理。
?*
?* 應(yīng)用場(chǎng)景:異步發(fā)送一般用于鏈路耗時(shí)較長(zhǎng),對(duì) RT 響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場(chǎng)景,例如用戶視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。
?*/
public class AsyncProducer02 {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
??????? //Launch the instance.
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設(shè)置nameserver地址
??????? producer.start();
??????? producer.setRetryTimesWhenSendAsyncFailed(0);
??????? for (int i = 0; i < 100; i++) {
??????????? final int index = i;
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? //消息的keys可以作為標(biāo)記或者傳遞其他消息內(nèi)容,可以在消費(fèi)者獲取到消息后獲取keys進(jìn)行區(qū)分
??????????? Message msg = new Message("TopicTest",
??????????????????? "TagA",
? ??????????????????"OrderID188",
??????????????????? "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
??????????? //發(fā)送異步消息, 通過設(shè)置回調(diào)來接受服務(wù)器給我們返回的消息
??????????? producer.send(msg, new SendCallback() {
??????????????? //當(dāng)發(fā)送成功的時(shí)候執(zhí)行的方法
??????????????? @Override
??????????????? public void onSuccess(SendResult sendResult) {
??????????????????? System.out.printf("%-10d OK %s %n", index,
??????????????????????????? sendResult.getMsgId());
??????????????? }
??????????????? //當(dāng)發(fā)送失敗的時(shí)候執(zhí)行
??????????????? @Override
??????????????? public void onException(Throwable e) {
??????????????????? System.out.printf("%-10d Exception %s %n", index, e);
??????????????????? e.printStackTrace();
??????????????? }
??????????? });
??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? //當(dāng)發(fā)送異步消息的時(shí)候,producer 不要shutdown,因?yàn)榛卣{(diào)是異步的,可能在收到回調(diào)的時(shí)候producer關(guān)閉了會(huì)出錯(cuò)
????? //? producer.shutdown();
??? }
}

4.4 單向模式

原理:?jiǎn)蜗颍∣neway)發(fā)送特點(diǎn)為只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請(qǐng)求不等待應(yīng)答。此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級(jí)別。

應(yīng)用場(chǎng)景:適用于某些耗時(shí)非常短,但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集。

RocketMQ

4.4.1 生產(chǎn)者


/**
?* Created by jackiechan on 18-8-19/下午10:25
?*
?* @author jackiechan
?* 原理:?jiǎn)蜗颍?/em>Oneway)發(fā)送特點(diǎn)為只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請(qǐng)求不等待應(yīng)答。此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級(jí)別。
?*
?* 應(yīng)用場(chǎng)景:適用于某些耗時(shí)非常短,但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集。
?*/
public class OnewayProducer03 {
??? public static void main(String[] args) throws Exception{
??????? //Instantiate with a producer group name.
??????? DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
??????? //Launch the instance.
??????? producer.setNamesrvAddr("192.168.3.8:9876");//設(shè)置nameserver地址
??????? producer.start();
??????? for (int i = 0; i < 100; i++) {
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? Message msg = new Message("TopicTest" /* Topic */,
??????????????????? "TagA" /* Tag */,
??????????????????? ("Hello RocketMQ " +
??????????????????????????? i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
??????????? );
??????????? //Call send message to deliver message to one of brokers.
??????????? producer.sendOneway(msg);

??????? }
??????? //Shut down once the producer instance is not longer in use.
??????? producer.shutdown();
??? }
}

4.5消費(fèi)者

此消費(fèi)者可以接收上面三種不同的消息


/**
?* Created by jackiechan on 18-8-19/下午9:50
?*
?* @authoe jackiechan
?*/
public class MqConsumer {

??? public static void main(String[] args) {
?????? ?//同一個(gè)group代表是集群
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
??????? consumer.setNamesrvAddr("192.168.3.8:9876");
??????? try {
??????????? consumer.subscribe("TopicTest", "TagA||TagB");//可訂閱多個(gè)tag,但是一個(gè)消息只能有一個(gè)tag
??????????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
??????????? consumer.registerMessageListener(new MessageListenerConcurrently() {
??????????????? @Override
??????????????? public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
??????????????????? Message msg = list.get(0);
??????????????????? //輸出消息內(nèi)容
??????????????????? System.out.println("收到消息了:"+new String(msg.getBody()));
??????????????????? //此處可以根據(jù)消息的tag或者keys來區(qū)分消息
??????????????????? if (msg.getTags() != null&&msg.getTags().equals("TagA")) {
??????????????????????? //執(zhí)行TagA的邏輯
??????????????????????? System.out.println("收到的是taga的消息");
??????????????????? }
??????????????????? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
??????????????? }
??????????? });
??????????? consumer.start();
??????? } catch (MQClientException e) {
??????????? System.out.println("出錯(cuò)了");
??????? }
??? }
}

五 順序消費(fèi)

消息順序

消息順序是只可以按照消息發(fā)送的順序進(jìn)行消費(fèi)。一個(gè)訂單產(chǎn)生3條消息,訂單創(chuàng)建、付款、訂單完成。消費(fèi)時(shí)只有按照順序消費(fèi)才有意義,不可能先消費(fèi)付款消息再消費(fèi)訂單創(chuàng)建消息,這樣就亂了。另外,多筆訂單又可以并行消費(fèi)。如何保證呢?

一個(gè)訂單產(chǎn)生的消息只能發(fā)送給同一個(gè)MQ服務(wù)器中的同一個(gè)分區(qū),并且按順序發(fā)送,這樣才能在理論上保證消費(fèi)者消費(fèi)時(shí)是按照順序消費(fèi)的,因?yàn)橐粋€(gè)分區(qū)就是一個(gè)邏輯隊(duì)列。生產(chǎn)者雖然按順序發(fā)送,但是第一條消息到達(dá)MQ的耗時(shí)比第二條多,那么第二條則會(huì)被先消費(fèi),這樣就又導(dǎo)致消費(fèi)時(shí)不是順序的。那么如何解決呢?可以采取只有第一條被消費(fèi)者消費(fèi)成功后再發(fā)送第二條??聪聢D:

RocketMQ

但是如果第一條被發(fā)送到消費(fèi)者后,消費(fèi)者沒有響應(yīng)(消費(fèi)者發(fā)送響應(yīng)但是因?yàn)榫W(wǎng)絡(luò)問題丟失或者消費(fèi)者就沒有收到消息),那么在這種情況下你是繼續(xù)發(fā)送第二條還是重發(fā)第一條呢?如果是嚴(yán)格消息順序,那肯定是重發(fā)第一條,但是如果是消費(fèi)者消費(fèi)后的響應(yīng)丟失了,那么重發(fā)第一條就會(huì)造成重復(fù)消費(fèi)。

從另外一方面看,如果不考慮網(wǎng)絡(luò)異常,那么要實(shí)現(xiàn)嚴(yán)格消息,就必須采取一種一對(duì)一關(guān)系,生產(chǎn)者A的消息對(duì)應(yīng)到MQ服務(wù)器1的X隊(duì)列,消費(fèi)者A消費(fèi)X隊(duì)列。這樣串行結(jié)構(gòu)就會(huì)造成系統(tǒng)吞吐量太低;更多異常需要處理比如消費(fèi)端出現(xiàn)問題,那么整個(gè)消息隊(duì)列就會(huì)出現(xiàn)阻塞。RocketMQ通過輪詢所有隊(duì)列來確定消息發(fā)送到哪一個(gè)隊(duì)列(負(fù)載均衡),比如相同訂單號(hào)的消息會(huì)被先后發(fā)送到統(tǒng)一隊(duì)列中。所以RocketMQ

消息重復(fù)

造成消費(fèi)重復(fù)的根本原因是網(wǎng)絡(luò)不可達(dá),只要有網(wǎng)絡(luò),這種網(wǎng)絡(luò)的不穩(wěn)定因素就存在你無法規(guī)避。所以解決這個(gè)問題的最好辦法就是繞過它。這就變成了,消費(fèi)端收到兩個(gè)一樣的消息后如何處理,而不是從發(fā)送端解決不發(fā)送2個(gè)一樣的消息。對(duì)于消費(fèi)端的要求就是:

  • 消費(fèi)端處理業(yè)務(wù)消息要保持冪等性,也就是同一個(gè)東西執(zhí)行多次會(huì)得到相同結(jié)果

  • 保證每條消息都有唯一編號(hào)切保證消息處理成功與去重表的日志同時(shí)出現(xiàn)

第一條好理解,第二條就是利用一張日志表來記錄已經(jīng)處理成功的消息ID,如果新到的消息ID已經(jīng)存在表中那么就不再處理這個(gè)消息。第一條是在消費(fèi)端實(shí)現(xiàn)的,不屬于消息系統(tǒng)的功能;第二條可以是消息系統(tǒng)實(shí)現(xiàn)也可以是業(yè)務(wù)端實(shí)現(xiàn),處于對(duì)消息系統(tǒng)的吞吐量和高可用考慮最好還是由消費(fèi)端去處理。所以這也就是RocketMQ不解決消息重復(fù)的原因

5.1 生產(chǎn)者


/**
?* Created by jackiechan on 18-8-20/上午12:08
?*
?* @author jackiechan
?*/
public class OrderedProducer {
??? public static void main(String[] args) throws Exception {
??????? //Instantiate with a producer group name.
??????? MQProducer producer = new DefaultMQProducer("example_group_name");
??????? ((DefaultMQProducer) producer).setNamesrvAddr(ServerUtil.SERVERADD);//設(shè)置服務(wù)器地址,請(qǐng)?zhí)鎿Q為自己的服務(wù)器地址
??? ????//Launch the instance.
??????? producer.start();
??????? String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
??????? for (int i = 0; i < 100; i++) {
??????????? int orderId = i % 10;
??????????? int a=i;
??????????? //Create a message instance, specifying topic, tag and message body.
??????????? Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
??????????????????? ("Hello RocketMQ==> " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
???????? ???SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
??????????????? @Override
??????????????? public MessageQueue select(List mqs, Message msg, Object arg) {

??????????????????? // arg的值其實(shí)就是orderId
??????????????????? Integer id = (Integer) arg;

??????????????????? // mqs是隊(duì)列集合,也就是topic所對(duì)應(yīng)的所有隊(duì)列
??????????????????? int index = id % mqs.size();

??????????????????? // 這里根據(jù)前面的id對(duì)隊(duì)列集合大小求余來返回所對(duì)應(yīng)的隊(duì)列
??????????????????? System.out.println(index+"====>"+a);
??????????????????? return mqs.get(index);

??????????????? }
??????????? }, orderId);

?????????? // System.out.printf("%s%n", sendResult);
??????? }
??????? //server shutdown
??????? producer.shutdown();
??? }
}

5.2 消費(fèi)者

消費(fèi)者有多個(gè),代碼一致


/**
?* Created by jackiechan on 18-8-20/上午12:08
?*
?* @author jackiechan
?* 順序消費(fèi)的場(chǎng)景,一個(gè)業(yè)務(wù)需要從頭到尾按照固定順序執(zhí)行, 比如訂單的順序是 創(chuàng)建訂單-支付-發(fā)貨,必須按照這個(gè)順序執(zhí)行, 就可以通過順序消費(fèi)來解決這個(gè)問題
?*/
public class OrderedConsumer {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
??????? consumer.setNamesrvAddr(ServerUtil.SERVERADD);//設(shè)置服務(wù)器地址,實(shí)際開發(fā)替換為自己的地址
??????? /**
???????? * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
???????? * 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
???????? * 這里設(shè)置的是一個(gè)consumer的消費(fèi)策略
???????? *? CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊(duì)列最尾開始消費(fèi),即跳過歷史消息
???????? *? CONSUME_FROM_FIRST_OFFSET 從隊(duì)列最開始開始消費(fèi),即歷史消息(還儲(chǔ)存在broker的)全部消費(fèi)一遍
???????? *? CONSUME_FROM_TIMESTAMP 從某個(gè)時(shí)間點(diǎn)開始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個(gè)小時(shí)以前
???????? *
???????? */
??????? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

??????? consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
??????? //設(shè)置一個(gè)Listener,主要進(jìn)行消息的邏輯處理
??????? //注意這里使用的是MessageListenerOrderly這個(gè)接口
??????? consumer.registerMessageListener(new MessageListenerOrderly() {

??????????? AtomicLong consumeTimes = new AtomicLong(0);
??????????? @Override
??????????? public ConsumeOrderlyStatus consumeMessage(List msgs,
?????? ????????????????????????????????????????????????ConsumeOrderlyContext context) {
??????????????? //返回消費(fèi)狀態(tài)
??????????????? //SUCCESS 消費(fèi)成功
??????????????? //SUSPEND_CURRENT_QUEUE_A_MOMENT 消費(fèi)失敗,暫停當(dāng)前隊(duì)列的消費(fèi)

??????????????? context.setAutoCommit(false);//手動(dòng)提交
??????????????? System.out.printf(Thread.currentThread().getName()+"消費(fèi)者1===>" + msgs.get(0).getQueueId() +? "%n"+new String(msgs.get(0).getBody())+ "%n");
??????????????? this.consumeTimes.incrementAndGet();
??????????????? //以下內(nèi)容模擬收消息失敗,或者回滾等操作
// ???????????????if ((this.consumeTimes.get() % 2) == 0) {
//??????????????????? return ConsumeOrderlyStatus.SUCCESS;
//??????????????? } else if ((this.consumeTimes.get() % 3) == 0) {
//??????????????????? return ConsumeOrderlyStatus.ROLLBACK;
//?????????? ?????} else if ((this.consumeTimes.get() % 4) == 0) {
//??????????????????? return ConsumeOrderlyStatus.COMMIT;
//??????????????? } else if ((this.consumeTimes.get() % 5) == 0) {
//??????????????????? context.setSuspendCurrentQueueTimeMillis(3000);
//???? ???????????????return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
//??????????????? }
??????????????? return ConsumeOrderlyStatus.SUCCESS;

??????????? }
??????? });

??????? consumer.start();

??????? System.out.printf("Consumer Started.%n");
??? }
}

經(jīng)過測(cè)試發(fā)現(xiàn),不同隊(duì)列的消息收取是無序的,但是同一隊(duì)列中消息的收取順序是按照發(fā)送順序收取的

六 廣播模式

6.1 生產(chǎn)者

/**
?* Created by jackiechan on 2018/8/20/上午10:22
?*/
public class BroadcastProducer {
??? public static void main(String[] args) throws Exception {
??????? DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
??????? producer.setNamesrvAddr(ServerUtil.SERVERADD);//設(shè)置服務(wù)器地址
??????? producer.start();
??????? for (int i = 0; i < 100; i++){
??????????? //發(fā)送消息
??????????? Message msg = new Message("TopicTest",
??????????????????? "TagA",
??????????????????? "OrderID188",
??????????????????? ("Hello world==>"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
??????????? SendResult sendResult = producer.send(msg);
?? ?????????System.out.printf("%s%n", sendResult);
??????? }
??????? producer.shutdown();
??? }
}

6.2 消費(fèi)者

消費(fèi)者有多個(gè),代碼一致


分享標(biāo)題:RocketMQ
本文地址:http://weahome.cn/article/pscopc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部