這篇文章主要介紹了redis如何實現(xiàn)隊列的阻塞、延時、發(fā)布和訂閱的相關(guān)知識,內(nèi)容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇redis如何實現(xiàn)隊列的阻塞、延時、發(fā)布和訂閱文章都會有所收獲,下面我們一起來看看吧。
成都創(chuàng)新互聯(lián)專注于梁園企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站建設(shè),商城系統(tǒng)網(wǎng)站開發(fā)。梁園網(wǎng)站建設(shè)公司,為梁園等地區(qū)提供建站服務(wù)。全流程定制網(wǎng)站制作,專業(yè)設(shè)計,全程項目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
Redis不僅可作為緩存服務(wù)器,還可以用作消息隊列。它的列表類型天生支持用作消息隊列。如下圖所示:
由于Redis的列表是使用雙向鏈表實現(xiàn)的,保存了頭節(jié)點和尾節(jié)點,所以在列表的頭部和尾部兩邊插入或獲取元素都是非??斓?,時間復(fù)雜度為O(1)。
可以直接使用Redis的list數(shù)據(jù)類型實現(xiàn)消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。
lpush+rpop:左進右出的隊列
rpush+lpop:左出右進的隊列
下面使用redis的命令來模擬普通隊列。
使用lpush命令生產(chǎn)消息:
>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
使用rpop命令消費消息:
>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
下面使用Java代碼來實現(xiàn)普通隊列。
生產(chǎn)者SingleProducer
package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/** * 生產(chǎn)者 */public class SingleProducer { public static final String SINGLE_QUEUE_NAME = "queue:single"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i < 100; i++) { jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i); } jedis.close(); }}
消費者SingleConsumer:
package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/** * 消費者 */public class SingleConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME); if(Objects.nonNull(message)) { System.out.println(message); } else { TimeUnit.MILLISECONDS.sleep(500); } } }}
上面的代碼已經(jīng)基本實現(xiàn)了普通隊列的生產(chǎn)與消費,但是上述的例子中消息的消費者存在兩個問題:
消費者需要不停的調(diào)用rpop方法查看redis的list中是否有待處理的數(shù)據(jù)(消息)。每調(diào)用一次都會發(fā)起一次連接,有可能list中沒有數(shù)據(jù),造成大量的空輪詢,導(dǎo)致造成不必要的浪費。也許你可以使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,如果睡眠時間過長,這樣不能處理一些時效性要求高的消息,睡眠時間過短,也會在連接上造成比較大的開銷。
如果生產(chǎn)者速度大于消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內(nèi)存空間。
消費者可以使用brpop指令從redis的list中獲取數(shù)據(jù),這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,于是消費端就不需要休眠后獲取數(shù)據(jù)了,這樣就相當(dāng)于實現(xiàn)了一個阻塞隊列,
使用redis的brpop命令來模擬阻塞隊列。
>brpop queue:single 30
可以看到命令行阻塞在了brpop這里了,30s后沒數(shù)據(jù)就返回。
Java代碼實現(xiàn)如下:
生產(chǎn)者與普通隊列的生產(chǎn)者一致。
消費者BlockConsumer:
package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/** * 消費者 */public class BlockConsumer { public static void main(String[] args) { Jedis jedis = new Jedis(); while (true) { // 超時時間為1s ListmessageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME); if (null != messageList && !messageList.isEmpty()) { System.out.println(messageList); } } }}
缺點:無法實現(xiàn)一次生產(chǎn)多次消費。
Redis除了對消息隊列提供支持外,還提供了一組命令用于支持發(fā)布/訂閱模式。利用Redis的pub/sub模式可以實現(xiàn)一次生產(chǎn)多次消費的隊列。
發(fā)布:PUBLISH指令可用于發(fā)布一條消息,格式:
PUBLISH channel message
返回值表示訂閱了該消息的數(shù)量。
訂閱:SUBSCRIBE指令用于接收一條消息,格式:
SUBSCRIBE channel
使用SUBSCRIBE指令后進入了訂閱模式,但是不會接收到訂閱之前publish發(fā)送的消息,這是因為只有在消息發(fā)出去前訂閱才會接收到。在這個模式下其他指令,只能看到回復(fù)。
回復(fù)分為三種類型:
如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是已訂閱的頻道的數(shù)量
如果為message(消息),第二個值為產(chǎn)生該消息的頻道,第三個值為消息
如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當(dāng)前客戶端的訂閱數(shù)量。
下面使用redis的命令來模擬發(fā)布訂閱模式。
生產(chǎn)者:
127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1
消費者:
127.0.0.1:6379> subscribe queue Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
Java代碼實現(xiàn)如下:
生產(chǎn)者PubsubProducer:
package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/** * 生產(chǎn)者 */public class PubsubProducer { public static final String PUBSUB_QUEUE_NAME = "queue:pubsub"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i < 100; i++) { jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i); } jedis.close(); }}
消費者PubsubConsumer:
package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * 消費者 */public class PubsubConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onMessage(String channel, String message) { System.out.println("receive message: " + message); if(message.indexOf("99") > -1) { this.unsubscribe(); } } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("subscribe channel: " + channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("unsubscribe channel " + channel); } }; jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME); }}
消費者可以啟動多個,每個消費者都能收到所有的消息。
可以使用指令UNSUBSCRIBE退訂,如果不加參數(shù),則會退訂所有由SUBSCRIBE指令訂閱的頻道。
Redis還支持基于通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:
psubscribe channel.*
用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。
同時PUNSUBSCRIBE指令通配符不會展開。例如:PUNSUBSCRIBE \*
不會匹配到channel.\*
,所以要取消訂閱channel.\*
就要這樣寫PUBSUBSCRIBE channel.\*
。
Redis的pub/sub也有其缺點,那就是如果消費者下線,生產(chǎn)者的消息會丟失。
Redis中有個數(shù)據(jù)類型叫Zset,其本質(zhì)就是在數(shù)據(jù)類型Set的基礎(chǔ)上加了個排序的功能而已,除了保存原始的數(shù)據(jù)value之外,還提供另一個屬性score,這一屬性在添加修改元素時候可以進行指定,每次指定后,Zset會自動重新按新的score值進行排序。
如果score字段設(shè)置為消息的優(yōu)先級,優(yōu)先級最高的消息排在第一位,這樣就能實現(xiàn)一個優(yōu)先級隊列。
如果score字段代表的是消息想要執(zhí)行時間的時間戳,將它插入Zset集合中,便會按照時間戳大小進行排序,也就是對執(zhí)行時間先后進行排序,集合中最先要執(zhí)行的消息就會排在第一位,這樣的話,只需要起一個死循環(huán)線程不斷獲取集合中的第一個元素,如果當(dāng)前時間戳大于等于該元素的score就將它取出來進行消費刪除,就可以達到延時執(zhí)行的目的,注意不需要遍歷整個Zset集合,以免造成性能浪費。
下面使用redis的zset來模擬延時隊列。
生產(chǎn)者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0
消費者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1
Java代碼如下:
生產(chǎn)者DelayProducer:
package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/** * 生產(chǎn)者 */public class DelayProducer { public static final String DELAY_QUEUE_NAME = "queue:delay"; public static void main(String[] args) { Jedis jedis = new Jedis(); long now = new Date().getTime(); Random random = new Random(); for (int i = 0; i < 10; i++) { int second = random.nextInt(30); // 隨機訂單失效時間 jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i); } jedis.close(); }}
消費者:
package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/** * 消費者 */public class DelayConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { long now = new Date().getTime(); SettupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0); if(tupleSet.isEmpty()) { TimeUnit.MILLISECONDS.sleep(500); } else { for (Tuple tuple : tupleSet) { Double score = tuple.getScore(); long time = score.longValue(); if(time < now) { jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement()); System.out.println("order[" + tuple.getElement() +"] is timeout at " + time); } else { TimeUnit.MILLISECONDS.sleep(500); } break; } } } }}
延時隊列可用于訂單超時失效的場景
二級緩存(local+redis)中,當(dāng)有緩存需要更新時,可以使用發(fā)布訂閱模式通知其他服務(wù)器使得本地緩存失效。
關(guān)于“redis如何實現(xiàn)隊列的阻塞、延時、發(fā)布和訂閱”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“redis如何實現(xiàn)隊列的阻塞、延時、發(fā)布和訂閱”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。