真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

這篇文章主要介紹“redis發(fā)布訂閱怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在Redis發(fā)布訂閱怎么實(shí)現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Redis發(fā)布訂閱怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

成都創(chuàng)新互聯(lián)公司自2013年起,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元江州做網(wǎng)站,已為上家服務(wù),為江州各地企業(yè)和個人服務(wù),聯(lián)系電話:13518219792

假設(shè)我們有這么一個業(yè)務(wù)場景,在網(wǎng)站下單支付以后,需要通知庫存服務(wù)進(jìn)行發(fā)貨處理。

上面業(yè)務(wù)實(shí)現(xiàn)不難,我們只要讓庫存服務(wù)提供給相關(guān)的給口,下單支付之后只要調(diào)用庫存服務(wù)即可。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

后面如果又有新的業(yè)務(wù),比如說積分服務(wù),他需要獲取下單支付的結(jié)果,然后增加用戶的積分。

這個實(shí)現(xiàn)也不難,讓積分服務(wù)同樣提供一個接口,下單支付之后只要調(diào)用庫存服務(wù)即可。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

如果就兩個業(yè)務(wù)需要獲取下單支付的結(jié)果,那也還好,程序改造也快??墒请S著業(yè)務(wù)不斷的發(fā)展,越來越多的新業(yè)務(wù)說是要下單支付的結(jié)果。

這時(shí)我們會發(fā)現(xiàn)上面這樣的系統(tǒng)架構(gòu)存在很多問題:

第一,下單支付業(yè)務(wù)與其他業(yè)務(wù)重度耦合,每當(dāng)有個新業(yè)務(wù)需要支付結(jié)果,就需要改動下單支付的業(yè)務(wù)。

第二,如果調(diào)用業(yè)務(wù)過多,會導(dǎo)致下單支付接口響應(yīng)時(shí)間變長。另外,如果有任一下游接口響應(yīng)變慢,就會同步導(dǎo)致下單支付接口響應(yīng)也變長。

第三,如果任一下游接口失敗,可能導(dǎo)致數(shù)據(jù)不一致的情況。比如說下圖,先調(diào)用 A,成功之后再調(diào)用 B,最后再調(diào)用 C。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

如果在調(diào)用 B 接口的發(fā)生異常,此時(shí)可能就導(dǎo)致下單支付接口返回失敗,但是此時(shí) A 接口其實(shí)已經(jīng)調(diào)用成功,這就代表它內(nèi)部已經(jīng)處理下單支付成功的結(jié)果。

這樣就會導(dǎo)致 A,B,C 三個下游接口,A 獲取成功獲取支付結(jié)果,但是 B,C 沒有拿到,導(dǎo)致三者系統(tǒng)數(shù)據(jù)不一致的情況。

其實(shí)我們仔細(xì)想一下,對于下單支付業(yè)務(wù)來講,它其實(shí)不需要關(guān)心下游調(diào)用結(jié)果,只要有某種機(jī)制通知能通知到他們就可以了。

講到這里,這就需要引入今天需要介紹發(fā)布訂閱機(jī)制。

Redis 發(fā)布與訂閱

Redis 提供了基于「發(fā)布/訂閱」模式的消息機(jī)制,在這種模式下,消息發(fā)布者與訂閱者不需要進(jìn)行直接通信。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

如上圖所示,消息發(fā)布者只需要想指定的頻道發(fā)布消息,訂閱該頻道的每個客戶端都可以接受到到這個消息。

使用 Redis 發(fā)布訂閱這種機(jī)制,對于上面業(yè)務(wù),下單支付業(yè)務(wù)只需要向支付結(jié)果這個頻道發(fā)送消息,其他下游業(yè)務(wù)訂閱支付結(jié)果這個頻道,就能收相應(yīng)消息,然后做出業(yè)務(wù)處理即可。

這樣就可以解耦系統(tǒng)上下游之間調(diào)用關(guān)系。

接下來我們來看下,我們來看下如何使用 Redis 發(fā)布訂閱功能。

Redis 中提供了一組命令,可以用于發(fā)布消息,訂閱頻道,取消訂閱以及按照模式訂閱。

