如何解析k8s中的Informer機(jī)制,相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
專注于為中小企業(yè)提供網(wǎng)站設(shè)計(jì)制作、網(wǎng)站建設(shè)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)鲅魚圈免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了1000多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
Informer機(jī)制架構(gòu)設(shè)計(jì)總覽
下面是我根據(jù)理解畫的一個數(shù)據(jù)流轉(zhuǎn)圖,從全局視角看一下數(shù)據(jù)的整體走向是怎么樣的。
其中虛線的表示的是代碼中的方法。
首先講一個結(jié)論:
通過Informer機(jī)制獲取數(shù)據(jù)的情況下,在初始化的時候會從Kubernetes API Server獲取對應(yīng)Resource的全部Object,后續(xù)只會通過Watch機(jī)制接收API Server推送過來的數(shù)據(jù),不會再主動從API Server拉取數(shù)據(jù),直接使用本地緩存中的數(shù)據(jù)以減少API Server的壓力。
Watch機(jī)制基于HTTP的Chunk實(shí)現(xiàn),維護(hù)一個長連接,這是一個優(yōu)化點(diǎn),減少請求的數(shù)據(jù)量。第二個優(yōu)化點(diǎn)是SharedInformer,它可以讓同一種資源使用的是同一個Informer,例如v1版本的Deployment和v1beta1版本的Deployment同時存在的時候,共享一個Informer。
上面圖中可以看到Informer分為三個部分,可以理解為三大邏輯。
其中Reflector主要是把從API Server數(shù)據(jù)獲取到的數(shù)據(jù)放到DeltaFIFO隊(duì)列中,充當(dāng)生產(chǎn)者角色。
SharedInformer主要是從DeltaFIFIO隊(duì)列中獲取數(shù)據(jù)并分發(fā)數(shù)據(jù),充當(dāng)消費(fèi)者角色。
最后Indexer是作為本地緩存的存儲組件存在。
Reflector理解
Reflector中主要看Run、ListAndWatch、watchHandler三個地方就足夠了。
源碼位置是 tools/cache/reflector.go
// Ruvn starts a watch and handles watch events. Will restart the watch if it is closed. // Run will exit when stopCh is closed. //開始時執(zhí)行Run,上一層調(diào)用的地方是 controller.go中的Run方法 func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.Until(func() { //啟動后執(zhí)行一次ListAndWatch if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) } ... // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { //這里是調(diào)用了各個資源中的ListFunc函數(shù),例如如果v1版本的Deployment //則調(diào)用的是informers/apps/v1/deployment.go中的ListFunc return r.listerWatcher.List(opts) })) if r.WatchListPageSize != 0 { pager.Pa1geSize = r.WatchListPageSize } // Pager falls back to full list if paginated list calls fail due to an "Expired">
數(shù)據(jù)的生產(chǎn)就結(jié)束了,就兩點(diǎn):
初始化時從API Server請求數(shù)據(jù)
監(jiān)聽后續(xù)從Watch推送來的數(shù)據(jù)
DeltaFIFO理解
先看一下數(shù)據(jù)結(jié)構(gòu):
type DeltaFIFO struct { ... items map[string]Deltas queue []string ... } type Delta struct { Type DeltaType Object interface{} } type Deltas []Delta type DeltaType string // Change type definition const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Sync DeltaType = "Sync" )
其中queue存儲的是Object的id,而items存儲的是以O(shè)bjectID為key的這個Object的事件列表,
可以想象到是這樣的一個數(shù)據(jù)結(jié)構(gòu),左邊是Key,右邊是一個數(shù)組對象,其中每個元素都是由type和obj組成.
DeltaFIFO顧名思義存放Delta數(shù)據(jù)的先入先出隊(duì)列,相當(dāng)于一個數(shù)據(jù)的中轉(zhuǎn)站,將數(shù)據(jù)從一個地方轉(zhuǎn)移另一個地方。
主要看的內(nèi)容是queueActionLocked、Pop、Resync
queueActionLocked方法:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { ... newDeltas := append(f.items[id], Delta{actionType, obj}) //去重處理 newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { ... //pop消息 f.cond.Broadcast() ... return nil }
Pop方法:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { //阻塞 直到調(diào)用了f.cond.Broadcast() f.cond.Wait() } //取出第一個元素 id := f.queue[0] f.queue = f.queue[1:] ... item, ok := f.items[id] ... delete(f.items, id) //這個process可以在controller.go中的processLoop()找到 //初始化是在shared_informer.go的Run //最終執(zhí)行到shared_informer.go的HandleDeltas方法 err := process(item) //如果處理出錯了重新放回隊(duì)列中 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } ... } }
Resync機(jī)制:
小總結(jié):每次從本地緩存Indexer中獲取數(shù)據(jù)重新放到DeltaFIFO中執(zhí)行任務(wù)邏輯。
啟動的Resync地方是reflector.go的resyncChan()方法,在reflector.go的ListAndWatch方法中的調(diào)用開始定時執(zhí)行。
go func() { //啟動定時任務(wù) resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } //定時執(zhí)行 調(diào)用會執(zhí)行到delta_fifo.go的Resync()方法 if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() func (f *DeltaFIFO) Resync() error { ... //從緩存中獲取到所有的key keys := f.knownObjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } return nil } func (f *DeltaFIFO) syncKeyLocked(key string) error { //獲緩存拿到對應(yīng)的Object obj, exists, err := f.knownObjects.GetByKey(key) ... //放入到隊(duì)列中執(zhí)行任務(wù)邏輯 if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil }
SharedInformer消費(fèi)消息理解
主要看HandleDeltas方法就好,消費(fèi)消息然后分發(fā)數(shù)據(jù)并且存儲數(shù)據(jù)到緩存的地方
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: ... //查一下是否在Indexer緩存中 如果在緩存中就更新緩存中的對象 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } //把數(shù)據(jù)分發(fā)到Listener s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { //沒有在Indexer緩存中 把對象插入到緩存中 if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } ... } } return nil }
Indexer理解
這塊不會講述太多內(nèi)容,因?yàn)槲艺J(rèn)為Informer機(jī)制最主要的還是前面數(shù)據(jù)的流轉(zhuǎn),當(dāng)然這并不代表數(shù)據(jù)存儲不重要,而是先理清楚整體的思路,后續(xù)再詳細(xì)更新存儲的部分。
Indexer使用的是threadsafe_store.go中的threadSafeMap存儲數(shù)據(jù),是一個線程安全并且?guī)в兴饕δ艿膍ap,數(shù)據(jù)只會存放在內(nèi)存中,每次涉及操作都會進(jìn)行加鎖。
// threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} indexers Indexers indices Indices }
Indexer還有一個索引相關(guān)的內(nèi)容就暫時不展開講述。
Example代碼
------------- package main import ( "flag" "fmt" "path/filepath" "time" v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) func main() { var err error var config *rest.Config var kubeconfig *string if home := homedir.HomeDir(); home != "">
以上示例代碼中程序啟動后會拉取一次Deployment數(shù)據(jù),并且拉取數(shù)據(jù)完成后從本地緩存中List一次default命名空間的Deployment資源并打印,然后每60秒Resync一次Deployment資源。
QA
為什么需要Resync?
在本周有同學(xué)提出一個,我看到這個問題后也感覺挺奇怪的,因?yàn)镽esync是從本地緩存的數(shù)據(jù)緩存到本地緩存(從開始到結(jié)束來說是這樣),為什么需要把數(shù)據(jù)拿出來又走一遍流程呢?當(dāng)時鉆牛角尖也是想不明白,后來換個角度想就知道了。
數(shù)據(jù)從API Server過來并且經(jīng)過處理后放到緩存中,但數(shù)據(jù)并不一定就可以正常處理,也就是說可能報(bào)錯了,而這個Resync相當(dāng)于一個重試的機(jī)制。
可以嘗試實(shí)踐一下: 部署有狀態(tài)服務(wù),存儲使用LocalPV(也可以換成自己熟悉的),這時候pod會由于存儲目錄不存在而啟動失敗. 然后在pod啟動失敗后再創(chuàng)建好對應(yīng)的目錄,過一會pod就啟動成功了。
看完上述內(nèi)容,你們掌握如何解析k8s中的Informer機(jī)制的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!