真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何使用Redis實現(xiàn)一個安全可靠的分布式鎖

這篇文章給大家分享的是有關如何使用redis實現(xiàn)一個安全可靠的分布式鎖的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

創(chuàng)新互聯(lián)建站是一家專注于網(wǎng)站設計、成都網(wǎng)站制作與策劃設計,高港網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設十載,網(wǎng)設計領域的專業(yè)建站公司;建站業(yè)務涵蓋:高港等地區(qū)。高港做網(wǎng)站價格咨詢:028-86922220

并發(fā)場景下多個進程或線程共享資源的讀寫,需要保證對資源的訪問互斥。在單機系統(tǒng)中,我們可以使用Java并發(fā)包中的API、synchronized關鍵字等方式來解決;但是在分布式系統(tǒng)下,這些方式不再適用,我們需要自己實現(xiàn)分布式鎖。

常見的分布式鎖的實現(xiàn)方案有:基于數(shù)據(jù)庫、基于Redis、基于Zookeeper等。作為Redis專題的一部分,本文將基于Redis聊一聊分布式鎖的實現(xiàn)方案。

分析與實現(xiàn)


問題分析

分布式鎖與JVM內置的鎖有著共同的目的:讓應用程序以預期的順序訪問或操作共享的資源,防止多個線程同時對同一資源操作,導致系統(tǒng)運行紊亂、不可控。常常用于商品庫存扣減、優(yōu)惠券扣減等場景。

理論上來講,為了保證鎖的安全性和有效性,分布式鎖至少需要滿足以下條件:

  • 互斥性:在同一時間內,僅有一個線程能夠獲得鎖;

  • 無死鎖:線程獲取鎖后,必須保證能夠釋放,即使線程獲取鎖后應用程序宕機,也能在限定時間內釋放;

  • 加鎖和解鎖必須是同一個線程;

在實現(xiàn)方式上,分布式鎖大體分為三個步驟:

  • a-獲取資源的操作權;

  • b-對資源執(zhí)行操作;

  • c-釋放資源的操作權;

無論是Java內置的鎖,還是分布式鎖,也無論使用哪種分布式實現(xiàn)方案,都是圍繞a、c兩個步驟展開。Redis對于實現(xiàn)分布式鎖天然友好,原因如下:

  • 命令處理階段Redis使用單線程處理,同一個key同時只有一個線程能夠處理,沒有多線程競態(tài)問題。

  • SET key value NX PX milliseconds命令在不存在key的情況下添加具有過期時間的key,為安全加鎖提供支持。

  • Lua腳本和DEL命令為安全解鎖提供可靠支撐。

代碼實現(xiàn)
  • Maven依賴


    org.springframework.boot
    spring-boot-starter-data-redis
  	${your-spring-boot-version}
  • 配置文件

在application.properties增加以下內容,單機版Redis實例。

spring.redis.database=0
spring.redis.host=localhost
spring.redis.port=6379
  • RedisConfig

@Configuration
public class RedisConfig {

    // 自己定義了一個 RedisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate redisTemplate(RedisConnectionFactory factory)
        throws UnknownHostException {
        // 我們?yōu)榱俗约洪_發(fā)方便,一般直接使用 
        RedisTemplate template = new RedisTemplate();
        template.setConnectionFactory(factory);
        // Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // String 的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}
  • RedisLock

@Service
public class RedisLock {

    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 加鎖,最多等待maxWait毫秒
     *
     * @param lockKey   鎖定key
     * @param lockValue 鎖定value
     * @param timeout   鎖定時長(毫秒)
     * @param maxWait   加鎖等待時間(毫秒)
     * @return true-成功,false-失敗
     */
    public boolean tryAcquire(String lockKey, String lockValue, int timeout, long maxWait) {
        long start = System.currentTimeMillis();

        while (true) {
            // 嘗試加鎖
            Boolean ret = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, timeout, TimeUnit.MILLISECONDS);
            if (!ObjectUtils.isEmpty(ret) && ret) {
                return true;
            }

            // 計算已經(jīng)等待的時間
            long now = System.currentTimeMillis();
            if (now - start > maxWait) {
                return false;
            }

            try {
                Thread.sleep(200);
            } catch (Exception ex) {
                return false;
            }
        }
    }

    /**
     * 釋放鎖
     *
     * @param lockKey   鎖定key
     * @param lockValue 鎖定value
     * @return true-成功,false-失敗
     */
    public boolean releaseLock(String lockKey, String lockValue) {
        // lua腳本
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

        DefaultRedisScript redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = redisTemplate.opsForValue().getOperations().execute(redisScript, Collections.singletonList(lockKey), lockValue);
        return result != null && result > 0L;
    }
}
  • 測試用例

@SpringBootTest
class RedisDistLockDemoApplicationTests {

    @Resource
    private RedisLock redisLock;