首先我們來看下如何發(fā)布一條消息,其實(shí)很簡單只要使用 publish指令:

publish channel message

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

上圖中,我們使用 publish指令向 pay_result這個頻道發(fā)送了一條消息。我們可以看到 redis 向我們返回 0 ,這其實(shí)代表當(dāng)前訂閱者個數(shù),由于此時(shí)沒有訂閱,所以返回結(jié)果為 0 。

接下來我們使用 subscribe訂閱一個或多個頻道

subscribe channel [channel ...]

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

如上圖所示,我們訂閱 pay_result這個頻道,當(dāng)有其他客戶端往這個頻道發(fā)送消息,

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

當(dāng)前訂閱者就會收到消息。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

我們子在使用訂閱命令,需要主要幾點(diǎn):

第一,客戶端執(zhí)行訂閱指令之后,就會進(jìn)入訂閱狀態(tài),之后就只能接收 subscribe、psubscribe、unsubscribe、punsubscribe這四個命令。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

第二,新訂閱的客戶端,是無法收到這個頻道之前的消息,這是因?yàn)?Redis 并不會對發(fā)布的消息持久化的。

相比于很多專業(yè) MQ,比如 kafka、rocketmq 來說, redis 發(fā)布訂閱功能就顯得有點(diǎn)簡陋了。不過 redis 發(fā)布訂閱功能勝在簡單,如果當(dāng)前場景可以容忍這些缺點(diǎn),還是可以選擇使用的。

除了上面的功能以外的,Redis 還支持模式匹配的訂閱方式。簡單來說,客戶端可以訂閱一個帶 * 號的模式,如果某些頻道的名字與這個模式匹配,那么當(dāng)其他客戶端發(fā)送給消息給這些頻道時(shí),訂閱這個模式的客戶端也將會到收到消息。

使用 Redis 訂閱模式,我們需要使用一個新的指令 psubscribe

我們執(zhí)行下面這個指令:

psubscribe pay.*

那么一旦有其他客戶端往 pay開頭的頻道,比如 pay_resultpay_xxx,我們都可以收到消息。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

如果需要取消訂閱模式,我們需要使用相應(yīng)punsubscribe 指令,比如取消上面訂閱的模式:

punsubscribe pay.*

Redis 客戶端發(fā)布訂閱使用方式

基于 Jedis 開發(fā)發(fā)布/訂閱

聊完 Redis 發(fā)布訂閱指令,我們來看下 Java Redis 客戶端如何使用發(fā)布訂閱。

下面的例子主要基于 Jedis,maven 版本為:


 redis.clients
 jedis
 3.1.0

其他 Redis 客戶端大同小異。

jedis 發(fā)布代碼比較簡單,只需要調(diào)用 Jedis 類的 publish 方法。

// 生產(chǎn)環(huán)境千萬不要這么使用哦,推薦使用 JedisPool 線程池的方式 
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxxxx");
jedis.publish("pay_result", "hello world");

訂閱的代碼就相對復(fù)雜了,我們需要繼承 JedisPubSub 實(shí)現(xiàn)里面的相關(guān)方法,一旦有其他客戶端往訂閱的頻道上發(fā)送消息,將會調(diào)用 JedisPubSub 相應(yīng)的方法。

private static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("收到訂閱頻道:" + channel + " 消息:" + message);

    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("收到具體訂閱頻道:" + channel + "訂閱模式:" + pattern + " 消息:" + message);
    }

}

其次我們需要調(diào)用 Jedis 類的 subscribe 方法:

Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");

當(dāng)有其他客戶端往 pay_result頻道發(fā)送消息時(shí),訂閱將會收到消息。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

不過需要注意的是,jedis#subscribe 是一個阻塞方法,調(diào)用之后將會阻塞主線程的,所以如果需要在正式項(xiàng)目使用需要使用異步線程運(yùn)行,這里就不演示具體的代碼了。

基于 Spring-Data-Redis 開發(fā)發(fā)布訂閱

