真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RocketMQ發(fā)送普通消息的所有方法以及代碼示例-創(chuàng)新互聯(lián)

RocketMQ發(fā)送普通消息的所有方法以及代碼示例
  • 一、使用RocketMQTemplate發(fā)送消息(整合Springboot)
    • (1)void send(Message\message) throws MessagingException;同步發(fā)送
    • (2)void send(D destination, Message\message) throws MessagingException;同步發(fā)送
    • (3)SendResult syncSend(String destination, Message\message);同步發(fā)送
    • (4)SendResult syncSend(String destination, Message\message, long timeout);同步發(fā)送
    • (5)\SendResult syncSend(String destination, Collection\messages);同步發(fā)送
    • (6)\SendResult syncSend(String destination, Collection\messages, long timeout);同步發(fā)送
    • (7)SendResult syncSend(String destination, Object payload);同步發(fā)送
    • (8)SendResult syncSend(String destination, Object payload, long timeout);同步發(fā)送
    • (9)void sendOneWay(String destination, Message\message);one-way模式,異步發(fā)送
    • (10)void sendOneWay(String destination, Object payload);one-way模式,異步發(fā)送
    • (11)void asyncSend(String destination, Message\message, SendCallback sendCallback);異步發(fā)送
    • (12)void asyncSend(String destination, Message\message, SendCallback sendCallback, long timeout);異步發(fā)送
    • (13)void asyncSend(String destination, Object payload, SendCallback sendCallback);異步發(fā)送
    • (14)void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);異步發(fā)送
    • (15)\void asyncSend(String destination, Collection\messages, SendCallback sendCallback);異步發(fā)送
    • (16)\void asyncSend(String destination, Collection\messages, SendCallback sendCallback, long timeout);異步發(fā)送
    • (17)void convertAndSend(Object payload) throws MessagingException;同步發(fā)送
    • (18)void convertAndSend(D destination, Object payload) throws MessagingException;同步發(fā)送
    • (19)void convertAndSend(D destination, Object payload, @Nullable Mapheaders) throws MessagingException;同步發(fā)送
    • (20)void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送
    • (21)void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送
    • (22)void convertAndSend(D destination, Object payload, @Nullable Mapheaders, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送
  • 二、使用DefaultMQProducer發(fā)送消息
  • 2.1 DefaultMQProducer的創(chuàng)建
    • (1)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook);
    • (2)DefaultMQProducer(String producerGroup, RPCHook rpcHook);
    • (3)DefaultMQProducer(String namespace, String producerGroup);
    • (4)DefaultMQProducer(String producerGroup);
    • (5)DefaultMQProducer(RPCHook rpcHook);
    • (6)DefaultMQProducer();
    • (7)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, String customizedTraceTopic);
    • (8)DefaultMQProducer(String producerGroup, boolean enableMsgTrace, String customizedTraceTopic);
    • (9)DefaultMQProducer(String producerGroup, boolean enableMsgTrace);
  • 2.2 發(fā)送普通消息
    • (1)SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
    • (2)SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
    • (3)void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;異步發(fā)送
    • (4)void send(Message msg, SendCallback sendCallback,long timeout) throws MQClientException, RemotingException, InterruptedException;異步發(fā)送
    • (5)SendResult send(Collectionmsgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
    • (6)SendResult send(Collectionmsgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
    • (7)void send(Collectionmsgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發(fā)送
    • (8)void send(Collectionmsgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發(fā)送
    • (9)void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;one-way模式,異步發(fā)送
  • 總結(jié)與展望

RocketMQ發(fā)送消息主要分3大模式:同步發(fā)送sysnc、異步發(fā)送async、直接發(fā)送one-way

成都創(chuàng)新互聯(lián)專(zhuān)注于東寶企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站開(kāi)發(fā),購(gòu)物商城網(wǎng)站建設(shè)。東寶網(wǎng)站建設(shè)公司,為東寶等地區(qū)提供建站服務(wù)。全流程按需制作,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)
  • 同步發(fā)送模式只有在消息完全發(fā)送完成之后才返回結(jié)果,此方式存在需要同步等待發(fā)送結(jié)果的時(shí)間代價(jià)。這種方式具有內(nèi)部重試機(jī)制,即在主動(dòng)聲明本次消息發(fā)送失敗之前,內(nèi)部實(shí)現(xiàn)將重試一定次數(shù),默認(rèn)為2次。 發(fā)送的結(jié)果存在同一個(gè)消息可能被多次發(fā)送給給broker,這里需要應(yīng)用的開(kāi)發(fā)者自己在消費(fèi)端處理冪等性問(wèn)題。
  • 異步發(fā)送模式在消息發(fā)送后立刻返回,當(dāng)消息完全完成發(fā)送后,會(huì)調(diào)用回調(diào)函數(shù)sendCallback來(lái)告知發(fā)送者本次發(fā)送是成功或者失敗。異步模式通常用于響應(yīng)時(shí)間敏感業(yè)務(wù)場(chǎng)景,即承受不了同步發(fā)送消息時(shí)等待返回的耗時(shí)代價(jià)。同同步發(fā)送一樣,異步模式也在內(nèi)部實(shí)現(xiàn)了重試機(jī)制,默認(rèn)次數(shù)為2次。發(fā)送的結(jié)果同樣存在同一個(gè)消息可能被多次發(fā)送給給broker,需要應(yīng)用的開(kāi)發(fā)者自己在消費(fèi)端處理冪等性問(wèn)題。
  • 采用one-way發(fā)送模式發(fā)送消息的時(shí)候,發(fā)送端發(fā)送完消息后會(huì)立即返回,不會(huì)等待來(lái)自broker的ack來(lái)告知本次消息發(fā)送是否完全完成發(fā)送。這種模式吞吐量很大,但是存在消息丟失的風(fēng)險(xiǎn),所以其適用于不重要的消息發(fā)送,比如日志收集。one-way模式本質(zhì)上是沒(méi)有sendCallback的異步發(fā)送方式。
    每種發(fā)送模式都有很多發(fā)送消息的方法,接下來(lái)對(duì)每個(gè)發(fā)送方法進(jìn)行講解。
一、使用RocketMQTemplate發(fā)送消息(整合Springboot)

使用RocketMQTemplate必須要在配置文件中配置RocketMQ的屬性,Springboot在加載時(shí)才會(huì)創(chuàng)建RocketMQTemplate的Bean。配置文件示例如下:

rocketmq:
  name-server: nameServer的集群IP
  compress-message-body-threshold: 4096
  consumer:
    access-key: username
    secret-key: password
  max-message-size: 536870912
  producer:
    access-key: username
    secret-key: password
    group: producerGroup
    retry-next-server: true
    retry-times-when-send-async-failed: 2
    retry-times-when-send-failed: 2
    send-message-timeout: 3000
(1)void send(Messagemessage) throws MessagingException;同步發(fā)送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
        MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                // 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
                .setHeader("KEYS", "消息的key的值")
                .build();
        rocketMQTemplate.send(sendMessage);
    }
}

RocketMQTemplate.send(Messagemessage) 方法只有一個(gè)Message類(lèi)型的參數(shù),沒(méi)有設(shè)置topic,這個(gè)消息會(huì)發(fā)送到RocketMQ的默認(rèn)topic,這個(gè)默認(rèn)topic是在安裝RocketMQ Client的時(shí)候配置的,如果沒(méi)有這個(gè)topic會(huì)拋出"No 'defaultDestination' configured"異常。這個(gè)方法幾乎不會(huì)被使用,我們發(fā)送消息一般都是要發(fā)送到我們想去的一個(gè)topic。此方法會(huì)將消息同步發(fā)送至topic,此方法沒(méi)有返回值,我們無(wú)法獲取SendResult。

(2)void send(D destination, Messagemessage) throws MessagingException;同步發(fā)送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
        MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                // 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
                .setHeader("KEYS", "消息的key的值")
                .build();
        // 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        rocketMQTemplate.send("topicA:tagA", sendMessage);
    }
}

