org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-client 4.9.3
發(fā)送消息// 同步發(fā)送
SendResult sendResult = producer.send(msg);
// 異步發(fā)送,指定回調
producer.send(msg, new SendCallback() {// 當producer接收到MQ發(fā)送來的ACK后就會觸發(fā)該回調方法的執(zhí)行
@Override
public void onSuccess(SendResult sendResult) {System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {e.printStackTrace();
}
});
// 消息發(fā)送的狀態(tài)
public enum SendStatus {SEND_OK, // 發(fā)送成功
FLUSH_DISK_TIMEOUT, // 刷盤超時。當Broker設置的刷盤策略為同步刷盤時才可能出現這種異常狀態(tài)。異步刷盤不會出現
FLUSH_SLAVE_TIMEOUT, // Slave同步超時。當Broker集群設置的Master-Slave的復制方式為同步復制時才可能出現這種異常狀態(tài)。異步復制不會出現
SLAVE_NOT_AVAILABLE, // 沒有可用的Slave。當Broker集群設置為Master-Slave的復制方式為同步復制時才可能出現這種異常狀態(tài)。異步復制不會出現
}
消費消息DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("rocketmqOS:9876");
// 指定 從第一條消息開始消費
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("someTopicA", "*");
// 指定 每次可以 消費10條消息,默認為1
consumer.setConsumeMessageBatchMaxSize (10);
// 指定 每次 可以從Broker拉取40條消息, 默認為32
consumer.setPullBatchSize (40);
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println (msg);
}
// 消費成功的返回結果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消費異常時的返回結果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start ();
消息的可靠性(不丟失、高可用)消息可能在哪些階段丟失呢?可能會在這三個階段發(fā)生丟失:生產階段、存儲階段、消費階段
生產階段:事務消息,寫到OS Cache就返回 成功發(fā)送!
存儲階段:調整 MQ的刷盤策略,我們需要調整broker.conf配置文件,將其中的flushDiskType配置設置為:SYNC_FLUSH,默認他的值是ASYNC_FLUSH,即默認是異步刷盤
消費階段
不能 異步 消費消息
// 開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 業(yè)務 處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動接收消息的服務
consumer.start ();
也可以 使用如下結構:
目前RocketMQ支持的延時級別:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
生產者:
// 這是 訂單系統(tǒng)的生產者
DefaultMQProducer producer = new DefaultMQProducer ("OrderGroup");
// 啟動生產者
producer.start ();
Message msg = new Message (
"CreateOrderInformTopic",
"create success".getBytes ());
// 設置消息為延時消息,延遲級別為 16
msg.setDelayTimeLevel (16);
producer.send (msg);
消費者:
// 訂單掃描服務 的消費者
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer ("OrderScanServiceGroup");
// 訂閱 訂單創(chuàng)建 通知Topic
consumer.subscribe ("CreateOrderInformTopic", "*");
// 注冊 消息監(jiān)聽者
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listlist,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {// 將 msg轉為 訂單對象
Order order = buildOrder (msg);
// 根據Id 查詢數據庫
order = getById (order.getId ());
// 如果 已支付
if (order.getStatus () == 1) {}
// 如果 未支付
else {}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
消息過濾// 創(chuàng)建消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("aGroup");
// 訂閱主題,tag為 a,b,c的都監(jiān)聽
consumer.subscribe ("orderInfoTopic", "a || b || c");
修改broker.conf 配置文件才能生效
// 創(chuàng)建消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("bGroup");
// 只有訂閱的消息有這個屬性a, a >=0 and a<= 3
consumer.subscribe ("TopicTest", MessageSelector.bySql ("a between 0 and 3"));
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {// 處理業(yè)務邏輯
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start ();
RocketMQ還是 支持比較豐富的數據過濾語法的,如下所示:
(1)數值比較,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比較,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)邏輯符號 AND,OR,NOT;
(5)數值,比如:123,3.1415;
(6)字符,比如:‘abc’,必須用單引號包裹起來;
(7)NULL,特殊的常量
(8)布爾值,TRUE 或 FALSE
順序消息是指消息的消費順序和產生順序相同,在有些業(yè)務邏輯下,必須保證順序,比如訂單的生成、付款、發(fā)貨,這個消息必須按順序處理才行。
「消息亂序解決方案 不能和重試隊列混用?!?/p>全局順序消息
創(chuàng)建一個 Topic ,默認八個寫隊列,八個讀隊列
要保證全局順序消息, 需要先把 Topic 的讀寫隊列數設置為 一,然后Producer Consumer 的并發(fā)設置,也要是一
部分順序消息部分順序消息相對比較好實現
,生產端需要做到把同 ID 的消息發(fā)送到同一個 Message Queue ;在消費過程中,要做到從同一個Message Queue讀取的消息順序處理
DefaultMQProducer producer = new DefaultMQProducer ();
producer.setSendLatencyFaultEnable (true);
producer.setSendMsgTimeout (50);
producer.setNamesrvAddr ("192.168.111.101:9876");
producer.start ();
Message message = new Message ("Topic", "hello world".getBytes ());
SendResult sendResult = producer.send (message, new MessageQueueSelector () {@Override
public MessageQueue select(Listmqs,
Message message,
Object arg) {Long orderId = (Long) arg; // 根據 訂單id選擇發(fā)送 queue
long index = orderId % mqs.size (); // 用訂單id 對MessageQueue數量取模
return mqs.get ((int) index); // 返回一個MessageQueue
}
}, orderId //這里傳入訂單id
);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("192.168.111.101:9876");
// 從頭 開始消費
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("Topic", "*");
consumer.registerMessageListener (new MessageListenerOrderly () {@Override
public ConsumeOrderlyStatus consumeMessage(Listmsgs,
ConsumeOrderlyContext context) { context.setAutoCommit (true);
try { for (MessageExt msg : msgs) { // 對有序的消息 進行處理
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) { // 如果 消息處理 有問題
// 讓這批消息 暫停一會兒 再繼續(xù)處理這批消息
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start ();
消息積壓發(fā)生了消息積壓,這時候就得想辦法趕緊把積壓的消息消費完,就得考慮提高消費能力,一般有兩種辦法
處理消息重復問題,主要有業(yè)務端自己保證,主要的方式有兩種:業(yè)務去重 和 消息去重。
業(yè)務去重:保證 業(yè)務消費的冪等性即可
消息去重:依據在 生產者方設置消息的 messageKey,然后 每一條消息 在消費方依據這個唯一的 messageKey,進行冪等判斷
流程:** insert 失敗,說明已經消費---->捕獲異常返回已消費,insert 成功---->處理業(yè)務 提交事務后 再確認成功消費~~ **
// 設置 同步發(fā)送失敗時 重試發(fā)送的次數,默認為2次
producer.setRetryTimesWhenSendFailed(3);
// 設置 發(fā)送 超時時限為5s,默認3s
producer.setSendMsgTimeout(5000);
// 指定 異步發(fā)送失敗后 不進行重試發(fā)送
producer.setRetryTimesWhenSendAsyncFailed(0);
// 順序消息消費失敗的消費重試時間間隔,單位毫秒,默認為1000,其取值范圍為[10,30000]
// 重試期間 應用會出現 消息消費被阻塞
consumer.setSuspendCurrentQueueTimeMillis(100);
// 修改消費重試次數,默認16次
consumer.setMaxReconsumeTimes(10);
消息 刷盤失敗策略
消息刷盤超時(Master或Slave)或slave不可用(slave在做數據同步時向master返回狀態(tài)不是SEND_OK)時,默認是不會將消息嘗試發(fā)送到其他Broker的。不過,對于重要消息可以通過在Broker的配置文件設置retryAnotherBrokerWhenNotStoreOK
屬性為true
來開啟
重試隊列里面的消息會再次發(fā)給消費組,默認 最多重試 16 次,如果重試 16 次失敗則進入「死信隊列」
「死信隊列:」
對于死信隊列,一般我們可以專門開一個后臺線程,訂閱這個死信隊列,對死信隊列中的消息,一直不停的嘗試。
消息量比較大,不建議同步,影響消息消費速度,造成消息堆積
消費者處理失敗后,立刻寫入重試表,有個 定時任務專門重試
消費失敗,自己給同一個topic發(fā)一條消息—>對消息順序要求不高的場景可以使用
事務消息原理
Rocketmq 未收到rollback、commit也會 補償回調,MQ也會有補償機制 :checkLocalTransaction方法
讓我們自己處理
TransactionListenerImpl 事務監(jiān)聽器
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {// 發(fā)送成功給 broker后,可以 執(zhí)行本地事務
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {// 執(zhí)行 訂單本地事務
try {// 如果 本地事務都執(zhí)行成功了,返回 commit
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {// 本地事務 執(zhí)行失敗,回滾 本地事務
// 更新 broker中的消息狀態(tài)為 刪除
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 因為超時 等原因,沒有返回 commit或者 rollback
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 這里默認是 回滾
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
生產者:
// 接受 RocketMQ 回調的一個 監(jiān)聽器接口
// 會執(zhí)行 訂單本地事務,commit、rollback,回調查詢 等邏輯
TransactionListenerImpl transactionListener = new TransactionListenerImpl ();
TransactionMQProducer producer = new TransactionMQProducer ();
ThreadPoolExecutor executorService = new ThreadPoolExecutor (
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000)
);
// 設置 對應的線程池,負責執(zhí)行 回調請求
producer.setExecutorService (executorService);
// 開啟 容錯機制
producer.setSendLatencyFaultEnable (true);
// 設置 事務監(jiān)聽器
producer.setTransactionListener (transactionListener);
// 設置 發(fā)送失敗時,最多重試幾次
producer.setRetryTimesWhenSendFailed (2);
// 構建 消息體
Message message = new Message (
"PayOrderSuccessTopic",
"Tag",
"MyKey",
"Pay Success".getBytes ());
// 可以 查詢發(fā)送結果
SendResult sendResult = producer.send (message, 10);
問題
MQ集群掛掉降級方案通常的思路:
發(fā)送消息到MQ代碼里去try catch捕獲異常,如果你發(fā)現發(fā)送消息到MQ有異常,此時你需要進行重試
重試了,比如超過3次還是失敗,說明此時可能就是你的MQ集群徹底崩潰了
此時你必須把這條重要的消息寫入到本地存儲中去,可以是寫入數據庫里
要不停的嘗試發(fā)送消息到MQ去
發(fā)現MQ集群恢復了,你 必須有一個后臺線程可以 把之前持久化存儲的消息都查詢出來,然后依次 按照順序 發(fā)送到MQ集群里去
這里要有一個很關鍵的注意點,就是你把消息寫入存儲中 暫存時,一定要保證他的順序
,比如按照順序一條一條的寫入本地磁盤文件去
暫存消息
**流量 太多:**解決?法就是 對線上系統(tǒng)擴容雙段緩沖的??,從 512kb 擴容到?個緩沖區(qū) 10mb。
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網查看詳情吧