本文小編為大家詳細介紹“基于curator怎么實現(xiàn)分布式鎖”,內(nèi)容詳細,步驟清晰,細節(jié)處理妥當(dāng),希望這篇“基于curator怎么實現(xiàn)分布式鎖”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識吧。
創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供漣源網(wǎng)站建設(shè)、漣源做網(wǎng)站、漣源網(wǎng)站設(shè)計、漣源網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、漣源企業(yè)網(wǎng)站模板建站服務(wù),十余年漣源做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。
1、鎖的應(yīng)用場景:
在單體應(yīng)用中,我們會使用ReentrantLock或Synchronized來應(yīng)對并發(fā)場景。比如最常見的賣票場景,假如總共有100張票,線程A和線程B同時操作,如下圖:
這時有一個共享變量100,線程A和B將100拷貝到自己的工作內(nèi)存中,當(dāng)線程A搶到執(zhí)行權(quán)的時候,此時A工作內(nèi)存中的值是100,然后售票,進行自減操作,將自己工作內(nèi)存中的值變成了99。當(dāng)A還沒來得及將99刷回到主內(nèi)存的時候,線程B進來了,此時B拿到的主內(nèi)存的值還是100,然后售票,進行自減,也是99。這就出現(xiàn)了同一張票出售了兩次的情況。所以我們會加鎖加volatile保證原子性保證可見性。
2、分布式鎖是什么?
上面的場景中,我們可以通過ReentrantLock或者Synchronized搞定,因為你的項目只運行在一臺服務(wù)器上,只有一個JVM,所有的共享變量都加載到同一個主內(nèi)存中。而分布式應(yīng)用中,一個項目部署在多臺服務(wù)器上,最基本的架構(gòu)如下圖:
比如現(xiàn)在server1、server2和server3讀取到數(shù)據(jù)庫的票數(shù)都是100,在每一個server中,我們可以用JDK的鎖來保證多個用戶同時訪問我這臺server時不會出問題。但問題是,如果client1訪問到的是server1,票數(shù)是100,然后購票,還沒來得及將數(shù)據(jù)庫票數(shù)改為99,client2也開始訪問系統(tǒng)購票了,client2如果訪問的是server1,自然不會出問題,如果訪問的是server2,這時server2讀取到數(shù)據(jù)庫的票數(shù)還是100,那么就出問題了,又出現(xiàn)了同一張票賣了兩次的情況。在分布式應(yīng)用中,JDK的鎖機制就無法滿足需求了,所以就出現(xiàn)了分布式鎖。
3、分布式鎖應(yīng)該滿足的條件:
4、分布式鎖的實現(xiàn)方式:
set key value NX EX 30000
;也可以用redis的第三方庫比如Redisson1、建表:
CREATE TABLE `tb_distributed_lock` (
`dl_id` INT NOT NULL auto_increment COMMENT '主鍵,自增',
`dl_method_name` VARCHAR (64) NOT NULL DEFAULT '' COMMENT '方法名',
`dl_device_info` VARCHAR (100) NOT NULL DEFAULT '' COMMENT 'ip+線程id',
`dl_operate_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '數(shù)據(jù)被操作的時間',
PRIMARY KEY (`dl_id`),
UNIQUE KEY `uq_method_name` (`dl_method_name`) USING BTREE
) ENGINE = INNODB DEFAULT charset = utf8 COMMENT = '分布式鎖表';
2、思路:
當(dāng)執(zhí)行一個方法的時候,我們首先嘗試往表中插入一條數(shù)據(jù)。如果插入成功,則占鎖成功,繼續(xù)往下執(zhí)行,執(zhí)行完刪除該記錄。如果插入失敗,我們再以當(dāng)前方法名、當(dāng)前機器ip+線程id、數(shù)據(jù)被操作時間為5分鐘內(nèi)(5分鐘表示鎖失效的時間)
為條件去查詢,如果有記錄,表示該機器的該線程在5分鐘內(nèi)占有過鎖了,直接往下執(zhí)行最后刪除記錄;如果沒有記錄,占有鎖失敗。一個用戶就是一個線程,所以我們可以把機器ip和用戶id組合一起當(dāng)成dl_device_info
。
3、占有鎖和釋放鎖:
INSERT INTO tb_distributed_lock (
dl_method_name,
dl_device_info
)
VALUES
('方法名', 'ip&用戶id');
如果insert失敗,則:
SELECT
count(*)
FROM
tb_distributed_lock
WHERE
dl_method_name = '方法名'
AND dl_device_info = 'ip&用戶id'
AND dl_operate_time < SYSDATE() - 5;
DELETE
FROM
tb_distributed_lock
WHERE
dl_method_name = '方法名'
AND dl_device_info = 'ip&用戶id';
4、小總結(jié):
以上表結(jié)構(gòu)可能并不是很好,只是提供了這么一個思路。下面說它的優(yōu)缺點:
1、原理:
基于redis的set key value nx ex 30
,這條語句的意思就是如果key不存在就設(shè)置,并且過期時間為30s,如果key已經(jīng)存在就會返回false。如果要以毫秒為單位,把ex
換成px
就好了。我們執(zhí)行方法前,先將方法名當(dāng)成key,執(zhí)行這條語句,如果執(zhí)行成功就是獲取鎖成功,執(zhí)行失敗就是獲取鎖失敗。
2、代碼實現(xiàn):
/**
* key不存在時就設(shè)置,返回true,key已存在就返回false
* @param key
* @param value
* @param timeout
* @return
*/
public static boolean setIfAbsent(String key, String value, Long timeout) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
}
/**
* 獲取key-value
* @param key
* @return
*/
public static String getString(String key) {
return (String) redisTemplate.opsForValue().get(key);
}
/**
* 刪除key
* @param key
* @return
*/
public static boolean delKey(String key) {
return redisTemplate.delete(key);
}
public String hello() {
// 方法名當(dāng)作key
String key = "hello";
String value = "hellolock";
if (RedisUtil.setIfAbsent(key, value, 60 * 2L)) {
System.out.println("成功獲取到鎖,開始執(zhí)行業(yè)務(wù)邏輯……");
// 假如執(zhí)行業(yè)務(wù)邏輯需要1分鐘
try {TimeUnit.MINUTES.sleep(1L); } catch (Exception e) { e.printStackTrace();};
// 釋放鎖先校驗value,避免釋放錯
if (value.equals(RedisUtil.getString(key))) {
RedisUtil.delKey(key);
System.out.println("執(zhí)行完業(yè)務(wù)邏輯,釋放鎖成功");
}
return "success";
} else {
System.out.println("鎖被別的線程占有,獲取鎖失敗");
return "acquire lock failed";
}
}
3、小總結(jié):
優(yōu)點:簡單易用,一條redis命令就搞定。可以設(shè)置過期時間,避免釋放鎖失敗造成其他線程長時間無法獲取鎖的問題。
缺點:這種做法只適合redis是單機的時候,如果redis有集群,這樣做就會出問題。假如一個線程在master上獲取鎖成功了,在master還沒來得及將數(shù)據(jù)同步到slave上的時候,master掛了,slave升級為master。第二個線程進來嘗試獲取鎖,因為新的master上并沒有這個key,所以,也能成功獲取到鎖。
解決辦法:針對上面的缺點,我們可以采用redis的RedLock算法。假如集群中有n個redis
,我們先從這n個redis中嘗試獲取鎖(鎖的過期時間為x
),并記錄獲取鎖的消耗的總時間t
,獲取鎖成功數(shù)量為s
,當(dāng)且僅當(dāng)t < x 并且 s >= (n/2 + 1)
時,認為獲取鎖成功。
1、是什么?
官網(wǎng)地址:https://github.com/redisson/redisson/wiki/Table-of-Content Redisson是一個功能十分強大的redis客戶端,封裝了很多分布式操作,比如分布式對象、分布式集合、分布式鎖等。它的分布式鎖也很多,什么公平鎖、可重入鎖、redlock等一應(yīng)俱全,下面來看看如何在springboot項目中使用它。
2、使用redisson做分布式鎖:
org.redisson
redisson-spring-boot-starter
3.12.3
io.netty
netty-all
spring:
application:
name: distributed-lock
redis:
# redis單機版的寫法
host: 192.168.2.43
port: 6379
# 集群的寫法
#cluster:
#nodes:
#- 192.168.0.106,192.168.0.107
#哨兵的寫法
#sentinel:
#master: 192.168.0.106
#nodes:
#- 192.168.0.107,192.168.0.108
@Autowired
private RedissonClient redisson;
/**
* 未設(shè)置過期時間,沒獲取到就會一直阻塞著
* @return
*/
@GetMapping("/testLock")
public String testLock() {
log.info("進入testLock方法,開始獲取鎖");
String key = "testLock";
RLock lock = redisson.getLock(key);
lock.lock();
log.info("獲取鎖成功,開始執(zhí)行業(yè)務(wù)邏輯……");
try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
log.info("執(zhí)行完業(yè)務(wù)邏輯,釋放鎖");
lock.unlock();
return "success";
}
/**
* 嘗試獲取鎖,沒獲取到就直接失敗,不會阻塞
* @return
*/
@GetMapping("/testTryLock")
public String testTryLock() {
log.info("進入testTryLock方法,開始獲取鎖");
String key = "testTryLock";
RLock lock = redisson.getLock(key);
boolean res = lock.tryLock();
if (!res) {
log.error("嘗試獲取鎖失敗");
return "fail";
} else {
log.info("獲取鎖成功,開始執(zhí)行業(yè)務(wù)邏輯……");
try {TimeUnit.SECONDS.sleep(30L); } catch (Exception e) { e.printStackTrace();};
log.info("執(zhí)行完業(yè)務(wù)邏輯,釋放鎖");
lock.unlock();
return "success";
}
}
/**
* 鎖設(shè)置了過期時間,即使最后面的unlock失敗,20秒后也會自動釋放鎖
* @return
*/
@GetMapping("/testLockTimeout")
public String testLockTimeout() {
log.info("進入testLockTimeout方法,開始獲取鎖");
String key = "testLockTimeout";
RLock lock = redisson.getLock(key);
// 20秒后自動釋放鎖
lock.lock(20, TimeUnit.SECONDS);
log.info("獲取鎖成功,開始執(zhí)行業(yè)務(wù)邏輯……");
try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
lock.unlock();
return "success";
}
/**
* 嘗試獲取鎖,15秒還沒獲取到就獲取鎖失?。猾@取到了會持有20秒,20秒后自動釋放鎖
* @return
*/
@GetMapping("/testTryLockTimeout")
public String testTryLockTimeout() {
log.info("進入testTryLockTimeout方法,開始獲取鎖");
String key = "testTryLockTimeout";
RLock lock = redisson.getLock(key);
boolean res = false;
try {
res = lock.tryLock(15, 20, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
if (!res) {
log.error("嘗試獲取鎖失敗");
return "fail";
} else {
log.info("獲取鎖成功,開始執(zhí)行業(yè)務(wù)邏輯……");
try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
log.info("執(zhí)行完業(yè)務(wù)邏輯,釋放鎖");
lock.unlock();
return "success";
}
}
3、小總結(jié):
以上就是使用redisson做分布式鎖的簡單demo,用起來十分的方便。上面是與springboot項目集成,直接用它提供的springboot的starter就好了。用它來做分布式鎖的更多用法請移步至官網(wǎng):redisson分布式鎖。
1、zookeeper知識點回顧:
zookeeper有四種類型的節(jié)點:
持久節(jié)點:默認的節(jié)點類型,客戶端與zookeeper斷開連接后,節(jié)點依然存在
持久順序節(jié)點:首先是持久節(jié)點,順序的意思是,zookeeper會根據(jù)節(jié)點創(chuàng)建的順序編號
臨時節(jié)點:客戶端與zookeeper斷開連接后節(jié)點不復(fù)存在
臨時順序節(jié)點:客戶端與zookeeper斷開連接后節(jié)點不復(fù)存在,zookeeper會根據(jù)節(jié)點創(chuàng)建的順序編號
2、基于zookeeper實現(xiàn)分布式鎖的原理:
我們正是利用了zookeeper的臨時順序節(jié)點來實現(xiàn)分布式鎖。首先我們創(chuàng)建一個名為lock
(節(jié)點名稱隨意)的持久節(jié)點。線程1獲取鎖時,就在lock
下面創(chuàng)建一個名為lock1
的臨時順序節(jié)點,然后查找lock
下所有的節(jié)點,判斷自己的lock1
是不是第一個,如果是,獲取鎖成功,繼續(xù)執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完后刪除lock1
節(jié)點;如果不是第一個,獲取鎖失敗,就watch排在自己前面一位的節(jié)點,當(dāng)排在自己前一位的節(jié)點被干掉時,再檢查自己是不是排第一了,如果是,獲取鎖成功。圖解過程如下:
線程1創(chuàng)建了一個lock1,發(fā)現(xiàn)lock1的第一個節(jié)點,占鎖成功;在線程1還沒釋放鎖的時候,線程2來了,創(chuàng)建了一個lock2,發(fā)現(xiàn)lock2不是第一個,便監(jiān)控lock1,線程3此時進行就監(jiān)控lock2。直到自己是第一個節(jié)點時才占鎖成功。假如某個線程釋放鎖的時候zookeeper崩了也沒關(guān)系,因為是臨時節(jié)點,斷開連接節(jié)點就沒了,其他線程還是可以正常獲取鎖,這就是要用臨時節(jié)點的原因。
說清楚了原理,用代碼實現(xiàn)也就不難了,可以引入zookeeper的客戶端zkClient
,自己寫代碼實現(xiàn)(偷個懶,自己就不寫了,有興趣的可以參考我zookeeper的文章,肯定可以自己寫出來的)。不過有非常優(yōu)秀的開源解決方案比如curator,下面就看看curator怎么用。
1、springboot整合curator:
org.apache.zookeeper
zookeeper
3.4.14
org.apache.curator
curator-framework
4.2.0
org.apache.curator
curator-recipes
4.2.0
org.seleniumhq.selenium
selenium-java
curator:
retryCount: 5 # 連接失敗的重試次數(shù)
retryTimeInterval: 5000 # 每隔5秒重試一次
url: 192.168.2.43:2181 # zookeeper連接地址
sessionTimeout: 60000 # session超時時間1分鐘
connectionTimeout: 5000 # 連接超時時間5秒鐘
@Configuration
public class CutatorConfig {
@Value("${curator.retryCount}")
private Integer retryCount;
@Value("${curator.retryTimeInterval}")
private Integer retryTimeInterval;
@Value("${curator.url}")
private String url;
@Value("${curator.sessionTimeout}")
private Integer sessionTimeout;
@Value("${curator.connectionTimeout}")
private Integer connectionTimeout;
@Bean
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(url, sessionTimeout, connectionTimeout,
new RetryNTimes(retryCount, retryTimeInterval));
}
}
@SpringBootTest(classes = {DistributedLockApplication.class})
@RunWith(SpringRunner.class)
public class DistributedLockApplicationTests {
@Autowired
private CuratorFramework curatorFramework;
@Test
public void contextLoads() {
curatorFramework.start();
try {
curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/zhusl", "test".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
在確保zookeeper成功啟動了的情況下,執(zhí)行這個單元測試,最后回到linux中,用zkCli.sh連接,查看是否成功創(chuàng)建節(jié)點。
2、使用Curator做分布式鎖:
Curator封裝了很多鎖,比如可重入共享鎖、不可重入共享鎖、可重入讀寫鎖、聯(lián)鎖等。具體可以參考官網(wǎng):curator分布式鎖的用法。
@Component
@Slf4j
public class ZookeeperUtil {
private static CuratorFramework curatorFramework;
private static InterProcessLock lock;
/** 持久節(jié)點 */
private final static String ROOT_PATH = "/lock/";
/** 可重入共享鎖 */
private static InterProcessMutex interProcessMutex;
/** 不可重入共享鎖 */
private static InterProcessSemaphoreMutex interProcessSemaphoreMutex;
/** 可重入讀寫鎖 */
private static InterProcessReadWriteLock interProcessReadWriteLock;
/** 多共享鎖(將多把鎖當(dāng)成一把來用) */
private static InterProcessMultiLock interProcessMultiLock;
@Autowired
private void setCuratorFramework(CuratorFramework curatorFramework) {
ZookeeperUtil.curatorFramework = curatorFramework;
ZookeeperUtil.curatorFramework.start();
}
/**
* 獲取可重入排他鎖
*
* @param lockName
* @return
*/
public static boolean interProcessMutex(String lockName) {
interProcessMutex = new InterProcessMutex(curatorFramework, ROOT_PATH + lockName);
lock = interProcessMutex;
return acquireLock(lockName, lock);
}
/**
* 獲取不可重入排他鎖
*
* @param lockName
* @return
*/
public static boolean interProcessSemaphoreMutex(String lockName) {
interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(curatorFramework, ROOT_PATH + lockName);
lock = interProcessSemaphoreMutex;
return acquireLock(lockName, lock);
}
/**
* 獲取可重入讀鎖
*
* @param lockName
* @return
*/
public static boolean interProcessReadLock(String lockName) {
interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_PATH + lockName);
lock = interProcessReadWriteLock.readLock();
return acquireLock(lockName, lock);
}
/**
* 獲取可重入寫鎖
*
* @param lockName
* @return
*/
public static boolean interProcessWriteLock(String lockName) {
interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_PATH + lockName);
lock = interProcessReadWriteLock.writeLock();
return acquireLock(lockName, lock);
}
/**
* 獲取聯(lián)鎖(多把鎖當(dāng)成一把來用)
* @param lockNames
* @return
*/
public static boolean interProcessMultiLock(List lockNames) {
if (lockNames == null || lockNames.isEmpty()) {
log.error("no lockNames found");
return false;
}
interProcessMultiLock = new InterProcessMultiLock(curatorFramework, lockNames);
try {
if (!interProcessMultiLock.acquire(10, TimeUnit.SECONDS)) {
log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail");
return false;
} else {
log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success");
return true;
}
} catch (Exception e) {
log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
return false;
}
}
/**
* 釋放鎖
*
* @param lockName
*/
public static void releaseLock(String lockName) {
try {
if (lock != null && lock.isAcquiredInThisProcess()) {
lock.release();
curatorFramework.delete().inBackground().forPath(ROOT_PATH + lockName);
log.info("Thread:" + Thread.currentThread().getId() + " release lock success");
}
} catch (Exception e) {
log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
}
}
/**
* 釋放聯(lián)鎖
*/
public static void releaseMultiLock(List lockNames) {
try {
if (lockNames == null || lockNames.isEmpty()) {
log.error("no no lockNames found to release");
return;
}
if (interProcessMultiLock != null && interProcessMultiLock.isAcquiredInThisProcess()) {
interProcessMultiLock.release();
for (String lockName : lockNames) {
curatorFramework.delete().inBackground().forPath(ROOT_PATH + lockName);
}
log.info("Thread:" + Thread.currentThread().getId() + " release lock success");
}
} catch (Exception e) {
log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
}
}
/**
* 獲取鎖
*
* @param lockName
* @param interProcessLock
* @return
*/
private static boolean acquireLock(String lockName, InterProcessLock interProcessLock) {
int flag = 0;
try {
while (!interProcessLock.acquire(2, TimeUnit.SECONDS)) {
flag++;
if (flag > 1) {
break;
}
}
} catch (Exception e) {
log.error("acquire lock occured an exception = " + e);
return false;
}
if (flag > 1) {
log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail");
return false;
} else {
log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success");
return true;
}
}
}
@RestController
@RequestMapping("/zookeeper-lock")
public class ZookeeperLockController {
@GetMapping("/testLock")
public String testLock() {
// 獲取鎖
boolean lockResult = ZookeeperUtil.interProcessMutex("testLock");
if (lockResult) {
try {
// 模擬執(zhí)行業(yè)務(wù)邏輯
TimeUnit.MINUTES.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 釋放鎖
ZookeeperUtil.releaseLock("testLock");
return "success";
} else {
return "fail";
}
}
}
打開一個瀏覽器窗口訪問,后臺打印出獲取鎖成功的日志,在1分鐘之內(nèi),開啟另一個窗口再次訪問,打印出獲取鎖失敗的日志,說明分布式鎖生效了。
讀到這里,這篇“基于curator怎么實現(xiàn)分布式鎖”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領(lǐng)會,如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。