RocketMQTemplate.send(D destination, Messagemessage) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息。這個(gè)方法是一個(gè)委托方法,其實(shí)最終調(diào)用的是RocketMQTemplate.syncSend(String destination, Messagemessage)方法,也就是說(shuō)destination雖然是個(gè)泛型,但是我們應(yīng)該傳入一個(gè)字符串類(lèi)型的topic,此方法會(huì)將消息同步發(fā)送至topic。此方法沒(méi)有返回值,我們無(wú)法獲取SendResult。

(3)SendResult syncSend(String destination, Messagemessage);同步發(fā)送
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
        MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                // 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
                .setHeader("KEYS", "消息的key的值")
                .build();
        // 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage);
        System.out.println(sendResult);
    }
}

RocketMQTemplate.syncSend(String destination, Messagemessage) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息。此方法的返回值為SendResult,我們可以通過(guò)這個(gè)類(lèi)來(lái)確定消息是否發(fā)送成功,獲取消息的MessageId等。

(4)SendResult syncSend(String destination, Messagemessage, long timeout);同步發(fā)送
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
        MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                // 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
                .setHeader("KEYS", "消息的key的值")
                .build();
        // 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage, 3000L);
        System.out.println(sendResult);
    }
}

RocketMQTemplate.syncSend(String destination, Messagemessage, long timeout) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息,第三個(gè)參數(shù)是超時(shí)時(shí)間。其實(shí)方法(3)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。

