本篇文章為大家展示了Zookeeper中怎么實現(xiàn)一個分布式鎖,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
創(chuàng)新互聯(lián)從2013年成立,公司以成都做網(wǎng)站、網(wǎng)站設(shè)計、系統(tǒng)開發(fā)、網(wǎng)絡(luò)推廣、文化傳媒、企業(yè)宣傳、平面廣告設(shè)計等為主要業(yè)務(wù),適用行業(yè)近百種。服務(wù)企業(yè)客戶千余家,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設(shè)開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設(shè)、創(chuàng)意設(shè)計、宣傳推廣等服務(wù)。 通過專業(yè)的設(shè)計、獨特的風格,為不同客戶提供各種風格的特色服務(wù)。
zk有四種節(jié)點,一個最容易想到的策略就是創(chuàng)建節(jié)點,誰創(chuàng)建成功了,就表示誰持有了這個鎖
這個思路與redis的setnx
有點相似,因為zk的節(jié)點創(chuàng)建,也只會有一個會話會創(chuàng)建成功,其他的則會拋已存在的異常
借助臨時節(jié)點,會話丟掉之后節(jié)點刪除,這樣可以避免持有鎖的實例異常而沒有主動釋放導(dǎo)致所有實例都無法持有鎖的問題
如果采用這種方案,如果我想實現(xiàn)阻塞獲取鎖的邏輯,那么其中一個方案就需要寫一個while(true)來不斷重試
while(true) { if (tryLock(xxx)) return true; else Thread.sleep(1000); }
另外一個策略則是借助事件監(jiān)聽,當節(jié)點存在時,注冊一個節(jié)點刪除的觸發(fā)器,這樣就不需要我自己重試判斷了;充分借助zk的特性來實現(xiàn)異步回調(diào)
public void lock() { if (tryLock(path, new Watcher() { @Override public void process(WatchedEvent event) { synchronized (path){ path.notify(); } } })) { return true; } synchronized (path) { path.wait(); } }
那么上面這個實現(xiàn)有什么問題呢?
每次節(jié)點的變更,那么所有的都會監(jiān)聽到變動,好處是非公平鎖的支持;缺點就是剩下這些喚醒的實例中也只會有一個搶占到鎖,無意義的喚醒浪費性能
接下來這種方案更加常見,晚上大部分的教程也是這種case,主要思路就是創(chuàng)建臨時順序節(jié)點
只有序號最小的節(jié)點,才表示搶占鎖成功;如果不是最小的節(jié)點,那么就監(jiān)聽它前面一個節(jié)點的刪除事件,前面節(jié)點刪除了,一種可能是他放棄搶鎖,一種是他釋放自己持有的鎖,不論哪種情況,對我而言,我都需要撈一下所有的節(jié)點,要么拿鎖成功;要么換一個前置節(jié)點
接下來我們來一步步看下,基于臨時順序節(jié)點,可以怎么實現(xiàn)分布式鎖
對于zk,我們依然采用apache的提供的包 zookeeper
來操作;后續(xù)提供Curator
的分布式鎖實例
核心依賴
org.apache.zookeeper zookeeper 3.7.0 org.slf4j slf4j-log4j12
版本說明:
zk版本: 3.6.2
SpringBoot: 2.2.1.RELEASE
第一步,都是實例創(chuàng)建
public class ZkLock implements Watcher { private ZooKeeper zooKeeper; // 創(chuàng)建一個持久的節(jié)點,作為分布式鎖的根目錄 private String root; public ZkLock(String root) throws IOException { try { this.root = root; zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this); Stat stat = zooKeeper.exists(root, false); if (stat == null) { // 不存在則創(chuàng)建 createNode(root, true); } } catch (Exception e) { e.printStackTrace(); } } // 簡單的封裝節(jié)點創(chuàng)建,這里只考慮持久 + 臨時順序 private String createNode(String path, boolean persistent) throws Exception { return zooKeeper.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, persistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL_SEQUENTIAL); } }
在我們的這個設(shè)計中,我們需要持有當前節(jié)點和監(jiān)聽前一個節(jié)點的變更,所以我們在ZkLock實例中,添加兩個成員
/** * 當前節(jié)點 */ private String current; /** * 前一個節(jié)點 */ private String pre;
接下來就是嘗試獲取鎖的邏輯
current不存在,在表示沒有創(chuàng)建過,就創(chuàng)建一個臨時順序節(jié)點,并賦值current
current存在,則表示之前已經(jīng)創(chuàng)建過了,目前處于等待鎖釋放過程
接下來根據(jù)當前節(jié)點順序是否最小,來表明是否持有鎖成功
當順序不是最小時,找前面那個節(jié)點,并賦值 pre;
監(jiān)聽pre的變化
/** * 嘗試獲取鎖,創(chuàng)建順序臨時節(jié)點,若數(shù)據(jù)最小,則表示搶占鎖成功;否則失敗 * * @return */ public boolean tryLock() { try { String path = root + "/"; if (current == null) { // 創(chuàng)建臨時順序節(jié)點 current = createNode(path, false); } Listlist = zooKeeper.getChildren(root, false); Collections.sort(list); if (current.equalsIgnoreCase(path + list.get(0))) { // 獲取鎖成功 return true; } else { // 獲取鎖失敗,找到前一個節(jié)點 int index = Collections.binarySearch(list, current.substring(path.length())); // 查詢當前節(jié)點前面的那個 pre = path + list.get(index - 1); } } catch (Exception e) { e.printStackTrace(); } return false; }
請注意上面的實現(xiàn),這里并沒有去監(jiān)聽前一個節(jié)點的變更,在設(shè)計tryLock
,因為是立馬返回成功or失敗,所以使用這個接口的,不需要注冊監(jiān)聽
我們的監(jiān)聽邏輯,放在 lock()
同步阻塞里面
嘗試搶占鎖,成功則直接返回
拿鎖失敗,則監(jiān)聽前一個節(jié)點的刪除事件
public boolean lock() { if (tryLock()) { return true; } try { // 監(jiān)聽前一個節(jié)點的刪除事件 Stat state = zooKeeper.exists(pre, true); if (state != null) { synchronized (pre) { // 阻塞等待前面的節(jié)點釋放 pre.wait(); // 這里不直接返回true,因為前面的一個節(jié)點刪除,可能并不是因為它持有鎖并釋放鎖,如果是因為這個會話中斷導(dǎo)致臨時節(jié)點刪除,這個時候需要做的是換一下監(jiān)聽的 preNode return lock(); } } else { // 不存在,則再次嘗試拿鎖 return lock(); } } catch (Exception e) { e.printStackTrace(); } return false; }
注意:
當節(jié)點不存在時,或者事件觸發(fā)回調(diào)之后,重新調(diào)用lock()
,表明我胡漢三又來競爭鎖了?
為啥不是直接返回 true? 而是需要重新競爭呢?
因為前面節(jié)點的刪除,有可能是因為前面節(jié)點的會話中斷導(dǎo)致的;但是鎖還在另外的實例手中,這個時候我應(yīng)該做的是重新排隊
最后別忘了釋放鎖
public void unlock() { try { zooKeeper.delete(current, -1); current = null; zooKeeper.close(); } catch (Exception e) { e.printStackTrace(); } }
到此,我們的分布式鎖就完成了,接下來我們復(fù)盤下實現(xiàn)過程
所有知識點來自前一篇的zk基礎(chǔ)使用(創(chuàng)建節(jié)點,刪除節(jié)點,獲取所有自己點,監(jiān)聽事件)
搶鎖過程 =》 創(chuàng)建序號最小的節(jié)點
若節(jié)點不是最小的,那么就監(jiān)聽前面的節(jié)點刪除事件
這個實現(xiàn),支持了鎖的重入(why? 因為鎖未釋放時,我們保存了current,當前節(jié)點存在時則直接判斷是不是最小的;而不是重新創(chuàng)建)
最后寫一個測試case,來看下
@SpringBootApplication public class Application { private void tryLock(long time) { ZkLock zkLock = null; try { zkLock = new ZkLock("/lock"); System.out.println("嘗試獲取鎖: " + Thread.currentThread() + " at: " + LocalDateTime.now()); boolean ans = zkLock.lock(); System.out.println("執(zhí)行業(yè)務(wù)邏輯:" + Thread.currentThread() + " at:" + LocalDateTime.now()); Thread.sleep(time); } catch (Exception e) { e.printStackTrace(); } finally { if (zkLock != null) { zkLock.unlock(); } } } public Application() throws IOException, InterruptedException { new Thread(() -> tryLock(10_000)).start(); Thread.sleep(1000); // 獲取鎖到執(zhí)行鎖會有10s的間隔,因為上面的線程搶占到鎖,并持有了10s new Thread(() -> tryLock(1_000)).start(); System.out.println("---------over------------"); Scanner scanner = new Scanner(System.in); String ans = scanner.next(); System.out.println("---> over --->" + ans); } public static void main(String[] args) { SpringApplication.run(Application.class); } }
輸出結(jié)果如下
上述內(nèi)容就是Zookeeper中怎么實現(xiàn)一個分布式鎖,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。