一、阿里云官網(wǎng)---幫助文檔
創(chuàng)新互聯(lián)主要從事成都網(wǎng)站設(shè)計、網(wǎng)站制作、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)疏附,10余年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh
按照官網(wǎng)步驟,創(chuàng)建Topic、申請發(fā)布(生產(chǎn)者)、申請訂閱(消費者)
二、代碼
1、配置:
public class MqConfig { /** * 啟動測試之前請?zhí)鎿Q如下 XXX 為您的配置 */ public static final String PUBLIC_TOPIC = "test";//公網(wǎng)測試 public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER"; public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE"; public static final String ACCESS_KEY = "123"; public static final String SECRET_KEY = "123"; public static final String TAG = ""; public static final String THREAD_NUM = "25";//消費端線程數(shù) /** * ONSADDR 請根據(jù)不同Region進(jìn)行配置 * 公網(wǎng)測試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet * 公有云生產(chǎn): http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */ public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"; }
ONSADDR 阿里云用 公有云生產(chǎn),測試用公網(wǎng)
不同的業(yè)務(wù)可以設(shè)置不同的tag,但是如果發(fā)送消息量大的話,建議新建TOPIC
2、生產(chǎn)者
方式1:
配置文件:producer.xml
<?xml version="1.0" encoding="UTF-8"?>
啟動方式1,在使用類的全局里設(shè)置:
//初始化生產(chǎn)者 private ApplicationContext ctx; private ProducerBean producer; @Value("${producerConfig.enabled}")//開關(guān),spring配置項,true為開啟,false關(guān)閉 private boolean producerConfigEnabled; @PostConstruct public void init(){ if (true == producerConfigEnabled) { ctx = new ClassPathXmlApplicationContext("producer.xml"); producer = (ProducerBean) ctx.getBean("producer"); } }
PS:最近發(fā)現(xiàn)一個坑,如果producer用上面這種方式啟動的話,一旦啟動的多了,會造成fullGC,所以可以換成下面這種注解方式啟動,在用到的地方手動start、shutdown
方式2:配置類(不需要xml)
@Configuration public class ProducerBeanConfig { @Value("${openservices.ons.producerBean.producerId}") private String producerId; @Value("${openservices.ons.producerBean.accessKey}") private String accessKey; @Value("${openservices.ons.producerBean.secretKey}") private String secretKey; private ProducerBean producerBean; @Value("${openservices.ons.producerBean.ONSAddr}") private String ONSAddr; @Bean public ProducerBean oneProducer() { ProducerBean producerBean = new ProducerBean(); Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.ProducerId, producerId); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producerBean.setProperties(properties); return producerBean; } }
PS:經(jīng)過這次雙11發(fā)現(xiàn),以上2種方式在大數(shù)據(jù)量,多線程情況下都不太適用, 性能很差,所以推薦用3
方式3:(不需要xml)
@Component public class ProducerBeanSingleTon { @Value("${openservices.ons.producerBean.producerId}") private String producerId; @Value("${openservices.ons.producerBean.accessKey}") private String accessKey; @Value("${openservices.ons.producerBean.secretKey}") private String secretKey; @Value("${openservices.ons.producerBean.ONSAddr}") private String ONSAddr; private static Producer producer; private static class SingletonHolder { private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon(); } private ProducerBeanSingleTon (){} public static final ProducerBeanSingleTon getInstance() { return SingletonHolder.INSTANCE; } @PostConstruct public void init(){ // producer 實例配置初始化 Properties properties = new Properties(); //您在控制臺創(chuàng)建的Producer ID properties.setProperty(PropertyKeyConst.ProducerId, producerId); // AccessKey 阿里云身份驗證,在阿里云服務(wù)器管理控制臺創(chuàng)建 properties.setProperty(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份驗證,在阿里云服務(wù)器管理控制臺創(chuàng)建 properties.setProperty(PropertyKeyConst.SecretKey, secretKey); //設(shè)置發(fā)送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設(shè)置 TCP 接入域名(此處以公共云生產(chǎn)環(huán)境為例) properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producer = ONSFactory.createProducer(properties); // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可 producer.start(); } public Producer getProducer(){ return producer; } }
spring配置
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect consumerConfig.enabled = true producerConfig.enabled = true #方式1: scheduling.enabled = false #方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E openservices.ons.producerBean.producerId = pid openservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.ONSAddr = 公網(wǎng)、杭州公有云生產(chǎn)
方式1投遞消息代碼:
try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info(".Send mq message success!”; } else { logger.warn(".sendResult is null........."); } } catch (Exception e) { logger.warn("DoubleElevenAllPreService"); Thread.sleep(1000);//如果有異常,休眠1秒 }
方式2投遞消息代碼:(可以每發(fā)1000個啟動/關(guān)閉一次)
producerBean.start(); try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info(".Send mq message success!”; } else { logger.warn(".sendResult is null........."); } } catch (Exception e) { logger.warn("DoubleElevenAllPreService"); Thread.sleep(1000);//如果有異常,休眠1秒 } producerBean.shutdown();
方式3:投遞消息
try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); Producer producer = ProducerBeanSingleTon.getInstance().getProducer(); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”; } else { logger.warn("DoubleElevenMidService.sendResult is null........."); } } catch (Exception e) { logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e); Thread.sleep(1000);//如果有異常,休眠1秒 }
發(fā)送消息的代碼一定要捕獲異常,不然會重復(fù)發(fā)送。
這里的TOPIC用自己創(chuàng)建的,elevenMessage是要發(fā)送的內(nèi)容,我這里是自己建的對象
3、消費者
配置啟動類:
@Configuration @ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true) public class ConsumerConfig { private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name()); @Bean public Consumer consumerFactory(){//不同消費者 這里不能重名 Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID); consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY); consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY); //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM); consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR); Consumer consumer = ONSFactory.createConsumer(consumerProperties); consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new對應(yīng)的監(jiān)聽器 consumer.start(); logger.info("ConsumerConfig start success."); return consumer; } }
CID和ONSADDR一點要選對,用自己的,消費者線程數(shù)等可以在這里配置
創(chuàng)建消息監(jiān)聽器類,消費消息:
@Component public class MessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger("remind"); protected static ElevenReposity elevenReposity; @Resource public void setElevenReposity(ElevenReposity elevenReposity){ MessageListener .elevenReposity=elevenReposity; } @Override public Action consume(Message message, ConsumeContext consumeContext) { if(message.getTopic().equals("自己的TOPIC")){//避免消費到其他消息 json轉(zhuǎn)換報錯 try { byte[] body = message.getBody(); String res = new String(body); //res 是生產(chǎn)者傳過來的消息內(nèi)容 //業(yè)務(wù)代碼 }else{ logger.warn("!"); } } catch (Exception e) { logger.error("MessageListener.consume error:" + e.getMessage(), e); } logger.info("MessageListener.Receive message”); //如果想測試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater return Action.CommitMessage; }else{ logger.warn(); return Action.ReconsumeLater; } }
注意,由于消費者是多線程的,所以對象要用static+set注入,把對象的級別提升到進(jìn)程,這樣多個線程就可以共用,但是無法調(diào)用父類的方法和變量
消費者狀態(tài)可以查看消費者是否連接成功,消費是否延遲,消費速度等
重置消費位點可以清空所有消息
三、注意事項
1、發(fā)送的消息體 最大為256KB
2、消息最多存在3天
3、消費端默認(rèn)線程數(shù)是20
4、如果運行過程中出現(xiàn)java掛掉或者cpu占用異常高,可以在發(fā)送消息的時候,每發(fā)送1000條讓線程休息1s
5、本地測試或啟動的時候,把ONSADDR換成公網(wǎng),不然報錯無法啟動
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。