(5)SendResult syncSend(String destination, Collectionmessages);同步發(fā)送
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {Collection>messages = new ArrayList<>();
        for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
                    .setHeader("消息的屬性的key", "消息的屬性的值")
                    .setHeader("KEYS", "消息的key的值")
                    .build();
            messages.add(sendMessage);
        }
        SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages);
        System.out.println(sendResult);
    }
}

RocketMQTemplate.syncSend(String destination, Collectionmessages) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的集合。此方法會(huì)將消息批量發(fā)送到topicA下的tagA下。

(6)SendResult syncSend(String destination, Collectionmessages, long timeout);同步發(fā)送
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {Collection>messages = new ArrayList<>();
        for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
                    .setHeader("消息的屬性的key", "消息的屬性的值")
                    .setHeader("KEYS", "消息的key的值")
                    .build();
            messages.add(sendMessage);
        }
        SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages, 3000L);
        System.out.println(sendResult);
    }
}

RocketMQTemplate.syncSend(String destination, Collectionmessages, long timeout) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的集合,第三個(gè)參數(shù)是超時(shí)時(shí)間。此方法會(huì)將消息批量發(fā)送到topicA下的tagA下。其實(shí)方法(5)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。

(7)SendResult syncSend(String destination, Object payload);同步發(fā)送
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "這里設(shè)置消息體");
        System.out.println(sendResult);
    }
}

RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的消息體。此方法不需要我們自己創(chuàng)建Message對(duì)象了,底層會(huì)幫我們創(chuàng)建。但是缺點(diǎn)就是不能設(shè)置消息的屬性和key。

(8)SendResult syncSend(String destination, Object payload, long timeout);同步發(fā)送
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "這里設(shè)置消息體", 3000L);
        System.out.println(sendResult);
    }
}

RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的消息體,第三個(gè)參數(shù)是超時(shí)時(shí)間。此方法就是不需要我們自己創(chuàng)建Message對(duì)象了,底層會(huì)幫我們創(chuàng)建。但是缺點(diǎn)就是不能設(shè)置消息的屬性和key。其實(shí)方法(7)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。

(9)void sendOneWay(String destination, Messagemessage);one-way模式,異步發(fā)送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類(lèi)型
        MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                // 想發(fā)送帶key的消息,請(qǐng)求頭的鍵必須寫(xiě)成KEYS
                .setHeader("KEYS", "消息的key的值")
                .build();
        // 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        rocketMQTemplate.sendOneWay("topicA:tagA", "這里設(shè)置消息體");
    }
}

RocketMQTemplate.sendOneWay(String destination, Messagemessage) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息。此方法可以異步發(fā)送消息,具有很高的發(fā)送效率,但是沒(méi)有返回值,我們無(wú)法獲取SendResult。

(10)void sendOneWay(String destination, Object payload);one-way模式,異步發(fā)送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 這里指定將消息發(fā)送到topicA的tagA下,也可以不指定tagA只寫(xiě)topicA
        rocketMQTemplate.sendOneWay("topicA:tagA", "這里設(shè)置消息體");
    }
}

RocketMQTemplate.sendOneWay(String destination, Object payload) 方法有兩個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息的消息體。此方法不需要我們自己創(chuàng)建Message對(duì)象了,底層會(huì)幫我們創(chuàng)建。但是缺點(diǎn)就是不能設(shè)置消息的屬性和key。此方法可以異步發(fā)送消息,具有很高的發(fā)送效率,但是沒(méi)有返回值,我們無(wú)法獲取SendResult。

