真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Zookeeper中怎么實現(xiàn)一個分布式鎖

本篇文章為大家展示了Zookeeper中怎么實現(xiàn)一個分布式鎖,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

創(chuàng)新互聯(lián)從2013年成立,公司以成都做網(wǎng)站、網(wǎng)站設(shè)計、系統(tǒng)開發(fā)、網(wǎng)絡(luò)推廣、文化傳媒、企業(yè)宣傳、平面廣告設(shè)計等為主要業(yè)務(wù),適用行業(yè)近百種。服務(wù)企業(yè)客戶千余家,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設(shè)開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設(shè)、創(chuàng)意設(shè)計、宣傳推廣等服務(wù)。 通過專業(yè)的設(shè)計、獨特的風格,為不同客戶提供各種風格的特色服務(wù)。

I. 方案設(shè)計

1. 創(chuàng)建節(jié)點方式實現(xiàn)

zk有四種節(jié)點,一個最容易想到的策略就是創(chuàng)建節(jié)點,誰創(chuàng)建成功了,就表示誰持有了這個鎖

這個思路與redis的setnx有點相似,因為zk的節(jié)點創(chuàng)建,也只會有一個會話會創(chuàng)建成功,其他的則會拋已存在的異常

借助臨時節(jié)點,會話丟掉之后節(jié)點刪除,這樣可以避免持有鎖的實例異常而沒有主動釋放導(dǎo)致所有實例都無法持有鎖的問題

如果采用這種方案,如果我想實現(xiàn)阻塞獲取鎖的邏輯,那么其中一個方案就需要寫一個while(true)來不斷重試

while(true) {
    if (tryLock(xxx)) return true;
    else Thread.sleep(1000);
}

另外一個策略則是借助事件監(jiān)聽,當節(jié)點存在時,注冊一個節(jié)點刪除的觸發(fā)器,這樣就不需要我自己重試判斷了;充分借助zk的特性來實現(xiàn)異步回調(diào)

public void lock() {
  if (tryLock(path,  new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            synchronized (path){
                path.notify();
            }
        }
    })) {
      return true;
  }

  synchronized (path) {
      path.wait();
  }
}

那么上面這個實現(xiàn)有什么問題呢?

每次節(jié)點的變更,那么所有的都會監(jiān)聽到變動,好處是非公平鎖的支持;缺點就是剩下這些喚醒的實例中也只會有一個搶占到鎖,無意義的喚醒浪費性能

2. 臨時順序節(jié)點方式

接下來這種方案更加常見,晚上大部分的教程也是這種case,主要思路就是創(chuàng)建臨時順序節(jié)點

只有序號最小的節(jié)點,才表示搶占鎖成功;如果不是最小的節(jié)點,那么就監(jiān)聽它前面一個節(jié)點的刪除事件,前面節(jié)點刪除了,一種可能是他放棄搶鎖,一種是他釋放自己持有的鎖,不論哪種情況,對我而言,我都需要撈一下所有的節(jié)點,要么拿鎖成功;要么換一個前置節(jié)點

II.分布式鎖實現(xiàn)

接下來我們來一步步看下,基于臨時順序節(jié)點,可以怎么實現(xiàn)分布式鎖

對于zk,我們依然采用apache的提供的包 zookeeper來操作;后續(xù)提供Curator的分布式鎖實例

1. 依賴

核心依賴



    org.apache.zookeeper
    zookeeper
    3.7.0
    
        
            org.slf4j
            slf4j-log4j12
        
    

版本說明:

  • zk版本: 3.6.2

  • SpringBoot: 2.2.1.RELEASE

2. 簡單的分布式鎖

第一步,都是實例創(chuàng)建

public class ZkLock implements Watcher {

    private ZooKeeper zooKeeper;
    // 創(chuàng)建一個持久的節(jié)點,作為分布式鎖的根目錄
    private String root;

