ZooKeeperServer,為所有服務(wù)器的父類。
QuorumZooKeeperServer,其是所有參與選舉的服務(wù)器的父類,是抽象類,其繼承了ZooKeeperServer類。
LeaderZooKeeperServer,Leader服務(wù)器,繼承了QuorumZooKeeperServer類,也會(huì)繼承ZooKeeperServer中的很多方法。
LearnerZooKeeper,其是Learner服務(wù)器的父類,為抽象類,也繼承了QuorumZooKeeperServer類。
FollowerZooKeeperServer,F(xiàn)ollower服務(wù)器,繼承了LearnerZooKeeper。
ObserverZooKeeperServer,Observer服務(wù)器,繼承了LearnerZooKeeper。
ReadOnlyZooKeeperServer,只讀服務(wù)器,不提供寫服務(wù),繼承QuorumZooKeeperServer。
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛(ài)。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名注冊(cè)、雅安服務(wù)器托管、營(yíng)銷軟件、網(wǎng)站建設(shè)、清苑網(wǎng)站維護(hù)、網(wǎng)站推廣。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}
ZooKeeperServer是ZooKeeper中所有服務(wù)器的父類,其實(shí)現(xiàn)了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定義了expire方法(表示會(huì)話過(guò)期)和getServerId方法(表示獲取服務(wù)器ID),而Provider則主要定義了獲取服務(wù)器某些數(shù)據(jù)的方法。
protected static final Logger LOG;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
Environment.logEnv("Server environment:", LOG);
}
//jmx服務(wù)
protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;
// 默認(rèn)心跳頻率
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// 最小會(huì)話過(guò)期時(shí)間
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;
// 最大會(huì)話過(guò)期時(shí)間
/** value of -1 indicates unset, use default */
protected int maxSessionTimeout = -1;
protected SessionTracker sessionTracker;
// 事務(wù)日志快照
private FileTxnSnapLog txnLogFactory = null;
// Zookeeper內(nèi)存數(shù)據(jù)庫(kù)
private ZKDatabase zkDb;
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
// 請(qǐng)求處理器
protected RequestProcessor firstProcessor;
protected volatile State state = State.INITIAL;
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR
}
/**
* This is the secret that we use to generate passwords. For the moment,
* it's more of a checksum that's used in reconnection, which carries no
* security weight, and is treated internally as if it carries no
* security weight.
*/
static final private long superSecret = 0XB3415C00L;
private final AtomicInteger requestsInProcess = new AtomicInteger(0);
// 未處理的ChangeRecord
final Deque outstandingChanges = new ArrayDeque<>();
// this data structure must be accessed under the outstandingChanges lock
// 記錄path對(duì)應(yīng)的ChangeRecord
final HashMap outstandingChangesForPath =
new HashMap();
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;
// 服務(wù)器統(tǒng)計(jì)數(shù)據(jù)
private final ServerStats serverStats;
private final ZooKeeperServerListener listener;
private ZooKeeperServerShutdownHandler zkShutdownHandler;
private volatile int createSessionTrackerServerId = 1;
該函數(shù)用于加載數(shù)據(jù),其首先會(huì)判斷內(nèi)存庫(kù)是否已經(jīng)加載設(shè)置zxid,之后會(huì)調(diào)用killSession函數(shù)刪除過(guò)期的會(huì)話
if(zkDb.isInitialized()){ // 內(nèi)存數(shù)據(jù)庫(kù)已被初始化
// 設(shè)置為最后處理的Zxid
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
// 未被初始化,則加載數(shù)據(jù)庫(kù)
setZxid(zkDb.loadDataBase());
}
// Clean up dead sessions
LinkedList deadSessions = new LinkedList();
for (Long session : zkDb.getSessions()) {// 遍歷所有的會(huì)話
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
for (long session : deadSessions) { // 刪除過(guò)期的會(huì)話
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
// Make a clean snapshot
//初始化一個(gè)快照
takeSnapshot();
提交請(qǐng)求,處理器進(jìn)行處理
public void submitRequest(Request si) {
if (firstProcessor == null) {// 第一個(gè)處理器為空
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
//服務(wù)器調(diào)用鏈還未初始化完成
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
// 是否為合法的請(qǐng)求
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
//調(diào)用鏈第一處理器開始處理
firstProcessor.proce***equest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().proce***equest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
// 提交請(qǐng)求處理器
CommitProcessor commitProcessor;
//處理鏈請(qǐng)求第一個(gè)處理處理器
PrepRequestProcessor prepRequestProcessor;
LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
}
直接調(diào)用父類QuorumZooKeeperServer的構(gòu)造函數(shù),然后再調(diào)用ZooKeeperServer的構(gòu)造函數(shù),逐級(jí)構(gòu)造。
@Override
protected void setupRequestProcessors() {
//創(chuàng)建FinalRequestProcessor,處理鏈最后一個(gè)處理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
//創(chuàng)建ToBeAppliedRequestProcessor
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
// 創(chuàng)建CommitProcessor,提交處理器
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
// 啟動(dòng)CommitProcessor
commitProcessor.start();
// 創(chuàng)建ProposalRequestProcessor
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
// 初始化ProposalProcessor
proposalProcessor.initialize();
//創(chuàng)建PrepRequestProcessor,作為以第一個(gè)處理鏈處理器
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
// firstProcessor為PrepRequestProcessor
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
setupRequestProcessors函數(shù)表示創(chuàng)建處理鏈,可以看到其處理鏈的順序?yàn)镻repRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。