(11)void asyncSend(String destination, Messagemessage, SendCallback sendCallback);異步發(fā)送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                .setHeader("KEYS", "消息的key的值")
                .build();
        rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        });
    }
}

RocketMQTemplate.asyncSend(String destination, Messagemessage, SendCallback sendCallback) 方法有三個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息,第三個(gè)參數(shù)是異步消息的回調(diào)對(duì)象。此方法允許我們?cè)O(shè)置回調(diào)函數(shù),知道異步消息是否發(fā)送成功以此來(lái)做相應(yīng)的事情。方法(9)和(10)底層也是調(diào)用了此方法,只不過(guò)把SendCallback對(duì)象設(shè)為了null。

(12)void asyncSend(String destination, Messagemessage, SendCallback sendCallback, long timeout);異步發(fā)送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體")
                .setHeader("消息的屬性的key", "消息的屬性的值")
                .setHeader("KEYS", "消息的key的值")
                .build();
        rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        }, 3000L);
    }
}

RocketMQTemplate.asyncSend(String destination, Messagemessage, SendCallback sendCallback, long timeout) 方法有四個(gè)參數(shù),第一個(gè)參數(shù)就是topic,第二個(gè)參數(shù)是要發(fā)送的消息,第三個(gè)參數(shù)是異步消息的回調(diào)對(duì)象,第四個(gè)參數(shù)是超時(shí)時(shí)間。此方法允許我們?cè)O(shè)置回調(diào)函數(shù),知道異步消息是否發(fā)送成功以此來(lái)做相應(yīng)的事情。方法(11)也是調(diào)用了此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。

(13)void asyncSend(String destination, Object payload, SendCallback sendCallback);異步發(fā)送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "這里設(shè)置消息體", new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        });
    }
}

同方法(11)一樣,只不過(guò)第二個(gè)參數(shù)由Message類(lèi)型換成了Object類(lèi)型,可以直接傳入要發(fā)送的消息體,不用我們自己創(chuàng)建Message對(duì)象,缺點(diǎn)就是不能設(shè)置消息的屬性和key。

(14)void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);異步發(fā)送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "這里設(shè)置消息體", new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        }, 3000L);
    }
}

方法(13)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。

(15)void asyncSend(String destination, Collectionmessages, SendCallback sendCallback);異步發(fā)送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.ArrayList;
import java.util.Collection;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {Collection>messages = new ArrayList<>();
        for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
                    .setHeader("消息的屬性的key", "消息的屬性的值")
                    .setHeader("KEYS", "消息的key的值")
                    .build();
            messages.add(sendMessage);
        }
        rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        });
    }
}

同方法(11)一樣,只不過(guò)第二個(gè)參數(shù)由Message類(lèi)型換成了Collection類(lèi)型,可以批量的異步發(fā)送消息。

(16)void asyncSend(String destination, Collectionmessages, SendCallback sendCallback, long timeout);異步發(fā)送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.ArrayList;
import java.util.Collection;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {Collection>messages = new ArrayList<>();
        for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設(shè)置消息體" + i)
                    .setHeader("消息的屬性的key", "消息的屬性的值")
                    .setHeader("KEYS", "消息的key的值")
                    .build();
            messages.add(sendMessage);
        }
        rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        }, 3000L);
    }
}

方法(15)的底層也是調(diào)用此方法,只不過(guò)由于我們沒(méi)有設(shè)置timeout,系統(tǒng)會(huì)使用默認(rèn)的timeout,默認(rèn)值為3000毫秒。注意:超時(shí)時(shí)間設(shè)置的過(guò)小會(huì)導(dǎo)致消息發(fā)送失敗。

(17)void convertAndSend(Object payload) throws MessagingException;同步發(fā)送
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 實(shí)體類(lèi)
        ExampleEntity exampleEntity = new ExampleEntity();
        exampleEntity.setName("Tom");
        rocketMQTemplate.convertAndSend(exampleEntity);
    }
}

同方法(1)一樣,此方法也是將消息發(fā)送到默認(rèn)的topic。payload可以是一個(gè)實(shí)體類(lèi)、集合等也可以是字符串。如果payload是實(shí)體類(lèi)、集合等,底層會(huì)將實(shí)體類(lèi)轉(zhuǎn)化成json對(duì)象,例如上述代碼發(fā)送消息的結(jié)果就是{"name":"Tom"},如果傳入的是集合對(duì)象這會(huì)轉(zhuǎn)換從jsonArray。如果payload是字符串,則發(fā)送消息的結(jié)果就是原字符串。

