Zookeeper 使用 Zookeeper Atomic Broadcast (ZAB) 協(xié)議來保障分布式數(shù)據(jù)一致性。
在莊河等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供做網(wǎng)站、網(wǎng)站制作 網(wǎng)站設(shè)計(jì)制作按需網(wǎng)站制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),成都全網(wǎng)營銷,成都外貿(mào)網(wǎng)站建設(shè)公司,莊河網(wǎng)站建設(shè)費(fèi)用合理。ZAB是一種支持崩潰恢復(fù)的消息廣播協(xié)議,采用類似2PC的廣播模式保證正常運(yùn)行時(shí)性能,并使用基于 Paxos 的策略保證崩潰恢復(fù)時(shí)的一致性。
有的Follower服務(wù)器分發(fā)Commit消息,要求其將前一個(gè)Proposal進(jìn)行提交。
ZAB一些包括兩種基本的模式:崩潰恢復(fù)和消息廣播。
1、當(dāng)整個(gè)服務(wù)框架啟動(dòng)過程中或Leader服務(wù)器出現(xiàn)網(wǎng)絡(luò)中斷、崩潰退出與重啟等異常情況時(shí),ZAB協(xié)議就會(huì)進(jìn)入恢復(fù)模式并選舉產(chǎn)生新的Leader服務(wù)器。當(dāng)選舉產(chǎn)生了新的Leader服務(wù)器,同時(shí)集群中已經(jīng)有過半的機(jī)器與該Leader服務(wù)器完成了狀態(tài)同步之后,ZAB協(xié)議就會(huì)退出恢復(fù)模式,狀態(tài)同步是指數(shù)據(jù)同步,用來保證集群在過半的機(jī)器能夠和Leader服務(wù)器的數(shù)據(jù)狀態(tài)保持一致。
2、當(dāng)集群中已經(jīng)有過半的Follower服務(wù)器完成了和Leader服務(wù)器的狀態(tài)同步,那么整個(gè)服務(wù)框架就可以進(jìn)入消息廣播模式,當(dāng)一臺同樣遵守ZAB協(xié)議的服務(wù)器啟動(dòng)后加入到集群中,如果此時(shí)集群中已經(jīng)存在一個(gè)Leader服務(wù)器在負(fù)責(zé)進(jìn)行消息廣播,那么加入的服務(wù)器就會(huì)自覺地進(jìn)入數(shù)據(jù)恢復(fù)模式:找到Leader所在的服務(wù)器,并與其進(jìn)行數(shù)據(jù)同步,然后一起參與到消息廣播流程中去。Zookeeper只允許唯一的一個(gè)Leader服務(wù)器來進(jìn)行事務(wù)請求的處理,Leader服務(wù)器在接收到客戶端的事務(wù)請求后,會(huì)生成對應(yīng)的事務(wù)提議并發(fā)起一輪廣播協(xié)議,而如果集群中的其他機(jī)器收到客戶端的事務(wù)請求后,那么這些非Leader服務(wù)器會(huì)首先將這個(gè)事務(wù)請求轉(zhuǎn)發(fā)給Leader服務(wù)器。
3、當(dāng)Leader服務(wù)器出現(xiàn)崩潰或者機(jī)器重啟、集群中已經(jīng)不存在過半的服務(wù)器與Leader服務(wù)器保持正常通信時(shí),那么在重新開始新的一輪的原子廣播事務(wù)操作之前,所有進(jìn)程首先會(huì)使用崩潰恢復(fù)協(xié)議來使彼此到達(dá)一致狀態(tài),于是整個(gè)ZAB流程就會(huì)從消息廣播模式進(jìn)入到崩潰恢復(fù)模式。一個(gè)機(jī)器要成為新的Leader,必須獲得過半機(jī)器的支持,同時(shí)由于每個(gè)機(jī)器都有可能會(huì)崩潰,因此,ZAB協(xié)議運(yùn)行過程中,前后會(huì)出現(xiàn)多個(gè)Leader,并且每臺機(jī)器也有可能會(huì)多次成為Leader,進(jìn)入崩潰恢復(fù)模式后,只要集群中存在過半的服務(wù)器能夠彼此進(jìn)行正常通信,那么就可以產(chǎn)生一個(gè)新的Leader并再次進(jìn)入消息廣播模式。如一個(gè)由三臺機(jī)器組成的ZAB服務(wù),通常由一個(gè)Leader、2個(gè)Follower服務(wù)器組成,某一個(gè)時(shí)刻,加入其中一個(gè)Follower掛了,整個(gè)ZAB集群是不會(huì)中斷服務(wù)的。
ZAB協(xié)議中節(jié)點(diǎn)存在四種狀態(tài):
Leading: 當(dāng)前節(jié)點(diǎn)為集群 Leader,負(fù)責(zé)協(xié)調(diào)事務(wù)
Following: 當(dāng)前節(jié)點(diǎn)為 Follower 在 Leader 協(xié)調(diào)下執(zhí)行事務(wù)
Looking: 集群沒有正在運(yùn)行的 Leader, 正處于選舉過程
Observing: 節(jié)點(diǎn)跟隨 Leader 保存系統(tǒng)最新的狀態(tài)提供讀服務(wù),但不參與選舉和事務(wù)投票
Zab協(xié)議消息廣播有以下4個(gè)步驟組成:
- Leader發(fā)送PROPOSAL給集群中所有的節(jié)點(diǎn)。
- 節(jié)點(diǎn)在收到PROPOSAL之后,把PROPOSAL落盤,發(fā)送一個(gè)ACK給Leader。
- Leader在收到大多數(shù)節(jié)點(diǎn)的ACK之后,發(fā)送COMMIT給集群中所有的Follower節(jié)點(diǎn)。
- 如果存在Observer節(jié)點(diǎn),Leader同時(shí)發(fā)送INFORM信息給Observer服務(wù)節(jié)點(diǎn)同步數(shù)據(jù),Observer只接收Leader的INFORM消息同步數(shù)據(jù),不參與Leader選舉和事務(wù)提交。
在Leader服務(wù)器出現(xiàn)崩潰,或者由于網(wǎng)絡(luò)原因?qū)е翷eader服務(wù)器失去了與過半Follower的聯(lián)系,那么就會(huì)進(jìn)入崩潰恢復(fù)模式,在ZAB協(xié)議中,為了保證程序的正確運(yùn)行,整個(gè)恢復(fù)過程結(jié)束后需要選舉出一個(gè)新的Leader服務(wù)器,因此,ZAB協(xié)議需要一個(gè)高效且可靠的Leader選舉算法,從而保證能夠快速地選舉出新的Leader,同時(shí),Leader選舉算法不僅僅需要讓Leader自身知道已經(jīng)被選舉為Leader,同時(shí)還需要讓集群中的所有其他機(jī)器也能夠快速地感知到選舉產(chǎn)生的新的Leader服務(wù)器。
ZAB協(xié)議的基本原則
假設(shè)一個(gè)事務(wù)在Leader服務(wù)器上被提交了,并且已經(jīng)得到了過半Follower服務(wù)器的Ack反饋,但是在它Commit消息發(fā)送給所有Follower機(jī)器之前,Leader服務(wù)掛了。如下圖所示:
在集群正常運(yùn)行過程中的某一個(gè)時(shí)刻,Server1是Leader服務(wù)器,其先后廣播了P1、P2、C1、P3、C2(C2是Commit Of Proposal2的縮寫),其中,當(dāng)Leader服務(wù)器發(fā)出C2后就立即崩潰退出了,針對這種情況,ZAB協(xié)議就需要確保事務(wù)Proposal2最終能夠在所有的服務(wù)器上都被提交成功,否則將出現(xiàn)不一致。
如果在崩潰恢復(fù)過程中出現(xiàn)一個(gè)需要被丟棄的提議,那么在崩潰恢復(fù)結(jié)束后需要跳過該事務(wù)Proposal,如下圖所示:
假設(shè)初始的Leader服務(wù)器Server1在提出一個(gè)事務(wù)Proposal3之后就崩潰退出了,從而導(dǎo)致集群中的其他服務(wù)器都沒有收到這個(gè)事務(wù)Proposal,于是,當(dāng)Server1恢復(fù)過來再次加入到集群中的時(shí)候,ZAB協(xié)議需要確保丟棄Proposal3這個(gè)事務(wù)。
能夠確保提交已經(jīng)被Leader提交的事務(wù)的Proposal,同時(shí)丟棄已經(jīng)被跳過的事務(wù)Proposal。如果讓Leader選舉算法能夠保證新選舉出來的Leader服務(wù)器擁有集群中所有機(jī)器最高編號(ZXID大)的事務(wù)Proposal,那么就可以保證這個(gè)新選舉出來的Leader一定具有所有已經(jīng)提交的提議,更為重要的是如果讓具有最高編號事務(wù)的Proposal機(jī)器稱為Leader,就可以省去Leader服務(wù)器查詢Proposal的提交和丟棄工作這一步驟了。
完成Leader選舉后,在正式開始工作前,Leader服務(wù)器首先會(huì)確認(rèn)日志中的所有Proposal是否都已經(jīng)被集群中的過半機(jī)器提交了,即是否完成了數(shù)據(jù)同步。Leader服務(wù)器需要確所有的Follower服務(wù)器都能夠接收到每一條事務(wù)Proposal,并且能夠正確地將所有已經(jīng)提交了的事務(wù)Proposal應(yīng)用到內(nèi)存數(shù)據(jù)庫中。Leader服務(wù)器會(huì)為每個(gè)Follower服務(wù)器維護(hù)一個(gè)隊(duì)列,并將那些沒有被各Follower服務(wù)器同步的事務(wù)以Proposal消息的形式逐個(gè)發(fā)送給Follower服務(wù)器,并在每一個(gè)Proposal消息后面緊接著再發(fā)送一個(gè)Commit消息,以表示該事務(wù)已經(jīng)被提交,等到Follower服務(wù)器將所有其尚未同步的事務(wù)Proposal都從Leader服務(wù)器上同步過來并成功應(yīng)用到本地?cái)?shù)據(jù)庫后,Leader服務(wù)器就會(huì)將該Follower服務(wù)器加入到真正的可用Follower列表并開始之后的其他流程。
1、 發(fā)現(xiàn),選舉產(chǎn)生Leader,產(chǎn)生最新的epoch(每次選舉產(chǎn)生新Leader的同時(shí)產(chǎn)生新epoch)。
2、 同步,各Follower和Leader完成數(shù)據(jù)同步。
3、廣播,Leader處理客戶端的寫操作,并將狀態(tài)變更廣播至Follower,F(xiàn)ollower多數(shù)通過之后Leader發(fā)起將狀態(tài)變更落地Commit。
在正常運(yùn)行過程中,ZAB協(xié)議會(huì)一直運(yùn)行于階段三來反復(fù)進(jìn)行消息廣播流程,如果出現(xiàn)崩潰或其他原因?qū)е翷eader缺失,那么此時(shí)ZAB協(xié)議會(huì)再次進(jìn)入發(fā)現(xiàn)階段,選舉新的Leader。
ProposalRequestProcessor.proce***equest()方法發(fā)送PROPOSAL 給每一個(gè)節(jié)點(diǎn)。它調(diào)用Leader.propose()方法把PROPOSAL
入隊(duì)到各個(gè)follower的queuedPackets,然后直接把PROPOSAL提交給leader節(jié)點(diǎn)自己的SyncRequestProcessor 。
以下是大概的代碼路徑:
ProposalRequestProcessor.proce***equest(request)
zks.getLeader().propose(request)
sendPacket(pp)
for f in forwardingFollowers
f.queuePacket(qp)
queuedPackets.add(p)
syncProcessor.proce***equest(request)
SyncRequestProcessor先處理
SyncRequestProcessor.run()
zks.getZKDatabase().append(si)
flush(toFlush)
zks.getZKDatabase().commit()
while (!toFlush.isEmpty())
Request i = toFlush.remove()
if (nextProcessor != null)
nextProcessor.proce***equest(i)
然后是Leader的ACK處理器處理,返回給Leader自己ACK結(jié)果
AckRequestProcessor.proce***equest()
proce***equest()
leader.processAck(self.getId(), request.zxid, null)
Follower. followLeader()方法處理接收到的QuorumPacket, case Leader.PROPOSAL分支處理的就是PROPOSAL。
Follower.followLeader()
loop
readPacket(qp)
leaderIs.readRecord(pp, "packet")
processPacket(qp)
case Leader.PROPOSAL
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr)
fzk.logRequest(hdr, txn)
syncProcessor.proce***equest(request)
case Leader.COMMIT:
fzk.commit(qp.getZxid())
commitProcessor.commit(request)
SyncRequestProcessor的處理邏輯
SyncRequestProcessor.run()
zks.getZKDatabase().append(si)
flush(toFlush)
zks.getZKDatabase().commit()
while (!toFlush.isEmpty())
Request i = toFlush.remove()
if (nextProcessor != null)
nextProcessor.proce***equest(i)
QuorumPacket qp = new QuorumPacket(Leader.ACK)
learner.writePacket(qp, false)
leaderOs.writeRecord(pp, "packet")
((Flushable)nextProcessor).flush()
learner.writePacket(null, true)
bufferedOutput.flush()
Leader的processAck()處理ACK消息,如果收到大多數(shù)節(jié)點(diǎn)的ACK,發(fā)送COMMIT給所有的follower節(jié)點(diǎn),并調(diào)用leader自己 的CommitProcessor。 processAck()有兩個(gè)調(diào)用入口:1. LeaderHandler的run()方法處理來自follower的ACK。2. AckRequestProcessor的proce***equest方法處理leader自己的ACK。
Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress())
Proposal p = outstandingProposals.get(zxid)
p.addAck(sid)
tryToCommit(p, zxid, followerAddr)
if !p.hasAllQuorums()
return false;
// Commit on all followers
commit(zxid)
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null)
sendPacket(qp)
// Commit on Leader
zk.commitProcessor.commit(p.request)
CommitProcessor.run()
request = queuedRequests.poll()
processCommitted()
sendToNextProcessor(pending)
已經(jīng)提交的請求,交給ToBeAppliedRequestProcessor準(zhǔn)備應(yīng)用到內(nèi)存數(shù)據(jù)庫
ToBeAppliedRequestProcessor.proce***equest()
next.proce***equest(request)
最后交給FinalRequestProcessor,返回響應(yīng)結(jié)果
CommitProcessor.run()
request = queuedRequests.poll()
processCommitted()
sendToNextProcessor(pending)
//返回響應(yīng)結(jié)果
FinalRequestProcessor.proce***equest()
創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開啟,新人活動(dòng)云服務(wù)器買多久送多久。