本篇文章為大家展示了如何進(jìn)行Pilot-agent作用及其源碼的分析,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
我們提供的服務(wù)有:成都網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、微信公眾號(hào)開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、肅寧ssl等。為上千多家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的肅寧網(wǎng)站制作公司
小編使用的Istio源碼是 release 1.5。
Sidecar在注入的時(shí)候會(huì)注入istio-init和istio-proxy兩個(gè)容器。Pilot-agent就是啟動(dòng)istio-proxy的入口。通過kubectl命令我們可以看到啟動(dòng)命令:
[root@localhost ~]# kubectl exec -it details-v1-6c9f8bcbcb-shltm -c istio-proxy -- ps -efww UID PID PPID C STIME TTY TIME CMD istio-p+ 1 0 0 08:52 ? 00:00:13 /usr/local/bin/pilot-agent proxy sidecar --domain default.svc.cluster.local --configPath /etc/istio/proxy --binaryPath /usr/local/bin/envoy --serviceCluster details.default --drainDuration 45s --parentShutdownDuration 1m0s --discoveryAddress istiod.istio-system.svc:15012 --zipkinAddress zipkin.istio-system:9411 --proxyLogLevel=warning --proxyComponentLogLevel=misc:error --connectTimeout 10s --proxyAdminPort 15000 --concurrency 2 --controlPlaneAuthPolicy NONE --DNSRefreshRate 300s --statusPort 15020 --trust-domain=cluster.local --controlPlaneBootstrap=false istio-p+ 18 1 0 08:52 ? 00:01:11 /usr/local/bin/envoy -c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60 --service-cluster details.default --service-node sidecar~172.20.0.14~details-v1-6c9f8bcbcb-shltm.default~default.svc.cluster.local --max-obj-name-len 189 --local-address-ip-version v4 --log-format [Envoy (Epoch 0)] [%Y-%m-%d %T.%e][%t][%l][%n] %v -l warning --component-log-level misc:error --concurrency 2
Pilot-agent除了啟動(dòng)istio-proxy以外還有以下能力:
生成Envoy的Bootstrap配置文件;
健康檢查;
監(jiān)視證書的變化,通知Envoy進(jìn)程熱重啟,實(shí)現(xiàn)證書的熱加載;
提供Envoy守護(hù)功能,當(dāng)Envoy異常退出的時(shí)候重啟Envoy;
通知Envoy優(yōu)雅退出;
proxyCmd = &cobra.Command{ Use: "proxy", Short: "Envoy proxy agent", FParseErrWhitelist: cobra.FParseErrWhitelist{ UnknownFlags: true, }, RunE: func(c *cobra.Command, args []string) error { ... // 用于設(shè)置默認(rèn)配置文件的默認(rèn)配置相關(guān)參數(shù) proxyConfig := mesh.DefaultProxyConfig() // set all flags proxyConfig.CustomConfigFile = customConfigFile proxyConfig.ProxyBootstrapTemplatePath = templateFile proxyConfig.ConfigPath = configPath proxyConfig.BinaryPath = binaryPath proxyConfig.ServiceCluster = serviceCluster proxyConfig.DrainDuration = types.DurationProto(drainDuration) proxyConfig.ParentShutdownDuration = types.DurationProto(parentShutdownDuration) proxyConfig.DiscoveryAddress = discoveryAddress proxyConfig.ConnectTimeout = types.DurationProto(connectTimeout) proxyConfig.StatsdUdpAddress = statsdUDPAddress ... ctx, cancel := context.WithCancel(context.Background()) // 啟動(dòng) status server if statusPort > 0 { localHostAddr := localHostIPv4 if proxyIPv6 { localHostAddr = localHostIPv6 } prober := kubeAppProberNameVar.Get() //健康探測(cè) statusServer, err := status.NewServer(status.Config{ LocalHostAddr: localHostAddr, AdminPort: proxyAdminPort, //通過參數(shù)--statusPort 15020設(shè)置 StatusPort: statusPort, KubeAppProbers: prober, NodeType: role.Type, }) if err != nil { cancel() return err } go waitForCompletion(ctx, statusServer.Run) } ... //構(gòu)造Proxy實(shí)例,包括配置,啟動(dòng)參數(shù)等 envoyProxy := envoy.NewProxy(envoy.ProxyConfig{ Config: proxyConfig, Node: role.ServiceNode(), LogLevel: proxyLogLevel, ComponentLogLevel: proxyComponentLogLevel, PilotSubjectAltName: pilotSAN, MixerSubjectAltName: mixerSAN, NodeIPs: role.IPAddresses, DNSRefreshRate: dnsRefreshRate, PodName: podName, PodNamespace: podNamespace, PodIP: podIP, SDSUDSPath: sdsUDSPath, SDSTokenPath: sdsTokenPath, STSPort: stsPort, ControlPlaneAuth: controlPlaneAuthEnabled, DisableReportCalls: disableInternalTelemetry, OutlierLogPath: outlierLogPath, PilotCertProvider: pilotCertProvider, }) //構(gòu)造agent實(shí)例,實(shí)現(xiàn)了Agent接口 agent := envoy.NewAgent(envoyProxy, features.TerminationDrainDuration()) if nodeAgentSDSEnabled { tlsCertsToWatch = []string{} } //構(gòu)造watcher實(shí)例 watcher := envoy.NewWatcher(tlsCertsToWatch, agent.Restart) //啟動(dòng) watcher go watcher.Run(ctx) // 優(yōu)雅退出 go cmd.WaitSignalFunc(cancel) //啟動(dòng) agent return agent.Run(ctx) }, }
執(zhí)行流程大概分成這么幾步:
用于設(shè)置默認(rèn)配置文件的默認(rèn)配置相關(guān)參數(shù);
啟動(dòng) status server進(jìn)行健康檢測(cè);
構(gòu)造Proxy實(shí)例,包括配置,啟動(dòng)參數(shù),并構(gòu)造構(gòu)造agent實(shí)例;
構(gòu)造watcher實(shí)例,并啟動(dòng);
開啟線程監(jiān)聽信號(hào),進(jìn)行優(yōu)雅退出;
啟動(dòng) agent;
kubectl exec -it details-v1-6c9f8bcbcb-shltm -c istio-proxy -- /usr/local/bin/pilot-agent proxy --help Envoy proxy agent Usage: pilot-agent proxy [flags] Flags: --binaryPath string Path to the proxy binary (default "/usr/local/bin/envoy") --concurrency int number of worker threads to run --configPath string Path to the generated configuration file directory (default "/etc/istio/proxy") --connectTimeout duration Connection timeout used by Envoy for supporting services (default 1s) --controlPlaneAuthPolicy string Control Plane Authentication Policy (default "NONE") --controlPlaneBootstrap Process bootstrap provided via templateFile to be used by control plane components. (default true) --customConfigFile string Path to the custom configuration file --datadogAgentAddress string Address of the Datadog Agent --disableInternalTelemetry Disable internal telemetry --discoveryAddress string Address of the discovery service exposing xDS (e.g. istio-pilot:8080) (default "istio-pilot:15010") --dnsRefreshRate string The dns_refresh_rate for bootstrap STRICT_DNS clusters (default "300s") --domain string DNS domain suffix. If not provided uses ${POD_NAMESPACE}.svc.cluster.local --drainDuration duration The time in seconds that Envoy will drain connections during a hot restart (default 45s) --envoyAccessLogService string Settings of an Envoy gRPC Access Log Service API implementation --envoyMetricsService string Settings of an Envoy gRPC Metrics Service API implementation -h, --help help for proxy --id string Proxy unique ID. If not provided uses ${POD_NAME}.${POD_NAMESPACE} from environment variables --ip string Proxy IP address. If not provided uses ${INSTANCE_IP} environment variable. --lightstepAccessToken string Access Token for LightStep Satellite pool --lightstepAddress string Address of the LightStep Satellite pool --lightstepCacertPath string Path to the trusted cacert used to authenticate the pool --lightstepSecure Should connection to the LightStep Satellite pool be secure --mixerIdentity string The identity used as the suffix for mixer's spiffe SAN. This would only be used by pilot all other proxy would get this value from pilot --outlierLogPath string The log path for outlier detection --parentShutdownDuration duration The time in seconds that Envoy will wait before shutting down the parent process during a hot restart (default 1m0s) --pilotIdentity string The identity used as the suffix for pilot's spiffe SAN --proxyAdminPort uint16 Port on which Envoy should listen for administrative commands (default 15000) --proxyComponentLogLevel string The component log level used to start the Envoy proxy (default "misc:error") --proxyLogLevel string The log level used to start the Envoy proxy (choose from {trace, debug, info, warning, error, critical, off}) (default "warning") --serviceCluster string Service cluster (default "istio-proxy") --serviceregistry string Select the platform for service registry, options are {Kubernetes, Consul, Mock} (default "Kubernetes") --statsdUdpAddress string IP Address and Port of a statsd UDP listener (e.g. 10.75.241.127:9125) --statusPort uint16 HTTP Port on which to serve pilot agent status. If zero, agent status will not be provided. --stsPort int HTTP Port on which to serve Security Token Service (STS). If zero, STS service will not be provided. --templateFile string Go template bootstrap config --tokenManagerPlugin string Token provider specific plugin name. (default "GoogleTokenExchange") --trust-domain string The domain to use for identities --zipkinAddress string Address of the Zipkin service (e.g. zipkin:9411)
從上面輸出我們也可以看到proxy參數(shù)的含義以及對(duì)應(yīng)的默認(rèn)值。
func DefaultProxyConfig() meshconfig.ProxyConfig { return meshconfig.ProxyConfig{ ConfigPath: constants.ConfigPathDir, BinaryPath: constants.BinaryPathFilename, ServiceCluster: constants.ServiceClusterName, DrainDuration: types.DurationProto(45 * time.Second), ParentShutdownDuration: types.DurationProto(60 * time.Second), DiscoveryAddress: constants.DiscoveryPlainAddress, ConnectTimeout: types.DurationProto(1 * time.Second), StatsdUdpAddress: "", EnvoyMetricsService: &meshconfig.RemoteService{Address: ""}, EnvoyAccessLogService: &meshconfig.RemoteService{Address: ""}, ProxyAdminPort: 15000, ControlPlaneAuthPolicy: meshconfig.AuthenticationPolicy_NONE, CustomConfigFile: "", Concurrency: 0, StatNameLength: 189, Tracing: nil, } }
默認(rèn)的啟動(dòng)參數(shù)都在DefaultProxyConfig方法中設(shè)置,默認(rèn)的啟動(dòng)配置如下所示:
ConfigPath:/etc/istio/proxy
BinaryPath:/usr/local/bin/envoy
ServiceCluster:istio-proxy
DrainDuration:45s
ParentShutdownDuration:60s
DiscoveryAddress:istio-pilot:15010
ConnectTimeout:1s
StatsdUdpAddress:""
EnvoyMetricsService:meshconfig.RemoteService
EnvoyAccessLogService:meshconfig.RemoteService
ProxyAdminPort:15000
ControlPlaneAuthPolicy:0
CustomConfigFile:""
Concurrency:0
StatNameLength:189
Tracing:nil
初始化status server:
func NewServer(config Config) (*Server, error) { s := &Server{ statusPort: config.StatusPort, ready: &ready.Probe{ LocalHostAddr: config.LocalHostAddr, AdminPort: config.AdminPort, NodeType: config.NodeType, }, } ... return s, nil }
初始化完成之后會(huì)開啟一個(gè)線程調(diào)用statusServer的 Run方法:
go waitForCompletion(ctx, statusServer.Run) func (s *Server) Run(ctx context.Context) { log.Infof("Opening status port %d\n", s.statusPort) mux := http.NewServeMux() // Add the handler for ready probes. // 初始化探針的回調(diào)處理器 // /healthz/ready mux.HandleFunc(readyPath, s.handleReadyProbe) mux.HandleFunc(quitPath, s.handleQuit) //應(yīng)用端口檢查 mux.HandleFunc("/app-health/", s.handleAppProbe) //端口通過參數(shù)--statusPort 15020設(shè)置 l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort)) if err != nil { log.Errorf("Error listening on status port: %v", err.Error()) return } ... defer l.Close() //開啟監(jiān)聽 go func() { if err := http.Serve(l, mux); err != nil { log.Errora(err) notifyExit() } }() <-ctx.Done() log.Info("Status server has successfully terminated") }
Run方法會(huì)開啟一個(gè)線程并監(jiān)聽15020端口,調(diào)用路徑為 /healthz/ready,并通過調(diào)用handleReadyProbe處理器來調(diào)用Envoy的15000端口判斷Envoy是否已經(jīng) ready 接受相對(duì)應(yīng)的流量。調(diào)用過程如下:
在進(jìn)行watcher監(jiān)控之前會(huì)通過NewAgent生成agent實(shí)例:
func NewAgent(proxy Proxy, terminationDrainDuration time.Duration) Agent { return &agent{ proxy: proxy, //用于管理啟動(dòng) Envoy 后的狀態(tài)通道,用于監(jiān)視 Envoy 進(jìn)程的狀態(tài) statusCh: make(chan exitStatus), //活躍的Epoch 集合 activeEpochs: map[int]chan error{}, //默認(rèn)5s terminationDrainDuration: terminationDrainDuration, //當(dāng)前的Epoch currentEpoch: -1, } }
然后構(gòu)建watcher實(shí)例:
//構(gòu)造watcher實(shí)例 watcher := envoy.NewWatcher(tlsCertsToWatch, agent.Restart) type watcher struct { //證書列表 certs []string //envoy 重啟函數(shù) updates func(interface{}) } func NewWatcher(certs []string, updates func(interface{})) Watcher { return &watcher{ certs: certs, updates: updates, } }
watcher里面總共就兩個(gè)參數(shù)certs是監(jiān)聽的證書列表,updates是envoy 重啟函數(shù),如果證書文件發(fā)生變化則調(diào)用updates來reload envoy。
啟動(dòng)watcher:
go watcher.Run(ctx) func (w *watcher) Run(ctx context.Context) { //啟動(dòng)envoy w.SendConfig() //監(jiān)聽證書變化 go watchCerts(ctx, w.certs, watchFileEvents, defaultMinDelay, w.SendConfig) <-ctx.Done() log.Info("Watcher has successfully terminated") }
watcher的Run方法首先會(huì)調(diào)用SendConfig啟動(dòng)Envoy,然后啟動(dòng)一個(gè)線程監(jiān)聽證書的變化。
func (w *watcher) SendConfig() { h := sha256.New() generateCertHash(h, w.certs) w.updates(h.Sum(nil)) }
SendConfig方法會(huì)獲取當(dāng)前的證書集合hash之后傳入到updates方法中,updates方法就是在初始化NewWatcher的時(shí)候傳入的,這里是會(huì)調(diào)用到agent的Restart方法的:
func (a *agent) Restart(config interface{}) { a.restartMutex.Lock() defer a.restartMutex.Unlock() a.mutex.Lock() //校驗(yàn)傳入的參數(shù)是否產(chǎn)生了變化 if reflect.DeepEqual(a.currentConfig, config) { // Same configuration - nothing to do. a.mutex.Unlock() return } //活躍的Epoch hasActiveEpoch := len(a.activeEpochs) > 0 //獲取當(dāng)前的Epoch activeEpoch := a.currentEpoch //因?yàn)榕渲米兞?所以Epoch加1 epoch := a.currentEpoch + 1 log.Infof("Received new config, creating new Envoy epoch %d", epoch) //更新當(dāng)前的配置以及Epoch a.currentEpoch = epoch a.currentConfig = config // 用來做做主動(dòng)退出 abortCh := make(chan error, 1) // 設(shè)置當(dāng)前活躍Epoch的abortCh管道,用于優(yōu)雅關(guān)閉 a.activeEpochs[a.currentEpoch] = abortCh a.mutex.Unlock() if hasActiveEpoch { a.waitUntilLive(activeEpoch) } //啟動(dòng)envoy,會(huì)將結(jié)果放入到statusCh管道中 go a.runWait(config, epoch, abortCh) }
Restart方法會(huì)判斷傳入的配置是否和當(dāng)前的配置一致,如果不一致,那么設(shè)置好當(dāng)前的配置后調(diào)用runWait方法啟動(dòng)Envoy,并將啟動(dòng)結(jié)果放入到statusCh管道中:
func (a *agent) runWait(config interface{}, epoch int, abortCh <-chan error) { log.Infof("Epoch %d starting", epoch) //啟動(dòng)envoy err := a.proxy.Run(config, epoch, abortCh) //刪除當(dāng)前 epoch 對(duì)應(yīng)的配置文件 a.proxy.Cleanup(epoch) a.statusCh <- exitStatus{epoch: epoch, err: err} }
在上面講了,envoy的啟動(dòng)會(huì)在runWait方法中進(jìn)行,通過調(diào)用proxy的Run方法會(huì)通過模板文件創(chuàng)建/etc/istio/proxy/envoy-rev0.json配置文件,然會(huì)直接使用exec包調(diào)用envoy啟動(dòng)命令啟動(dòng)envoy。
func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error { var fname string //如果指定了模板文件,則使用用戶指定的,否則則使用默認(rèn)的 if len(e.Config.CustomConfigFile) > 0 { fname = e.Config.CustomConfigFile } else { out, err := bootstrap.New(bootstrap.Config{ Node: e.Node, DNSRefreshRate: e.DNSRefreshRate, Proxy: &e.Config, PilotSubjectAltName: e.PilotSubjectAltName, MixerSubjectAltName: e.MixerSubjectAltName, LocalEnv: os.Environ(), NodeIPs: e.NodeIPs, PodName: e.PodName, PodNamespace: e.PodNamespace, PodIP: e.PodIP, SDSUDSPath: e.SDSUDSPath, SDSTokenPath: e.SDSTokenPath, STSPort: e.STSPort, ControlPlaneAuth: e.ControlPlaneAuth, DisableReportCalls: e.DisableReportCalls, OutlierLogPath: e.OutlierLogPath, PilotCertProvider: e.PilotCertProvider, }).CreateFileForEpoch(epoch) if err != nil { log.Errora("Failed to generate bootstrap config: ", err) os.Exit(1) // Prevent infinite loop attempting to write the file, let k8s/systemd report } fname = out } //設(shè)置啟動(dòng)參數(shù) args := e.args(fname, epoch, istioBootstrapOverrideVar.Get()) log.Infof("Envoy command: %v", args) //直接使用exec包調(diào)用envoy啟動(dòng)命令 cmd := exec.Command(e.Config.BinaryPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { return err } done := make(chan error, 1) go func() { done <- cmd.Wait() }() //等待 abort channel 和 done,用于結(jié)束 Envoy 和正確返回當(dāng)前的啟動(dòng)狀態(tài) select { //用于優(yōu)雅關(guān)閉,后面會(huì)講到 case err := <-abort: log.Warnf("Aborting epoch %d", epoch) if errKill := cmd.Process.Kill(); errKill != nil { log.Warnf("killing epoch %d caused an error %v", epoch, errKill) } return err case err := <-done: return err } }
Run方法會(huì)通過調(diào)用CreateFileForEpoch方法獲取到模板文件:/var/lib/istio/envoy/envoy_bootstrap_tmpl.json,然后生成/etc/istio/proxy/envoy-rev0.json文件并返回路徑;通過調(diào)用args方法來配置envoy的啟動(dòng)參數(shù),然后調(diào)用exec.Command啟動(dòng)envoy,BinaryPath為/usr/local/bin/envoy。
最后異步獲取cmd的返回結(jié)果,存入到done管道中作為方法的參數(shù)返回。返回的參數(shù)在runWait方法中會(huì)被接受到,存入到statusCh管道中。
在調(diào)用agent的run方法的時(shí)候會(huì)監(jiān)聽statusCh管道中的數(shù)據(jù):
agent.Run(ctx) func (a *agent) Run(ctx context.Context) error { log.Info("Starting proxy agent") for { select { //如果 proxy-Envoy 的狀態(tài)發(fā)生了變化 case status := <-a.statusCh: a.mutex.Lock() if status.err != nil { if status.err.Error() == errOutOfMemory { log.Warnf("Envoy may have been out of memory killed. Check memory usage and limits.") } log.Errorf("Epoch %d exited with error: %v", status.epoch, status.err) } else { //正常退出 log.Infof("Epoch %d exited normally", status.epoch) } //刪除當(dāng)前 epoch 對(duì)應(yīng)的配置文件 delete(a.activeEpochs, status.epoch) active := len(a.activeEpochs) a.mutex.Unlock() if active == 0 { log.Infof("No more active epochs, terminating") return nil } ... } }
pilot-agent會(huì)開啟一個(gè)線程調(diào)用WaitSignalFunc方法監(jiān)聽syscall.SIGINT、syscall.SIGTERM信號(hào),然后調(diào)用context的cancel來實(shí)現(xiàn)優(yōu)化關(guān)閉的效果:
func WaitSignalFunc(cancel func()) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs cancel() _ = log.Sync() }
當(dāng)context的cancel方法被調(diào)用的時(shí)候,agent的Run方法里面select監(jiān)聽的ctx.Done()方法也會(huì)立即返回,調(diào)用terminate方法:
func (a *agent) Run(ctx context.Context) error { for { select { //如果 proxy-Envoy 的狀態(tài)發(fā)生了變化 case status := <-a.statusCh: ... case <-ctx.Done(): a.terminate() log.Info("Agent has successfully terminated") return nil } } } func (a *agent) terminate() { log.Infof("Agent draining Proxy") e := a.proxy.Drain() if e != nil { log.Warnf("Error in invoking drain listeners endpoint %v", e) } log.Infof("Graceful termination period is %v, starting...", a.terminationDrainDuration) //睡眠5s time.Sleep(a.terminationDrainDuration) log.Infof("Graceful termination period complete, terminating remaining proxies.") a.abortAll() }
terminate方法會(huì)調(diào)用sleep休眠5s,然后調(diào)用abortAll通知所有活躍Epoch進(jìn)行優(yōu)雅關(guān)閉。
var errAbort = errors.New("epoch aborted") func (a *agent) abortAll() { a.mutex.Lock() defer a.mutex.Unlock() for epoch, abortCh := range a.activeEpochs { log.Warnf("Aborting epoch %d...", epoch) abortCh <- errAbort } log.Warnf("Aborted all epochs") }
abortAll會(huì)獲取到所有活躍的Epoch對(duì)應(yīng)的abortCh管道,并插入一條數(shù)據(jù)。如果這個(gè)時(shí)候有活躍的Epoch正在等待cmd返回結(jié)果,那么會(huì)直接調(diào)用kill方法將進(jìn)程殺死:
func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error { ... //等待 abort channel 和 done,用于結(jié)束 Envoy 和正確返回當(dāng)前的啟動(dòng)狀態(tài) select { //用于優(yōu)雅關(guān)閉,后面會(huì)講到 case err := <-abort: log.Warnf("Aborting epoch %d", epoch) if errKill := cmd.Process.Kill(); errKill != nil { log.Warnf("killing epoch %d caused an error %v", epoch, errKill) } return err case err := <-done: return err } }
上述內(nèi)容就是如何進(jìn)行Pilot-agent作用及其源碼的分析,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。