如何理解redis的Pub/Sub模式,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。
為遷安等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及遷安網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都做網(wǎng)站、成都網(wǎng)站建設(shè)、遷安網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專(zhuān)業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
Redis同樣支持消息的發(fā)布/訂閱(Pub/Sub)模式,這和中間件activemq有些類(lèi)似。訂閱者(Subscriber)可以訂閱自己感興趣的頻道(Channel),發(fā)布者(Publisher)可以將消息發(fā)往指定的頻道(Channel),正是通過(guò)這種方式,可以將消息的發(fā)送者和接收者解耦。另外,由于可以動(dòng)態(tài)的Subscribe和Unsubscribe,也可以提高系統(tǒng)的靈活性和可擴(kuò)展性。
這里假設(shè)有一個(gè)可用的Redis環(huán)境(單節(jié)點(diǎn)和集群均可)。
先用一個(gè)客戶端來(lái)訂閱頻道:
上圖中先使用redis-cli作為客戶端連接了Redis,之后使用了SUBSCRIBE命令,后面的參數(shù)表示訂閱了china和hongkong兩個(gè)channel??梢钥吹?SUBSCRIBE china hongkong"這條命令的輸出是6行(可以分為2組,每一組是一個(gè)Message)。因?yàn)橛嗛?、取消訂閱的操作跟發(fā)布的消息都是通過(guò)消息(Message)的方式發(fā)送的,消息的第一個(gè)元素就是消息類(lèi)型,它可以是以下幾種類(lèi)型:
subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.
unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.
message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.
--from http://redis.io/topics/pubsub
上圖的訂閱命令將使得發(fā)往這兩個(gè)channel的消息會(huì)被這個(gè)客戶端接收到。需要注意的是,redis-cli客戶端在進(jìn)入subscribe模式以后,將不能再響應(yīng)其他的任何命令:
A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.
The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT
--from http://redis.io/topics/pubsub
官網(wǎng)說(shuō)客戶端在subscribe下除了可以使用以上命令外,不能使用其他命令了。但是本人在Subscribe狀態(tài)下使用上述幾個(gè)命令,根本沒(méi)反應(yīng)。也就是說(shuō),使用redis-cli訂閱channel后,該客戶端將不能響應(yīng)任何命令。除非按下(ctrl+c),但該操作不是取消訂閱,而是退出redis-cli,此時(shí)將回到shell命令行下。
關(guān)于這個(gè)情況,我在官網(wǎng)上沒(méi)有找到對(duì)這種情況的解釋?zhuān)灿胁簧俚娜嗽诰W(wǎng)上問(wèn),找來(lái)找去,本人覺(jué)得還算合理的解釋是:
On this page: http://redis.io/commands/subscribe applies only to those clients.
The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.
Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).
--from http://stackoverflow.com/questions/17621371/redis-unsubscribe
就是說(shuō),官網(wǎng)中說(shuō)明的client,并不包含這里使用的redis-cli,于是它可以和其他的client有不同表現(xiàn)。(先不糾結(jié)這個(gè)問(wèn)題,稍后再用jedis來(lái)測(cè)試一下。)
接下來(lái)再用一個(gè)客戶端來(lái)發(fā)布消息:
可以看到,新的一個(gè)客戶端使用PUBLISH命令往china頻道發(fā)布了一條叫"China News"的消息,接下來(lái)再看看訂閱端:
可以看見(jiàn),這條消息已經(jīng)被接收到了??梢钥吹剑盏降南⒅械谝粋€(gè)參數(shù)是類(lèi)型"message",第二個(gè)參數(shù)是channel名字"china",第三個(gè)參數(shù)是消息內(nèi)容"China News",這和開(kāi)始說(shuō)的message類(lèi)型的結(jié)構(gòu)一致。
Redis還支持通配符的訂閱和發(fā)布??蛻舳丝梢杂嗛啙M足一個(gè)或多個(gè)規(guī)則的channel消息,相應(yīng)的命令是PSUBSCRIBE和PUNSUBSCRIBE。接下來(lái)我們?cè)儆昧硪粋€(gè)redis-cli客戶端來(lái)訂閱"chi*"的channel,如圖:
和subscribe/unsubscribe的輸出類(lèi)似,可以看到第一部分是消息類(lèi)型“psubscribe”,第二部分是訂閱的規(guī)則“chi*”,第三部分則是該客戶端目前訂閱的所有規(guī)則個(gè)數(shù)。
接下來(lái)再發(fā)布一條消息到china這個(gè)channel中,此時(shí),兩個(gè)訂閱者應(yīng)該都能收到該消息:
實(shí)際測(cè)試結(jié)果跟預(yù)期相同。需要注意的是,訂閱者2通過(guò)通配符訂閱的,收到的消息類(lèi)型是“pmessage”:
pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.
--from http://redis.io/topics/pubsub
第二部分是匹配的模式“chi*”,第三部分是實(shí)際的channel名字“china”,第四部分是消息內(nèi)容“China Daily”。
我們?cè)侔l(fā)布一條消息到chinnna中,此時(shí)只有訂閱者2能接收到消息了:
同樣,在使用PSUBSCRIBE進(jìn)入訂閱模式以后,該redis-cli也不能再監(jiān)聽(tīng)其他任何的命令,要退出該模式,只能使用ctrl+c。
Jedis是Redis客戶端的一種Java實(shí)現(xiàn),在http://redis.io/clients#java中也能找到。
這里使用maven來(lái)管理包的依賴,由于使用了Log4j來(lái)輸出日志,因此會(huì)用到log4j的jar包:
redis.clients jedis 2.8.0 log4j log4j 1.2.17
Jedis中的JedisPubSub抽象類(lèi)提供了訂閱和取消的功能。想處理訂閱和取消訂閱某些channel的相關(guān)事件,我們得擴(kuò)展JedisPubSub類(lèi)并實(shí)現(xiàn)相關(guān)的方法:
package com.demo.redis; import org.apache.log4j.Logger; import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub {//注意這里繼承了抽象類(lèi)JedisPubSub private static final Logger LOGGER = Logger.getLogger(Subscriber.class); @Override public void onMessage(String channel, String message) { LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message)); } @Override public void onPMessage(String pattern, String channel, String message) { LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s", pattern, channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) { LOGGER.info("onSubscribe"); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { LOGGER.info("onUnsubscribe"); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { LOGGER.info("onPUnsubscribe"); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { LOGGER.info("onPSubscribe"); } }
有了訂閱者,我們還需要一個(gè)發(fā)布者:
package com.demo.redis; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; public class Publisher { private static final Logger LOGGER = Logger.getLogger(Publisher.class); private final Jedis publisherJedis; private final String channel; public Publisher(Jedis publisherJedis, String channel) { this.publisherJedis = publisherJedis; this.channel = channel; } /** * 不停的讀取輸入,然后發(fā)布到channel上面,遇到quit則停止發(fā)布。 */ public void startPublish() { LOGGER.info("Type your message (quit for terminate)"); try { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); while (true) { String line = reader.readLine(); if (!"quit".equals(line)) { publisherJedis.publish(channel, line); } else { break; } } } catch (IOException e) { LOGGER.error("IO failure while reading input", e); } } }
為簡(jiǎn)單起見(jiàn),這個(gè)發(fā)布者接收控制臺(tái)的輸入,然后將輸入的消息發(fā)布到指定的channel上面,如果輸入quit,則停止發(fā)布消息。
接下來(lái)是主函數(shù):
package com.demo.redis; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class Program { public static final String CHANNEL_NAME = "MyChannel"; //我這里的Redis是一個(gè)集群,192.168.56.101和192.168.56.102都可以使用 public static final String REDIS_HOST = "192.168.56.101"; public static final int REDIS_PORT = 7000; private final static Logger LOGGER = Logger.getLogger(Program.class); private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig(); private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0); public static void main(String[] args) throws Exception { final Jedis subscriberJedis = JEDIS_POOL.getResource(); final Jedis publisherJedis = JEDIS_POOL.getResource(); final Subscriber subscriber = new Subscriber(); //訂閱線程:接收消息 new Thread(new Runnable() { public void run() { try { LOGGER.info("Subscribing to \"MyChannel\". This thread will be blocked."); //使用subscriber訂閱CHANNEL_NAME上的消息,這一句之后,線程進(jìn)入訂閱模式,阻塞。 subscriberJedis.subscribe(subscriber, CHANNEL_NAME); //當(dāng)unsubscribe()方法被調(diào)用時(shí),才執(zhí)行以下代碼 LOGGER.info("Subscription ended."); } catch (Exception e) { LOGGER.error("Subscribing failed.", e); } } }).start(); //主線程:發(fā)布消息到CHANNEL_NAME頻道上 new Publisher(publisherJedis, CHANNEL_NAME).startPublish(); publisherJedis.close(); //Unsubscribe subscriber.unsubscribe(); subscriberJedis.close(); } }
主類(lèi)Program中定義了channel名字、連接redis的地址和端口,并使用JedisPool來(lái)獲取Jedis實(shí)例。由于訂閱者(subscriber)在進(jìn)入訂閱狀態(tài)后會(huì)阻塞線程,因此新起一個(gè)線程(new Thread())作為訂閱線程,并是用主線程來(lái)發(fā)布消息。待發(fā)布者(類(lèi)中的new Publisher)停止發(fā)布消息(控制臺(tái)中輸入quit即可)時(shí),解除訂閱者的訂閱(subscriber.unsubscribe()方法)。此時(shí)訂閱線程解除阻塞,打印結(jié)束的日志并退出。
運(yùn)行程序之前,還需要一個(gè)簡(jiǎn)單的log4j配置以觀察輸出:
log4j.rootLogger=INFO,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n
運(yùn)行Program,以下是執(zhí)行結(jié)果:
從結(jié)果看,當(dāng)訂閱者訂閱后,訂閱線程阻塞,主線程中的Publisher接收輸入后,發(fā)布消息到MyChannel中,此時(shí)訂閱該channel的訂閱者收到消息并打印。
開(kāi)始使用redis-cli時(shí),在subscriber進(jìn)入監(jiān)聽(tīng)狀態(tài)后,并不能使用UNSUBSCRIBE和PUNSUBSCRIBE命令,現(xiàn)在在Jedis中,在訂閱線程阻塞時(shí),通過(guò)在main線程中調(diào)用改subscriber的unsubscribe()方法來(lái)解除阻塞。查看Jedis源碼,其實(shí)該方法也就是給redis發(fā)送了一個(gè)UNSUBSCRIBE命令而已:
因此這里是支持在“客戶端”使用UNSUBSCRIBE命令的。
在接收消息前,需要訂閱channel,訂閱完成之后,會(huì)執(zhí)行一個(gè)循環(huán),這個(gè)循環(huán)會(huì)一直阻塞,直到該Client沒(méi)有訂閱數(shù)為止,如下圖:
中間省略的其他行,主要是用于解析收到的Redis響應(yīng),這段代碼也是根據(jù)響應(yīng)的第一部分確定響應(yīng)的消息類(lèi)型,然后挨個(gè)解析響應(yīng)的后續(xù)內(nèi)容,最后根據(jù)解析到消息類(lèi)型,并使用后續(xù)解析到的內(nèi)容作為參數(shù)來(lái)回調(diào)相應(yīng)的方法,省略的內(nèi)容如下:
final byte[] resp = (byte[]) firstObj; if (Arrays.equals(SUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bchannel = (byte[]) reply.get(1); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); //調(diào)用onSubscribe方法,該方法在我們的Subscriber類(lèi)中實(shí)現(xiàn) onSubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bchannel = (byte[]) reply.get(1); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); //調(diào)用onUnsubscribe方法,該方法在我們的Subscriber類(lèi)中實(shí)現(xiàn) onUnsubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(MESSAGE.raw, resp)) { final byte[] bchannel = (byte[]) reply.get(1); final byte[] bmesg = (byte[]) reply.get(2); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg); //調(diào)用onMessage方法,該方法在我們的Subscriber類(lèi)中實(shí)現(xiàn) onMessage(strchannel, strmesg); } else if (Arrays.equals(PMESSAGE.raw, resp)) { final byte[] bpattern = (byte[]) reply.get(1); final byte[] bchannel = (byte[]) reply.get(2); final byte[] bmesg = (byte[]) reply.get(3); final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg); //調(diào)用onPMessage方法,該方法在我們的Subscriber類(lèi)中實(shí)現(xiàn) onPMessage(strpattern, strchannel, strmesg); } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bpattern = (byte[]) reply.get(1); final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern); onPSubscribe(strpattern, subscribedChannels); } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bpattern = (byte[]) reply.get(1); final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern); //調(diào)用onPUnsubscribe方法,該方法在我們的Subscriber類(lèi)中實(shí)現(xiàn) onPUnsubscribe(strpattern, subscribedChannels); } else { //對(duì)于其他Redis沒(méi)有定義的返回消息類(lèi)型,則直接報(bào)錯(cuò) throw new JedisException("Unknown message type: " + firstObj); }
以上就是為什么我們需要在Subscriber中實(shí)現(xiàn)這幾個(gè)方法的原因了(這些方法并不是抽象的,可以選擇實(shí)現(xiàn)使用到的方法)。
關(guān)于如何理解Redis的Pub/Sub模式問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。