(18)void convertAndSend(D destination, Object payload) throws MessagingException;同步發(fā)送
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 實(shí)體類(lèi)
        ExampleEntity exampleEntity = new ExampleEntity();
        exampleEntity.setName("Tom");
        rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity);
    }
}

在方法(17)的基礎(chǔ)上可以指定topic將消息發(fā)送到指定的topic。

(19)void convertAndSend(D destination, Object payload, @Nullable Mapheaders) throws MessagingException;同步發(fā)送
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.Map;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 消息的屬性
        Mapmap = new HashMap<>();
        map.put("消息的屬性的鍵", "消息的屬性的值");
        map.put("KEYS", "消息的key");
        // 實(shí)體類(lèi)
        ExampleEntity exampleEntity = new ExampleEntity();
        exampleEntity.setName("Tom");
        rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map);
    }
}

在方法(18)的基礎(chǔ)上增加了第三個(gè)Map類(lèi)型的參數(shù),我們可以使用這個(gè)參數(shù)來(lái)設(shè)置消息的屬性和key。

(20)void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;

import java.util.HashMap;
import java.util.Map;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 實(shí)體類(lèi)
        ExampleEntity exampleEntity = new ExampleEntity();
        exampleEntity.setName("Tom");
        rocketMQTemplate.convertAndSend(exampleEntity, new MessagePostProcessor() {@Override
            public MessagepostProcessMessage(Messagemessage) {MessageHeaders headers = message.getHeaders();
                String keys = headers.get("KEYS", String.class);
                System.out.println(keys);
                return message;
            }
        });
    }
}

在方法(17)的基礎(chǔ)上增加了MessagePostProcessor對(duì)象,MessagePostProcessor顧名思義就是消息的后處理。我們傳入的參數(shù)中,destination就是消息要去往的topic(這里沒(méi)有destination則發(fā)送默認(rèn)的topic),payload就是消息的消息體,headers就是消息的屬性(這里沒(méi)有headers則無(wú)法設(shè)置屬性),RocketMQ底層會(huì)根據(jù)payload和headers生成Message對(duì)象,MessagePostProcessor就是對(duì)這個(gè)生成的Message對(duì)象做一些事情,最后再發(fā)往destination。

(21)void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;

import java.util.HashMap;
import java.util.Map;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 實(shí)體類(lèi)
        ExampleEntity exampleEntity = new ExampleEntity();
        exampleEntity.setName("Tom");
        rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, new MessagePostProcessor() {@Override
            public MessagepostProcessMessage(Messagemessage) {MessageHeaders headers = message.getHeaders();
                String keys = headers.get("KEYS", String.class);
                System.out.println(keys);
                return message;
            }
        });
    }
}

在方法(20)的基礎(chǔ)上可以指定topic將消息發(fā)送到指定的topic。

(22)void convertAndSend(D destination, Object payload, @Nullable Mapheaders, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發(fā)送
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;

import java.util.HashMap;
import java.util.Map;

public class CommonMessageProducer {@Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMessages() {// 消息的屬性
        Mapmap = new HashMap<>();
        map.put("消息的屬性的鍵", "消息的屬性的值");
        map.put("KEYS", "消息的key");
        // 實(shí)體類(lèi)
        ExampleEntity exampleEntity = new ExampleEntity();
        exampleEntity.setName("Tom");
        rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map, new MessagePostProcessor() {@Override
            public MessagepostProcessMessage(Messagemessage) {MessageHeaders headers = message.getHeaders();
                String keys = headers.get("KEYS", String.class);
                System.out.println(keys);
                return message;
            }
        });
    }
}

在方法(21)的基礎(chǔ)上增加了Map類(lèi)型的參數(shù),我們可以使用這個(gè)參數(shù)來(lái)設(shè)置消息的屬性和key。

至此,使用RocketMQTemplate發(fā)送普通消息的方法就全部講解完了,其實(shí)還有RocketMQTemplate.sendAndReceive()方法也可以發(fā)送普通消息,但是這個(gè)要配合消費(fèi)者一起使用,我會(huì)另寫(xiě)一篇文章講解這個(gè)方法。

