本篇內(nèi)容介紹了“Zookeeper Queue隊列怎么實現(xiàn) ”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
10年積累的網(wǎng)站設(shè)計制作、做網(wǎng)站經(jīng)驗,可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認識你,你也不認識我。但先網(wǎng)站設(shè)計制作后付款的網(wǎng)站建設(shè)流程,更有高平免費網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
1: Barries: 柵欄,見面知意。
2:Queue:Queue也就是我們所說的隊列
1:Barries:
1.1: 是指所有的現(xiàn)場都達到 barrier后才能進行后續(xù)的計算
1.2:所有的線程都完成自己的計算以后才能離開barrier
進入柵欄: 1,新建一個根節(jié)點 "/root" 2, 想進入barrier的線程在 “/root”下建立一個字節(jié)點"/root/c-i" 3,循環(huán)監(jiān)聽"/root"孩子節(jié)點數(shù)的變化,每當(dāng)其達到Size的時候就說明有Size個線程都已經(jīng)達到了Barrier的 要求。
2:Queue:就是指一個生產(chǎn)者或消費者的模型
離開Barrier 1: 想離開Barrier的現(xiàn)場刪除掉在"/root" 下建立的子節(jié)點 2: 循環(huán)監(jiān)聽"/root" 孩子節(jié)點數(shù)目的變化,當(dāng)Size減少到0的時候它就可以離開了。
3 :Queue 隊列的實現(xiàn)
1 : 建立一個根節(jié)點"/root" 2 : 生產(chǎn)線程在"/root" 下建立一個SEQUENTAIL的節(jié)點 3 : 消費線程檢查"/root" 如果沒有就循環(huán)的監(jiān)聽"/root" 節(jié)點的變化,直到它有自己的子節(jié)點,刪除序號 最小子字節(jié)點。
package sync; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.List; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher { static ZooKeeper zk = null; static Integer mutex; String root; //同步原語 SyncPrimitive(String address) { if (zk == null) { try { System.out.println("Starting ZK:"); //建立Zookeeper連接,并且指定watcher zk = new ZooKeeper(address, 3000, this); //初始化鎖對象 mutex = new Integer(-1); System.out.println("Finished starting ZK:" + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } } @Override synchronized public void process(WatchedEvent event) { synchronized (mutex) { //有事件發(fā)生時,調(diào)用notify,使其他wait()點得以繼續(xù) mutex.notify(); } } static public class Barrier extends SyncPrimitive { int size; String name; Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; if (zk != null) { try { //一個barrier建立一個根目錄 Stat s = zk.exists(root, false); //不注冊watcher if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } try { //獲取自己的主機名 name = new String(InetAddress.getLocalHost() .getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } boolean enter() throws KeeperException, InterruptedException { //在根目錄下創(chuàng)建一個子節(jié)點.create和delete都會觸發(fā)children wathes,這樣getChildren就會收到通知,process()就會被調(diào)用 zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //一直等,直到根目錄下的子節(jié)點數(shù)目達到size時,函數(shù)退出 while (true) { synchronized (mutex) { Listlist = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); //釋放mutex上的鎖 } else { return true; } } } } boolean leave() throws KeeperException, InterruptedException { //刪除自己創(chuàng)建的節(jié)點 zk.delete(root + "/" + name, 0); //一直等,直到根目錄下有子節(jié)點時,函數(shù)退出 while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } } static public class Queue extends SyncPrimitive { Queue(String address, String name) { super(address); this.root = name; if (zk != null) { try { //一個queue建立一個根目錄 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } } //參數(shù)i是要創(chuàng)建節(jié)點的data boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); //根目錄下創(chuàng)建一個子節(jié)點,因為是SEQUENTIAL的,所以先創(chuàng)建的節(jié)點具有較小的序號 zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); //并不能保證list[0]就是序號最小的 //如果根目錄下沒有子節(jié)點就一直等 if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } //找到序號最小的節(jié)點將其刪除 else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tmp = new Integer(s.substring(7)); if (tmp < min) min = tmp; } System.out.println("Temporary value:" + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } public static void main(String[] args) { if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); } private static void barrierTest(String[] args) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try { boolean flag = b.enter(); System.out.println("Enter barrier:" + args[2]); if (!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e) { } catch (InterruptedException e) { } Random rand = new Random(); int r = rand.nextInt(100); for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try { b.leave(); } catch (KeeperException e) { } catch (InterruptedException e) { } System.out.println("Left barrier"); } private static void queueTest(String[] args) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input:" + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try { q.produce(10 + 1); } catch (KeeperException e) { } catch (InterruptedException e) { } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) try { int r = q.consume(); System.out.println("Item:" + r); } catch (KeeperException e) { i--; } catch (InterruptedException e) { } } } }
“Zookeeper Queue隊列怎么實現(xiàn) ”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!