redis分布式鎖有哪些?這個問題可能是我們?nèi)粘W習或工作經(jīng)常見到的。希望通過這個問題能讓你收獲頗深。下面是小編給大家?guī)淼膮⒖純?nèi)容,讓我們一起來看看吧!
創(chuàng)新互聯(lián)自成立以來,一直致力于為企業(yè)提供從網(wǎng)站策劃、網(wǎng)站設計、成都做網(wǎng)站、成都網(wǎng)站設計、電子商務、網(wǎng)站推廣、網(wǎng)站優(yōu)化到為企業(yè)提供個性化軟件開發(fā)等基于互聯(lián)網(wǎng)的全面整合營銷服務。公司擁有豐富的網(wǎng)站建設和互聯(lián)網(wǎng)應用系統(tǒng)開發(fā)管理經(jīng)驗、成熟的應用系統(tǒng)解決方案、優(yōu)秀的網(wǎng)站開發(fā)工程師團隊及專業(yè)的網(wǎng)站設計師團隊。
我們通常使用的synchronized或者Lock都是線程鎖,對同一個JVM進程內(nèi)的多個線程有效。因為鎖的本質(zhì) 是內(nèi)存中存放一個標記,記錄獲取鎖的線程是誰,這個標記對每個線程都可見。然而我們啟動的多個訂單服務,就是多個JVM,內(nèi)存中的鎖顯然是不共享的,每個JVM進程都有自己的 鎖,自然無法保證線程的互斥了,這個時候我們就需要使用到分布式鎖了。常用的有三種解決方案:1.基于數(shù)據(jù)庫實現(xiàn) 2.基于zookeeper的臨時序列化節(jié)點實現(xiàn) 3.redis實現(xiàn)。本文我們介紹的就是redis的實現(xiàn)方式。
實現(xiàn)分布式鎖要滿足3點:多進程可見,互斥,可重入。
1)多進程可見
redis本身就是基于JVM之外的,因此滿足多進程可見的要求。
2)互斥
即同一時間只能有一個進程獲取鎖標記,我們可以通過redis的setnx實現(xiàn),只有第一次執(zhí)行的才會成功并返回1,其它情況返回0。
釋放鎖
釋放鎖其實只需要把鎖的key刪除即可,使用del xxx指令。不過,如果在我們執(zhí)行del之前,服務突然宕機,那么鎖就永遠無法刪除了。所以我們可以通過setex 命令設置過期時間即可。
import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/** * 第一種分布式鎖 */@Componentpublic class RedisService {private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired JedisPool jedisPool; // 獲取鎖之前的超時時間(獲取鎖的等待重試時間) private long acquireTimeout = 5000; // 獲取鎖之后的超時時間(防止死鎖) private int timeOut = 10000; /** * 獲取分布式鎖 * @return 鎖標識 */ public boolean getRedisLock(String lockName,String val) { Jedis jedis = null; try { jedis = jedisPool.getResource(); // 1.計算獲取鎖的時間 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.嘗試獲取鎖 while (System.currentTimeMillis() < endTime) { // 3. 獲取鎖成功就設置過期時間 if (jedis.setnx(lockName, val) == 1) { jedis.expire(lockName, timeOut/1000); return true; } } } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } return false; } /** * 釋放分布式鎖 * @param lockName 鎖名稱 */ public void unRedisLock(String lockName) { Jedis jedis = null; try { jedis = jedisPool.getResource(); // 釋放鎖 jedis.del(lockName); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } }// =============================================== public String get(String key) { Jedis jedis = null; String value = null; try { jedis = jedisPool.getResource(); value = jedis.get(key); log.info(value); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } return value; } public void set(String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } } /** * 關(guān)閉連接 */ public void returnResource(Jedis jedis) { try { if(jedis!=null) jedis.close(); } catch (Exception e) { } } }
上面的分布式鎖實現(xiàn)了,但是這時候還可能出現(xiàn)另外2個問題:
一:獲取鎖時
setnx獲取鎖成功了,還沒來得及setex服務就宕機了,由于這種非原子性的操作,死鎖又發(fā)生了。其實redis提供了 nx 與 ex連用的命令。
二:釋放鎖時
1. 3個進程:A和B和C,在執(zhí)行任務,并爭搶鎖,此時A獲取了鎖,并設置自動過期時間為10s
2. A開始執(zhí)行業(yè)務,因為某種原因,業(yè)務阻塞,耗時超過了10秒,此時鎖自動釋放了
3. B恰好此時開始嘗試獲取鎖,因為鎖已經(jīng)自動釋放,成功獲取鎖
4. A此時業(yè)務執(zhí)行完畢,執(zhí)行釋放鎖邏輯(刪除key),于是B的鎖被釋放了,而B其實還在執(zhí)行業(yè)務
5. 此時進程C嘗試獲取鎖,也成功了,因為A把B的鎖刪除了。
問題出現(xiàn)了:B和C同時獲取了鎖,違反了互斥性!如何解決這個問題呢?我們應該在刪除鎖之前,判斷這個鎖是否是自己設置的鎖,如果不是(例如自己 的鎖已經(jīng)超時釋放),那么就不要刪除了。所以我們可以在set 鎖時,存入當前線程的唯一標識!刪除鎖前,判斷下里面的值是不是與自己標識釋放一 致,如果不一致,說明不是自己的鎖,就不要刪除了。
/** * 第二種分布式鎖 */public class RedisTool { private static final String LOCK_SUCCESS = "OK"; private static final Long RELEASE_SUCCESS = 1L; /** * 嘗試獲取分布式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 釋放分布式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { if (jedis.get(lockKey).equals(requestId)) { System.out.println("釋放鎖..." + Thread.currentThread().getName() + ",identifierValue:" + requestId); jedis.del(lockKey); return true; } return false; } }
按照上面方式實現(xiàn)分布式鎖之后,就可以輕松解決大部分問題了。網(wǎng)上很多博客也都是這么實現(xiàn)的,但是仍然有些場景是不滿足的,例如一個方法獲取到鎖之后,可能在方法內(nèi)調(diào)這個方法此時就獲取不到鎖了。這個時候我們就需要把鎖改進成可重入式鎖了。
3)重入鎖:
也叫做遞歸鎖,指的是在同一線程內(nèi),外層函數(shù)獲得鎖之后,內(nèi)層遞歸函數(shù)仍然可以獲取到該鎖。換一種說法:同一個線程再次進入同步代碼時,可以使用自己已獲取到的鎖??芍厝腈i可以避免因同一線程中多次獲取鎖而導致死鎖發(fā)生。像synchronized就是一個重入鎖,它是通過moniter函數(shù)記錄當前線程信息來實現(xiàn)的。實現(xiàn)可重入鎖需要考慮兩點:
獲取鎖:首先嘗試獲取鎖,如果獲取失敗,判斷這個鎖是否是自己的,如果是則允許再次獲取, 而且必須記錄重復獲取鎖的次數(shù)。
釋放鎖:釋放鎖不能直接刪除了,因為鎖是可重入的,如果鎖進入了多次,在內(nèi)層直接刪除鎖, 導致外部的業(yè)務在沒有鎖的情況下執(zhí)行,會有安全問題。因此必須獲取鎖時累計重入的次數(shù),釋放時則減去重入次數(shù),如果減到0,則可以刪除鎖。
下面我們假設鎖的key為“ lock ”,hashKey是當前線程的id:“ threadId ”,鎖自動釋放時間假設為20 獲取鎖的步驟: 1、判斷l(xiāng)ock是否存在 EXISTS lock 2、不存在,則自己獲取鎖,記錄重入層數(shù)為1. 2、存在,說明有人獲取鎖了,下面判斷是不是自己的鎖,即判斷當前線程id作為hashKey是否存在:HEXISTS lock threadId 3、不存在,說明鎖已經(jīng)有了,且不是自己獲取的,鎖獲取失敗. 3、存在,說明是自己獲取的鎖,重入次數(shù)+1: HINCRBY lock threadId 1 ,最后更新鎖自動釋放時間, EXPIRE lock 20 釋放鎖的步驟: 1、判斷當前線程id作為hashKey是否存在: HEXISTS lock threadId 2、不存在,說明鎖已經(jīng)失效,不用管了 2、存在,說明鎖還在,重入次數(shù)減1: HINCRBY lock threadId -1 , 3、獲取新的重入次數(shù),判斷重入次數(shù)是否為0,為0說明鎖全部釋放,刪除key: DEL lock
因此,存儲在鎖中的信息就必須包含:key、線程標識、重入次數(shù)。不能再使用簡單的key-value結(jié)構(gòu), 這里推薦使用hash結(jié)構(gòu)。
獲取鎖的腳本(注釋刪掉,不然運行報錯)
local key = KEYS[1]; -- 第1個參數(shù),鎖的keylocal threadId = ARGV[1]; -- 第2個參數(shù),線程唯一標識local releaseTime = ARGV[2]; -- 第3個參數(shù),鎖的自動釋放時間if(redis.call('exists', key) == 0) then -- 判斷鎖是否已存在 redis.call('hset', key, threadId, '1'); -- 不存在, 則獲取鎖 redis.call('expire', key, releaseTime); -- 設置有效期 return 1; -- 返回結(jié)果end;if(redis.call('hexists', key, threadId) == 1) then -- 鎖已經(jīng)存在,判斷threadId是否是自己 redis.call('hincrby', key, threadId, '1'); -- 如果是自己,則重入次數(shù)+1 redis.call('expire', key, releaseTime); -- 設置有效期 return 1; -- 返回結(jié)果end;return 0; -- 代碼走到這里,說明獲取鎖的不是自己,獲取鎖失敗
釋放鎖的腳本(注釋刪掉,不然運行報錯)
local key = KEYS[1]; -- 第1個參數(shù),鎖的keylocal threadId = ARGV[1]; -- 第2個參數(shù),線程唯一標識if (redis.call('HEXISTS', key, threadId) == 0) then -- 判斷當前鎖是否還是被自己持有 return nil; -- 如果已經(jīng)不是自己,則直接返回end;local count = redis.call('HINCRBY', key, threadId, -1); -- 是自己的鎖,則重入次數(shù)-1if (count == 0) then -- 判斷是否重入次數(shù)是否已經(jīng)為0 redis.call('DEL', key); -- 等于0說明可以釋放鎖,直接刪除 return nil; end;
完整代碼
import java.util.Collections;import java.util.UUID;import org.springframework.core.io.ClassPathResource;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.scripting.support.ResourceScriptSource;/** * Redis可重入鎖 */public class RedisLock { private static final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class); private static final DefaultRedisScriptLOCK_SCRIPT; private static final DefaultRedisScript
感謝各位的閱讀!看完上述內(nèi)容,你們對Redis分布式鎖有哪些大概了解了嗎?希望文章內(nèi)容對大家有所幫助。如果想了解更多相關(guān)文章內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。