本篇文章給大家分享的是有關(guān)如何在zookeeper中實(shí)現(xiàn)分布式鎖,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
目前成都創(chuàng)新互聯(lián)已為上1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬主機(jī)、網(wǎng)站托管、服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、珠海網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
一、分布式鎖介紹
分布式鎖主要用于在分布式環(huán)境中保護(hù)跨進(jìn)程、跨主機(jī)、跨網(wǎng)絡(luò)的共享資源實(shí)現(xiàn)互斥訪問,以達(dá)到保證數(shù)據(jù)的一致性。
二、架構(gòu)介紹
在介紹使用Zookeeper實(shí)現(xiàn)分布式鎖之前,首先看當(dāng)前的系統(tǒng)架構(gòu)圖
解釋:左邊的整個(gè)區(qū)域表示一個(gè)Zookeeper集群,locker是Zookeeper的一個(gè)持久節(jié)點(diǎn),node_1、node_2、node_3是locker這個(gè)持久節(jié)點(diǎn)下面的臨時(shí)順序節(jié)點(diǎn)。client_1、client_2、client_n表示多個(gè)客戶端,Service表示需要互斥訪問的共享資源。
三、分布式鎖獲取思路
1.獲取分布式鎖的總體思路
在獲取分布式鎖的時(shí)候在locker節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn),釋放鎖的時(shí)候刪除該臨時(shí)節(jié)點(diǎn)??蛻舳苏{(diào)用createNode方法在locker下創(chuàng)建臨時(shí)順序節(jié)點(diǎn),然后調(diào)用getChildren(“l(fā)ocker”)來獲取locker下面的所有子節(jié)點(diǎn),注意此時(shí)不用設(shè)置任何Watcher。客戶端獲取到所有的子節(jié)點(diǎn)path之后,如果發(fā)現(xiàn)自己在之前創(chuàng)建的子節(jié)點(diǎn)序號(hào)最小,那么就認(rèn)為該客戶端獲取到了鎖。如果發(fā)現(xiàn)自己創(chuàng)建的節(jié)點(diǎn)并非locker所有子節(jié)點(diǎn)中最小的,說明自己還沒有獲取到鎖,此時(shí)客戶端需要找到比自己小的那個(gè)節(jié)點(diǎn),然后對(duì)其調(diào)用exist()方法,同時(shí)對(duì)其注冊(cè)事件監(jiān)聽器。之后,讓這個(gè)被關(guān)注的節(jié)點(diǎn)刪除,則客戶端的Watcher會(huì)收到相應(yīng)通知,此時(shí)再次判斷自己創(chuàng)建的節(jié)點(diǎn)是否是locker子節(jié)點(diǎn)中序號(hào)最小的,如皋是則獲取到了鎖,如果不是則重復(fù)以上步驟繼續(xù)獲取到比自己小的一個(gè)節(jié)點(diǎn)并注冊(cè)監(jiān)聽。當(dāng)前這個(gè)過程中還需要許多的邏輯判斷。
2.獲取分布式鎖的核心算法流程
下面同個(gè)一個(gè)流程圖來分析獲取分布式鎖的完整算法,如下:
解釋:客戶端A要獲取分布式鎖的時(shí)候首先到locker下創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)(node_n),然后立即獲取locker下的所有(一級(jí))子節(jié)點(diǎn)。
此時(shí)因?yàn)闀?huì)有多個(gè)客戶端同一時(shí)間爭(zhēng)取鎖,因此locker下的子節(jié)點(diǎn)數(shù)量就會(huì)大于1。對(duì)于順序節(jié)點(diǎn),特點(diǎn)是節(jié)點(diǎn)名稱后面自動(dòng)有一個(gè)數(shù)字編號(hào),先創(chuàng)建的節(jié)點(diǎn)數(shù)字編號(hào)小于后創(chuàng)建的,因此可以將子節(jié)點(diǎn)按照節(jié)點(diǎn)名稱后綴的數(shù)字順序從小到大排序,這樣排在第一位的就是最先創(chuàng)建的順序節(jié)點(diǎn),此時(shí)它就代表了最先爭(zhēng)取到鎖的客戶端!此時(shí)判斷最小的這個(gè)節(jié)點(diǎn)是否為客戶端A之前創(chuàng)建出來的node_n,如果是則表示客戶端A獲取到了鎖,如果不是則表示鎖已經(jīng)被其它客戶端獲取,因此客戶端A要等待它釋放鎖,也就是等待獲取到鎖的那個(gè)客戶端B把自己創(chuàng)建的那個(gè)節(jié)點(diǎn)刪除。
此時(shí)就通過監(jiān)聽比node_n次小的那個(gè)順序節(jié)點(diǎn)的刪除事件來知道客戶端B是否已經(jīng)釋放了鎖,如果是,此時(shí)客戶端A再次獲取locker下的所有子節(jié)點(diǎn),再次與自己創(chuàng)建的node_n節(jié)點(diǎn)對(duì)比,直到自己創(chuàng)建的node_n是locker的所有子節(jié)點(diǎn)中順序號(hào)最小的,此時(shí)表示客戶端A獲取到了鎖!
四、基于Zookeeper的分布式鎖的代碼實(shí)現(xiàn)
1.定義分布式鎖接口
定義的分布式鎖接口如下:
public interface DistributedLock { /**獲取鎖,如果沒有得到就等待*/ public void acquire() throws Exception; /** * 獲取鎖,直到超時(shí) * @param time超時(shí)時(shí)間 * @param unit time參數(shù)的單位 * @return是否獲取到鎖 * @throws Exception */ public boolean acquire (long time, TimeUnit unit) throws Exception; /** * 釋放鎖 * @throws Exception */ public void release() throws Exception; }
2.定義一個(gè)簡(jiǎn)單的互斥鎖
定義一個(gè)互斥鎖類,實(shí)現(xiàn)以上定義的鎖接口,同時(shí)繼承一個(gè)基類BaseDistributedLock,該基類主要用于與Zookeeper交互,包含一個(gè)嘗試獲取鎖的方法和一個(gè)釋放鎖。
/**鎖接口的具體實(shí)現(xiàn),主要借助于繼承的父類BaseDistributedLock來實(shí)現(xiàn)的接口方法 * 該父類是基于Zookeeper實(shí)現(xiàn)分布式鎖的具體細(xì)節(jié)實(shí)現(xiàn)*/ public class SimpleDistributedLockMutex extends BaseDistributedLock implements DistributedLock { /*用于保存Zookeeper中實(shí)現(xiàn)分布式鎖的節(jié)點(diǎn),如名稱為locker:/locker, *該節(jié)點(diǎn)應(yīng)該是持久節(jié)點(diǎn),在該節(jié)點(diǎn)下面創(chuàng)建臨時(shí)順序節(jié)點(diǎn)來實(shí)現(xiàn)分布式鎖 */ private final String basePath; /*鎖名稱前綴,locker下創(chuàng)建的順序節(jié)點(diǎn)例如都以lock-開頭,這樣便于過濾無關(guān)節(jié)點(diǎn) *這樣創(chuàng)建后的節(jié)點(diǎn)類似:lock-00000001,lock-000000002*/ private staticfinal String LOCK_NAME ="lock-"; /*用于保存某個(gè)客戶端在locker下面創(chuàng)建成功的順序節(jié)點(diǎn),用于后續(xù)相關(guān)操作使用(如判斷)*/ private String ourLockPath; /** * 用于獲取鎖資源,通過父類的獲取鎖方法來獲取鎖 * @param time獲取鎖的超時(shí)時(shí)間 * @param unit time的時(shí)間單位 * @return是否獲取到鎖 * @throws Exception */ private boolean internalLock (long time, TimeUnit unit) throws Exception { //如果ourLockPath不為空則認(rèn)為獲取到了鎖,具體實(shí)現(xiàn)細(xì)節(jié)見attemptLock的實(shí)現(xiàn) ourLockPath = attemptLock(time, unit); return ourLockPath !=null; } /** * 傳入Zookeeper客戶端連接對(duì)象,和basePath * @param client Zookeeper客戶端連接對(duì)象 * @param basePath basePath是一個(gè)持久節(jié)點(diǎn) */ public SimpleDistributedLockMutex(ZkClientExt client, String basePath){ /*調(diào)用父類的構(gòu)造方法在Zookeeper中創(chuàng)建basePath節(jié)點(diǎn),并且為basePath節(jié)點(diǎn)子節(jié)點(diǎn)設(shè)置前綴 *同時(shí)保存basePath的引用給當(dāng)前類屬性*/ super(client,basePath,LOCK_NAME); this.basePath = basePath; } /**獲取鎖,直到超時(shí),超時(shí)后拋出異常*/ public void acquire() throws Exception { //-1表示不設(shè)置超時(shí)時(shí)間,超時(shí)由Zookeeper決定 if (!internalLock(-1,null)){ throw new IOException("連接丟失!在路徑:'"+basePath+"'下不能獲取鎖!"); } } /** * 獲取鎖,帶有超時(shí)時(shí)間 */ public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } /**釋放鎖*/ public void release()throws Exception { releaseLock(ourLockPath); } }
3. 分布式鎖的實(shí)現(xiàn)細(xì)節(jié)
獲取分布式鎖的重點(diǎn)邏輯在于BaseDistributedLock,實(shí)現(xiàn)了基于Zookeeper實(shí)現(xiàn)分布式鎖的細(xì)節(jié)。
public class BaseDistributedLock { private final ZkClientExt client; private final String path; private final String basePath; private final String lockName; private static final Integer MAX_RETRY_COUNT = 10; public BaseDistributedLock(ZkClientExt client, String path, String lockName){ this.client = client; this.basePath = path; this.path = path.concat("/").concat(lockName); this.lockName = lockName; } private void deleteOurPath(String ourPath) throws Exception{ client.delete(ourPath); } private String createLockNode(ZkClient client, String path) throws Exception{ return client.createEphemeralSequential(path, null); } /** * 獲取鎖的核心方法 * @param startMillis * @param millisToWait * @param ourPath * @return * @throws Exception */ private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{ boolean haveTheLock = false; boolean doDelete = false; try{ while ( !haveTheLock ) { //該方法實(shí)現(xiàn)獲取locker節(jié)點(diǎn)下的所有順序節(jié)點(diǎn),并且從小到大排序 Listchildren = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length()+1); //計(jì)算剛才客戶端創(chuàng)建的順序節(jié)點(diǎn)在locker的所有子節(jié)點(diǎn)中排序位置,如果是排序?yàn)?,則表示獲取到了鎖 int ourIndex = children.indexOf(sequenceNodeName); /*如果在getSortedChildren中沒有找到之前創(chuàng)建的[臨時(shí)]順序節(jié)點(diǎn),這表示可能由于網(wǎng)絡(luò)閃斷而導(dǎo)致 *Zookeeper認(rèn)為連接斷開而刪除了我們創(chuàng)建的節(jié)點(diǎn),此時(shí)需要拋出異常,讓上一級(jí)去處理 *上一級(jí)的做法是捕獲該異常,并且執(zhí)行重試指定的次數(shù) 見后面的 attemptLock方法 */ if ( ourIndex<0 ){ throw new ZkNoNodeException("節(jié)點(diǎn)沒有找到: " + sequenceNodeName); } //如果當(dāng)前客戶端創(chuàng)建的節(jié)點(diǎn)在locker子節(jié)點(diǎn)列表中位置大于0,表示其它客戶端已經(jīng)獲取了鎖 //此時(shí)當(dāng)前客戶端需要等待其它客戶端釋放鎖, boolean isGetTheLock = ourIndex == 0; //如何判斷其它客戶端是否已經(jīng)釋放了鎖?從子節(jié)點(diǎn)列表中獲取到比自己次小的哪個(gè)節(jié)點(diǎn),并對(duì)其建立監(jiān)聽 String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1); if ( isGetTheLock ){ haveTheLock = true; }else{ //如果次小的節(jié)點(diǎn)被刪除了,則表示當(dāng)前客戶端的節(jié)點(diǎn)應(yīng)該是最小的了,所以使用CountDownLatch來實(shí)現(xiàn)等待 String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch ); final CountDownLatch latch = new CountDownLatch(1); final IZkDataListener previousListener = new IZkDataListener() { //次小節(jié)點(diǎn)刪除事件發(fā)生時(shí),讓countDownLatch結(jié)束等待 //此時(shí)還需要重新讓程序回到while,重新判斷一次! public void handleDataDeleted(String dataPath) throws Exception { latch.countDown(); } public void handleDataChange(String dataPath, Object data) throws Exception { // ignore } }; try{ //如果節(jié)點(diǎn)不存在會(huì)出現(xiàn)異常 client.subscribeDataChanges(previousSequencePath, previousListener); if ( millisToWait != null ){ millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ){ doDelete = true; // timed out - delete our node break; } latch.await(millisToWait, TimeUnit.MICROSECONDS); }else{ latch.await(); } }catch ( ZkNoNodeException e ){ //ignore }finally{ client.unsubscribeDataChanges(previousSequencePath, previousListener); } } } }catch ( Exception e ){ //發(fā)生異常需要?jiǎng)h除節(jié)點(diǎn) doDelete = true; throw e; }finally{ //如果需要?jiǎng)h除節(jié)點(diǎn) if ( doDelete ){ deleteOurPath(ourPath); } } return haveTheLock; } private String getLockNodeNumber(String str, String lockName) { int index = str.lastIndexOf(lockName); if ( index >= 0 ){ index += lockName.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } private List getSortedChildren() throws Exception { try{ List children = client.getChildren(basePath); Collections.sort( children, new Comparator (){ public int compare(String lhs, String rhs){ return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName)); } } ); return children; }catch(ZkNoNodeException e){ client.createPersistent(basePath, true); return getSortedChildren(); } } protected void releaseLock(String lockPath) throws Exception{ deleteOurPath(lockPath); } protected String attemptLock(long time, TimeUnit unit) throws Exception{ final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; int retryCount = 0; //網(wǎng)絡(luò)閃斷需要重試一試 while ( !isDone ){ isDone = true; try{ //createLockNode用于在locker(basePath持久節(jié)點(diǎn))下創(chuàng)建客戶端要獲取鎖的[臨時(shí)]順序節(jié)點(diǎn) ourPath = createLockNode(client, path); /** * 該方法用于判斷自己是否獲取到了鎖,即自己創(chuàng)建的順序節(jié)點(diǎn)在locker的所有子節(jié)點(diǎn)中是否最小 * 如果沒有獲取到鎖,則等待其它客戶端鎖的釋放,并且稍后重試直到獲取到鎖或者超時(shí) */ hasTheLock = waitToLock(startMillis, millisToWait, ourPath); }catch ( ZkNoNodeException e ){ if ( retryCount++ < MAX_RETRY_COUNT ){ isDone = false; }else{ throw e; } } } if ( hasTheLock ){ return ourPath; } return null; }
以上就是如何在zookeeper中實(shí)現(xiàn)分布式鎖,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。