    @Test
    public void testLock() {
        redisLock.tryAcquire("abcd", "abcd", 5 * 60 * 1000, 5 * 1000);
        redisLock.releaseLock("abcd", "abcd");
    }
}
安全隱患

可能很多同學(也包括我)在日常工作中都是使用上面的實現(xiàn)方式,看似是穩(wěn)妥的:

  • 使用set命令NXPX選項進行加鎖,保證了加鎖互斥,避免了死鎖;

  • 使用lua腳本解鎖,防止解除其他線程的鎖;

  • 加鎖、解鎖命令都是原子操作;

其實以上實現(xiàn)的穩(wěn)妥有個前提條件:單機版Redis、開啟AOF持久化方式并設置appendfsync=always

但是在哨兵模式和集群模式下可能存在問題,為什么呢?

哨兵模式和集群模式基于主從架構,主從之間通過命令傳播實現(xiàn)數(shù)據(jù)同步,而命令傳播是異步的。

所以就存在主節(jié)點數(shù)據(jù)寫入成功,在還未通知從節(jié)點情況下,主節(jié)點就宕機的可能。

當從節(jié)點通過故障轉移提升為新的主節(jié)點后,其他線程就有機會重新加鎖成功,導致不滿足分布式鎖的互斥條件。

官方RedLock


集群模式下,若集群所有節(jié)點穩(wěn)定運行,不出現(xiàn)故障轉移的情況下,安全性是有保障的。但是,沒有什么系統(tǒng)能夠保證100%穩(wěn)定,基于Redis的分布式鎖必須考慮容錯。

由于主從同步基于異步復制原理,所以哨兵模式和集群模式天生無法滿足此條件。為此,Redis作者專門提出了一種解決方案——RedLock(Redis Distribute Lock)。

設計思路

根據(jù)官方文檔的說明,把RedLock的設計思路進行介紹。

先說環(huán)境要求,需要N(N>=3)個獨立部署的Redis實例,相互之間不需要主從復制、故障轉移等技術。

為了獲取鎖,客戶端將按照以下流程進行操作:

  • 獲取當前時間(毫秒)作為開始時間start;

  • 使用相同的key和隨機value,按順序向所有N個節(jié)點發(fā)起獲取鎖的請求。當向每個實例設置鎖時,客戶端會使用一個過期時間(小于鎖的自動釋放時間)。比如鎖的自動釋放時間是10秒,這個超時時間應該是5-50毫秒。這是為了防止客戶端在一個已經(jīng)宕機的實例浪費太多時間:如果Redis實例宕機,客戶端盡快處理下一個實例。

  • 客戶端計算加鎖消耗的時間cost(cost=start-now)。只有客戶端在半數(shù)以上實例加鎖成功,并且整個耗時小于整個有效時間(ttl),才能認為當前客戶端加鎖成功。

  • 如果客戶端加鎖成功,那么整個鎖的真正有效時間應該是:validTime=ttl-cost。

  • 如果客戶端加鎖失?。赡苁谦@取鎖成功實例數(shù)未過半,也可能是耗時超過ttl),那么客戶端應該向所有實例嘗試解鎖(即使剛剛客戶端認為加鎖失敗)。

RedLock的設計思路延續(xù)了Redis內部多種場景的投票方案,通過多個實例分別加鎖解決競態(tài)問題,雖然加鎖消耗了時間,但是消除了主從機制下的安全問題。

代碼實現(xiàn)

官方推薦Java實現(xiàn)為Redisson,它具備可重入特性,按照RedLock進行實現(xiàn),支持獨立實例模式、集群模式、主從模式、哨兵模式等;API比較簡單,上手容易。示例如下(直接通過測試用例):

    @Test
    public void testRedLock() throws InterruptedException {

        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        final RedissonClient client = Redisson.create(config);

        // 獲取鎖實例
        final RLock lock = client.getLock("test-lock");

        // 加鎖
        lock.lock(60 * 1000, TimeUnit.MILLISECONDS);
        try {
            // 假裝做些什么事情
            Thread.sleep(50 * 1000);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //解鎖
            lock.unlock();
        }
    }

Redisson封裝的非常好,我們可以像使用Java內置的鎖一樣去使用,代碼簡潔的不能再少了。關于Redisson源碼的分析,網(wǎng)上有很多文章大家可以找找看。

全文總結


分布式鎖是我們研發(fā)過程中常用的的一種解決并發(fā)問題的方式,Redis是只是一種實現(xiàn)方式。

關鍵的是要弄清楚加鎖、解鎖背后的原理,以及實現(xiàn)分布式鎖需要解決的核心問題,同時考慮我們所采用的中間件有什么特性可以支撐。了解這些后,實現(xiàn)起來就不是什么問題了。

感謝各位的閱讀!關于“如何使用Redis實現(xiàn)一個安全可靠的分布式鎖”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!


名稱欄目:如何使用Redis實現(xiàn)一個安全可靠的分布式鎖
當前網(wǎng)址:http://weahome.cn/article/jcpdcp.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部