小編給大家分享一下使用redis實現(xiàn)分布式鎖的方法,希望大家閱讀完這篇文章后大所收獲,下面讓我們一起去探討吧!
創(chuàng)新互聯(lián)公司2013年成立,公司以成都做網(wǎng)站、成都網(wǎng)站建設、系統(tǒng)開發(fā)、網(wǎng)絡推廣、文化傳媒、企業(yè)宣傳、平面廣告設計等為主要業(yè)務,適用行業(yè)近百種。服務企業(yè)客戶上千多家,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設、創(chuàng)意設計、宣傳推廣等服務。 通過專業(yè)的設計、獨特的風格,為不同客戶提供各種風格的特色服務。
使用Redis實現(xiàn)分布式鎖
redis特性介紹
1、支持豐富的數(shù)據(jù)類型,如String、List、Map、Set、ZSet等。
2、支持數(shù)據(jù)持久化,RDB和AOF兩種方式
3、支持集群工作模式,分區(qū)容錯性強
4、單線程,順序處理命令
5、支持事務
6、支持發(fā)布與訂閱
Redis實現(xiàn)分布式鎖使用了SETNX命令:
SETNX key value
將key的值設為value ,當且僅當key不存在。
若給定的key已經(jīng)存在,則SETNX不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
可用版本:>= 1.0.0時間復雜度:O(1)返回值:
設置成功,返回 1 。
設置失敗,返回 0 。
redis> EXISTS job # job 不存在 (integer) 0 redis> SETNX job "programmer" # job 設置成功 (integer) 1 redis> SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗 (integer) 0 redis> GET job # 沒有被覆蓋 "programmer"
首先,我們需要封裝一個公共的Redis訪問工具類。該類需要注入RedisTemplate實例和ValueOperations實例,使用ValueOperations實例是因為Redis實現(xiàn)的分布式鎖使用了最簡單的String類型。另外,我們需要封裝3個方法,分別是setIfObsent (String key, String value)、 expire (String key, long timeout, TimeUnit unit) 、delete (String key) ,分別對應Redis的SETNX、expire、del命令。以下是Redis訪問工具類的具體實現(xiàn):
@Component public class RedisDao { @Autowired private RedisTemplate redisTemplate; @Resource(name="redisTemplate") private ValueOperations
完成了Redis訪問工具類的實現(xiàn),現(xiàn)在需要考慮的是如何去模擬競爭分布式鎖。因為Redis本身就是支持分布式集群的,所以只需要模擬出多線程處理業(yè)務場景。這里采用線程池來模擬,以下是測試類的具體實現(xiàn):
@RestController @RequestMapping("test") public class TestController { private static final Logger LOG = LoggerFactory.getLogger(TestController.class); //日志對象 @Autowired private RedisDao redisDao; //定義的分布式鎖key private static final String LOCK_KEY = "MyTestLock"; @RequestMapping(value={"testRedisLock"}, method=RequestMethod.GET) public void testRedisLock () { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.submit(new Runnable() { @Override public void run() { //獲取分布式鎖 boolean flag = redisDao.setIfObsent(LOCK_KEY, "lock"); if (flag) { LOG.info(Thread.currentThread().getName() + ":獲取Redis分布式鎖成功"); //獲取鎖成功后設置失效時間 redisDao.expire(LOCK_KEY, 2, TimeUnit.SECONDS); try { LOG.info(Thread.currentThread().getName() + ":處理業(yè)務開始"); Thread.sleep(1000); //睡眠1000ms模擬處理業(yè)務 LOG.info(Thread.currentThread().getName() + ":處理業(yè)務結束"); //處理業(yè)務完成后刪除鎖 redisDao.delete(LOCK_KEY); } catch (InterruptedException e) { LOG.error("處理業(yè)務異常:", e); } } else { LOG.info(Thread.currentThread().getName() + ":獲取Redis分布式鎖失敗"); } } }); } } }
通過上面這段代碼,可能會產(chǎn)生以下幾個疑問:
線程如果獲取分布式鎖失敗,為什么不嘗試重新獲取鎖?
線程獲取分布式鎖成功后,設置了鎖的失效時間,這個失效時間長短如何確定?
線程業(yè)務處理結束后,為什么要做刪除鎖的操作?
針對這幾個疑問,我們可以來討論下。
第一,Redis的SETNX命令,如果key已經(jīng)存在,則不會做任何操作,所以SETNX實現(xiàn)的分布式鎖并不是可重入鎖。當然,也可以自己通過代碼實現(xiàn)重試n次或者直至獲取到分布式鎖為止。但是,這不能保證競爭的公平性,某個線程會因為一直等待鎖而阻塞。因此,Redis實現(xiàn)的分布式鎖更適用于對共享資源一寫多讀的場景。
第二,分布式鎖必須設置失效時間,而且失效時間必須大于業(yè)務處理所需的時間(保證數(shù)據(jù)一致性)。所以,在測試階段盡可能準確的預測出業(yè)務正常處理所需的時間,設置失效時間是防止因為業(yè)務處理過程的某些原因導致死鎖的情況。
第三,業(yè)務處理結束,必須要做刪除鎖的操作。
上面設置分布式鎖和為鎖設置失效時間是通過兩個操作步驟完成的,更合理的方式應該是把設置分布式鎖和為鎖設置失效時間通過一個操作完成。要么都成功,要么都失敗。實現(xiàn)代碼如下:
/** * Redis訪問工具類 */ @Component public class RedisDao { private static Logger logger = LoggerFactory.getLogger(RedisDao.class); @Autowired private StringRedisTemplate stringRedisTemplate; /** * 設置分布式鎖 * @param key 鍵 * @param value 值 * @param timeout 失效時間 * @return */ public boolean setDistributeLock (String key, String value, long timeout) { RedisConnection connection = null; boolean flag = false; try { //獲取一個連接 connection = stringRedisTemplate.getConnectionFactory().getConnection(); //設置分布式鎖的同時為鎖設置失效時間 connection.set(key.getBytes(), value.getBytes(), Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT); flag = true; } catch (Exception e) { logger.error("set automic lock error:", e); } finally { //使用后關閉連接 connection.close(); } return flag; } /** * 查詢key的失效時間 * @param key 鍵 * @param timeUnit 時間單位 * @return */ public long ttl (String key, TimeUnit timeUnit) { return stringRedisTemplate.getExpire(key, timeUnit); } } /** * 單元測試類 */ @RunWith(SpringRunner.class) @SpringBootTest public class Demo1ApplicationTests { private static final Logger LOG = LoggerFactory.getLogger(Demo1ApplicationTests.class); @Autowired private RedisDao redisDao; @Test public void testDistributeLock () { String key = "MyDistributeLock"; //設置分布式鎖,失效時間20s boolean result = redisDao.setDistributeLock(key, "1", 20); if (result) { LOG.info("設置分布式鎖成功"); long ttl = redisDao.ttl(key, TimeUnit.SECONDS); LOG.info("{}距離失效還有{}s", key, ttl); } } }
運行單元測試類,結果如下:
2019-05-15 13:07:10.827 - 設置分布式鎖成功 2019-05-15 13:07:10.838 - MyDistributeLock距離失效還有19s
看完了這篇文章,相信你對使用redis實現(xiàn)分布式鎖的方法有了一定的了解,想了解更多相關知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!