二、使用DefaultMQProducer發(fā)送消息

有些場(chǎng)景下我們不想使用springboot自動(dòng)創(chuàng)建的RocketMQTemplate的Bean來(lái)發(fā)送消息,而是想自己創(chuàng)建生產(chǎn)者以使用不同的nameServer來(lái)發(fā)送消息到不同的集群或者想在main函數(shù)中創(chuàng)建生產(chǎn)者,可以使用DefaultMQProducer來(lái)發(fā)送消息。其實(shí)RocketMQTemplate底層也是使用DefaultMQProducer來(lái)發(fā)送消息的,只不過(guò)進(jìn)行了包裝讓用戶(hù)使用起來(lái)更方便。

2.1 DefaultMQProducer的創(chuàng)建

DefaultMQProducer有多個(gè)構(gòu)造函數(shù),我們可以根據(jù)不同的場(chǎng)景使用不同的構(gòu)造函數(shù)創(chuàng)建對(duì)象。

(1)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook);
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產(chǎn)者組", new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")));

此構(gòu)造函數(shù)的第一個(gè)參數(shù)是命名空間,命名空間需要在服務(wù)端提前創(chuàng)建。第二個(gè)參數(shù)是生產(chǎn)者組,一個(gè)生產(chǎn)者組可以包含多個(gè)生產(chǎn)者,生產(chǎn)者組不需要提前創(chuàng)建,在創(chuàng)建DefaultMQProducer對(duì)象的時(shí)候賦值一個(gè)生產(chǎn)者組就可以。第三個(gè)參數(shù)是RPCHook對(duì)象用于權(quán)限認(rèn)證,相當(dāng)于你登陸一個(gè)網(wǎng)站需要輸入用戶(hù)名和密碼。

命名空間是RocketMQ中的一個(gè)資源管理概念。用戶(hù)不同的業(yè)務(wù)場(chǎng)景一般都可以通過(guò)命名空間做隔離,并且針對(duì)不同的業(yè)務(wù)場(chǎng)景設(shè)置專(zhuān)門(mén)的配置,例如消息保留時(shí)間。不同命名空間之間的 Topic 相互隔離,訂閱相互隔離,角色權(quán)限相互隔離。

(2)DefaultMQProducer(String producerGroup, RPCHook rpcHook);
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組", new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")));

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1),只不過(guò)將namespace設(shè)為了null,在沒(méi)有命名空間的時(shí)候可以使用此構(gòu)造函數(shù)。

(3)DefaultMQProducer(String namespace, String producerGroup);
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產(chǎn)者組");

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1),只不過(guò)將RPCHook 設(shè)為了null,在不需要acl認(rèn)證的時(shí)候可以使用此構(gòu)造函數(shù)。

(4)DefaultMQProducer(String producerGroup);
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組");

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1),只不過(guò)將namespace和RPCHook設(shè)為了null,在沒(méi)有命名空間和不需要acl認(rèn)證的時(shí)候可以使用此構(gòu)造函數(shù)。

(5)DefaultMQProducer(RPCHook rpcHook);
DefaultMQProducer producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")));

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1),只不過(guò)將namespace設(shè)為了null,由于prodcuerGroup不能為null,所以RocketMQ會(huì)使用默認(rèn)的生產(chǎn)者組:DEFAULT_PRODUCER。

(6)DefaultMQProducer();
DefaultMQProducer producer = new DefaultMQProducer();

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(1),只不過(guò)將namespace和RPCHook設(shè)為了null,由于prodcuerGroup不能為null,所以RocketMQ會(huì)使用默認(rèn)的生產(chǎn)者組:DEFAULT_PRODUCER。

(7)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, String customizedTraceTopic);
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產(chǎn)者組", new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")), true, "traceTopic");

