真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

MessageQueueSelector如何實現(xiàn)順序消費

Message Queue Selector如何實現(xiàn)順序消費,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:主機(jī)域名、網(wǎng)頁空間、營銷軟件、網(wǎng)站建設(shè)、三水網(wǎng)站維護(hù)、網(wǎng)站推廣。

順序消息的定義:

順序消息是指消息的消費順序和生產(chǎn)順序相同,在某些場景下,必須保證順序消息。比如訂單的生成、付款、發(fā)貨.順序消息又分為全局順序消息和部分順序消息,全局順序消息指某一個topic下的所有消息都要保證順序;部分順序消息只要保證某一組消息被順序消費。對于訂單消息來說,只要保證同一個訂單ID的生成、付款、發(fā)貨消息按照順序消費即可。

部分順序消費實現(xiàn)原理:

1. 發(fā)送端:保證相同訂單ID的各種消息發(fā)往同一個MessageQueue(同一個Topic下的某一個queue)

2.消費端:保證同一個MessageQueue里面的消息不被并發(fā)處理 (同一個Topic的不同MessageQueue是可以同時消費的)

        DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");
		producer.setNamesrvAddr("10.76.0.38:9876");
		producer.start();
		for (int i = 0; i < 1000; i++) {
			Order order  = new Order();
			order.orderId = i;
			order.status = "生成";

			Message msg1 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult1={}",sendResult1);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="付款";

			Message msg2 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult2={}",sendResult2);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="發(fā)貨";
			Message msg3 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);


			SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List mqs, Message msg, Object arg) {
					Integer id = (Integer) arg;
					int index = id % mqs.size();
					return mqs.get(index);
				}
				//MessageQueueSelector保證同一個orderId的消息都存儲在同一個MessageQueue。
			}, order.orderId);
			log.info("sendResult3={}",sendResult1);
		}

消費端主要邏輯如下,主要MessageListenerOrderly回調(diào)實現(xiàn)同一個MessageQueue里面的消息不會被并發(fā)消費:

       //同一個MessageQueue里面的消息要順序消費,不能并發(fā)消費。
		//但是同一個Topic的不同MessageQueue是可以同時消費的
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");
		consumer.setNamesrvAddr("10.76.0.38:9876");
		consumer.subscribe("test", "");
		consumer.setPullBatchSize(1);
		consumer.setConsumeThreadMin(1);
		consumer.setConsumeThreadMax(1);
	//	consumer.registerMessageListener(new MessageListenerConcurrently() {
		consumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
				List messages = new ArrayList<>();

				for (MessageExt msg : msgs) {
					messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());
				}
				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
		consumer.start();
		Thread.currentThread().join();

源碼分析:

我們知道在RocketMQ中是可以給一個消費者實例設(shè)置多個線程并發(fā)消費的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,

那MessageListenerOrderly是如何保證某一個時刻,只有一個消費者的某一個線程在消費某一個MessageQueue的呢?

就在Client模塊的 ConsumeMessageOrderlyService里面,消費者端并不是簡單的禁止并發(fā)處理,而是給每一個Consumer Queue加鎖,

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

在消費每個消息之前,需要先獲取這個消息對應(yīng)的Consumer Queue所對應(yīng)的鎖,保證同一個Consumer Queue的消息不會被并發(fā)消費,但是不同的Consumer Queue的消息是可以并發(fā)處理的。

看完上述內(nèi)容,你們掌握Message Queue Selector如何實現(xiàn)順序消費的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!


文章名稱:MessageQueueSelector如何實現(xiàn)順序消費
轉(zhuǎn)載源于:http://weahome.cn/article/pjcggi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部