眾所周知,分布式鎖在微服務(wù)架構(gòu)中是重頭戲,尤其是在互聯(lián)網(wǎng)公司,基本上企業(yè)內(nèi)部都會(huì)有自己的一套分布式鎖開發(fā)框架。本文主要介紹使用redis如何構(gòu)建高并發(fā)分布式鎖。
成都創(chuàng)新互聯(lián)從2013年創(chuàng)立,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站建設(shè)、網(wǎng)站制作網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元??底鼍W(wǎng)站,已為上家服務(wù),為??蹈鞯仄髽I(yè)和個(gè)人服務(wù),聯(lián)系電話:18982081108
假設(shè) 存在一個(gè)SpringBoot的控制器,其扣減庫存的業(yè)務(wù)邏輯如下:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
// 將庫存取出來
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
return "finished.";
}
不難看出,在應(yīng)用服務(wù)器運(yùn)行這段代碼的時(shí)候就會(huì)有線程安全性問題。因?yàn)槎鄠€(gè)線程同時(shí)去修改Redis服務(wù)中的數(shù)據(jù)。因此考慮給這段代碼加上一把鎖:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
synchronized (this) {
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
}
return "finished.";
}
這樣一來,當(dāng)多個(gè)HTTP請(qǐng)求來請(qǐng)求數(shù)據(jù)的時(shí)候,多個(gè)線程去修改同一數(shù)據(jù)會(huì)有JVM本地鎖來進(jìn)行合理的資源限制。雖然這樣解決了線程安全性問題,但是這僅僅是JVM級(jí)別的鎖,在分布式的環(huán)境下,由于像這樣的Web應(yīng)用隨時(shí)會(huì)進(jìn)行動(dòng)態(tài)擴(kuò)容,因此當(dāng)多個(gè)應(yīng)用的時(shí)候,同樣會(huì)有線程安全性問題,當(dāng)上面這段代碼遇到類似下面的架構(gòu)時(shí)還是會(huì)有各種各樣的問題:
對(duì)于上述的情況,我們可以使用redis api提供的setnx方法解決:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
// 嘗試獲取鎖
Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");
// 判斷是否獲得鎖
if (!flag) { return "error"; }
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
// 刪除鎖
stringRedisTemplate.delete("Hello");
return "finished.";
}
setnx key value
是將key的值設(shè)置為value,當(dāng)且僅當(dāng)key不存在的時(shí)候。如果設(shè)置成功就返回1,否則就返回0。
這樣的話,首先嘗試獲取鎖,然后當(dāng)業(yè)務(wù)執(zhí)行完成的時(shí)候再刪除鎖。但是還是有問題的,當(dāng)獲取鎖的時(shí)候拋出異?;蛘邩I(yè)務(wù)執(zhí)行拋出異常怎么辦,所以加入異常處理邏輯:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
try {
// 嘗試獲取鎖
Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");
// 判斷是否獲得鎖
if (!flag) { return "error"; }
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
} finally {
// 刪除鎖
stringRedisTemplate.delete("Hello");
}
return "finished.";
}
經(jīng)過這樣的修改,看起來沒什么問題了。但是當(dāng)程序獲得鎖并且開始執(zhí)行業(yè)務(wù)邏輯的時(shí)候,突然程序掛掉了或者被一些粗暴的運(yùn)維工程師給kill,在finally中刪除鎖的邏輯就會(huì)得不到執(zhí)行,因此就會(huì)產(chǎn)生死鎖。對(duì)于這種情況,我們可以給這個(gè)鎖設(shè)置一個(gè)超時(shí)時(shí)間:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
try {
// 嘗試獲取鎖
Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");
// 設(shè)置超時(shí)時(shí)間, 根據(jù)業(yè)務(wù)場(chǎng)景估計(jì)超時(shí)時(shí)長
stringRedisTmplate.expire("Hello", 10, TimeUnit.SECONDS);
// 判斷是否獲得鎖
if (!flag) { return "error"; }
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
} finally {
// 刪除鎖
stringRedisTemplate.delete("Hello");
}
return "finished.";
}
如果程序這么來寫,相對(duì)來說安全一些了,但是還是存在問題。試想一下,當(dāng)獲取鎖成功時(shí),正想給這把鎖設(shè)置超時(shí)的時(shí)候,程序掛掉了,還是會(huì)出現(xiàn)死鎖的,因此在redis較高的版本中提供的setIfAbsent方法中可以同時(shí)設(shè)置鎖的超時(shí)時(shí)間。
Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World", 10, TimeUnit.SECONDS);
這樣一來,嘗試獲取鎖和設(shè)置鎖的超時(shí)時(shí)間就具備原子性了。實(shí)際上經(jīng)過我們這一番改造,這在小型企業(yè)已經(jīng)沒有太大的問題, 因?yàn)橄襁@種代碼每天也就執(zhí)行幾百次,并不算做高并發(fā)的場(chǎng)景。當(dāng)這樣的代碼被暴露在超高并發(fā)場(chǎng)景下的時(shí)候,還是會(huì)存在各種各樣的問題。試想一個(gè)場(chǎng)景,當(dāng)一個(gè)HTTP請(qǐng)求請(qǐng)求到控制器的時(shí)候,應(yīng)用獲取到鎖了,超時(shí)時(shí)間也設(shè)置成功了,但是應(yīng)用的業(yè)務(wù)邏輯超過了超時(shí)時(shí)間,我們這里的超時(shí)時(shí)間設(shè)置的是10秒,當(dāng)應(yīng)用的業(yè)務(wù)邏輯執(zhí)行15秒的時(shí)候,鎖就被redis服務(wù)刪除了。假設(shè)恰好此時(shí)又有一個(gè)HTTP請(qǐng)求來請(qǐng)求控制器,此時(shí)應(yīng)用服務(wù)器會(huì)再啟動(dòng)一個(gè)線程來獲取鎖,而且還獲取成功了,但是這次的HTTP請(qǐng)求對(duì)應(yīng)的業(yè)務(wù)邏輯還沒有執(zhí)行完。新來的TTTP請(qǐng)求也在執(zhí)行,由于新來的HTTP請(qǐng)求也在執(zhí)行,因?yàn)殒i超時(shí)后被刪除,新的HTTP請(qǐng)求也成功獲取鎖了。當(dāng)原來的HTTP請(qǐng)求對(duì)應(yīng)的業(yè)務(wù)邏輯執(zhí)行完成以后,嘗試刪除鎖,這樣正好刪除的是新來的HTTP請(qǐng)求對(duì)應(yīng)的鎖。這個(gè)時(shí)候redis中又沒有鎖了,這樣第三個(gè)HTTP請(qǐng)求又會(huì)獲得鎖,所以情況就不妙了。
為了解決上面的問題,我們可以將代碼優(yōu)化為下面的樣子:
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
String clientUuid = UUID.randomUUID().toString();
try {
// 嘗試獲取鎖,設(shè)置超時(shí)時(shí)間, 根據(jù)業(yè)務(wù)場(chǎng)景估計(jì)超時(shí)時(shí)長
Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", clientUuid, 10, TimeUnit.SECONDS);
// 判斷是否獲得鎖
if (!flag) { return "error"; }
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
} finally {
// 刪除鎖的時(shí)候判斷是不是自己的鎖
if (clientUuid.equals(stringRedisTemplate.opsForValue().get("Hello"))) {
stringRedisTemplate.delete("Hello");
}
}
return "finished.";
}
但是由于程序的不可預(yù)知性,誰也不能保證極端情況下,同時(shí)會(huì)有多個(gè)線程同時(shí)執(zhí)行這段業(yè)務(wù)邏輯。我們可以在當(dāng)執(zhí)行業(yè)務(wù)邏輯的時(shí)候同時(shí)開一個(gè)定時(shí)器線程,每隔幾秒就重新將這把鎖設(shè)置為10秒,也就是給這把鎖進(jìn)行“續(xù)命”。這樣就用擔(dān)心業(yè)務(wù)邏輯到底執(zhí)行多長時(shí)間了。但是這樣程序的復(fù)雜性就會(huì)增加,每個(gè)業(yè)務(wù)邏輯都要寫好多的代碼,因此這里推薦在分布式環(huán)境下使用redisson。因此我們使用redisson實(shí)現(xiàn)分支線程的代碼:
org.redisson
redisson
3.6.5
@Bean
public Redisson redisson () {
Config cfg = new Config();
cfg.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
return (Redisson) Redisson.create(cfg);
}
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
// 獲取鎖對(duì)象
RLock lock = redisson.getLock("Hello");
try {
// 嘗試加鎖, 默認(rèn)30秒, 自動(dòng)后臺(tái)開一個(gè)線程實(shí)現(xiàn)鎖的續(xù)命
lock.tryLock();
int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));
// 判斷庫存夠不夠減
if (stock > 0) {
// 將庫存回寫到redis
int tmp = stock - 1;
stringRedisTemplate.opsForValue().set("stock", tmp.toString());
logger.info("庫存扣減成功");
} else {
logger.info("庫存扣減失敗");
}
} finally {
// 釋放鎖
lock.unlock();
}
return "finished.";
}
Redisson分布式鎖的實(shí)現(xiàn)原理如下:
但是這個(gè)架構(gòu)還是存在問題的,因?yàn)閞edis服務(wù)器是主從的架構(gòu),當(dāng)在master節(jié)點(diǎn)設(shè)置鎖之后,slave節(jié)點(diǎn)會(huì)立刻同步。但是如果剛在master節(jié)點(diǎn)設(shè)置上了鎖,slave節(jié)點(diǎn)還沒來得及設(shè)置,master節(jié)點(diǎn)就掛掉了。還是會(huì)產(chǎn)生上同樣的問題,新的線程獲得鎖。
因此使用redis構(gòu)建高并發(fā)的分布式鎖,僅適合單機(jī)架構(gòu),當(dāng)使用主從架構(gòu)的redis時(shí)還是會(huì)出現(xiàn)線程安全性問題。