本篇內(nèi)容主要講解“Kubernetes Replication Controller的結(jié)構(gòu)定義是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Kubernetes Replication Controller的結(jié)構(gòu)定義是什么”吧!
公司主營業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。成都創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來驚喜。成都創(chuàng)新互聯(lián)推出五河免費(fèi)做網(wǎng)站回饋大家。
ReplicationManager就是ReplicationController控制器對(duì)象,方便在代碼中和ReplicationController Resource API Object進(jìn)行區(qū)分。下面代碼是ReplicationManager的結(jié)構(gòu)定義。
pkg/controller/replication/replication_controller.go:75 // ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods. type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface // internalPodInformer is used to hold a personal informer. If we're using // a normal shared informer, then the informer will be started for us. If // we have a personal informer, we must start it ourselves. If you start // the controller using NewReplicationManager(passing SharedInformer), this // will be null internalPodInformer cache.SharedIndexInformer // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see. expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister // Watches changes to all replication controllers rcController *cache.Controller // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods podController cache.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool lookupCache *controller.MatchingCache // Controllers that need to be synced queue workqueue.RateLimitingInterface // garbageCollectorEnabled denotes if the garbage collector is enabled. RC // manager behaves differently if GC is enabled. garbageCollectorEnabled bool }
重點(diǎn)對(duì)下面?zhèn)€幾個(gè)對(duì)象介紹說明:
podControl: 提供Create/Delete Pod的操作接口。
burstReplicas: 每次批量Create/Delete Pods時(shí)允許并發(fā)的最大數(shù)量。
syncHandler: 真正執(zhí)行Replica Sync的函數(shù)。
expectation: 維護(hù)的期望狀態(tài)下的Pod的Uid Cache,并且提供了修正該Cache的接口。
rcStore: ReplicationController Resource對(duì)象的Indexer,數(shù)據(jù)由rcController提供和維護(hù)。
rcController: 用來watch 所有 ReplicationController Resource,watch到的change更新到rcStore中。
podStore: Pod的Indexer,數(shù)據(jù)由podController提供和維護(hù)。
podController: 用來watch所有Pod Resource,watch到的change更新到podStore中。
queue: 用來存放待sync的RC,是一個(gè)RateLimit類型的queue。
lookupCache: 提供Pod和RC匹配信息的cache,以提高查詢效率。
看過我我的博文: Kubernetes ResourceQuota Controller內(nèi)部實(shí)現(xiàn)原理及源碼分析的可能有印象,里面也提到了controller manager是如何啟動(dòng)ResourceQuotaController的,ReplicationController也是一樣的。在kube-controller-manager調(diào)用newControllerInitializers進(jìn)行控制器初始化的時(shí)候,將startReplicationController注冊(cè)進(jìn)去了,用來啟動(dòng)ReplicationController控制器。
cmd/kube-controller-manager/app/controllermanager.go:224 func newControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefuleset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["certificatesigningrequests"] = startCSRController return controllers }
代碼繼續(xù)跟到startReplicationController,很簡(jiǎn)單,啟動(dòng)一個(gè)goroutine,調(diào)用replicationcontroller.NewReplicationManager創(chuàng)建一個(gè)ReplicationManager并執(zhí)行其中Run方法開始工作。
cmd/kube-controller-manager/app/core.go:55 func startReplicationController(ctx ControllerContext) (bool, error) { go replicationcontroller.NewReplicationManager( ctx.InformerFactory.Pods().Informer(), ctx.ClientBuilder.ClientOrDie("replication-controller"), ResyncPeriod(&ctx.Options), replicationcontroller.BurstReplicas, int(ctx.Options.LookupCacheSizeForRC), ctx.Options.EnableGarbageCollector, ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil }
上面分析到,controller-manager通過NewReplicationManager創(chuàng)建一個(gè)ReplicationManager對(duì)象,其實(shí)就是ReplicationController控制器。
pkg/controller/replication/replication_controller.go:122 // NewReplicationManager creates a replication manager func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) return newReplicationManager( eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}), podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) } pkg/controller/replication/replication_controller.go:132 // newReplicationManager configures a replication manager with the specified event recorder func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } rm := &ReplicationManager{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventRecorder, }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"), garbageCollectorEnabled: garbageCollectorEnabled, } rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) }, }, &v1.ReplicationController{}, // TODO: Can we have much longer period here? FullControllerResyncPeriod, cache.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, UpdateFunc: rm.updateRC, // This will enter the sync loop and no-op, because the controller has been deleted from the store. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the controller. DeleteFunc: rm.enqueueController, }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill // the most frequent pod update is status, and the associated rc will only list from local storage, so // it should be ok. UpdateFunc: rm.updatePod, DeleteFunc: rm.deletePod, }) rm.podStore.Indexer = podInformer.GetIndexer() rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm }
newReplicationManager中主要配置ReplicationManager,比如:
通過workqueue.NewNamedRateLimitingQueue配置queue。
通過controller.NewUIDTrackingControllerExpectations配置expectations。
配置rcStore, podStore, rcController, podController。
配置syncHandler為rm.syncReplicationController,這個(gè)很重要,所以我單獨(dú)列出來說。在后面會(huì)講到,syncReplicationController就是做核心工作的的方法,可以說Replica的自動(dòng)維護(hù)都是由它來完成的。
ReplicationManager創(chuàng)建好了,接下來得干活啦。Run方法就是干活的起步點(diǎn),開始進(jìn)行watchingand syncing。
pkg/controller/replication/replication_controller.go:217 // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting RC Manager") go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) } if rm.internalPodInformer != nil { go rm.internalPodInformer.Run(stopCh) } <-stopCh glog.Infof("Shutting down RC Manager") rm.queue.ShutDown() }
watching
go rm.rcController.Run(stopCh)
負(fù)責(zé)watch all rc。
go rm.podController.Run(stopCh)
負(fù)責(zé)watch all pod。
syncing
啟動(dòng)workers數(shù)量的goroutine。
每個(gè)goroutine都不斷循環(huán)執(zhí)行rm.worker,每個(gè)循環(huán)之間停留1s。而rm.worker就是負(fù)責(zé)從queue中獲取rc并調(diào)用syncHandler進(jìn)行同步。
每個(gè)goroutine直到收到stopCh信號(hào)才結(jié)束。
下面是rcController和podController的Run方法實(shí)現(xiàn),功能就是完成rc / pod的watch。
pkg/client/cache/controller.go:84 // Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() r.RunUntil(stopCh) wait.Until(c.processLoop, time.Second, stopCh) }
sync的關(guān)鍵實(shí)現(xiàn),就在ReplicationManager的worker方法中,代碼如下。
pkg/controller/replication/replication_controller.go:488 // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { workFunc := func() bool { key, quit := rm.queue.Get() if quit { return true } defer rm.queue.Done(key) err := rm.syncHandler(key.(string)) if err == nil { rm.queue.Forget(key) return false } rm.queue.AddRateLimited(key) utilruntime.HandleError(err) return false } for { if quit := workFunc(); quit { glog.Infof("replication controller worker shutting down") return } } }
worker中的主要邏輯為:
從rm的RateLimited Queue中獲取一個(gè)rc的key。
調(diào)用syncHandler Interface,對(duì)該rc進(jìn)行sync。
在newReplicationManager時(shí),通過rm.syncHandler = rm.syncReplicationController
注冊(cè)syncHandler為syncReplicationController了。因此sync rc的邏輯就在syncReplicationController中了。
pkg/controller/replication/replication_controller.go:639 // syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key. func (rm *ReplicationManager) syncReplicationController(key string) error { trace := util.NewTrace("syncReplicationController: " + key) defer trace.LogIfLong(250 * time.Millisecond) startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) }() if !rm.podStoreSynced() { // Sleep so we give the pod reflector goroutine a chance to run. time.Sleep(PodStoreSyncedPollPeriod) glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key) rm.queue.Add(key) return nil } obj, exists, err := rm.rcStore.Indexer.GetByKey(key) if !exists { glog.Infof("Replication Controller has been deleted %v", key) rm.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rc := *obj.(*v1.ReplicationController) trace.Step("ReplicationController restored") rcNeedsSync := rm.expectations.SatisfiedExpectations(key) trace.Step("Expectations restored") // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*v1.Pod if rm.garbageCollectorEnabled { // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) // Adopt pods only if this replication controller is not going to be deleted. if rc.DeletionTimestamp == nil { for _, pod := range matchesNeedsController { err := cm.AdoptPod(pod) // continue to next pod if adoption fails. if err != nil { // If the pod no longer exists, don't even log the error. if !errors.IsNotFound(err) { utilruntime.HandleError(err) } } else { matchesAndControlled = append(matchesAndControlled, pod) } } } filteredPods = matchesAndControlled // remove the controllerRef for the pods that no longer have matching labels var errlist []error for _, pod := range controlledDoesNotMatch { err := cm.ReleasePod(pod) if err != nil { errlist = append(errlist, err) } } if len(errlist) != 0 { aggregate := utilerrors.NewAggregate(errlist) // push the RC into work queue again. We need to try to free the // pods again otherwise they will stuck with the stale // controllerRef. rm.queue.Add(key) return aggregate } } else { pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } filteredPods = controller.FilterActivePods(pods) } var manageReplicasErr error if rcNeedsSync && rc.DeletionTimestamp == nil { manageReplicasErr = rm.manageReplicas(filteredPods, &rc) } trace.Step("manageReplicas done") newStatus := calculateStatus(rc, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil { // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop return err } return manageReplicasErr }
syncReplicationController的主要邏輯為:
如果podStore還沒有被同步過一次,則將該rc的key重新加入到queue中,以等待podStore同步,流程結(jié)束,否則繼續(xù)后面的流程。
根據(jù)該rc的key值,從rcStore中獲取對(duì)應(yīng)的rc object,如果不存在該rc object,則說明該rc已經(jīng)被刪除了,然后根據(jù)key從epectations中刪除該rc并返回,流程結(jié)束。如果存在該rc object,則繼續(xù)后面的流程。
檢測(cè)expectations中的add和del以及距離上一個(gè)時(shí)間戳是否超時(shí)5min,來判斷該rc是否需要sync。
如果啟動(dòng)了GC,則獲取podStore中整個(gè)namespace下的pods,然后將matchesAndControlled和matchesNeedsController的pods作為過濾后待同步的filteredPods。如果沒有啟動(dòng)GC,則直接獲取podStore中該namespace下匹配rc.Spec.Selector的Active狀態(tài)的pods作為過濾后待同步的filteredPods。(關(guān)于matchesAndControlled和matchesNeedsController的理解,請(qǐng)參考pkg/controller/controller_ref_manager.go:57中定義的PodControllerRefManager.Classify函數(shù))
如果第3步中檢測(cè)到該rc需要sync,并且DeletionTimestamp這個(gè)時(shí)間戳為nil,則調(diào)用manageReplicas方法,使得該rc管理的active狀態(tài)的pods數(shù)量和期望值一樣。
執(zhí)行完manageReplicas后,需要馬上重新計(jì)算一下rc的status,更新status中的Conditions,Replicas,F(xiàn)ullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。
通過updateReplicationControllerStatus方法調(diào)用kube-api-server的接口更新該rc的status為上一步重新計(jì)算后的新status,流程結(jié)束。
上面描述的syncReplicationController流程中,一個(gè)很關(guān)鍵的步驟是step 5中調(diào)用的manageReplicas方法,它負(fù)責(zé)rc對(duì)應(yīng)replicas的修復(fù)工作(add or delete)。
pkg/controller/replication/replication_controller.go:516 // manageReplicas checks and updates replicas for the given replication controller. // Does NOT modify. func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error { diff := len(filteredPods) - int(*(rc.Spec.Replicas)) rcKey, err := controller.KeyFunc(rc) if err != nil { return err } if diff == 0 { return nil } if diff < 0 { diff *= -1 if diff > rm.burstReplicas { diff = rm.burstReplicas } // TODO: Track UIDs of creates just like deletes. The problem currently // is we'd need to wait on the result of a create to record the pod's // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. errCh := make(chan error, diff) rm.expectations.ExpectCreations(rcKey, diff) var wg sync.WaitGroup wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) for i := 0; i < diff; i++ { go func() { defer wg.Done() var err error if rm.garbageCollectorEnabled { var trueVar = true controllerRef := &metav1.OwnerReference{ APIVersion: getRCKind().GroupVersion().String(), Kind: getRCKind().Kind, Name: rc.Name, UID: rc.UID, Controller: &trueVar, } err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) } else { err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc) } if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) errCh <- err utilruntime.HandleError(err) } }() } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil } if diff > rm.burstReplicas { diff = rm.burstReplicas } glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) // No need to sort pods if we are about to delete all of them if *(rc.Spec.Replicas) != 0 { // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either // when we see it as an update of the deletion timestamp, or as a delete. // Note that if the labels on a pod/rc change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. deletedPodKeys := []string{} for i := 0; i < diff; i++ { deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } // We use pod namespace/name as a UID to wait for deletions, so if the // labels on a pod/rc change in a way that the pod gets orphaned, the // rc will only wake up after the expectation has expired. errCh := make(chan error, diff) rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) var wg sync.WaitGroup wg.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { defer wg.Done() if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) errCh <- err utilruntime.HandleError(err) } }(i) } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil }
上面manageReplicas代碼的主要邏輯為:
首先計(jì)算filteredPods中Pods數(shù)量和rc.Spec.Replicas中定義的期望數(shù)量的差值diff。
如果差值diff為0,表示當(dāng)前狀態(tài)和期望狀態(tài)一樣,直接返回,流程結(jié)束。
如果差值diff為負(fù)數(shù),表示當(dāng)前Active狀態(tài)的Pods數(shù)量不足,則啟動(dòng)下面流程:
比較|diff|
和burstReplicas的值,以保證這次最多只創(chuàng)建burstReplicas數(shù)量的pods。
調(diào)用expectations.ExpectCreations接口設(shè)置expectations中的add大小為|diff|
的值,表示要新創(chuàng)建|diff|
數(shù)量的pods以達(dá)到期望狀態(tài)。
sync.WaitGroup啟動(dòng)|diff|
數(shù)量的goroutine協(xié)程,每個(gè)goroutine分別負(fù)責(zé)調(diào)用podControl.CreatePods接口創(chuàng)建一個(gè)該namespace.rc管理的對(duì)應(yīng)spec Template的pod。
待所有g(shù)oroutine都執(zhí)行完畢后,如果其中一個(gè)或者多個(gè)pod創(chuàng)建失敗,則返回err,否則返回nil,流程結(jié)束。
如果差值diff為正數(shù),表示當(dāng)前Active狀態(tài)的Pods數(shù)量超過了期望值,則啟動(dòng)下面流程:
比較|diff|
和burstReplicas的值,以保證這次最多只刪除burstReplicas數(shù)量的pods。
對(duì)filteredPods中的pods進(jìn)行排序,排序目的是:not-ready < ready, unscheduled < scheduled, and pending < running,讓stages越早的pods優(yōu)先被delete。
排序完之后,挑選前面|diff|
個(gè)pods作為待delete的Pods。
調(diào)用expectations.ExpectDeletions接口設(shè)置expectations中的del大小為|diff|
的值,表示要新刪除|diff|
數(shù)量的pods以達(dá)到期望狀態(tài)。
sync.WaitGroup啟動(dòng)|diff|
數(shù)量的goroutine協(xié)程,每個(gè)goroutine分別負(fù)責(zé)調(diào)用podControl.DeletePod接口刪除待delete Pods中的一個(gè)Pod。
待所有g(shù)oroutine都執(zhí)行完畢后,如果其中一個(gè)或者多個(gè)pod刪除失敗,則返回err,否則返回nil,流程結(jié)束。
到此,相信大家對(duì)“Kubernetes Replication Controller的結(jié)構(gòu)定義是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!