真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖

使用redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖,針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

創(chuàng)新互聯(lián)公司是一家專注于成都做網(wǎng)站、成都網(wǎng)站制作與策劃設(shè)計(jì),威寧網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:威寧等地區(qū)。威寧做網(wǎng)站價(jià)格咨詢:028-86922220

Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時(shí),重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來做Java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細(xì)節(jié)進(jìn)行一一分析。

鎖的接口定義了一下方法:

使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖

分布式鎖當(dāng)中加鎖,我們常用的加鎖接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我們來看一下方法的具體實(shí)現(xiàn):

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return true;
  }
  
  time -= (System.currentTimeMillis() - current);
  if (time <= 0) {
   acquireFailed(threadId);
   return false;
  }
  
  current = System.currentTimeMillis();
  final RFuture subscribeFuture = subscribe(threadId);
  if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
   if (!subscribeFuture.cancel(false)) {
    subscribeFuture.addListener(new FutureListener() {
     @Override
     public void operationComplete(Future future) throws Exception {
      if (subscribeFuture.isSuccess()) {
       unsubscribe(subscribeFuture, threadId);
      }
     }
    });
   }
   acquireFailed(threadId);
   return false;
  }

  try {
   time -= (System.currentTimeMillis() - current);
   if (time <= 0) {
    acquireFailed(threadId);
    return false;
   }
  
   while (true) {
    long currentTime = System.currentTimeMillis();
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     return true;
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time = 0 && ttl < time) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
     acquireFailed(threadId);
     return false;
    }
   }
  } finally {
   unsubscribe(subscribeFuture, threadId);
  }
//  return get(tryLockAsync(waitTime, leaseTime, unit));
 }

首先我們看到調(diào)用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據(jù)鎖名稱的過期時(shí)間TTL來判定的(TTL

下面我們接著看一下tryAcquire的實(shí)現(xiàn):

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
 return get(tryAcquireAsync(leaseTime, unit, threadId));
}

可以看到真正獲取鎖的操作經(jīng)過一層get操作里面執(zhí)行的,這里為何要這么操作,本人也不是太理解,如有理解錯(cuò)誤,歡迎指正。

get 是由CommandAsyncExecutor(一個(gè)線程Executor)封裝的一個(gè)Executor

設(shè)置一個(gè)單線程的同步控制器CountDownLatch,用于控制單個(gè)線程的中斷信息。個(gè)人理解經(jīng)過中間的這么一步:主要是為了支持線程可中斷操作。

public V get(RFuture future) {
 if (!future.isDone()) {
  final CountDownLatch l = new CountDownLatch(1);
  future.addListener(new FutureListener() {
   @Override
   public void operationComplete(Future future) throws Exception {
    l.countDown();
   }
  });
  
  boolean interrupted = false;
  while (!future.isDone()) {
   try {
    l.await();
   } catch (InterruptedException e) {
    interrupted = true;
   }
  }
  
  if (interrupted) {
   Thread.currentThread().interrupt();
  }
 }

 // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個(gè)線程的阻塞問題,每分鐘高達(dá)200毫秒
 // future.awaitUninterruptibly();
 if (future.isSuccess()) {
  return future.getNow();
 }

 throw convertException(future);
}

我們進(jìn)一步往下看:

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
 if (leaseTime != -1) {
  return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 }
 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 ttlRemainingFuture.addListener(new FutureListener() {
  @Override
  public void operationComplete(Future future) throws Exception {
   if (!future.isSuccess()) {
    return;
   }

   Long ttlRemaining = future.getNow();
   // lock acquired
   if (ttlRemaining == null) {
    scheduleExpirationRenewal(threadId);
   }
  }
 });
 return ttlRemainingFuture;
}

首先判斷鎖是否有超時(shí)時(shí)間,有過期時(shí)間的話,會(huì)在后面獲取鎖的時(shí)候設(shè)置進(jìn)去。沒有過期時(shí)間的話,則會(huì)用默認(rèn)的

private long lockWatchdogTimeout = 30 * 1000;

下面我們在進(jìn)一步往下分析真正獲取鎖的操作:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
     "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

我把里面的重點(diǎn)信息做了以下三點(diǎn)總結(jié):

1:真正執(zhí)行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執(zhí)行。

2:鎖真正持久化到Redis時(shí),用的hash類型key field value

3:獲取鎖的三個(gè)參數(shù):getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過期時(shí)間;getLockName則是鎖對應(yīng)的線程級(jí)別的名稱,因?yàn)橹С窒嗤€程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學(xué)可能會(huì)問,這樣不是很縝密:不同的JVM可能會(huì)生成相同的threadId,所以Redission這里加了一個(gè)區(qū)分度很高的UUID;

Lua腳本中的執(zhí)行分為以下三步:

1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時(shí)把邏輯鎖名稱KEYS[1],線程級(jí)別的鎖名稱[ARGV[2],value=1,設(shè)置到redis。并設(shè)置邏輯鎖名稱的過期時(shí)間ARGV[2],返回;

2:如果檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時(shí)會(huì)自增對應(yīng)的value值,記錄重入次數(shù);并更新鎖的過期時(shí)間

3:key不存,直接返回key的剩余過期時(shí)間(-2)

關(guān)于使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。


網(wǎng)站欄目:使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖
標(biāo)題路徑:http://weahome.cn/article/jhpcds.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部