原生 jedis 發(fā)布訂閱操作,相對來說還是有點(diǎn)復(fù)雜?,F(xiàn)在我們很多應(yīng)用已經(jīng)基于 SpringBoot 開發(fā),使用 spring-boot-starter-data-redis ,可以簡化發(fā)布訂閱開發(fā)。

首先我們需要引入相應(yīng)的 startter 依賴:


    org.springframework.boot
    spring-boot-starter-data-redis
    
        
            lettuce-core
            io.lettuce
        
    


    redis.clients
    jedis

這里我們使用 Jedis 當(dāng)做底層連接客戶端,所以需要排除 lettuce,然后引入 Jedis 依賴。

然后我們需要創(chuàng)建一個消息接收類,里面需要有方法消費(fèi)消息:

@Slf4j
public class Receiver {
    private AtomicInteger counter = new AtomicInteger();

    public void receiveMessage(String message) {
        log.info("Received <" + message + ">");
        counter.incrementAndGet();
    }

    public int getCount() {
        return counter.get();
    }
}

接著我們只需要注入 Spring- Redis 相關(guān) Bean,比如:

  • StringRedisTemplate,用來操作 Redis 命令

  • MessageListenerAdapter ,消息監(jiān)聽器,可以在這個類注入我們上面創(chuàng)建消息接受類 Receiver

  • RedisConnectionFactory, 創(chuàng)建 Redis 底層連接

@Configuration
public class MessageConfiguration {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 訂閱指定頻道使用 ChannelTopic
        // 訂閱模式使用 PatternTopic
        container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        // 注入 Receiver,指定類中的接受方法
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

}

最后我們使用 StringRedisTemplate#convertAndSend 發(fā)送消息,同時(shí) Receiver 將會收到一條消息。

@SpringBootApplication
public class MessagingRedisApplication {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        Receiver receiver = ctx.getBean(Receiver.class);

        while (receiver.getCount() == 0) {
            template.convertAndSend("pay_result", "Hello from Redis!");
            Thread.sleep(500L);
        }

        System.exit(0);
    }
}

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

Redis 發(fā)布訂閱實(shí)際應(yīng)用

Redis Sentinel 節(jié)點(diǎn)發(fā)現(xiàn)

Redis Sentinel是 Redis 一套高可用方案,可以在主節(jié)點(diǎn)故障的時(shí)候,自動將從節(jié)點(diǎn)提升為主節(jié)點(diǎn),從而轉(zhuǎn)移故障。

今天這里我們不詳細(xì)解釋 Redis Sentinel詳細(xì)原理,主要來看下 Redis Sentinel如何使用發(fā)布訂閱機(jī)制。

Redis Sentinel節(jié)點(diǎn)主要使用發(fā)布訂閱機(jī)制,實(shí)現(xiàn)新節(jié)點(diǎn)的發(fā)現(xiàn),以及交換主節(jié)點(diǎn)的之間的狀態(tài)。

如下所示,每一個 Sentinel節(jié)點(diǎn)將會定時(shí)向 _sentinel_:hello 頻道發(fā)送消息,并且每個 Sentinel都會訂閱這個節(jié)點(diǎn)。

Redis發(fā)布訂閱怎么實(shí)現(xiàn)

這樣一旦有節(jié)點(diǎn)往這個頻道發(fā)送消息,其他節(jié)點(diǎn)就可以立刻收到消息。

這樣一旦有的新節(jié)點(diǎn)加入,它往這個頻道發(fā)送消息,其他節(jié)點(diǎn)收到之后,判斷本地列表并沒有這個節(jié)點(diǎn),于是就可以當(dāng)做新的節(jié)點(diǎn)加入本地節(jié)點(diǎn)列表。

除此之外,每次往這個頻道發(fā)送消息內(nèi)容可以包含節(jié)點(diǎn)的狀態(tài)信息,這樣可以作為后面 Sentinel領(lǐng)導(dǎo)者選舉的依據(jù)。

以上都是對于 Redis 服務(wù)端來講,對于客戶端來講,我們也可以用到發(fā)布訂閱機(jī)制。

當(dāng) Redis Sentinel進(jìn)行主節(jié)點(diǎn)故障轉(zhuǎn)移,這個過程各個階段會通過發(fā)布訂閱對外提供。