    public ZkLock(String root) throws IOException {
        try {
            this.root = root;
            zooKeeper = new ZooKeeper("127.0.0.1:2181", 500_000, this);
            Stat stat = zooKeeper.exists(root, false);
            if (stat == null) {
                // 不存在則創(chuàng)建
                createNode(root, true);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    // 簡單的封裝節(jié)點創(chuàng)建,這里只考慮持久 + 臨時順序
    private String createNode(String path, boolean persistent) throws Exception {
        return zooKeeper.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, persistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL_SEQUENTIAL);
    }
}

在我們的這個設(shè)計中,我們需要持有當前節(jié)點和監(jiān)聽前一個節(jié)點的變更,所以我們在ZkLock實例中,添加兩個成員

/**
 * 當前節(jié)點
 */
private String current;

/**
 * 前一個節(jié)點
 */
private String pre;

接下來就是嘗試獲取鎖的邏輯

  • current不存在,在表示沒有創(chuàng)建過,就創(chuàng)建一個臨時順序節(jié)點,并賦值current

  • current存在,則表示之前已經(jīng)創(chuàng)建過了,目前處于等待鎖釋放過程

  • 接下來根據(jù)當前節(jié)點順序是否最小,來表明是否持有鎖成功

  • 當順序不是最小時,找前面那個節(jié)點,并賦值 pre;

  • 監(jiān)聽pre的變化

/**
 * 嘗試獲取鎖,創(chuàng)建順序臨時節(jié)點,若數(shù)據(jù)最小,則表示搶占鎖成功;否則失敗
 *
 * @return
 */
public boolean tryLock() {
    try {
        String path = root + "/";
        if (current == null) {
            // 創(chuàng)建臨時順序節(jié)點
            current = createNode(path, false);
        }
        List list = zooKeeper.getChildren(root, false);
        Collections.sort(list);

        if (current.equalsIgnoreCase(path + list.get(0))) {
            // 獲取鎖成功
            return true;
        } else {
            // 獲取鎖失敗,找到前一個節(jié)點
            int index = Collections.binarySearch(list, current.substring(path.length()));
            // 查詢當前節(jié)點前面的那個
            pre = path + list.get(index - 1);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

請注意上面的實現(xiàn),這里并沒有去監(jiān)聽前一個節(jié)點的變更,在設(shè)計tryLock,因為是立馬返回成功or失敗,所以使用這個接口的,不需要注冊監(jiān)聽

我們的監(jiān)聽邏輯,放在 lock() 同步阻塞里面

  • 嘗試搶占鎖,成功則直接返回

  • 拿鎖失敗,則監(jiān)聽前一個節(jié)點的刪除事件

public boolean lock() {
    if (tryLock()) {
        return true;
    }

    try {
        // 監(jiān)聽前一個節(jié)點的刪除事件
        Stat state = zooKeeper.exists(pre, true);
        if (state != null) {
            synchronized (pre) {
                // 阻塞等待前面的節(jié)點釋放
                pre.wait();
                // 這里不直接返回true,因為前面的一個節(jié)點刪除,可能并不是因為它持有鎖并釋放鎖,如果是因為這個會話中斷導(dǎo)致臨時節(jié)點刪除,這個時候需要做的是換一下監(jiān)聽的 preNode
                return lock();
            }
        } else {
          // 不存在,則再次嘗試拿鎖
          return lock();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

注意:

  • 當節(jié)點不存在時,或者事件觸發(fā)回調(diào)之后,重新調(diào)用lock(),表明我胡漢三又來競爭鎖了?

為啥不是直接返回 true? 而是需要重新競爭呢?

  • 因為前面節(jié)點的刪除,有可能是因為前面節(jié)點的會話中斷導(dǎo)致的;但是鎖還在另外的實例手中,這個時候我應(yīng)該做的是重新排隊

最后別忘了釋放鎖

public void unlock() {
    try {
        zooKeeper.delete(current, -1);
        current = null;
        zooKeeper.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

到此,我們的分布式鎖就完成了,接下來我們復(fù)盤下實現(xiàn)過程

  • 所有知識點來自前一篇的zk基礎(chǔ)使用(創(chuàng)建節(jié)點,刪除節(jié)點,獲取所有自己點,監(jiān)聽事件)

  • 搶鎖過程 =》 創(chuàng)建序號最小的節(jié)點

  • 若節(jié)點不是最小的,那么就監(jiān)聽前面的節(jié)點刪除事件

這個實現(xiàn),支持了鎖的重入(why? 因為鎖未釋放時,我們保存了current,當前節(jié)點存在時則直接判斷是不是最小的;而不是重新創(chuàng)建)

3. 測試

最后寫一個測試case,來看下

@SpringBootApplication
public class Application {

    private void tryLock(long time) {
        ZkLock zkLock = null;
        try {
            zkLock = new ZkLock("/lock");
            System.out.println("嘗試獲取鎖: " + Thread.currentThread() + " at: " + LocalDateTime.now());
            boolean ans = zkLock.lock();
            System.out.println("執(zhí)行業(yè)務(wù)邏輯:" + Thread.currentThread() + " at:" + LocalDateTime.now());
            Thread.sleep(time);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (zkLock != null) {
                zkLock.unlock();
            }
        }
    }

    public Application() throws IOException, InterruptedException {
        new Thread(() -> tryLock(10_000)).start();

        Thread.sleep(1000);
        // 獲取鎖到執(zhí)行鎖會有10s的間隔,因為上面的線程搶占到鎖,并持有了10s
        new Thread(() -> tryLock(1_000)).start();
        System.out.println("---------over------------");

        Scanner scanner = new Scanner(System.in);
        String ans = scanner.next();
        System.out.println("---> over --->" + ans);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

輸出結(jié)果如下

Zookeeper中怎么實現(xiàn)一個分布式鎖

上述內(nèi)容就是Zookeeper中怎么實現(xiàn)一個分布式鎖,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


文章名稱:Zookeeper中怎么實現(xiàn)一個分布式鎖
當前網(wǎng)址:http://weahome.cn/article/gjigpd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部