此構(gòu)造函數(shù)的第一個(gè)參數(shù)是命名空間,命名空間需要在服務(wù)端提前創(chuàng)建。第二個(gè)參數(shù)是生產(chǎn)者組,一個(gè)生產(chǎn)者組可以包含多個(gè)生產(chǎn)者,生產(chǎn)者組不需要提前創(chuàng)建,在創(chuàng)建DefaultMQProducer對(duì)象的時(shí)候賦值一個(gè)生產(chǎn)者組就可以。第三個(gè)參數(shù)是RPCHook對(duì)象用于權(quán)限認(rèn)證,相當(dāng)于你登陸一個(gè)網(wǎng)站需要輸入用戶(hù)名和密碼。第四個(gè)參數(shù)是布爾類(lèi)型,表示是否開(kāi)啟消息追蹤。第五個(gè)參數(shù)是消息跟蹤的topic的名稱(chēng),這個(gè)topic專(zhuān)門(mén)用來(lái)做消息追蹤的,一般不會(huì)用這個(gè)topic生產(chǎn)和消費(fèi)業(yè)務(wù)數(shù)據(jù)。開(kāi)啟追蹤后,追蹤topic內(nèi)會(huì)記錄生產(chǎn)者的一些信息,比如生產(chǎn)者IP、消息的MessageID等。例如下面的代碼就是開(kāi)啟追蹤并設(shè)置trace-topic為追蹤topic,然后將消息發(fā)送到topicA中,于是topicA里面是業(yè)務(wù)數(shù)據(jù),trace-topic里面是用于消息追蹤的追蹤數(shù)據(jù)。也就是發(fā)送一次消息會(huì)發(fā)送一份業(yè)務(wù)數(shù)據(jù)和一份追蹤數(shù)據(jù)到業(yè)務(wù)topic和追蹤topic,

package com.sgm.esb.gateway.service;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");

        producer.start();
        Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
        sendMessage.setKeys("消息的key");

        producer.send(sendMessage);

        producer.shutdown();
    }
}

如下是追蹤topic中的消息內(nèi)容:
跟蹤topic的消息內(nèi)容

(8)DefaultMQProducer(String producerGroup, boolean enableMsgTrace, String customizedTraceTopic);
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組", true, "traceTopic");

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(7),只不過(guò)將namespace和RPCHook設(shè)為了null,使用于沒(méi)有命名空間和不需要acl認(rèn)證的時(shí)候。

(9)DefaultMQProducer(String producerGroup, boolean enableMsgTrace);
DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組", true);

此構(gòu)造函數(shù)底層還是調(diào)用了構(gòu)造方法(7),只不過(guò)將namespace、RPCHook和customizedTraceTopic設(shè)為了null。

2.2 發(fā)送普通消息 (1)SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        // 這里的Message類(lèi)型是org.apache.rocketmq.common.message.Message
        Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
        sendMessage.setKeys("消息的key");
        SendResult sendResult = producer.send(sendMessage);
        System.out.println(sendResult);
        producer.shutdown();
    }
}

我們根據(jù)需要使用上面提到的某一種構(gòu)造方法創(chuàng)建生產(chǎn)者后必須調(diào)用setNamesrvAddr()方法來(lái)設(shè)置NameServer(要不然消息發(fā)到哪呢),然后調(diào)用start()開(kāi)始發(fā)送消息,調(diào)用send()方法將創(chuàng)建的Message對(duì)象發(fā)送到某個(gè)topic下,最后調(diào)用shutdown()來(lái)釋放資源。DefaultMQProducer.send(Message msg)方法只有一個(gè)Message類(lèi)型的參數(shù),這個(gè)Message的類(lèi)型為org.apache.rocketmq.common.message.Message,不同于之前RocketMQTemplate中的org.springframework.messaging.Message。我們可以在此Message中設(shè)置topic和tag而不是在send()方法中設(shè)置topic和tag,通過(guò)putUserProperty()方法設(shè)置消息的屬性,通過(guò)setKeys()方法設(shè)置消息的key等。在RocketMQTemplate中我們使用org.springframework.messaging.Message來(lái)創(chuàng)建消息,其實(shí)RocketMQ底層最終會(huì)將org.springframework.messaging.Message轉(zhuǎn)化為org.apache.rocketmq.common.message.Message類(lèi)型進(jìn)行消息的發(fā)送。

(2)SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        // 這里的Message類(lèi)型是org.apache.rocketmq.common.message.Message
        Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
        sendMessage.setKeys("消息的key");
        SendResult sendResult = producer.send(sendMessage, 3000L);
        System.out.println(sendResult);
        producer.shutdown();
    }
}

在方法(1)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。

(3)void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;異步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
        sendMessage.setKeys("消息的key");
        producer.send(sendMessage, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        });
        producer.shutdown();
    }
}

第二個(gè)參數(shù)設(shè)置異步回調(diào),同RocketMQTemplate異步發(fā)送消息一樣,不再贅述。

