使用redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖,針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
創(chuàng)新互聯(lián)公司是一家專注于成都做網(wǎng)站、成都網(wǎng)站制作與策劃設(shè)計(jì),威寧網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:威寧等地區(qū)。威寧做網(wǎng)站價(jià)格咨詢:028-86922220
Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時(shí),重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來做Java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細(xì)節(jié)進(jìn)行一一分析。
鎖的接口定義了一下方法:
分布式鎖當(dāng)中加鎖,我們常用的加鎖接口:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面我們來看一下方法的具體實(shí)現(xiàn):
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); final RFuture subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time = 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
首先我們看到調(diào)用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據(jù)鎖名稱的過期時(shí)間TTL來判定的(TTL
下面我們接著看一下tryAcquire的實(shí)現(xiàn):
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
可以看到真正獲取鎖的操作經(jīng)過一層get操作里面執(zhí)行的,這里為何要這么操作,本人也不是太理解,如有理解錯(cuò)誤,歡迎指正。
get 是由CommandAsyncExecutor(一個(gè)線程Executor)封裝的一個(gè)Executor
設(shè)置一個(gè)單線程的同步控制器CountDownLatch,用于控制單個(gè)線程的中斷信息。個(gè)人理解經(jīng)過中間的這么一步:主要是為了支持線程可中斷操作。
public V get(RFuture future) { if (!future.isDone()) { final CountDownLatch l = new CountDownLatch(1); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { l.countDown(); } }); boolean interrupted = false; while (!future.isDone()) { try { l.await(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個(gè)線程的阻塞問題,每分鐘高達(dá)200毫秒 // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); }
我們進(jìn)一步往下看:
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
首先判斷鎖是否有超時(shí)時(shí)間,有過期時(shí)間的話,會(huì)在后面獲取鎖的時(shí)候設(shè)置進(jìn)去。沒有過期時(shí)間的話,則會(huì)用默認(rèn)的
private long lockWatchdogTimeout = 30 * 1000;
下面我們在進(jìn)一步往下分析真正獲取鎖的操作:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我把里面的重點(diǎn)信息做了以下三點(diǎn)總結(jié):
1:真正執(zhí)行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執(zhí)行。
2:鎖真正持久化到Redis時(shí),用的hash類型key field value
3:獲取鎖的三個(gè)參數(shù):getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過期時(shí)間;getLockName則是鎖對應(yīng)的線程級(jí)別的名稱,因?yàn)橹С窒嗤€程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學(xué)可能會(huì)問,這樣不是很縝密:不同的JVM可能會(huì)生成相同的threadId,所以Redission這里加了一個(gè)區(qū)分度很高的UUID;
Lua腳本中的執(zhí)行分為以下三步:
1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時(shí)把邏輯鎖名稱KEYS[1],線程級(jí)別的鎖名稱[ARGV[2],value=1,設(shè)置到redis。并設(shè)置邏輯鎖名稱的過期時(shí)間ARGV[2],返回;
2:如果檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時(shí)會(huì)自增對應(yīng)的value值,記錄重入次數(shù);并更新鎖的過期時(shí)間
3:key不存,直接返回key的剩余過期時(shí)間(-2)
關(guān)于使用Redisson怎么實(shí)現(xiàn)一個(gè)分布式鎖問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。