真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

ZooKeeperJavaAPI編程的示例分析

這篇文章主要為大家展示了“ZooKeeper Java API編程的示例分析”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“ZooKeeper Java API編程的示例分析”這篇文章吧。

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:國際域名空間、雅安服務器托管、營銷軟件、網(wǎng)站建設、鞏義網(wǎng)站維護、網(wǎng)站推廣。

開發(fā)應用程序的ZooKeeper Java綁定主要由兩個Java包組成:

org.apache.zookeeper

org.apache.zookeeper.data

org.apache.zookeeper包由ZooKeeper監(jiān)視的接口定義和ZooKeeper的各種回調(diào)處理程序組成。 它定義了ZooKeeper客戶端類庫的主要類以及許多ZooKeeper事件類型和狀態(tài)的靜態(tài)定義。 org.apache.zookeeper.data包定義了與數(shù)據(jù)寄存器(也稱為znode)相關的特性,例如訪問控制列表(ACL),IDs,stats等。

ZooKeeper Java API中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是服務器實現(xiàn)的一部分。 org.apache.zookeeper.client包用于查詢ZooKeeper服務器的狀態(tài)。

一 準備開發(fā)環(huán)境

Apache ZooKeeper是一個復雜的軟件,因此它需要運行許多其他類庫。 依賴庫作為jar文件在ZooKeeper發(fā)行版中附帶在lib目錄中。 核心ZooKeeper jar文件名字為zookeeper-3.4.6.jar,位于主目錄下。

要開發(fā)Java的ZooKeeper應用程序,我們必須設置指向ZooKeeper jar的類路徑,以及ZooKeeper所依賴的所有第三方庫。在 bin 目錄下有一個 zkEnv.sh文件,可以用來設置CLASSPATH。

我們需要將腳本如下設置,在命令行中執(zhí)行以下語句:

$ ZOOBINDIR=${ZK_HOME}/bin
$ source ${ZOOBINDIR}/zkEnv.sh

shell變量ZK_HOME被設置為安裝ZooKeeper的路徑,在我的設置中,它是/usr/share/zookeeper。 之后,CLASSPATH變量被正確設置,在我的系統(tǒng)中,如下所示:

$ echo $CLASSPATH 
/usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:

在Windows操作系統(tǒng)中,需要運行zkEnv.cmd腳本。 現(xiàn)在可以使用CLASSPATH變量來編譯和運行使用ZooKeeper API編寫的Java程序。 可以在Uni/Linux中的主目錄的.bashrc文件中找到zkEnv.sh腳本,避免每次啟動shell會話時都采用它。

二 第一個ZooKeeper程序

為了引入ZooKeeper Java API,讓我們從一個非常簡單的程序開始,它可以連接到localhost中的ZooKeeper實例,如果連接成功,它將在ZooKeeper名稱空間的根路徑下打印znode的列表。

這個程序的代碼如下所示:

/*Our First ZooKeeper Program*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class HelloZooKeeper {
public static void main(String[] args) throws IOException {
String hostPort = "localhost:2181";
String zpath = "/";
List  zooChildren = new ArrayList();
ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);
if (zk != null) {
try {
zooChildren = zk.getChildren(zpath, false);
System.out.println("Znodes of '/': ");
for (String child: zooChildren) {
//print the children
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

在構(gòu)建和執(zhí)行前面的代碼片段之前,讓我們來看看它具體做了什么。代碼從導入語句開始。使用這些語句,我們導入了程序各個組件所需的包。如前所述,org.apache.zookeeper包包含客戶端與ZooKeeper服務器進行交互所需的所有類和接口。在導入包之后,定義了一個名為HelloZooKeeper的類。由于我們要連接到在同一系統(tǒng)中運行的ZooKeeper實例,在main方法中將主機和端口字符串定義為localhost:2181。代碼行zk = new ZooKeeper(hostPort, 2000, null)調(diào)用ZooKeeper構(gòu)造方法,該構(gòu)造方法嘗試連接到ZooKeeper服務器并返回一個引用。對于連接到ZooKeeper服務器實例并維護該連接的客戶端程序,需要維護一個實時會話。在此例中,構(gòu)造方法實例化的zk對象返回的引用表示這個會話。 ZooKeeper API是圍繞這個引用構(gòu)建的,每個方法調(diào)用都需要一個引用來執(zhí)行。

ZooKeeper類的構(gòu)造方法使用以下代碼創(chuàng)建ZooKeeper實例的引用:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

使用的參數(shù)含義如下:

connectString:以逗號分隔的主機:端口號列表,每個對應一個ZooKeeper服務器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三個節(jié)點的ZooKeeper ensemble的有效的主機:端口匹配對。 sessionTimeout:這是以毫秒為單位的會話超時時間。這是ZooKeeper在宣布session結(jié)束之前,沒有從客戶端獲得心跳的時間。 watcher:一個watcher對象,如果創(chuàng)建,當狀態(tài)改變和發(fā)生節(jié)點事件時會收到通知。這個watcher對象需要通過一個用戶定義的類單獨創(chuàng)建,通過實現(xiàn)Watcher接口并將實例化的對象傳遞給ZooKeeper構(gòu)造方法??蛻舳藨贸绦蚩梢允盏礁鞣N類型的事件的通知,例如連接丟失、會話過期等。

ZooKeeper Java API定義了另外帶有三個參數(shù)的構(gòu)造方法,以指定更高級的操作。代碼如下:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

在ZooKeeper類的上面的構(gòu)造方法中,如果設置為true,boolean canBeReadOnly參數(shù)允許創(chuàng)建的客戶端在網(wǎng)絡分區(qū)的情況下進入只讀模式。只讀模式是客戶端無法找到任何多數(shù)服務器的場景,但有一個可以到達的分區(qū)服務器,以只讀模式連接到它,這樣就允許對服務器的讀取請求,而寫入請求則不允許??蛻舳死^續(xù)嘗試在后臺連接到大多數(shù)服務器,同時仍然保持只讀模式。分區(qū)服務器僅僅是ZooKeeper組的一個子集,它是由于集群中的網(wǎng)絡分配而形成的。大多數(shù)服務器構(gòu)成了ensemble中的大多數(shù)quorum。

以下構(gòu)造方法顯示了兩個附加參數(shù)的定義:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

這個構(gòu)造方法允許ZooKeeper客戶端對象創(chuàng)建兩個額外的參數(shù):

sessionId:在客戶端重新連接到ZooKeeper服務器的情況下,可以使用特定的會話ID來引用先前連接的會話 sessionPasswd:如果指定的會話需要密碼,可以在這里指定

以下構(gòu)造方法是前兩個調(diào)用的組合:

ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)

此構(gòu)造方法是前兩個調(diào)用的組合,允許在啟用只讀模式的情況下重新連接到指定的會話。

Note
ZooKeeper類的詳細Java API文檔可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查詢。

現(xiàn)在,回到我們的ZooKeeper程序。 在調(diào)用構(gòu)造方法后,如果連接成功,我們將得到ZooKeeper服務器的引用。 我們通過下面的代碼將引用傳遞給getChildren方法:

zooChildren = zk.getChildren(zpath, false)

ZooKeeper類的getChildren(String path,boolean watch)方法返回給定路徑上znode的子級列表。 我們只是迭代這個方法返回的列表,并將字符串打印到控制臺。

將程序命名為HelloZooKeeper.java,并編譯我們的程序如下:

$ javac -cp $CLASSPATH HelloZooKeeper.java

在我們運行的程序之前,需要使用以下命令來啟動ZooKeeper服務器實例:

$ ${ZK_HOME}/bin/zkServer.sh start

運行程序如下:

$ java -cp $CLASSPATH HelloZooKeeper

執(zhí)行程序會在控制臺上打印日志消息,顯示所使用的ZooKeeper版本,Java版本,Java類路徑,服務器體系結(jié)構(gòu)等等。 這里顯示了這些日志消息的一部分:

ZooKeeper Java API編程的示例分析

ZooKeeper Java API生成的日志消息對調(diào)試非常有用。 它為我們提供了關于客戶端連接到ZooKeeper服務器,建立會話等后臺得信息。 上面顯示的最后三條日志消息告訴我們客戶端如何使用程序中指定的參數(shù)來啟動連接,以及在成功連接后,服務器如何為客戶端分配會話ID。

最后,程序執(zhí)行最后在控制臺中輸出以下內(nèi)容:

ZooKeeper Java API編程的示例分析

我們可以使用ZooKeeper shell來驗證程序的正確性:

$ $ZK_HOME/bin/zkCli.sh -server localhost

ZooKeeper Java API編程的示例分析

恭喜! 我們剛剛成功編寫了我們的第一個ZooKeeper客戶端程序。

二 實現(xiàn)Watcher接口

ZooKeeper Watcher監(jiān)視使客戶端能夠接收來自ZooKeeper服務器的通知,并在發(fā)生時處理這些事件。 ZooKeeper Java API提供了一個名為Watcher的公共接口,客戶端事件處理程序類必須實現(xiàn)該接口才能接收有關來自ZooKeeper服務器的事件通知。 以編程方式,使用這種客戶端的應用程序通過向客戶端注冊回調(diào)(callback)對象來處理這些事件。

我們將實現(xiàn)Watcher接口,處理與znode關聯(lián)的數(shù)據(jù)更改時由ZooKeeper生成的事件。

Watcher接口在org.apache.zookeeper包中聲明如下:

public interface Watcher {
void process(WatchedEvent event);
}

為了演示znode數(shù)據(jù)監(jiān)視器(Watcher),有兩個Java類:DataWatcher和DataUpdater。 DataWatcher將一直運行,并在/MyConfig指定znode路徑中偵聽來自ZooKeeper服務器的NodeDataChange事件。DataUpdater類將定期更新此znode路徑中的數(shù)據(jù)字段,這將生成事件,并且在接收到這些事件后,DataWatcher類將把更改后的數(shù)據(jù)打印到控制臺上。

以下是DataWatcher.java類的代碼:

import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class DataWatcher implements Watcher, Runnable {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
byte zoo_data[] = null;
ZooKeeper zk;

public DataWatcher() {
try {
zk = new ZooKeeper(hostPort, 2000, this);
if (zk != null) {
try {
//Create the znode if it doesn't exist, with the following code:
if (zk.exists(zooDataPath, this) == null) {
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void printData() throws InterruptedException, KeeperException {
zoo_data = zk.getData(zooDataPath, this, null);
String zString = new String(zoo_data);
// The following code prints the current content of the znode to the console:
System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);
}
@Override
public void process(WatchedEvent event) {
System.out.printf(
"\nEvent Received: %s", event.toString());
//We will process only events of type NodeDataChanged
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
printData();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException, KeeperException {
DataWatcher dataWatcher = new DataWatcher();
dataWatcher.printData();
dataWatcher.run();
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

我們來看一下DataWatcher.java類的代碼來理解一個ZooKeeper監(jiān)視器的實現(xiàn)。 DataWatcher公共類實現(xiàn)Watcher接口以及Runnable接口,打算將監(jiān)視器作為線程運行。 main方法創(chuàng)建DataWatcher類的一個實例。 在前面的代碼中,DataWatcher構(gòu)造方法嘗試連接到在本地主機上運行的ZooKeeper實例。 如果連接成功,我們用下面的代碼檢查znode路徑/MyConfig是否存在:

if (zk.exists(zooDataPath, this) == null) {

如果znode不存在ZooKeeper命名空間中,那么exists方法調(diào)用將返回null,并且嘗試使用代碼將其創(chuàng)建為持久化znode,如下所示:

zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

接下來是process方法,它在org.apache.ZooKeeper的Watcher接口中聲明,并由DataWatcher類使用以下代碼實現(xiàn):

public void process(WatchedEvent event) {

為了簡單起見,在process方法中,打印從ZooKeeper實例接收的事件,并僅對NodeDataChanged類型的事件進行進一步處理,如下所示:

if (event.getType() == Event.EventType.NodeDataChanged)

當znode路徑/MyConfig的數(shù)據(jù)字段發(fā)生任何更新或更改而收到NodeDataChanged類型的事件時,調(diào)用printData方法來打印znode的當前內(nèi)容。 在znode上執(zhí)行一個getData調(diào)用時,我們再次設置一個監(jiān)視,這是該方法的第二個參數(shù),如下面的代碼所示:

zoo_data = zk.getData(zooDataPath, this, null);

監(jiān)視事件是發(fā)送給設置監(jiān)視的客戶端的一次性觸發(fā)器,為了不斷接收進一步的事件通知,客戶端應該重置監(jiān)視器。

DataUpdater.java是一個簡單的類,它連接到運行本地主機的ZooKeeper實例,并用隨機字符串更新znode路徑/MyConfig的數(shù)據(jù)字段。 在這里,我們選擇使用通用唯一標識符(UUID)字符串更新znode,因為后續(xù)的UUID生成器調(diào)用將保證生成唯一的字符串。

DataUpdater.java類代碼如下:

import java.io.IOException;
import java.util.UUID;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DataUpdater implements Watcher {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
ZooKeeper zk;
public DataUpdater() throws IOException {
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
}

// updates the znode path /MyConfig every 5 seconds with a new UUID string.
public void run() throws InterruptedException, KeeperException {
while (true) {
String uuid = UUID.randomUUID().toString();
byte zoo_data[] = uuid.getBytes();
zk.setData(zooDataPath, zoo_data, -1);
try {
Thread.sleep(5000); // Sleep for 5 secs
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public static void main(String[] args) throws
IOException, InterruptedException, KeeperException {
DataUpdater dataUpdater = new DataUpdater();
dataUpdater.run();
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}

上面的代碼使ZooKeeper服務器觸發(fā)一個NodeDataChanged事件。 由于DataWatcher為此znode路徑設置了監(jiān)視,因此它會接收數(shù)據(jù)更改事件的通知。 然后它檢索更新的數(shù)據(jù),重置監(jiān)視,并在控制臺上打印數(shù)據(jù)。

使用以下命令編譯DataWatcher和DataUpdater類:

$ javac –cp $CLASSPATH DataWatcher.java
$ javac –cp $CLASSPATH DataUpdater.java

要執(zhí)行監(jiān)視器和更新程序,需要打開兩個終端窗口。 我要先運行監(jiān)視器,因為它創(chuàng)建了/MyConfig的znode(如果還未在ZooKeeper的命名空間中創(chuàng)建的話)。 運行監(jiān)視器之前,請確保ZooKeeper服務器在本地主機上已經(jīng)運行。

在其中一個終端窗口中,通過運行以下命令來執(zhí)行watcher類:

$ java –cp $CLASSPATH DataWatcher

輸出類似于以下屏幕截圖所示的消息:
ZooKeeper Java API編程的示例分析

如前面的截圖所示,znode路徑/MyConfig是由DataWatcher類創(chuàng)建的。 它也打印znode的內(nèi)容,但沒有打印在控制臺中,因為我們在創(chuàng)建znode時沒有設置任何數(shù)據(jù)。 當znode被創(chuàng)建時,類中的監(jiān)視者收到了NodeCreated類型的事件通知,這個通知被打印在控制臺中。 DataWatcher類繼續(xù)運行,并從ZooKeeper服務器偵聽/MyConfig節(jié)點上的事件。

讓我們在另一個終端窗口中運行DataUpdater類:

$ java -cp $CLASSPATH DataUpdater

將最初的ZooKeeper特定日志消息記錄到控制臺后,DataUpdater類運行時沒有提示。 它將一個新的UUID字符串設置到ZooKeeper路徑/MyConfig的數(shù)據(jù)字段中。 因此,看到每隔5秒鐘,在下面的屏幕截圖中顯示的輸出內(nèi)容打印在運行DataWatche的終端窗口中:

ZooKeeper Java API編程的示例分析

DataWatcher也可以使用ZooKeeper shell進行測試。 繼續(xù)像以前一樣在終端中運行DataWatcher類,并在另一個終端中調(diào)用ZooKeeper shell并運行以下屏幕截圖中所示的命令:

ZooKeeper Java API編程的示例分析

在DataWatcher正在運行的終端中,將打印以下消息:

ZooKeeper Java API編程的示例分析

三 示例——群集監(jiān)視器

通過互聯(lián)網(wǎng)提供的流行服務,如電子郵件,文件服務平臺,在線游戲等,都是通過跨越多個數(shù)據(jù)中心的高度可用的成百上千臺服務器來服務的,而這些服務器通常在地理位置上分開。 在這種集群中,設置了一些專用的服務器節(jié)點來監(jiān)視生產(chǎn)網(wǎng)絡中承載服務或應用程序的服務器的活躍性。 在云計算環(huán)境中,也用于管理云環(huán)境的這種監(jiān)控節(jié)點被稱為云控制器。 這些控制器節(jié)點的一個重要工作是實時檢測生產(chǎn)服務器的故障,并相應地通知管理員,并采取必要的措施,例如將故障服務器上的應用程序故障轉(zhuǎn)移到另一個服務器,從而確保容錯性和高可用性。

在本節(jié)中,我們將使用ZooKeeper Java客戶端API開發(fā)一個簡約的分布式集群監(jiān)視器模型。 使用ZooKeeper的ephemeral znode概念來構(gòu)建這個監(jiān)視模型相當簡單和優(yōu)雅,如以下步驟所述:

每個生產(chǎn)服務器運行一個ZooKeeper客戶端作為守護進程。 這個過程連接到ZooKeeper服務器,并在/ZooKeeper命名空間的預定義路徑(比如/Members)下創(chuàng)建一個帶有名稱(最好是其網(wǎng)絡名稱或主機名)的ephemeral znode。云控制器節(jié)點運行ZooKeeper監(jiān)視器進程,該進程監(jiān)視路徑/Members并監(jiān)聽NodeChildrenChanged類型的事件。 這個監(jiān)視器進程作為服務或守護進程運行,并設置或重置路徑上的監(jiān)視,并且實現(xiàn)其邏輯以調(diào)用適當?shù)哪K來為監(jiān)視事件采取必要的行動?,F(xiàn)在,如果生產(chǎn)服務器由于硬件故障或軟件崩潰而關閉,ZooKeeper客戶端進程就會被終止,導致服務器和ZooKeeper服務之間的會話被終止。 由于ephemeral znode的屬性唯一,每當客戶端連接關閉時,ZooKeeper服務會自動刪除路徑/Members中的znode。路徑中znode的刪除引發(fā)了NodeChildrenChanged事件,因此云控制器中的觀察器進程會收到通知。 通過調(diào)用路徑/Members中的getChildren方法,可以確定哪個服務器節(jié)點已經(jīng)關閉。然后,控制器節(jié)點可以采取適當?shù)拇胧?,比如?zhí)行恢復邏輯以重啟另一臺服務器中的故障服務。這個邏輯可以構(gòu)建為實時工作,保證接近于零停機的時間和高度可用的服務。

為實現(xiàn)這個集群監(jiān)控模型,我們將開發(fā)兩個Java類。 ClusterMonitor類將持續(xù)運行監(jiān)視器,以監(jiān)視ZooKeeper樹中的路徑/Members。 處理完引發(fā)事件后,我們將在控制臺中打印znode列表并重置監(jiān)視。 另一個類ClusterClient將啟動到ZooKeeper服務器的連接,在/Members下創(chuàng)建一個ephemeral znode。

要模擬具有多個節(jié)點的集群,我們在同一臺計算機上啟動多個客戶端,并使用客戶端進程的進程ID創(chuàng)建ephemeral znode。 通過查看進程標識,ClusterMonitor類可以確定哪個客戶進程已經(jīng)關閉,哪些進程還在。 在實際情況中,客戶端進程通常會使用當前正在運行的服務器的主機名創(chuàng)建ephemeral znode。 下面顯示了這兩個類的源代碼。

ClusterMonitor.java類定義如下:

import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterMonitor implements Runnable {
private static String membershipRoot = "/Members";
private final Watcher connectionWatcher;
private final Watcher childrenWatcher;
private ZooKeeper zk;
boolean alive=true;
public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
connectionWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
};
childrenWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
//Get current list of child znode, 
//reset the watch
List children = zk.getChildren( membershipRoot, this);
wall("!!!Cluster Membership Change!!!");
wall("Members: " + children);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
alive = false;
throw new RuntimeException(e);
}
}
}
};
zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
// Ensure the parent znode exists
if(zk.exists(membershipRoot, false) == null) {
zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

// Set a watch on the parent znode
List children = zk.getChildren(membershipRoot, childrenWatcher);
System.err.println("Members: " + children);
}
public synchronized void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void wall (String message) {
System.out.printf("\nMESSAGE: %s", message);
}
public void run() {
try {
synchronized (this) {
while (alive) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
if (args.length != 1) {
System.err.println("Usage: ClusterMonitor ");
System.exit(0);
}
String hostPort = args[0];
new ClusterMonitor(hostPort).run();
}
}

ClusterClient.java類定義如下:

import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ClusterClient implements Watcher, Runnable {
private static String membershipRoot = "/Members";
ZooKeeper zk;
public ClusterClient(String hostPort, Long pid) {
String processId = pid.toString();
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
if (zk != null) {
try {
zk.create(membershipRoot + '/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (
KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void close() {
try {
zk.close();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}

public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
this.close();
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: ClusterClient ");
System.exit(0);
}
String hostPort = args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0, index));
new ClusterClient(hostPort, processId).run();
}
}

使用下面命令編譯這兩個類:

$ javac -cp $CLASSPATH ClusterMonitor.java
$ javac -cp $CLASSPATH ClusterClient.java

要執(zhí)行群集監(jiān)控模型,打開兩個終端。 在其中一個終端中,運行ClusterMonitor類。 在另一個終端中,通過在后臺運行ClusterClient類來執(zhí)行多個實例。

在第一個終端中,執(zhí)行ClusterMonitor類:

$ java -cp $CLASSPATH ClusterMonitorlocalhost:2181

如前面的示例所示,看到來自客戶端API的調(diào)試日志消息,最后,ClusterMonitor類開始監(jiān)視事件,輸入如下內(nèi)容:
ZooKeeper Java API編程的示例分析

現(xiàn)在,執(zhí)行ClusterClient類的五個實例來模擬一個集群的五個節(jié)點。ClusterClient在ZooKeeper樹的/Members路徑中使用自己的進程ID創(chuàng)建ephemeral znode:

$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[1] 4028
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[2] 4045
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[3] 4057
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[4] 4072
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
[5] 4084

與此相對應,將觀察到ClusterMonitor類檢測到這些新的ClusterClient類實例,因為它正在監(jiān)視ZooKeeper樹的/Members路徑上的事件。 這模擬了一個真正的集群中的節(jié)點加入事件。 可以在ClusterMonitor類的終端中看到輸出,這與下面的截圖中顯示的類似:

ZooKeeper Java API編程的示例分析

現(xiàn)在,如果殺死一個ClusterClient.java進程,那么它與ZooKeeper服務器一起維護的會話將被終止。因此,客戶端創(chuàng)建的ephemeral znode將被刪除。刪除將觸發(fā)NodeChildrenChanged事件,該事件將被ClusterMonitor類捕獲。該模擬在集群中一個節(jié)點離開的場景。

讓我們用ID 4084終止ClusterClient進程:

$ kill -9 4084

以下屏幕截圖顯示了ClusterMonitor類的終端中的輸出。 它列出了當前可用的進程及其進程ID,這些進程ID模擬了實時服務器:

ZooKeeper Java API編程的示例分析

上面的簡單而優(yōu)雅的集群監(jiān)控模型的示例實現(xiàn)展示了ZooKeeper的真正威力。 在沒有ZooKeeper的情況下,開發(fā)這樣一個能夠?qū)崟r監(jiān)控節(jié)點活躍度的模型將是一項真正的艱巨任務。

以上是“ZooKeeper Java API編程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


當前文章:ZooKeeperJavaAPI編程的示例分析
標題來源:http://weahome.cn/article/ipiihh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部