(4)void send(Message msg, SendCallback sendCallback,long timeout) throws MQClientException, RemotingException, InterruptedException;異步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Message sendMessage = new Message("topicA", "tagA", "這里設(shè)置消息體".getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
        sendMessage.setKeys("消息的key");
        producer.send(sendMessage, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        });
        producer.shutdown();
    }
}

在方法(3)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。

(5)SendResult send(Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Listmessages = new ArrayList<>();
        for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
            sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
            sendMessage.setKeys("消息的key");
            messages.add(sendMessage);
        }
        SendResult sendResult = producer.send(messages);
        System.out.println(sendResult);
        producer.shutdown();
    }
}

批量發(fā)送消息到某一個(gè)topic。

注意:這里L(fēng)ist中Message的topic都必須是同一個(gè),否則會(huì)報(bào)錯(cuò)。

(6)SendResult send(Collection msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Listmessages = new ArrayList<>();
        for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
            sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
            sendMessage.setKeys("消息的key");
            messages.add(sendMessage);
        }
        SendResult sendResult = producer.send(messages);
        System.out.println(sendResult);
        producer.shutdown();
    }
}

批量發(fā)送消息到某一個(gè)topic,在方法(5)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。

(7)void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Listmessages = new ArrayList<>();
        for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
            sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
            sendMessage.setKeys("消息的key");
            messages.add(sendMessage);
        }
        producer.send(messages, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        });
        producer.shutdown();
    }
}

批量異步發(fā)送消息到某一個(gè)topic。

(8)void send(Collection msgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Listmessages = new ArrayList<>();
        for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
            sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
            sendMessage.setKeys("消息的key");
            messages.add(sendMessage);
        }
        producer.send(messages, new SendCallback() {@Override
            public void onSuccess(SendResult sendResult) {System.out.println("Send success");
            }

            @Override
            public void onException(Throwable throwable) {System.out.println("Send fail");
            }
        }, 3000L);
        producer.shutdown();
    }
}

批量異步發(fā)送消息到某一個(gè)topic,在方法(7)的基礎(chǔ)上增加timeout參數(shù)來(lái)設(shè)置超時(shí)時(shí)間。

(9)void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;one-way模式,異步發(fā)送
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;


public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產(chǎn)者組",
                new AclClientRPCHook(new SessionCredentials("用戶(hù)名","密碼")),
                true, "trace-topic");
        producer.setNamesrvAddr("nameServer集群IP");
        producer.start();
        Message sendMessage = new Message("topicA", "tagA", ("這里設(shè)置消息體" + i).getBytes(StandardCharsets.UTF_8));
        sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
        sendMessage.setKeys("消息的key");
        producer.sendOneway(sendMessage);
        producer.shutdown();
    }
}

使用one-way模式異步發(fā)送消息。

總結(jié)與展望

至此,使用RocketMQTemplate和DefaultMQProducer發(fā)送普通消息的全部方法就講解完了,本文的主要目的是幫助讀者快速學(xué)習(xí)使用RocketMQ發(fā)送普通消息,本文總結(jié)了所有的發(fā)送普通消息的方法以滿(mǎn)足實(shí)際工作中不同的業(yè)務(wù)場(chǎng)景。RocketMQTemplate和DefaultMQProducer中還有一些發(fā)送消息的方法是用來(lái)發(fā)送順序、定時(shí)/延時(shí)消息的(DefaultMQProducer不能用來(lái)發(fā)送事務(wù)消息),之后我會(huì)繼續(xù)寫(xiě)文章來(lái)講解這些方法以及所有的消費(fèi)消息的方法。
這是我在寫(xiě)的第一篇原創(chuàng)文章,之后我還準(zhǔn)備寫(xiě)一些使用RocketMQ時(shí)踩過(guò)的坑,最后可能會(huì)寫(xiě)一些RocketMQ更底層的東西,我還準(zhǔn)備寫(xiě)一些webflux、springCloudGateway這些我比較感興趣的東西,希望看到這篇文章的人能和我一起成長(zhǎng)。

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧


網(wǎng)頁(yè)題目:RocketMQ發(fā)送普通消息的所有方法以及代碼示例-創(chuàng)新互聯(lián)
URL網(wǎng)址:http://weahome.cn/article/dheosd.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部