同步發(fā)送sysnc
、異步發(fā)送async
、直接發(fā)送one-way
。成都創(chuàng)新互聯(lián)專(zhuān)注于東寶企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站開(kāi)發(fā),購(gòu)物商城網(wǎng)站建設(shè)。東寶網(wǎng)站建設(shè)公司,為東寶等地區(qū)提供建站服務(wù)。全流程按需制作,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)使用RocketMQTemplate必須要在配置文件中配置RocketMQ的屬性,Springboot在加載時(shí)才會(huì)創(chuàng)建RocketMQTemplate的Bean。配置文件示例如下:
rocketmq:
name-server: nameServer的集群IP
compress-message-body-threshold: 4096
consumer:
access-key: username
secret-key: password
max-message-size: 536870912
producer:
access-key: username
secret-key: password
group: producerGroup
retry-next-server: true
retry-times-when-send-async-failed: 2
retry-times-when-send-failed: 2
send-message-timeout: 3000
(1)void send(Message>message) throws MessagingException;同步發(fā)送import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.send(sendMessage);
}
}
RocketMQTemplate.send(Message>message) 方法只有一個(gè)Message>類(lèi)型的參數(shù),沒(méi)有設(shè)置topic,這個(gè)消息會(huì)發(fā)送到RocketMQ的默認(rèn)topic
,這個(gè)默認(rèn)topic是在安裝RocketMQ Client的時(shí)候配置的,如果沒(méi)有這個(gè)topic會(huì)拋出"No 'defaultDestination' configured"
異常。這個(gè)方法幾乎不會(huì)被使用,我們發(fā)送消息一般都是要發(fā)送到我們想去的一個(gè)topic。此方法會(huì)將消息同步發(fā)送至topic,此方法沒(méi)有返回值,我們無(wú)法獲取SendResult。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
rocketMQTemplate.send("topicA:tagA", sendMessage);
}
}
RocketMQTemplate.send(D destination, Message>message) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息。這個(gè)方法是一個(gè)委托方法,其實(shí)最終調(diào)用的是RocketMQTemplate.syncSend(String destination, Message>message)方法,也就是說(shuō)destination雖然是個(gè)泛型,但是我們應(yīng)該傳入一個(gè)字符串類(lèi)型的topic,此方法會(huì)將消息同步發(fā)送至topic
。此方法沒(méi)有返回值,我們無(wú)法獲取SendResult。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Message>message) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息。此方法的返回值為SendResult,我們可以通過(guò)這個(gè)類(lèi)來(lái)確定消息是否發(fā)送成功,獲取消息的MessageId等
。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage, 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Message>message, long timeout) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息,第三個(gè)參數(shù)是超時(shí)時(shí)間
。其實(shí)方法(3)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Collection消息的集合
。此方法會(huì)將消息批量發(fā)送到topicA下的tagA下。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages, 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Collection消息的集合
,第三個(gè)參數(shù)是超時(shí)時(shí)間。此方法會(huì)將消息批量發(fā)送到topicA下的tagA下。其實(shí)方法(5)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "這里設(shè)置消息體");
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的消息體。此方法不需要我們自己創(chuàng)建Message對(duì)象了,底層會(huì)幫我們創(chuàng)建。但是缺點(diǎn)就是不能設(shè)置消息的屬性和key
。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "這里設(shè)置消息體", 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的消息體
,第三個(gè)參數(shù)是超時(shí)時(shí)間。此方法就是不需要我們自己創(chuàng)建Message對(duì)象了,底層會(huì)幫我們創(chuàng)建。但是缺點(diǎn)就是不能設(shè)置消息的屬性和key
。其實(shí)方法(7)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
rocketMQTemplate.sendOneWay("topicA:tagA", "這里設(shè)置消息體");
}
}
RocketMQTemplate.sendOneWay(String destination, Message>message) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息。此方法可以異步發(fā)送消息,具有很高的發(fā)送效率,但是沒(méi)有返回值,我們無(wú)法獲取SendResult。
(10)void sendOneWay(String destination, Object payload);one-way模式,異步發(fā)送import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
rocketMQTemplate.sendOneWay("topicA:tagA", "這里設(shè)置消息體");
}
}
RocketMQTemplate.sendOneWay(String destination, Object payload) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的消息體
。此方法不需要我們自己創(chuàng)建Message對(duì)象了,底層會(huì)幫我們創(chuàng)建。但是缺點(diǎn)就是不能設(shè)置消息的屬性和key。此方法可以異步發(fā)送消息,具有很高的發(fā)送效率,但是沒(méi)有返回值,我們無(wú)法獲取SendResult。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
RocketMQTemplate.asyncSend(String destination, Message>message, SendCallback sendCallback) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息,第三個(gè)參數(shù)是異步消息的回調(diào)對(duì)象。此方法允許我們?cè)O(shè)置回調(diào)函數(shù),知道異步消息是否發(fā)送成功以此來(lái)做相應(yīng)的事情。方法(9)和(10)底層也是調(diào)用了此方法,只不過(guò)把SendCallback對(duì)象設(shè)為了null。
(12)void asyncSend(String destination, Message>message, SendCallback sendCallback, long timeout);異步發(fā)送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
RocketMQTemplate.asyncSend(String destination, Message>message, SendCallback sendCallback, long timeout) 方法有四個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息,第三個(gè)參數(shù)是異步消息的回調(diào)對(duì)象,第四個(gè)參數(shù)是超時(shí)時(shí)間。此方法允許我們?cè)O(shè)置回調(diào)函數(shù),知道異步消息是否發(fā)送成功以此來(lái)做相應(yīng)的事情。方法(11)也是調(diào)用了此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。
(13)void asyncSend(String destination, Object payload, SendCallback sendCallback);異步發(fā)送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "這里設(shè)置消息體", new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
同方法(11)一樣,只不過(guò)第二個(gè)參數(shù)由Message>類(lèi)型換成了Object類(lèi)型,可以直接傳入要發(fā)送的消息體,不用我們自己創(chuàng)建Message對(duì)象,缺點(diǎn)就是不能設(shè)置消息的屬性和key。
(14)void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);異步發(fā)送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "這里設(shè)置消息體", new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
方法(13)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。
(15)import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
同方法(11)一樣,只不過(guò)第二個(gè)參數(shù)由Message>類(lèi)型換成了Collection
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
方法(15)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。
(17)void convertAndSend(Object payload) throws MessagingException;同步發(fā)送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實(shí)體類(lèi)
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend(exampleEntity);
}
}
同方法(1)一樣,此方法也是將消息發(fā)送到默認(rèn)的topic。payload可以是一個(gè)實(shí)體類(lèi)、集合等也可以是字符串。如果payload是實(shí)體類(lèi)、集合等,底層會(huì)將實(shí)體類(lèi)轉(zhuǎn)化成json對(duì)象,例如上述代碼發(fā)送消息的結(jié)果就是{"name":"Tom"}
,如果傳入的是集合對(duì)象這會(huì)轉(zhuǎn)換從jsonArray。如果payload是字符串,則發(fā)送消息的結(jié)果就是原字符串。
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實(shí)體類(lèi)
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity);
}
}
在方法(17)的基礎(chǔ)上可以指定topic將消息發(fā)送到指定的topic。
(19)void convertAndSend(D destination, Object payload, @Nullable Mapimport com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 消息的屬性
Mapmap = new HashMap<>();
map.put("消息的屬性的鍵", "消息的屬性的值");
map.put("KEYS", "消息的key");
// 實(shí)體類(lèi)
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map);
}
}
在方法(18)的基礎(chǔ)上增加了第三個(gè)Map類(lèi)型的參數(shù),我們可以使用這個(gè)參數(shù)來(lái)設(shè)置消息的屬性和key。
(20)void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實(shí)體類(lèi)
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend(exampleEntity, new MessagePostProcessor() {@Override
public Message>postProcessMessage(Message>message) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(17)的基礎(chǔ)上增加了MessagePostProcessor對(duì)象,MessagePostProcessor顧名思義就是消息的后處理。我們傳入的參數(shù)中,destination就是消息要去往的topic(這里沒(méi)有destination則發(fā)送默認(rèn)的topic),payload就是消息的消息體,headers就是消息的屬性(這里沒(méi)有headers則無(wú)法設(shè)置屬性),RocketMQ底層會(huì)根據(jù)payload和headers生成Message對(duì)象,MessagePostProcessor就是對(duì)這個(gè)生成的Message對(duì)象做一些事情,最后再發(fā)往destination。
(21)void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實(shí)體類(lèi)
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, new MessagePostProcessor() {@Override
public Message>postProcessMessage(Message>message) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(20)的基礎(chǔ)上可以指定topic將消息發(fā)送到指定的topic。
(22)void convertAndSend(D destination, Object payload, @Nullable Mapimport com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 消息的屬性
Mapmap = new HashMap<>();
map.put("消息的屬性的鍵", "消息的屬性的值");
map.put("KEYS", "消息的key");
// 實(shí)體類(lèi)
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map, new MessagePostProcessor() {@Override
public Message>postProcessMessage(Message>message) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(21)的基礎(chǔ)上增加了Map類(lèi)型的參數(shù),我們可以使用這個(gè)參數(shù)來(lái)設(shè)置消息的屬性和key。
至此,使用RocketMQTemplate發(fā)送普通消息的方法就全部講解完了,其實(shí)還有RocketMQTemplate.sendAndReceive()方法也可以發(fā)送普通消息,但是這個(gè)要配合消費(fèi)者一起使用,我會(huì)另寫(xiě)一篇文章講解這個(gè)方法。
二、使用DefaultMQProducer發(fā)送消息有些場(chǎng)景下我們不想使用springboot自動(dòng)創(chuàng)建的RocketMQTemplate的Bean來(lái)發(fā)送消息,而是想自己創(chuàng)建生產(chǎn)者以使用不同的nameServer來(lái)發(fā)送消息到不同的集群或者想在main函數(shù)中創(chuàng)建生產(chǎn)者,可以使用DefaultMQProducer來(lái)發(fā)送消息。其實(shí)RocketMQTemplate底層也是使用DefaultMQProducer來(lái)發(fā)送消息的,只不過(guò)進(jìn)行了包裝讓用戶(hù)使用起來(lái)更方便。
2.1 DefaultMQProducer的創(chuàng)建DefaultMQProducer有多個(gè)構(gòu)造函數(shù),我們可以根據(jù)不同的場(chǎng)景使用不同的構(gòu)造函數(shù)創(chuàng)建對(duì)象。
(1)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook);DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產(chǎn)者組", new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")));
此構(gòu)造函數(shù)的第一個(gè)參數(shù)是命名空間,命名空間需要在服務(wù)端提前創(chuàng)建。第二個(gè)參數(shù)是生產(chǎn)者組,一個(gè)生產(chǎn)者組可以包含多個(gè)生產(chǎn)者,生產(chǎn)者組不需要提前創(chuàng)建,在創(chuàng)建DefaultMQProducer對(duì)象的時(shí)候賦值一個(gè)生產(chǎn)者組就可以。第三個(gè)參數(shù)是RPCHook對(duì)象用于權(quán)限認(rèn)證,相當(dāng)于你登陸一個(gè)網(wǎng)站需要輸入用戶(hù)名和密碼。
(2)DefaultMQProducer(String producerGroup, RPCHook rpcHook);命名空間是RocketMQ中的一個(gè)資源管理概念。用戶(hù)不同的業(yè)務(wù)場(chǎng)景一般都可以通過(guò)命名空間做隔離,并且針對(duì)不同的業(yè)務(wù)場(chǎng)景設(shè)置專(zhuān)門(mén)的配置,例如消息保留時(shí)間。不同命名空間之間的 Topic 相互隔離,訂閱相互隔離,角色權(quán)限相互隔離。
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組", new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")));
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1)
,只不過(guò)將namespace設(shè)為了null,在沒(méi)有命名空間的時(shí)候可以使用此構(gòu)造函數(shù)。
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產(chǎn)者組");
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1)
,只不過(guò)將RPCHook 設(shè)為了null,在不需要acl認(rèn)證的時(shí)候可以使用此構(gòu)造函數(shù)。
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組");
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1)
,只不過(guò)將namespace和RPCHook設(shè)為了null,在沒(méi)有命名空間和不需要acl認(rèn)證的時(shí)候可以使用此構(gòu)造函數(shù)。
DefaultMQProducer producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")));
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1)
,只不過(guò)將namespace設(shè)為了null,由于prodcuerGroup不能為null,所以RocketMQ會(huì)使用默認(rèn)的生產(chǎn)者組:DEFAULT_PRODUCER
。
DefaultMQProducer producer = new DefaultMQProducer();
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1)
,只不過(guò)將namespace和RPCHook設(shè)為了null,由于prodcuerGroup不能為null,所以RocketMQ會(huì)使用默認(rèn)的生產(chǎn)者組:DEFAULT_PRODUCER
。
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產(chǎn)者組", new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")), true, "traceTopic");
此構(gòu)造函數(shù)的第一個(gè)參數(shù)是命名空間,命名空間需要在服務(wù)端提前創(chuàng)建。第二個(gè)參數(shù)是生產(chǎn)者組,一個(gè)生產(chǎn)者組可以包含多個(gè)生產(chǎn)者,生產(chǎn)者組不需要提前創(chuàng)建,在創(chuàng)建DefaultMQProducer對(duì)象的時(shí)候賦值一個(gè)生產(chǎn)者組就可以。第三個(gè)參數(shù)是RPCHook對(duì)象用于權(quán)限認(rèn)證,相當(dāng)于你登陸一個(gè)網(wǎng)站需要輸入用戶(hù)名和密碼。第四個(gè)參數(shù)是布爾類(lèi)型,表示是否開(kāi)啟消息追蹤。第五個(gè)參數(shù)是消息跟蹤的topic的名稱(chēng),這個(gè)topic專(zhuān)門(mén)用來(lái)做消息追蹤的,一般不會(huì)用這個(gè)topic生產(chǎn)和消費(fèi)業(yè)務(wù)數(shù)據(jù)。開(kāi)啟追蹤后,追蹤topic內(nèi)會(huì)記錄生產(chǎn)者的一些信息,比如生產(chǎn)者IP、消息的MessageID等
。例如下面的代碼就是開(kāi)啟追蹤并設(shè)置trace-topic
為追蹤topic,然后將消息發(fā)送到topicA中,于是topicA里面是業(yè)務(wù)數(shù)據(jù),trace-topic里面是用于消息追蹤的追蹤數(shù)據(jù)。也就是發(fā)送一次消息會(huì)發(fā)送一份業(yè)務(wù)數(shù)據(jù)和一份追蹤數(shù)據(jù)到業(yè)務(wù)topic和追蹤topic
,
package com.sgm.esb.gateway.service;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage);
producer.shutdown();
}
}
如下是追蹤topic中的消息內(nèi)容:
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組", true, "traceTopic");
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(7)
,只不過(guò)將namespace和RPCHook設(shè)為了null,使用于沒(méi)有命名空間和不需要acl認(rèn)證的時(shí)候。
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組", true);
此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(7)
,只不過(guò)將namespace、RPCHook和customizedTraceTopic設(shè)為了null。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
// 這里的Message類(lèi)型是org.apache.rocketmq.common.message.Message
Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
SendResult sendResult = producer.send(sendMessage);
System.out.println(sendResult);
producer.shutdown();
}
}
我們根據(jù)需要使用上面提到的某一種構(gòu)造方法創(chuàng)建生產(chǎn)者后必須調(diào)用setNamesrvAddr()方法來(lái)設(shè)置NameServer(要不然消息發(fā)到哪呢),然后調(diào)用start()開(kāi)始發(fā)送消息,調(diào)用send()方法將創(chuàng)建的Message對(duì)象發(fā)送到某個(gè)topic下,最后調(diào)用shutdown()來(lái)釋放資源。DefaultMQProducer.send(Message msg)方法只有一個(gè)Message類(lèi)型的參數(shù),這個(gè)Message的類(lèi)型為org.apache.rocketmq.common.message.Message
,不同于之前RocketMQTemplate中的org.springframework.messaging.Message
。我們可以在此Message中設(shè)置topic和tag而不是在send()方法中設(shè)置topic和tag,通過(guò)putUserProperty()方法設(shè)置消息的屬性,通過(guò)setKeys()方法設(shè)置消息的key等。在RocketMQTemplate中我們使用org.springframework.messaging.Message來(lái)創(chuàng)建消息,其實(shí)RocketMQ底層最終會(huì)將org.springframework.messaging.Message轉(zhuǎn)化為org.apache.rocketmq.common.message.Message類(lèi)型進(jìn)行消息的發(fā)送
。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
// 這里的Message類(lèi)型是org.apache.rocketmq.common.message.Message
Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
SendResult sendResult = producer.send(sendMessage, 3000L);
System.out.println(sendResult);
producer.shutdown();
}
}
在方法(1)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。
(3)void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;異步發(fā)送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
第二個(gè)參數(shù)設(shè)置異步回調(diào),同RocketMQTemplate異步發(fā)送消息一樣,不再贅述。
(4)void send(Message msg, SendCallback sendCallback,long timeout) throws MQClientException, RemotingException, InterruptedException;異步發(fā)送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
在方法(3)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。
(5)SendResult send(Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
}
批量發(fā)送消息到某一個(gè)topic。
(6)SendResult send(Collection msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送注意:這里L(fēng)ist中Message的topic都必須是同一個(gè),否則會(huì)報(bào)錯(cuò)。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
}
批量發(fā)送消息到某一個(gè)topic,在方法(5)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。
(7)void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發(fā)送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
producer.send(messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
批量異步發(fā)送消息到某一個(gè)topic。
(8)void send(Collection msgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發(fā)送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
producer.send(messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
producer.shutdown();
}
}
批量異步發(fā)送消息到某一個(gè)topic,在方法(7)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。
(9)void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;one-way模式,異步發(fā)送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.sendOneway(sendMessage);
producer.shutdown();
}
}
使用one-way模式異步發(fā)送消息。
總結(jié)與展望至此,使用RocketMQTemplate和DefaultMQProducer發(fā)送普通消息的全部方法就講解完了,本文的主要目的是幫助讀者快速學(xué)習(xí)使用RocketMQ發(fā)送普通消息,本文總結(jié)了所有的發(fā)送普通消息的方法以滿(mǎn)足實(shí)際工作中不同的業(yè)務(wù)場(chǎng)景。RocketMQTemplate和DefaultMQProducer中還有一些發(fā)送消息的方法是用來(lái)發(fā)送順序、定時(shí)/延時(shí)消息的(DefaultMQProducer不能用來(lái)發(fā)送事務(wù)消息),之后我會(huì)繼續(xù)寫(xiě)文章來(lái)講解這些方法以及所有的消費(fèi)消息的方法。
這是我在寫(xiě)的第一篇原創(chuàng)文章,之后我還準(zhǔn)備寫(xiě)一些使用RocketMQ時(shí)踩過(guò)的坑,最后可能會(huì)寫(xiě)一些RocketMQ更底層的東西,我還準(zhǔn)備寫(xiě)一些webflux、springCloudGateway這些我比較感興趣的東西,希望看到這篇文章的人能和我一起成長(zhǎng)。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