本篇內(nèi)容主要講解“kubernetes中etcd增刪改查的具體實(shí)現(xiàn)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“kubernetes中etcd增刪改查的具體實(shí)現(xiàn)”吧!
創(chuàng)新互聯(lián)建站是一家專注于成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)與策劃設(shè)計(jì),卓尼網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設(shè)十余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:卓尼等地區(qū)。卓尼做網(wǎng)站價格咨詢:028-86922220
kubernetes中基于etcd實(shí)現(xiàn)集中的數(shù)據(jù)存儲,今天來學(xué)習(xí)下基于etcd如何實(shí)現(xiàn)數(shù)據(jù)讀取一致性、更新一致性、事務(wù)的具體實(shí)現(xiàn)
在k8s中有部分?jǐn)?shù)據(jù)的存儲是需要經(jīng)過處理之后才能存儲的,比如secret這種加密的數(shù)據(jù),既然要存儲就至少包含兩個操作,加密存儲,解密讀取,transformer就是為了完成該操作而實(shí)現(xiàn)的,其在進(jìn)行etcd數(shù)據(jù)存儲的時候回對數(shù)據(jù)進(jìn)行加密,而在讀取的時候,則會進(jìn)行解密
在etcd中進(jìn)行修改(增刪改)操作的時候,都會遞增revision,而在k8s中也通過該值來作為k8s資源的ResourceVersion,該機(jī)制也是實(shí)現(xiàn)watch的關(guān)鍵機(jī)制,在操作etcd解碼從etcd獲取的數(shù)據(jù)的時候,會通過versioner組件來為資源動態(tài)的修改該值
將數(shù)據(jù)從etcd中讀取后,數(shù)據(jù)本身就是一個字節(jié)數(shù)組,如何將對應(yīng)的數(shù)據(jù)轉(zhuǎn)換成我們真正的運(yùn)行時對象呢?還記得我們之前的scheme與codec么,在這里我們知道對應(yīng)的數(shù)據(jù)編碼格式,也知道資源對象的類型,則通過codec、字節(jié)數(shù)組、目標(biāo)類型,我們就可以完成對應(yīng)數(shù)據(jù)的反射
etcd中的數(shù)據(jù)寫入是基于leader單點(diǎn)寫入和集群quorum機(jī)制實(shí)現(xiàn)的,并不是一個強(qiáng)一致性的數(shù)據(jù)寫入,則如果如果我們訪問的節(jié)點(diǎn)不存在quorum的半數(shù)節(jié)點(diǎn)內(nèi),則可能造成短暫的數(shù)據(jù)不一致,針對一些強(qiáng)一致的場景,我們可以通過其revision機(jī)制來進(jìn)行數(shù)據(jù)的讀取, 保證我們讀取到更新之后的數(shù)據(jù)
// 省略非核心代碼 func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { // 獲取key getResp, err := s.client.KV.Get(ctx, key, s.getOps...) // 檢測當(dāng)前版本,是否達(dá)到最小版本的 if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { return err } // 執(zhí)行數(shù)據(jù)轉(zhuǎn)換 data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key)) if err != nil { return storage.NewInternalError(err.Error()) } // 解碼數(shù)據(jù) return decode(s.codec, s.versioner, data, out, kv.ModRevision) }
創(chuàng)建一個接口數(shù)據(jù)則會首先進(jìn)行資源對象的檢查,避免重復(fù)創(chuàng)建對象,此時會先通過資源對象的version字段來進(jìn)行初步檢查,然后在利用etcd的事務(wù)機(jī)制來保證資源創(chuàng)建的原子性操作
// 省略非核心代碼 func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion should not be set on objects to be created") } if err := s.versioner.PrepareObjectForStorage(obj); err != nil { return fmt.Errorf("PrepareObjectForStorage failed: %v", err) } // 將數(shù)據(jù)編碼 data, err := runtime.Encode(s.codec, obj) if err != nil { return err } // 轉(zhuǎn)換數(shù)據(jù) newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key)) if err != nil { return storage.NewInternalError(err.Error()) } startTime := time.Now() // 事務(wù)操作 txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), // 如果之前不存在 這里是利用的etcd的ModRevision即修改版本為0, 寓意著對應(yīng)的key不存在 ).Then( clientv3.OpPut(key, string(newData), opts...), // put修改數(shù)據(jù) ).Commit() metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { return err } if !txnResp.Succeeded { return storage.NewKeyExistsError(key, 0) } if out != nil { // 獲取對應(yīng)的Revision putResp := txnResp.Responses[0].GetResponsePut() return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } return nil } func notFound(key string) clientv3.Cmp { return clientv3.Compare(clientv3.ModRevision(key), "=", 0) }
刪除接口主要是通過CAS和事務(wù)機(jī)制來共同實(shí)現(xiàn),確保在etcd不發(fā)生異常的情況,即使并發(fā)對同個資源來進(jìn)行刪除操作也能保證至少有一個節(jié)點(diǎn)成功
// 省略非核心代碼 func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error { startTime := time.Now() // 獲取當(dāng)前的key的數(shù)據(jù) getResp, err := s.client.KV.Get(ctx, key) for { // 獲取當(dāng)前的狀態(tài) origState, err := s.getState(getResp, key, v, false) if err != nil { return err } txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等于當(dāng)前狀態(tài),就嘗試刪除 ).Then( clientv3.OpDelete(key), // 刪除 ).Else( clientv3.OpGet(key), // 獲取 ).Commit() if !txnResp.Succeeded { // 獲取最新的數(shù)據(jù)重試事務(wù)操作 getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) continue } // 將最后一個版本的數(shù)據(jù)解碼到out里面,然后返回 return decode(s.codec, s.versioner, origState.data, out, origState.rev) } }
更新接口實(shí)現(xiàn)上與刪除接口并無本質(zhì)上的差別,但是如果多個節(jié)點(diǎn)同時進(jìn)行更新,CAS并發(fā)操作必然會有一個節(jié)點(diǎn)成功,當(dāng)發(fā)現(xiàn)已經(jīng)有節(jié)點(diǎn)操作成功,則當(dāng)前節(jié)點(diǎn)其實(shí)并不需要再做過多的操作,直接返回即可
// 省略非核心代碼 func (s *store) GuaranteedUpdate( ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error { // 獲取當(dāng)前key的最新數(shù)據(jù) getCurrentState := func() (*objState, error) { startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, s.getOps...) metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) if err != nil { return nil, err } return s.getState(getResp, key, v, ignoreNotFound) } // 獲取當(dāng)前數(shù)據(jù) var origState *objState var mustCheckData bool if len(suggestion) == 1 && suggestion[0] != nil { // 如果提供了建議的數(shù)據(jù),則會使用, origState, err = s.getStateFromObject(suggestion[0]) if err != nil { return err } //但是需要檢測數(shù)據(jù) mustCheckData = true } else { // 嘗試重新獲取數(shù)據(jù) origState, err = getCurrentState() if err != nil { return err } } transformContext := authenticatedDataString(key) for { // 檢查對象是否已經(jīng)更新, 主要是通過檢測uuid/revision來實(shí)現(xiàn) if err := preconditions.Check(key, origState.obj); err != nil { // If our data is already up to date, return the error if !mustCheckData { return err } // 如果檢查數(shù)據(jù)一致性錯誤,則需要重新獲取 origState, err = getCurrentState() if err != nil { return err } mustCheckData = false // Retry continue } // 刪除當(dāng)前的版本數(shù)據(jù)revision ret, ttl, err := s.updateState(origState, tryUpdate) if err != nil { // If our data is already up to date, return the error if !mustCheckData { return err } // It's possible we were working with stale data // Actually fetch origState, err = getCurrentState() if err != nil { return err } mustCheckData = false // Retry continue } // 編碼數(shù)據(jù) data, err := runtime.Encode(s.codec, ret) if err != nil { return err } if !origState.stale && bytes.Equal(data, origState.data) { // 如果我們發(fā)現(xiàn)我們當(dāng)前的數(shù)據(jù)與獲取到的數(shù)據(jù)一致,則會直接跳過 if mustCheckData { origState, err = getCurrentState() if err != nil { return err } mustCheckData = false if !bytes.Equal(data, origState.data) { // original data changed, restart loop continue } } if !origState.stale { // 直接返回數(shù)據(jù) return decode(s.codec, s.versioner, origState.data, out, origState.rev) } } // 磚漢數(shù)據(jù) newData, err := s.transformer.TransformToStorage(data, transformContext) if err != nil { return storage.NewInternalError(err.Error()) } opts, err := s.ttlOpts(ctx, int64(ttl)) if err != nil { return err } trace.Step("Transaction prepared") startTime := time.Now() // 事務(wù)更新數(shù)據(jù) txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( clientv3.OpPut(key, string(newData), opts...), ).Else( clientv3.OpGet(key), ).Commit() metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime) if err != nil { return err } trace.Step("Transaction committed") if !txnResp.Succeeded { // 重新獲取數(shù)據(jù) getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key) origState, err = s.getState(getResp, key, v, ignoreNotFound) if err != nil { return err } trace.Step("Retry value restored") mustCheckData = false continue } // 獲取put響應(yīng) putResp := txnResp.Responses[0].GetResponsePut() return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } }
到此,相信大家對“kubernetes中etcd增刪改查的具體實(shí)現(xiàn)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!