對于我們客戶端來講,比較關(guān)心切換之后的主節(jié)點(diǎn),這樣我們及時(shí)切換主節(jié)點(diǎn)的連接(舊節(jié)點(diǎn)此時(shí)已故障,不能再接受操作指令),

客戶端可以訂閱 +switch-master頻道,一旦 Redis Sentinel結(jié)束了對主節(jié)點(diǎn)的故障轉(zhuǎn)移就會發(fā)布主節(jié)點(diǎn)的的消息。

redission 分布式鎖

redission 開源框架提供一些便捷操作 Redis 的方法,其中比較出名的 redission 基于 Redis 的實(shí)現(xiàn)分布式鎖。

今天我們來看下 Redis 的實(shí)現(xiàn)分布式鎖中如何使用 Redis 發(fā)布訂閱機(jī)制,提高加鎖的性能。

PS:redission 分布式鎖實(shí)現(xiàn)原理,可以參考之前寫過的文章:

  1. 可重入分布式鎖的實(shí)現(xiàn)方式

  2. Redis 分布式鎖,看似簡單,其實(shí)真不簡單

首先我們來看下 redission 加鎖的方法:

Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();

RLock 繼承自 Java 標(biāo)準(zhǔn)的 Lock 接口,調(diào)用 lock 方法,如果當(dāng)前鎖已被其他客戶端獲取,那么當(dāng)前加鎖的線程將會被阻塞,直到其他客戶端釋放這把鎖。

這里其實(shí)有個問題,當(dāng)前阻塞的線程如何感知分布式鎖已被釋放呢?

這里其實(shí)有兩種實(shí)現(xiàn)方法:

第一鐘,定時(shí)查詢分布時(shí)鎖的狀態(tài),一旦查到鎖已被釋放(Redis 中不存在這個鍵值),那么就去加鎖。

實(shí)現(xiàn)偽碼如下:

while (true) {
  boolean result=lock();
  if (!result) {
    Thread.sleep(N);
  }
}

這種方式實(shí)現(xiàn)起來起來簡單,不過缺點(diǎn)也比較多。

如果定時(shí)任務(wù)時(shí)間過短,將會導(dǎo)致查詢次數(shù)過多,其實(shí)這些都是無效查詢。

如果定時(shí)任務(wù)休眠時(shí)間過長,那又會導(dǎo)致加鎖時(shí)間過長,導(dǎo)致加鎖性能不好。

那么第二種實(shí)現(xiàn)方案,就是采用服務(wù)通知的機(jī)制,當(dāng)分布式鎖被釋放之后,客戶端可以收到鎖釋放的消息,然后第一時(shí)間再去加鎖。

這個服務(wù)通知的機(jī)制我們可以使用 Redis 發(fā)布訂閱模式。

當(dāng)線程加鎖失敗之后,線程將會訂閱 redisson_lock__channel_xxx(xx 代表鎖的名稱) 頻道,使用異步線程監(jiān)聽消息,然后利用 Java 中 Semaphore 使當(dāng)前線程進(jìn)入阻塞。

一旦其他客戶端進(jìn)行解鎖,redission 就會往這個redisson_lock__channel_xxx 發(fā)送解鎖消息。

等異步線程收到消息,將會調(diào)用 Semaphore 釋放信號量,從而讓當(dāng)前被阻塞的線程喚醒去加鎖。

ps:這里只是簡單描述了 redission 加鎖部分原理,出于篇幅,這里就不再消息解析源碼。

感興趣的小伙伴可以自己看下 redission 加鎖的源碼。

通過發(fā)布訂閱機(jī)制,被阻塞的線程可以及時(shí)被喚醒,減少無效的空轉(zhuǎn)的查詢,有效的提高的加鎖的效率。

ps: 這種方式,性能確實(shí)提高,但是實(shí)現(xiàn)起來的復(fù)雜度也很高,這部分源碼有點(diǎn)東西,快看暈了。

到此,關(guān)于“Redis發(fā)布訂閱怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!


分享名稱:Redis發(fā)布訂閱怎么實(shí)現(xiàn)
鏈接地址:http://weahome.cn/article/ghcccs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部