生產(chǎn)-消費者隊列,用于多節(jié)點的分布式數(shù)據(jù)結構,生產(chǎn)和消費數(shù)據(jù)。生產(chǎn)者創(chuàng)建一個數(shù)據(jù)對象,并放到隊列中;消費者從隊列中取出一個數(shù)據(jù)對象并進行處理。在ZooKeeper中,隊列可以使用一個容器節(jié)點下創(chuàng)建多個子節(jié)點來實現(xiàn);創(chuàng)建子節(jié)點時,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper會自動在節(jié)點名稱后面添加唯一序列號。EPHEMERAL_SEQUENTIAL也有同樣的特點,區(qū)別在于會話結束后是否會自動刪除。
成都創(chuàng)新互聯(lián)公司2013年至今,先為青海等服務建站,青海等地企業(yè),進行企業(yè)商務咨詢服務。為青海企業(yè)網(wǎng)站制作PC+手機+微官網(wǎng)三網(wǎng)同步一站式服務解決您的所有建站問題。
敲小黑板:*_SEQUENTIAL是ZooKeeper的一個很重要的特性,分布式鎖、選舉制度都依靠這個特性實現(xiàn)的。
之前的文章,我們已經(jīng)用實現(xiàn)了Watcher和Barrier,創(chuàng)建ZooKeeper連接的代碼已經(jīng)復制了一遍。后續(xù)還需要類似的工作,因此先對原有代碼做一下重構,讓代碼味道干凈一點。
?
以下是 process(WatchedEvent)的代碼
final public void process(WatchedEvent event) {
if (Event.EventType.None.equals(event.getType())) {
// 連接狀態(tài)發(fā)生變化
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
// 連接建立成功
connectedSemaphore.countDown();
}
} else if (Event.EventType.NodeCreated.equals(event.getType())) {
processNodeCreated(event);
} else if (Event.EventType.NodeDeleted.equals(event.getType())) {
processNodeDeleted(event);
} else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
processNodeDataChanged(event);
} else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {
processNodeChildrenChanged(event);
}
}
以ZooKeeperBarrier為例,看看重構之后的構造函數(shù)和監(jiān)聽Event的代碼
ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)
throws IOException {
super(address);
this.tableSerial = createRootNode(tableSerial);
this.tableCapacity = tableCapacity;
this.customerName = customerName;
}
protected void processNodeChildrenChanged(WatchedEvent event) {
log.info("{} 接收到了通知 : {}", customerName, event.getType());
// 子節(jié)點有變化
synchronized (mutex) {
mutex.notify();
}
}
生產(chǎn)者的關鍵代碼
String elementName = queueName + "/element";
ArrayList ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;
CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;
getZooKeeper().create(elementName, value, ids, createMode);
注意,重點是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存儲直到有命令刪除,SEQUENTIAL表示自動在后面添加自增的唯一序列號。這樣,盡管elementName都一樣,但實際生成的zNode名字在 “element”后面會添加格式為%010d的10個數(shù)字,如0000000001。如一個完整的zNode名可能為/queue/element0000000021。
消費者嘗試從子節(jié)點列表獲取zNode名最小的一個子節(jié)點,如果隊列為空則等待NodeChildrenChanged事件。關鍵代碼
/** 隊列的同步信號 */
private static Integer queueMutex = Integer.valueOf(1);
@Override
protected void processNodeChildrenChanged(WatchedEvent event) {
synchronized (queueMutex) {
queueMutex.notify();
}
}
/**
* 從隊列中刪除第一個對象
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException {
while (true) {
synchronized (queueMutex) {
List list = getZooKeeper().getChildren(queueName, true);
if (list.size() == 0) {
queueMutex.wait();
} else {
// 獲取第一個子節(jié)點的名稱
String firstNodeName = getFirstElementName(list);
// 刪除節(jié)點,并返回節(jié)點的值
return deleteNodeAndReturnValue(firstNodeName);
}
}
}
}
把測試結果放源碼前面,免得大家被無聊的代碼晃暈。
測試代碼創(chuàng)建了兩個線程,一個線程是生產(chǎn)者,按隨機間隔往隊列中添加對象;一個線程是消費者,隨機間隔嘗試從隊列中取出第一個,如果當時隊列為空,會等到直到新的數(shù)據(jù)。
兩個進程都加上隨機間隔,是為了模擬生產(chǎn)可能比消費更快的情況。以下是測試日志,為了更突出,生產(chǎn)和消費的日志我增加了不同的文字樣式。
49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 開始ZooKeeper隊列測試,本次將測試 10 個數(shù)據(jù)
49:48.076 [DEBUG] ZooKeeperQueue.log(201)
+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]
|-- elapsed time [開始鏈接] 119.863 milliseconds.
|-- elapsed time [等待連接成功的Event] 40.039 milliseconds.
|-- Total [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper] 159.911 milliseconds.
49:48.082 [DEBUG] ZooKeeperQueue.log(201)
+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]
|-- elapsed time [開始鏈接] 103.795 milliseconds.
|-- elapsed time [等待連接成功的Event] 65.899 milliseconds.
|-- Total [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper] 170.263 milliseconds.
49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 1 , 然后等待 1700 毫秒
49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 1 , 然后等待 4000 毫秒
49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 2 , 然后等待 900 毫秒
49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 3 , 然后等待 1300 毫秒
49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 4 , 然后等待 3700 毫秒
49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 2 , 然后等待 2800 毫秒
49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 3 , 然后等待 4500 毫秒
49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 5 , 然后等待 3500 毫秒
49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 6 , 然后等待 4200 毫秒
49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 4 , 然后等待 2400 毫秒
50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 5 , 然后等待 4900 毫秒
50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 7 , 然后等待 4500 毫秒
50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 6 , 然后等待 3600 毫秒
50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 8 , 然后等待 1900 毫秒
50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 9 , 然后等待 3200 毫秒
50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 7 , 然后等待 2900 毫秒
50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生產(chǎn)對象 : 10 , 然后等待 4900 毫秒
50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 8 , 然后等待 300 毫秒
50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 9 , 然后等待 4800 毫秒
50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 10 , 然后等待 2400 毫秒