這篇文章主要講解了“kube-proxy怎么使用”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“kube-proxy怎么使用”吧!
成都創(chuàng)新互聯(lián)專注于海南網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供海南營(yíng)銷型網(wǎng)站建設(shè),海南網(wǎng)站制作、海南網(wǎng)頁(yè)設(shè)計(jì)、海南網(wǎng)站官網(wǎng)定制、成都小程序開發(fā)服務(wù),打造海南網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供海南網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。
##源碼目錄結(jié)構(gòu)分析
cmd/kube-proxy //負(fù)責(zé)kube-proxy的創(chuàng)建,啟動(dòng)的入口 . ├── app │ ├── conntrack.go //linux kernel的nf_conntrack-sysctl的interface定義,更多關(guān)于conntracker的定義請(qǐng)看https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt │ ├── options │ │ └── options.go //kube-proxy的參數(shù)定義ProxyServerConfig及相關(guān)方法 │ ├── server.go //ProxyServer結(jié)構(gòu)定義及其創(chuàng)建(NewProxyServerDefault)和運(yùn)行(Run)的方法。 │ └── server_test.go └── proxy.go //kube-proxy的main方法 pkg/proxy . ├── OWNERS ├── config │ ├── api.go //給proxy配置Service和Endpoint的Reflectors和Cache.Store │ ├── api_test.go │ ├── config.go //定義ServiceUpdate,EndpointUpdate結(jié)構(gòu)體以及ServiceConfigHandler,EndpointConfigHandler來(lái)處理Service和Endpoint的Update │ ├── config_test.go │ └── doc.go ├── doc.go ├── healthcheck //負(fù)責(zé)service listener和endpoint的health check,add/delete請(qǐng)求。 │ ├── api.go │ ├── doc.go │ ├── healthcheck.go │ ├── healthcheck_test.go │ ├── http.go │ ├── listener.go │ └── worker.go ├── iptables //proxy mode為iptables的實(shí)現(xiàn) │ ├── proxier.go │ └── proxier_test.go ├── types.go ├── userspace //proxy mode為userspace的實(shí)現(xiàn) │ ├── loadbalancer.go │ ├── port_allocator.go │ ├── port_allocator_test.go │ ├── proxier.go │ ├── proxier_test.go │ ├── proxysocket.go │ ├── rlimit.go │ ├── rlimit_windows.go │ ├── roundrobin.go │ ├── roundrobin_test.go │ └── udp_server.go └── winuserspace //windows OS時(shí),proxy mode為userspace的實(shí)現(xiàn) ├── loadbalancer.go ├── port_allocator.go ├── port_allocator_test.go ├── proxier.go ├── proxier_test.go ├── proxysocket.go ├── roundrobin.go ├── roundrobin_test.go └── udp_server.go
##內(nèi)部實(shí)現(xiàn)模塊邏輯圖
##源碼分析
###main kube-proxy的main入口在:cmd/kube-proxy/proxy.go:39
func main() { //創(chuàng)建kube-proxy的默認(rèn)config對(duì)象 config := options.NewProxyConfig() //用kube-proxy命令行的參數(shù)替換默認(rèn)參數(shù) config.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() //根據(jù)config創(chuàng)建ProxyServer s, err := app.NewProxyServerDefault(config) if err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } //執(zhí)行Run方法讓kube-proxy開始干活了 if err = s.Run(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
main方法中,我們重點(diǎn)關(guān)注app.NewProxyServerDefault(config)創(chuàng)建ProxyServer和Run方法。
###創(chuàng)建ProxyServer NewProxyServerDefault負(fù)責(zé)根據(jù)提供的config參數(shù)創(chuàng)建一個(gè)新的ProxyServer對(duì)象,其代碼比較長(zhǎng),邏輯相對(duì)復(fù)雜,下面會(huì)挑重點(diǎn)說(shuō)一下。
cmd/kube-proxy/app/server.go:131 func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) { ... // Create a iptables utils. execer := exec.New() if runtime.GOOS == "windows" { netshInterface = utilnetsh.New(execer) } else { dbus = utildbus.New() iptInterface = utiliptables.New(execer, dbus, protocol) } ... //設(shè)置OOM_SCORE_ADJ var oomAdjuster *oom.OOMAdjuster if config.OOMScoreAdj != nil { oomAdjuster = oom.NewOOMAdjuster() if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*config.OOMScoreAdj)); err != nil { glog.V(2).Info(err) } } ... // Create a Kube Client ... // 創(chuàng)建event Broadcaster和event recorder hostname := nodeutil.GetHostname(config.HostnameOverride) eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "kube-proxy", Host: hostname}) //定義proxier和endpointsHandler,分別用于處理services和endpoints的update event。 var proxier proxy.ProxyProvider var endpointsHandler proxyconfig.EndpointsConfigHandler //從config中獲取proxy mode proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) // proxy mode為iptables場(chǎng)景 if proxyMode == proxyModeIPTables { glog.V(0).Info("Using iptables Proxier.") if config.IPTablesMasqueradeBit == nil { // IPTablesMasqueradeBit must be specified or defaulted. return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config") } //調(diào)用pkg/proxy/iptables/proxier.go:222中的iptables.NewProxier來(lái)創(chuàng)建proxier,賦值給前面定義的proxier和endpointsHandler,表示由該proxier同時(shí)負(fù)責(zé)service和endpoint的event處理。 proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname)) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierIPTables endpointsHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") userspace.CleanupLeftovers(iptInterface) } // proxy mode為userspace場(chǎng)景 else { glog.V(0).Info("Using userspace Proxier.") // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // our config.EndpointsConfigHandler. loadBalancer := userspace.NewLoadBalancerRR() // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer var proxierUserspace proxy.ProxyProvider // windows OS場(chǎng)景下,調(diào)用pkg/proxy/winuserspace/proxier.go:146的winuserspace.NewProxier來(lái)創(chuàng)建proxier。 if runtime.GOOS == "windows" { proxierUserspace, err = winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), netshInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), // TODO @pires replace below with default values, if applicable config.IPTablesSyncPeriod.Duration, config.UDPIdleTimeout.Duration, ) } // linux OS場(chǎng)景下,調(diào)用pkg/proxy/userspace/proxier.go:143的userspace.NewProxier來(lái)創(chuàng)建proxier。 else { proxierUserspace, err = userspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), iptInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.UDPIdleTimeout.Duration, ) } if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierUserspace // Remove artifacts from the pure-iptables Proxier, if not on Windows. if runtime.GOOS != "windows" { glog.V(0).Info("Tearing down pure-iptables proxy rules.") iptables.CleanupLeftovers(iptInterface) } } // Add iptables reload function, if not on Windows. if runtime.GOOS != "windows" { iptInterface.AddReloadFunc(proxier.Sync) } // Create configs (i.e. Watches for Services and Endpoints) // 創(chuàng)建serviceConfig負(fù)責(zé)service的watchforUpdates serviceConfig := proxyconfig.NewServiceConfig() //給serviceConfig注冊(cè)proxier,既添加對(duì)應(yīng)的listener用來(lái)處理service update時(shí)邏輯。 serviceConfig.RegisterHandler(proxier) // 創(chuàng)建endpointsConfig負(fù)責(zé)endpoint的watchforUpdates endpointsConfig := proxyconfig.NewEndpointsConfig() //給endpointsConfig注冊(cè)endpointsHandler,既添加對(duì)應(yīng)的listener用來(lái)處理endpoint update時(shí)的邏輯。 endpointsConfig.RegisterHandler(endpointsHandler) //NewSourceAPI creates config source that watches for changes to the services and endpoints. //NewSourceAPI通過(guò)ListWatch apiserver的Service和endpoint,并周期性的維護(hù)serviceStore和endpointStore的更新 proxyconfig.NewSourceAPI( client.Core().RESTClient(), config.ConfigSyncPeriod, serviceConfig.Channel("api"), //Service Update Channel endpointsConfig.Channel("api"), //endpoint update channel ) ... //把前面創(chuàng)建的對(duì)象作為參數(shù),構(gòu)造出ProxyServer對(duì)象。 return NewProxyServer(client, config, iptInterface, proxier, eventBroadcaster, recorder, conntracker, proxyMode) }
NewProxyServerDefault中的核心邏輯我都已經(jīng)在上述代碼中添加了注釋,其中有幾個(gè)地方需要我們?cè)偕钊敫M(jìn)去看看:proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。
####proxyconfig.NewServiceConfig 我們對(duì)ServiceConfig的代碼分析一遍,EndpointsConfig的代碼則類似。
pkg/proxy/config/config.go:192 func NewServiceConfig() *ServiceConfig { // 創(chuàng)建updates channel updates := make(chan struct{}, 1) // 構(gòu)建serviceStore對(duì)象 store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} mux := config.NewMux(store) // 新建Broadcaster,在后續(xù)的serviceConfig.RegisterHandler會(huì)注冊(cè)該Broadcaster的listener。 bcaster := config.NewBroadcaster() //啟動(dòng)協(xié)程,馬上開始watch updates channel go watchForUpdates(bcaster, store, updates) return &ServiceConfig{mux, bcaster, store} }
下面我們?cè)俑M(jìn)watchForUpdates去看看。
pkg/proxy/config/config.go:292 func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { for true { <-updates bcaster.Notify(accessor.MergedState()) } }
watchForUpdates就是一直在watch updates channel,如果有數(shù)據(jù),則立刻由該Broadcaster Notify到注冊(cè)的listeners。 Notify的代碼如下,可見,它負(fù)責(zé)將數(shù)據(jù)通知給所有的listener,并調(diào)用各個(gè)listener的OnUpdate方法。
pkg/util/config/config.go:133 // Notify notifies all listeners. func (b *Broadcaster) Notify(instance interface{}) { b.listenerLock.RLock() listeners := b.listeners b.listenerLock.RUnlock() for _, listener := range listeners { listener.OnUpdate(instance) } } func (f ListenerFunc) OnUpdate(instance interface{}) { f(instance) }
####serviceConfig.RegisterHandler 上面分析的proxyconfig.NewServiceConfig負(fù)責(zé)創(chuàng)建ServiceConfig,開始watch updates channel了,當(dāng)從channel中取到值的時(shí)候,Broadcaster就會(huì)通知listener進(jìn)行處理。serviceConfig.RegisterHandler正是負(fù)責(zé)給Broadcaster注冊(cè)listener的,其代碼如下。
pkg/proxy/config/config.go:205 func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { //給ServiceConfig的Broadcaster注冊(cè)listener。 c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnServiceUpdate()") handler.OnServiceUpdate(instance.([]api.Service)) })) }
上面分析proxyconfig.NewServiceConfig時(shí)可知,當(dāng)從updates channel中取到值的時(shí)候,最終會(huì)調(diào)用對(duì)應(yīng)的ListenerFunc(instance)進(jìn)行處理,在這里,也就是調(diào)用:
func(instance interface{}) { glog.V(3).Infof("Calling handler.OnServiceUpdate()") handler.OnServiceUpdate(instance.([]api.Service)) }
即調(diào)用到handler.OnServiceUpdate。每種proxymode對(duì)應(yīng)的proxier都有對(duì)應(yīng)的handler.OnServiceUpdate接口實(shí)現(xiàn),我們以iptables mode為例,看看handler.OnServiceUpdate的實(shí)現(xiàn):
pkg/proxy/iptables/proxier.go:428 func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { ... proxier.syncProxyRules() proxier.deleteServiceConnections(staleUDPServices.List()) }
因此,最終關(guān)鍵的邏輯都轉(zhuǎn)向了proxier.syncProxyRules(),從我們上面給出的內(nèi)部模塊交互圖也能看得出來(lái)。對(duì)于proxier.syncProxyRules(),我們放到后面來(lái)詳細(xì)討論,現(xiàn)在你只要知道proxier.syncProxyRules()負(fù)責(zé)將proxy中緩存的service/endpoint同步更新到iptables中生成對(duì)應(yīng)Chain和NAT Rules。
####proxyconfig.NewEndpointsConfig endpointsConfig的邏輯和serviceConfig的類似,在這里只給出對(duì)應(yīng)代碼,不再跟進(jìn)分析。
pkg/proxy/config/config.go:84 func NewEndpointsConfig() *EndpointsConfig { // The updates channel is used to send interrupts to the Endpoints handler. // It's buffered because we never want to block for as long as there is a // pending interrupt, but don't want to drop them if the handler is doing // work. updates := make(chan struct{}, 1) store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) return &EndpointsConfig{mux, bcaster, store} }
####endpointsConfig.RegisterHandler
pkg/proxy/config/config.go:97 func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") handler.OnEndpointsUpdate(instance.([]api.Endpoints)) })) }
####proxyconfig.NewSourceAPI
proxyconfig.NewSourceAPI是很關(guān)鍵的,它負(fù)責(zé)給service updates channel和endpoint updates channel配置數(shù)據(jù)源,它是通過(guò)周期性的List和Watch kube-apiserver中的all service and endpoint來(lái)提供數(shù)據(jù)的,發(fā)給對(duì)應(yīng)的channel。默認(rèn)的List周期是15min,可通過(guò)--config-sync-period
修改。下面來(lái)看其具體代碼:
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run() endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run() } // NewServiceStore creates an undelta store that expands updates to the store into // ServiceUpdate events on the channel. If no store is passed, a default store will // be initialized. Allows reuse of a cache store across multiple components. func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { fn := func(objs []interface{}) { var services []api.Service for _, o := range objs { services = append(services, *(o.(*api.Service))) } ch <- ServiceUpdate{Op: SET, Services: services} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } } // NewEndpointsStore creates an undelta store that expands updates to the store into // EndpointsUpdate events on the channel. If no store is passed, a default store will // be initialized. Allows reuse of a cache store across multiple components. func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store { fn := func(objs []interface{}) { var endpoints []api.Endpoints for _, o := range objs { endpoints = append(endpoints, *(o.(*api.Endpoints))) } ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } }
代碼很簡(jiǎn)單,不需要過(guò)多解釋。
###執(zhí)行Run開始工作 創(chuàng)建完P(guān)roxyServer后,就執(zhí)行Run方法開始工作了,它主要負(fù)責(zé)周期性(default 30s)的同步proxy中的services/endpionts到iptables中生成對(duì)應(yīng)Chain and NAT Rules。
cmd/kube-proxy/app/server.go:308 func (s *ProxyServer) Run() error { ... // Start up a webserver if requested if s.Config.HealthzPort > 0 { http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", s.ProxyMode) }) configz.InstallHandler(http.DefaultServeMux) go wait.Until(func() { err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(int(s.Config.HealthzPort)), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } ... // Just loop forever for now... s.Proxier.SyncLoop() return nil }
Run方法關(guān)鍵代碼很簡(jiǎn)單,就是執(zhí)行對(duì)應(yīng)proxier的SyncLoop()。我們還是以iptables mode為例,看看它是如何實(shí)現(xiàn)SyncLoop()的:
pkg/proxy/iptables/proxier.go:416 // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { t := time.NewTicker(proxier.syncPeriod) defer t.Stop() for { <-t.C glog.V(6).Infof("Periodic sync") proxier.Sync() } }
SyncLoop中,通過(guò)設(shè)置定時(shí)器,默認(rèn)每30s會(huì)執(zhí)行一次proxier.Sync(),可以通過(guò)--iptables-sync-period
修改默認(rèn)時(shí)間。那我們繼續(xù)跟進(jìn)Sync()的代碼:
pkg/proxy/iptables/proxier.go:409 // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.syncProxyRules() }
可見,最終還是調(diào)用proxier.syncProxyRules()。前一節(jié)中創(chuàng)建ProxyServer的分析也是一樣,最終watch到service/endpoint有更新時(shí),都會(huì)調(diào)用到proxier.syncProxyRules()。那下面我們就來(lái)看看proxier.syncProxyRules()的代碼。
###proxier.syncProxyRules
下面的proxier.syncProxyRules代碼是iptables mode對(duì)應(yīng)的實(shí)現(xiàn)。userspace mode的代碼我就不貼了。
pkg/proxy/iptables/proxier.go:791 // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { if proxier.throttle != nil { proxier.throttle.Accept() } start := time.Now() defer func() { glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain. { tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} for _, table := range tablesNeedServicesChain { if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil { glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err) return } } tableChainsNeedJumpServices := []struct { table utiliptables.Table chain utiliptables.Chain }{ {utiliptables.TableFilter, utiliptables.ChainOutput}, {utiliptables.TableNAT, utiliptables.ChainOutput}, {utiliptables.TableNAT, utiliptables.ChainPrerouting}, } comment := "kubernetes service portals" args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} for _, tc := range tableChainsNeedJumpServices { if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) return } } } // Create and link the kube postrouting chain. { if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil { glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err) return } comment := "kubernetes postrouting rules" args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)} if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err) return } } // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingFilterChains := make(map[utiliptables.Chain]string) iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableFilter) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw) } existingNATChains := make(map[utiliptables.Chain]string) iptablesSaveRaw, err = proxier.iptables.Save(utiliptables.TableNAT) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) } filterChains := bytes.NewBuffer(nil) filterRules := bytes.NewBuffer(nil) natChains := bytes.NewBuffer(nil) natRules := bytes.NewBuffer(nil) // Write table headers. writeLine(filterChains, "*filter") writeLine(natChains, "*nat") // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). if chain, ok := existingFilterChains[kubeServicesChain]; ok { writeLine(filterChains, chain) } else { writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeServicesChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeNodePortsChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) } if chain, ok := existingNATChains[kubePostroutingChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) } // Install the kubernetes-specific postrouting rules. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. writeLine(natRules, []string{ "-A", string(kubePostroutingChain), "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, "-m", "mark", "--mark", proxier.masqueradeMark, "-j", "MASQUERADE", }...) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. writeLine(natRules, []string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--set-xmark", proxier.masqueradeMark, }...) // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set // Accumulate the set of local ports that we will be holding open once this update is complete replacementPortsMap := map[localPort]closeable{} // Build rules for each service. for svcName, svcInfo := range proxier.serviceMap { protocol := strings.ToLower(string(svcInfo.protocol)) // Create the per-service chain, retaining counters if possible. svcChain := servicePortChainName(svcName, protocol) if chain, ok := existingNATChains[svcChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true svcXlbChain := serviceLBChainName(svcName, protocol) if svcInfo.onlyNodeLocalEndpoints { // Only for services with the externalTraffic annotation set to OnlyLocal // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { writeLine(natChains, lbChain) } else { writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain)) } activeNATChains[svcXlbChain] = true } else if activeNATChains[svcXlbChain] { // Cleanup the previously created XLB chain for this service delete(activeNATChains, svcXlbChain) } // Capture the clusterIP. args := []string{ "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", fmt.Sprintf("%d", svcInfo.port), } if proxier.masqueradeAll { writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if len(proxier.clusterCIDR) > 0 { writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) } writeLine(natRules, append(args, "-j", string(svcChain))...) // Capture externalIPs. for _, externalIP := range svcInfo.externalIPs { // If the "external" IP happens to be an IP that is local to this // machine, hold the local port open so no other process can open it // (because the socket might open but it would never work). if local, err := isLocalIP(externalIP); err != nil { glog.Errorf("can't determine if IP is local, assuming not: %v", err) } else if local { lp := localPort{ desc: "externalIP for " + svcName.String(), ip: externalIP, port: svcInfo.port, protocol: protocol, } if proxier.portsMap[lp] != nil { glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err) continue } replacementPortsMap[lp] = socket } } // We're holding the port, so it's OK to install iptables rules. args := []string{ "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", externalIP), "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets to external IPs. writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. // This rule roughly translates to "all traffic from off-machine". // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } // Capture load-balancer ingress. for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { // create service firewall chain fwChain := serviceFirewallChainName(svcName, protocol) if chain, ok := existingNATChains[fwChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(fwChain)) } activeNATChains[fwChain] = true // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. args := []string{ "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", ingress.IP), "--dport", fmt.Sprintf("%d", svcInfo.port), } // jump to service firewall chain writeLine(natRules, append(args, "-j", string(fwChain))...) args = []string{ "-A", string(fwChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), } // Each source match rule in the FW chain may jump to either the SVC or the XLB chain chosenChain := svcXlbChain // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.onlyNodeLocalEndpoints { writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } if len(svcInfo.loadBalancerSourceRanges) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain writeLine(natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { allowFromNode = true } } // generally, ip route rule was added to intercept request to loadbalancer vip from the // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) } } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...) } } // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.nodePort != 0 { // Hold the local port open so no other process can open it // (because the socket might open but it would never work). lp := localPort{ desc: "nodePort for " + svcName.String(), ip: "", port: svcInfo.nodePort, protocol: protocol, } if proxier.portsMap[lp] != nil { glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } if lp.protocol == "udp" { proxier.clearUdpConntrackForPort(lp.port) } replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install iptables rules. args := []string{ "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", svcName.String(), "-m", protocol, "-p", protocol, "--dport", fmt.Sprintf("%d", svcInfo.nodePort), } if !svcInfo.onlyNodeLocalEndpoints { // Nodeports need SNAT, unless they're local. writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. writeLine(natRules, append(args, "-j", string(svcChain))...) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). writeLine(natRules, append(args, "-j", string(svcXlbChain))...) } } // If the service has no endpoints then reject packets. if len(proxier.endpointsMap[svcName]) == 0 { writeLine(filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", fmt.Sprintf("%d", svcInfo.port), "-j", "REJECT", ) continue } // Generate the per-endpoint chains. We do this in multiple passes so we // can group rules together. // These two slices parallel each other - keep in sync endpoints := make([]*endpointsInfo, 0) endpointChains := make([]utiliptables.Chain, 0) for _, ep := range proxier.endpointsMap[svcName] { endpoints = append(endpoints, ep) endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(endpointChain)) } activeNATChains[endpointChain] = true } // First write session affinity rules, if applicable. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { for _, endpointChain := range endpointChains { writeLine(natRules, "-A", string(svcChain), "-m", "comment", "--comment", svcName.String(), "-m", "recent", "--name", string(endpointChain), "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap", "-j", string(endpointChain)) } } // Now write loadbalancing & DNAT rules. n := len(endpointChains) for i, endpointChain := range endpointChains { // Balancing rules in the per-service chain. args := []string{ "-A", string(svcChain), "-m", "comment", "--comment", svcName.String(), } if i < (n - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", "--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i))) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) writeLine(natRules, args...) // Rules in the per-endpoint chain. args = []string{ "-A", string(endpointChain), "-m", "comment", "--comment", svcName.String(), } // Handle traffic that loops back to the originator with SNAT. writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip) writeLine(natRules, args...) } // The logic below this applies only if this service is marked as OnlyLocal if !svcInfo.onlyNodeLocalEndpoints { continue } // Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation // TODO - This logic may be combinable with the block above that creates the svc balancer chain localEndpoints := make([]*endpointsInfo, 0) localEndpointChains := make([]utiliptables.Chain, 0) for i := range endpointChains { if endpoints[i].localEndpoint { // These slices parallel each other; must be kept in sync localEndpoints = append(localEndpoints, endpoints[i]) localEndpointChains = append(localEndpointChains, endpointChains[i]) } } // First rule in the chain redirects all pod -> external vip traffic to the // Service's ClusterIP instead. This happens whether or not we have local // endpoints; only if clusterCIDR is specified if len(proxier.clusterCIDR) > 0 { args = []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`), "-s", proxier.clusterCIDR, "-j", string(svcChain), } writeLine(natRules, args...) } numLocalEndpoints := len(localEndpointChains) if numLocalEndpoints == 0 { // Blackhole all traffic since there are no local endpoints args := []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()), "-j", string(KubeMarkDropChain), } writeLine(natRules, args...) } else { // Setup probability filter rules only over local endpoints for i, endpointChain := range localEndpointChains { // Balancing rules in the per-service chain. args := []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()), } if i < (numLocalEndpoints - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", "--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i))) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) writeLine(natRules, args...) } } } // Delete chains no longer in use. for chain := range existingNATChains { if !activeNATChains[chain] { chainString := string(chain) if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { // Ignore chains that aren't ours. continue } // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. writeLine(natChains, existingNATChains[chain]) writeLine(natRules, "-X", chainString) } } // Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules. writeLine(natRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(kubeNodePortsChain)) // Write the end-of-table markers. writeLine(filterRules, "COMMIT") writeLine(natRules, "COMMIT") // Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. filterLines := append(filterChains.Bytes(), filterRules.Bytes()...) natLines := append(natChains.Bytes(), natRules.Bytes()...) lines := append(filterLines, natLines...) glog.V(3).Infof("Restoring iptables rules: %s", lines) err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines) // Revert new local ports. revertPorts(replacementPortsMap, proxier.portsMap) return } // Close old local ports and save new ones. for k, v := range proxier.portsMap { if replacementPortsMap[k] == nil { v.Close() } } proxier.portsMap = replacementPortsMap }
感謝各位的閱讀,以上就是“kube-proxy怎么使用”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)kube-proxy怎么使用這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!