對(duì)于請(qǐng)求處理鏈而言,所有請(qǐng)求處理器的父接口為RequestProcessor。
創(chuàng)新互聯(lián)是工信部頒發(fā)資質(zhì)IDC服務(wù)器商,為用戶提供優(yōu)質(zhì)的服務(wù)器托管雅安服務(wù)RequestProcessor內(nèi)部類RequestProcessorException,用來表示處理過程中的出現(xiàn)的異常,而proceequest和shutdown方法則是核心方法,是子類必須要實(shí)現(xiàn)的方法,處理的主要邏輯在proceequest中,通過proce***equest方法可以將請(qǐng)求傳遞到下個(gè)處理器。而shutdown表示關(guān)閉處理器,其意味著該處理器要關(guān)閉和其他處理器的連接。
public interface RequestProcessor {
@SuppressWarnings("serial")
public static class RequestProcessorException extends Exception {
public RequestProcessorException(String msg, Throwable t) {
super(msg, t);
}
}
void proce***equest(Request request) throws RequestProcessorException;
void shutdown();
}
實(shí)現(xiàn)RequestProcessor的processor有很多,PrepRequestProcessor,通常是請(qǐng)求處理鏈的第一個(gè)處理器。
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {}
PrepRequestProcessor繼承了ZooKeeperCriticalThread類并實(shí)現(xiàn)了RequestProcessor接口,表示其可以作為線程使用。
//已提交的請(qǐng)求隊(duì)列
LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue();
//下一個(gè)處理器
private final RequestProcessor nextProcessor;
// zk服務(wù)器
ZooKeeperServer zks;
while (true) {
//從隊(duì)列獲取請(qǐng)求
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
//requestOfDeath類型的請(qǐng)求,代表當(dāng)前處理器已經(jīng)關(guān)閉,不再處理請(qǐng)求。
if (Request.requestOfDeath == request) {
break;
}
//調(diào)用關(guān)鍵函數(shù)
pRequest(request);
}
pRequest會(huì)確定請(qǐng)求類型,并根據(jù)請(qǐng)求類型不同生成不同的請(qǐng)求對(duì)象,我們以創(chuàng)建節(jié)點(diǎn)為例子分析
//設(shè)置消息頭和事務(wù)為空
request.setHdr(null);
request.setTxn(null);
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
//創(chuàng)建節(jié)點(diǎn)請(qǐng)求
CreateRequest create2Request = new CreateRequest();
//處理請(qǐng)求
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
//省略其他代碼
//給請(qǐng)求的zxid賦值
request.zxid = zks.getZxid();
//交給下一個(gè)處理器繼續(xù)處理
nextProcessor.proce***equest(request);
pRequest2Txn函數(shù)是實(shí)際的處理請(qǐng)求的函數(shù),對(duì)于創(chuàng)建方法會(huì)調(diào)用pRequest2TxnCreate函數(shù)
//設(shè)置請(qǐng)求頭
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record, deserialize);
break;
}
pRequest2TxnCreate方法如下:
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
if (deserialize) {
//反序列化,將ByteBuffer轉(zhuǎn)化為Record
ByteBufferInputStream.byteBuffer2Record(request.request, record);
}
int flags;
String path;
List acl;
byte[] data;
long ttl;
if (type == OpCode.createTTL) {
CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
} else {
//轉(zhuǎn)換createRequest對(duì)象
CreateRequest createRequest = (CreateRequest)record;
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
}
CreateMode createMode = CreateMode.fromFlag(flags);
validateCreateRequest(path, createMode, request, ttl);
//獲取父節(jié)點(diǎn)路徑
String parentPath = validatePathForCreate(path, request.sessionId);
List listACL = fixupACL(path, request.authInfo, acl);
//獲取父節(jié)點(diǎn)的record
ChangeRecord parentRecord = getRecordForPath(parentPath);
//檢查ACL列表
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
//是否創(chuàng)建順序節(jié)點(diǎn)
if (createMode.isSequential()) {
//子路徑后追加一串?dāng)?shù)字,順序的
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
//父節(jié)點(diǎn)不能是臨時(shí)節(jié)點(diǎn)
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
//新的子節(jié)點(diǎn)版本號(hào)
int newCversion = parentRecord.stat.getCversion()+1;
//新生事務(wù)
if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
} else if (type == OpCode.createTTL) {
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
} else {
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
newCversion));
}
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) { //是否臨時(shí)節(jié)點(diǎn)
s.setEphemeralOwner(request.sessionId);
}
//拷貝
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
//子節(jié)點(diǎn)數(shù)量+1
parentRecord.childCount++;
//設(shè)置新版本號(hào)
parentRecord.stat.setCversion(newCversion);
//將parentRecord添加至outstandingChanges和outstandingChangesForPath中
addChangeRecord(parentRecord);
// 將新生成的ChangeRecord(包含了StatPersisted信息)添加至outstandingChanges和outstandingChangesForPath中
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
}
addChangeRecord函數(shù)將ChangeRecord添加至ZooKeeperServer的outstandingChanges和outstandingChangesForPath中。
private void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}
outstandingChanges 位于ZooKeeperServer 中,用于存放剛進(jìn)行更改還沒有同步到ZKDatabase中的節(jié)點(diǎn)信息。
znode節(jié)點(diǎn)會(huì)由于用戶的讀寫操作頻繁發(fā)生變化,為了提升數(shù)據(jù)的訪問效率,ZooKeeper中有一個(gè)三層的數(shù)據(jù)緩沖層用于存放節(jié)點(diǎn)數(shù)據(jù)。
outstandingChanges->ZKDatabase->FileSnap+FileTxnLog
創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開啟,新人活動(dòng)云服務(wù)器買多久送多久。