真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

這篇文章將為大家詳細(xì)講解有關(guān)如何在Java中使用redisson實(shí)現(xiàn)一個(gè)分布式鎖,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

成都創(chuàng)新互聯(lián)專注于企業(yè)營(yíng)銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、雷山網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5技術(shù)、商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為雷山等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

1. 基本用法


  org.redisson
  redisson
  3.8.2
Config config = new Config();
config.useClusterServers()
  .setScanInterval(2000) // cluster state scan interval in milliseconds
  .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
  .addNodeAddress("redis://127.0.0.1:7002");

RedissonClient redisson = Redisson.create(config);

RLock lock = redisson.getLock("anyLock");

lock.lock();

try {
  ...
} finally {
  lock.unlock();
}

針對(duì)上面這段代碼,重點(diǎn)看一下Redisson是如何基于Redis實(shí)現(xiàn)分布式鎖的

Redisson中提供的加鎖的方法有很多,但大致類似,此處只看lock()方法

更多請(qǐng)參見https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

2. 加鎖

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

可以看到,調(diào)用getLock()方法后實(shí)際返回一個(gè)RedissonLock對(duì)象,在RedissonLock對(duì)象的lock()方法主要調(diào)用tryAcquire()方法

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個(gè)方法才是關(guān)鍵

首先,看一下evalWriteAsync方法的定義

復(fù)制代碼 代碼如下:

RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params);

最后兩個(gè)參數(shù)分別是keys和params

實(shí)際調(diào)用是這樣的:

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

單獨(dú)將調(diào)用的那一段摘出來看

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
         "if (redis.call('exists', KEYS[1]) == 0) then " +
           "redis.call('hset', KEYS[1], ARGV[2], 1); " +
           "redis.call('pexpire', KEYS[1], ARGV[1]); " +
           "return nil; " +
         "end; " +
         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
           "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
           "redis.call('pexpire', KEYS[1], ARGV[1]); " +
           "return nil; " +
         "end; " +
         "return redis.call('pttl', KEYS[1]);",
          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

結(jié)合上面的參數(shù)聲明,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假設(shè)前面獲取鎖時(shí)傳的name是“abc”,假設(shè)調(diào)用的線程ID是Thread-1,假設(shè)成員變量UUID類型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,這段腳本的意思是

1、判斷有沒有一個(gè)叫“abc”的key

2、如果沒有,則在其下設(shè)置一個(gè)字段為“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值為“1”的鍵值對(duì) ,并設(shè)置它的過期時(shí)間

3、如果存在,則進(jìn)一步判斷“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,則其值加1,并重新設(shè)置過期時(shí)間

4、返回“abc”的生存時(shí)間(毫秒)

這里用的數(shù)據(jù)結(jié)構(gòu)是hash,hash的結(jié)構(gòu)是: key 字段1 值1 字段2 值2 。。。

用在鎖這個(gè)場(chǎng)景下,key就表示鎖的名稱,也可以理解為臨界資源,字段就表示當(dāng)前獲得鎖的線程

所有競(jìng)爭(zhēng)這把鎖的線程都要判斷在這個(gè)key下有沒有自己線程的字段,如果沒有則不能獲得鎖,如果有,則相當(dāng)于重入,字段值加1(次數(shù))

3. 解鎖

protected RFuture unlockInnerAsync(long threadId) {
  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
      "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; " +
      "end;" +
      "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
        "return nil;" +
      "end; " +
      "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
      "if (counter > 0) then " +
        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
        "return 0; " +
      "else " +
        "redis.call('del', KEYS[1]); " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; "+
      "end; " +
      "return nil;",
      Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

我們還是假設(shè)name=abc,假設(shè)線程ID是Thread-1

同理,我們可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存時(shí)間

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面腳本的意思是:

1、判斷是否存在一個(gè)叫“abc”的key

2、如果不存在,向Channel中廣播一條消息,廣播的內(nèi)容是0,并返回1

3、如果存在,進(jìn)一步判斷字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

4、若字段不存在,返回空,若字段存在,則字段值減1

5、若減完以后,字段值仍大于0,則返回0

6、減完后,若字段值小于或等于0,則廣播一條消息,廣播內(nèi)容是0,并返回1;

可以猜測(cè),廣播0表示資源可用,即通知那些等待獲取鎖的線程現(xiàn)在可以獲得鎖了

4. 等待

以上是正常情況下獲取到鎖的情況,那么當(dāng)無法立即獲取到鎖的時(shí)候怎么辦呢?

再回到前面獲取鎖的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return;
  }

  //  訂閱
  RFuture future = subscribe(threadId);
  commandExecutor.syncSubscription(future);

  try {
    while (true) {
      ttl = tryAcquire(leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        break;
      }

      // waiting for message
      if (ttl >= 0) {
        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
        getEntry(threadId).getLatch().acquire();
      }
    }
  } finally {
    unsubscribe(future, threadId);
  }
//    get(lockAsync(leaseTime, unit));
}


protected static final LockPubSub PUBSUB = new LockPubSub();

protected RFuture subscribe(long threadId) {
  return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

protected void unsubscribe(RFuture future, long threadId) {
  PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

這里會(huì)訂閱Channel,當(dāng)資源可用時(shí)可以及時(shí)知道,并搶占,防止無效的輪詢而浪費(fèi)資源

當(dāng)資源可用用的時(shí)候,循環(huán)去嘗試獲取鎖,由于多個(gè)線程同時(shí)去競(jìng)爭(zhēng)資源,所以這里用了信號(hào)量,對(duì)于同一個(gè)資源只允許一個(gè)線程獲得鎖,其它的線程阻塞

5. 小結(jié)

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖

關(guān)于如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。


網(wǎng)頁(yè)名稱:如何在Java中使用Redisson實(shí)現(xiàn)一個(gè)分布式鎖
網(wǎng)頁(yè)鏈接:http://weahome.cn/article/ihogde.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部