k8s為實(shí)現(xiàn)容器探活worker的管理構(gòu)建了一個(gè)Manager組件,該組件負(fù)責(zé)底層探活worker的管理,并且緩存當(dāng)前的容器的狀態(tài),并對(duì)外同步容器的當(dāng)前狀態(tài),今天我們就來(lái)分析下其部分核心組件
創(chuàng)新互聯(lián)成立十多年來(lái),這條路我們正越走越好,積累了技術(shù)與客戶資源,形成了良好的口碑。為客戶提供成都網(wǎng)站建設(shè)、成都做網(wǎng)站、網(wǎng)站策劃、網(wǎng)頁(yè)設(shè)計(jì)、主機(jī)域名、網(wǎng)絡(luò)營(yíng)銷、VI設(shè)計(jì)、網(wǎng)站改版、漏洞修補(bǔ)等服務(wù)。網(wǎng)站是否美觀、功能強(qiáng)大、用戶體驗(yàn)好、性價(jià)比高、打開快等等,這些對(duì)于網(wǎng)站建設(shè)都非常重要,創(chuàng)新互聯(lián)通過對(duì)建站技術(shù)性的掌握、對(duì)創(chuàng)意設(shè)計(jì)的研究為客戶提供一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶,共同發(fā)展進(jìn)步。
Manager緩存的狀態(tài)主要是會(huì)被kubelet、狀態(tài)組件消費(fèi),并且在Pod同步狀態(tài)的時(shí)候,會(huì)通過當(dāng)前Manager里面的探測(cè)狀態(tài)來(lái)更新Pod的容器的就緒與啟動(dòng)狀態(tài)的更新,讓我們一起看看Manager自身的一些關(guān)鍵實(shí)現(xiàn)吧
即prober/results/results_manager組件,其主要作用是:存儲(chǔ)探測(cè)結(jié)果和通知探測(cè)結(jié)果
cache負(fù)責(zé)容器的探測(cè)結(jié)果的保存,updates則負(fù)責(zé)對(duì)外更新狀態(tài)的訂閱,其通過新的結(jié)果和cache中的狀態(tài)進(jìn)行對(duì)比,從而決定是否對(duì)外通知
// Manager implementation.
type manager struct {
// 保護(hù)cache
sync.RWMutex
// 容器ID->探測(cè)結(jié)果
cache map[kubecontainer.ContainerID]Result
// 更新管道
updates chan Update
}
更新緩存的時(shí)候回通過對(duì)比前后狀態(tài)來(lái)進(jìn)行是否發(fā)布變更事件,從而通知到外部訂閱容器變更的kubelet核心流程
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *v1.Pod) {
// 修改內(nèi)部狀態(tài)
if m.setInternal(id, result) {
// 同步更新事件
m.updates <- Update{id, result, pod.UID}
}
}
內(nèi)部狀態(tài)修改與判斷是否進(jìn)行同步實(shí)現(xiàn)
// 如果之前的緩存不存在,或者前后狀態(tài)不一致則會(huì)返回true觸發(fā)更新
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
m.Lock()
defer m.Unlock()
prev, exists := m.cache[id]
if !exists || prev != result {
m.cache[id] = result
return true
}
return false
}
func (m *manager) Updates() <-chan Update {
return m.updates
}
探測(cè)管理器是指的prober/prober)manager的Manager組件,其負(fù)責(zé)當(dāng)前kubelet上面探活組件的管理,并且進(jìn)行探測(cè)狀態(tài)結(jié)果的緩存與同步,并且內(nèi)部還通過statusManager來(lái)進(jìn)行apiserver狀態(tài)的同步
每個(gè)探測(cè)Key包含要探測(cè)的目標(biāo)信息:pod的ID、容器名、探測(cè)類型
type probeKey struct {
podUID types.UID
containerName string
probeType probeType
}
statusManager組件在后續(xù)章節(jié)里面會(huì)進(jìn)行詳細(xì)分析,說(shuō)下livenessManager該組件即探活的結(jié)果,所以當(dāng)一個(gè)容器探測(cè)失敗,則會(huì)由kubelet本地先進(jìn)行處理,而readlinessManager和startupManager則需要通過statusManager同步給apiserver進(jìn)行同步
type manager struct {
//探測(cè)Key與worker映射
workers map[probeKey]*worker
// 讀寫鎖
workerLock sync.RWMutex
//statusManager緩存為探測(cè)提供pod IP和容器id。
statusManager status.Manager
// 存儲(chǔ)readiness探測(cè)結(jié)果
readinessManager results.Manager
// 存儲(chǔ)liveness探測(cè)結(jié)果
livenessManager results.Manager
// 存儲(chǔ)startup探測(cè)結(jié)果
startupManager results.Manager
// 執(zhí)行探測(cè)操作
prober *prober
}
func (m *manager) updateStartup() {
// 從管道獲取數(shù)據(jù)進(jìn)行同步
update := <-m.startupManager.Updates()
started := update.Result == results.Success
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}
添加 Pod的時(shí)候會(huì)遍歷Pod的所有容器,并根據(jù)探測(cè)類型來(lái)進(jìn)行對(duì)應(yīng)探測(cè)worker的構(gòu)建
func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
// 針對(duì)startupProbe的探測(cè)任務(wù)的構(gòu)建
if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
key.probeType = startup
if _, ok := m.workers[key]; ok {
klog.Errorf("Startup probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
// 構(gòu)建新的worker
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}
// 針對(duì)ReadinessProbe的探測(cè)任務(wù)的構(gòu)建
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
klog.Errorf("Readiness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
// 針對(duì)LivenessProbe的探測(cè)任務(wù)的構(gòu)建
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
klog.Errorf("Liveness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
更新Pod狀態(tài)主要是根據(jù)當(dāng)前Manager里面緩存的之前的狀態(tài)信息來(lái)更新Pod里面對(duì)應(yīng)容器的狀態(tài),這些狀態(tài)是Pod里面容器最新的探測(cè)狀態(tài),獲取這些狀態(tài)則是檢測(cè)當(dāng)前的容器是否已經(jīng)就緒和啟動(dòng),為后續(xù)更新流程做基礎(chǔ)數(shù)據(jù)
for i, c := range podStatus.ContainerStatuses {
var ready bool
// 檢測(cè)容器狀態(tài)
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
// 檢測(cè)readinessMnager里面的狀態(tài),如果是成功則就是已經(jīng)就緒
ready = result == results.Success
} else {
// 檢查是否有尚未運(yùn)行的探測(cè)器。只要存在則認(rèn)為就緒
_, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists
}
podStatus.ContainerStatuses[i].Ready = ready
var started bool
if c.State.Running == nil {
started = false
} else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
// 容器正在運(yùn)行,如果StartupProbe功能被禁用,則假定它已啟動(dòng)
started = true
} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
// 如果startupManager里面的狀態(tài)是成功的則認(rèn)為是已經(jīng)啟動(dòng)的
started = result == results.Success
} else {
// 檢查是否有尚未運(yùn)行的探測(cè)器。
_, exists := m.getWorker(podUID, c.Name, startup)
started = !exists
}
podStatus.ContainerStatuses[i].Started = &started
}
針對(duì)初始化容器主要容器已經(jīng)終止并且退出的狀態(tài)碼為0,則認(rèn)為初始化容器已經(jīng)就緒
for i, c := range podStatus.InitContainerStatuses {
var ready bool
if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
// 容器狀態(tài)
ready = true
}
podStatus.InitContainerStatuses[i].Ready = ready
}
存活狀態(tài)通知主要是在kubelet的核心流程循環(huán)中進(jìn)行的,如果檢測(cè)到容器的狀態(tài)失敗,會(huì)立刻進(jìn)行對(duì)應(yīng)pod的容器狀態(tài)的同步,從而決定下一步的操作是做什么
case update := <-kl.livenessManager.Updates():
// 如果探測(cè)狀態(tài)失敗
if update.Result == proberesults.Failure {
// 省略代碼
handler.HandlePodSyncs([]*v1.Pod{pod})
}
探活整體的設(shè)計(jì)大概就是這樣,接下來(lái)會(huì)分期其statusManager組件,即將將探測(cè)的狀態(tài)與apiserver的同步的實(shí)現(xiàn), k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3