真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

kube-proxy怎么使用

這篇文章主要講解了“kube-proxy怎么使用”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“kube-proxy怎么使用”吧!

成都創(chuàng)新互聯(lián)專注于海南網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供海南營(yíng)銷型網(wǎng)站建設(shè),海南網(wǎng)站制作、海南網(wǎng)頁(yè)設(shè)計(jì)、海南網(wǎng)站官網(wǎng)定制、成都小程序開發(fā)服務(wù),打造海南網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供海南網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

##源碼目錄結(jié)構(gòu)分析

cmd/kube-proxy      //負(fù)責(zé)kube-proxy的創(chuàng)建,啟動(dòng)的入口
.
├── app
│   ├── conntrack.go    //linux kernel的nf_conntrack-sysctl的interface定義,更多關(guān)于conntracker的定義請(qǐng)看https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
│   ├── options
│   │   └── options.go    //kube-proxy的參數(shù)定義ProxyServerConfig及相關(guān)方法
│   ├── server.go    //ProxyServer結(jié)構(gòu)定義及其創(chuàng)建(NewProxyServerDefault)和運(yùn)行(Run)的方法。
│   └── server_test.go
└── proxy.go    //kube-proxy的main方法


pkg/proxy
.
├── OWNERS
├── config
│   ├── api.go    //給proxy配置Service和Endpoint的Reflectors和Cache.Store
│   ├── api_test.go
│   ├── config.go    //定義ServiceUpdate,EndpointUpdate結(jié)構(gòu)體以及ServiceConfigHandler,EndpointConfigHandler來(lái)處理Service和Endpoint的Update
│   ├── config_test.go
│   └── doc.go
├── doc.go
├── healthcheck    //負(fù)責(zé)service listener和endpoint的health check,add/delete請(qǐng)求。
│   ├── api.go
│   ├── doc.go
│   ├── healthcheck.go
│   ├── healthcheck_test.go
│   ├── http.go
│   ├── listener.go
│   └── worker.go
├── iptables    //proxy mode為iptables的實(shí)現(xiàn)
│   ├── proxier.go
│   └── proxier_test.go
├── types.go
├── userspace    //proxy mode為userspace的實(shí)現(xiàn)
│   ├── loadbalancer.go
│   ├── port_allocator.go
│   ├── port_allocator_test.go
│   ├── proxier.go
│   ├── proxier_test.go
│   ├── proxysocket.go
│   ├── rlimit.go
│   ├── rlimit_windows.go
│   ├── roundrobin.go
│   ├── roundrobin_test.go
│   └── udp_server.go
└── winuserspace    //windows OS時(shí),proxy mode為userspace的實(shí)現(xiàn)
    ├── loadbalancer.go
    ├── port_allocator.go
    ├── port_allocator_test.go
    ├── proxier.go
    ├── proxier_test.go
    ├── proxysocket.go
    ├── roundrobin.go
    ├── roundrobin_test.go
    └── udp_server.go

##內(nèi)部實(shí)現(xiàn)模塊邏輯圖 kube-proxy怎么使用

##源碼分析

###main kube-proxy的main入口在:cmd/kube-proxy/proxy.go:39

func main() {
	//創(chuàng)建kube-proxy的默認(rèn)config對(duì)象
	config := options.NewProxyConfig()
	//用kube-proxy命令行的參數(shù)替換默認(rèn)參數(shù)
	config.AddFlags(pflag.CommandLine)

	flag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	verflag.PrintAndExitIfRequested()
	
	//根據(jù)config創(chuàng)建ProxyServer
	s, err := app.NewProxyServerDefault(config)
	if err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}

	//執(zhí)行Run方法讓kube-proxy開始干活了
	if err = s.Run(); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

main方法中,我們重點(diǎn)關(guān)注app.NewProxyServerDefault(config)創(chuàng)建ProxyServer和Run方法。

###創(chuàng)建ProxyServer NewProxyServerDefault負(fù)責(zé)根據(jù)提供的config參數(shù)創(chuàng)建一個(gè)新的ProxyServer對(duì)象,其代碼比較長(zhǎng),邏輯相對(duì)復(fù)雜,下面會(huì)挑重點(diǎn)說(shuō)一下。

cmd/kube-proxy/app/server.go:131

func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) {
	...

	// Create a iptables utils.
	execer := exec.New()

	if runtime.GOOS == "windows" {
		netshInterface = utilnetsh.New(execer)
	} else {
		dbus = utildbus.New()
		iptInterface = utiliptables.New(execer, dbus, protocol)
	}

	...
	//設(shè)置OOM_SCORE_ADJ
	var oomAdjuster *oom.OOMAdjuster
	if config.OOMScoreAdj != nil {
		oomAdjuster = oom.NewOOMAdjuster()
		if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*config.OOMScoreAdj)); err != nil {
			glog.V(2).Info(err)
		}
	}
	
	...
	
	// Create a Kube Client
	...

	// 創(chuàng)建event Broadcaster和event recorder
	hostname := nodeutil.GetHostname(config.HostnameOverride)
	eventBroadcaster := record.NewBroadcaster()
	recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "kube-proxy", Host: hostname})

	//定義proxier和endpointsHandler,分別用于處理services和endpoints的update event。
	var proxier proxy.ProxyProvider
	var endpointsHandler proxyconfig.EndpointsConfigHandler

	//從config中獲取proxy mode
	proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
	
	// proxy mode為iptables場(chǎng)景
	if proxyMode == proxyModeIPTables {
		glog.V(0).Info("Using iptables Proxier.")
		if config.IPTablesMasqueradeBit == nil {
			// IPTablesMasqueradeBit must be specified or defaulted.
			return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
		}
		
		//調(diào)用pkg/proxy/iptables/proxier.go:222中的iptables.NewProxier來(lái)創(chuàng)建proxier,賦值給前面定義的proxier和endpointsHandler,表示由該proxier同時(shí)負(fù)責(zé)service和endpoint的event處理。
		proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
		if err != nil {
			glog.Fatalf("Unable to create proxier: %v", err)
		}
		proxier = proxierIPTables
		endpointsHandler = proxierIPTables
		// No turning back. Remove artifacts that might still exist from the userspace Proxier.
		glog.V(0).Info("Tearing down userspace rules.")
		userspace.CleanupLeftovers(iptInterface)
	} 
	// proxy mode為userspace場(chǎng)景
	else {
		glog.V(0).Info("Using userspace Proxier.")
		// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
		// our config.EndpointsConfigHandler.
		loadBalancer := userspace.NewLoadBalancerRR()
		// set EndpointsConfigHandler to our loadBalancer
		endpointsHandler = loadBalancer

		var proxierUserspace proxy.ProxyProvider
		
		// windows OS場(chǎng)景下,調(diào)用pkg/proxy/winuserspace/proxier.go:146的winuserspace.NewProxier來(lái)創(chuàng)建proxier。
		if runtime.GOOS == "windows" {
			proxierUserspace, err = winuserspace.NewProxier(
				loadBalancer,
				net.ParseIP(config.BindAddress),
				netshInterface,
				*utilnet.ParsePortRangeOrDie(config.PortRange),
				// TODO @pires replace below with default values, if applicable
				config.IPTablesSyncPeriod.Duration,
				config.UDPIdleTimeout.Duration,
			)
		} 
		
		// linux OS場(chǎng)景下,調(diào)用pkg/proxy/userspace/proxier.go:143的userspace.NewProxier來(lái)創(chuàng)建proxier。
		else {
			proxierUserspace, err = userspace.NewProxier(
				loadBalancer,
				net.ParseIP(config.BindAddress),
				iptInterface,
				*utilnet.ParsePortRangeOrDie(config.PortRange),
				config.IPTablesSyncPeriod.Duration,
				config.IPTablesMinSyncPeriod.Duration,
				config.UDPIdleTimeout.Duration,
			)
		}
		if err != nil {
			glog.Fatalf("Unable to create proxier: %v", err)
		}
		proxier = proxierUserspace
		// Remove artifacts from the pure-iptables Proxier, if not on Windows.
		if runtime.GOOS != "windows" {
			glog.V(0).Info("Tearing down pure-iptables proxy rules.")
			iptables.CleanupLeftovers(iptInterface)
		}
	}

	// Add iptables reload function, if not on Windows.
	if runtime.GOOS != "windows" {
		iptInterface.AddReloadFunc(proxier.Sync)
	}

	// Create configs (i.e. Watches for Services and Endpoints)
	// 創(chuàng)建serviceConfig負(fù)責(zé)service的watchforUpdates
	serviceConfig := proxyconfig.NewServiceConfig()
	
	//給serviceConfig注冊(cè)proxier,既添加對(duì)應(yīng)的listener用來(lái)處理service update時(shí)邏輯。
	serviceConfig.RegisterHandler(proxier)

	// 創(chuàng)建endpointsConfig負(fù)責(zé)endpoint的watchforUpdates
	endpointsConfig := proxyconfig.NewEndpointsConfig()
	
	//給endpointsConfig注冊(cè)endpointsHandler,既添加對(duì)應(yīng)的listener用來(lái)處理endpoint update時(shí)的邏輯。
	endpointsConfig.RegisterHandler(endpointsHandler)

	//NewSourceAPI creates config source that watches for changes to the services and endpoints.
	//NewSourceAPI通過(guò)ListWatch apiserver的Service和endpoint,并周期性的維護(hù)serviceStore和endpointStore的更新
	proxyconfig.NewSourceAPI(
		client.Core().RESTClient(),
		config.ConfigSyncPeriod,
		serviceConfig.Channel("api"), //Service Update Channel
		endpointsConfig.Channel("api"),  //endpoint update channel
	)

	...

	//把前面創(chuàng)建的對(duì)象作為參數(shù),構(gòu)造出ProxyServer對(duì)象。
	return NewProxyServer(client, config, iptInterface, proxier, eventBroadcaster, recorder, conntracker, proxyMode)
}

NewProxyServerDefault中的核心邏輯我都已經(jīng)在上述代碼中添加了注釋,其中有幾個(gè)地方需要我們?cè)偕钊敫M(jìn)去看看:proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。

####proxyconfig.NewServiceConfig 我們對(duì)ServiceConfig的代碼分析一遍,EndpointsConfig的代碼則類似。

pkg/proxy/config/config.go:192
func NewServiceConfig() *ServiceConfig {
	// 創(chuàng)建updates channel
	updates := make(chan struct{}, 1)
	
	// 構(gòu)建serviceStore對(duì)象
	store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
	mux := config.NewMux(store)
	
	// 新建Broadcaster,在后續(xù)的serviceConfig.RegisterHandler會(huì)注冊(cè)該Broadcaster的listener。
	bcaster := config.NewBroadcaster()
	
	//啟動(dòng)協(xié)程,馬上開始watch updates channel
	go watchForUpdates(bcaster, store, updates)
	
	return &ServiceConfig{mux, bcaster, store}
}

下面我們?cè)俑M(jìn)watchForUpdates去看看。

pkg/proxy/config/config.go:292
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
	for true {
		<-updates
		bcaster.Notify(accessor.MergedState())
	}
}

watchForUpdates就是一直在watch updates channel,如果有數(shù)據(jù),則立刻由該Broadcaster Notify到注冊(cè)的listeners。 Notify的代碼如下,可見,它負(fù)責(zé)將數(shù)據(jù)通知給所有的listener,并調(diào)用各個(gè)listener的OnUpdate方法。

pkg/util/config/config.go:133
// Notify notifies all listeners.
func (b *Broadcaster) Notify(instance interface{}) {
	b.listenerLock.RLock()
	listeners := b.listeners
	b.listenerLock.RUnlock()
	for _, listener := range listeners {
		listener.OnUpdate(instance)
	}
}

func (f ListenerFunc) OnUpdate(instance interface{}) {
	f(instance)
}

####serviceConfig.RegisterHandler 上面分析的proxyconfig.NewServiceConfig負(fù)責(zé)創(chuàng)建ServiceConfig,開始watch updates channel了,當(dāng)從channel中取到值的時(shí)候,Broadcaster就會(huì)通知listener進(jìn)行處理。serviceConfig.RegisterHandler正是負(fù)責(zé)給Broadcaster注冊(cè)listener的,其代碼如下。

pkg/proxy/config/config.go:205

func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
	//給ServiceConfig的Broadcaster注冊(cè)listener。
	c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
		glog.V(3).Infof("Calling handler.OnServiceUpdate()")
		handler.OnServiceUpdate(instance.([]api.Service))
	}))
}

上面分析proxyconfig.NewServiceConfig時(shí)可知,當(dāng)從updates channel中取到值的時(shí)候,最終會(huì)調(diào)用對(duì)應(yīng)的ListenerFunc(instance)進(jìn)行處理,在這里,也就是調(diào)用:

func(instance interface{}) {
		glog.V(3).Infof("Calling handler.OnServiceUpdate()")
		handler.OnServiceUpdate(instance.([]api.Service))
	}

即調(diào)用到handler.OnServiceUpdate。每種proxymode對(duì)應(yīng)的proxier都有對(duì)應(yīng)的handler.OnServiceUpdate接口實(shí)現(xiàn),我們以iptables mode為例,看看handler.OnServiceUpdate的實(shí)現(xiàn):

pkg/proxy/iptables/proxier.go:428
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
	...

	proxier.syncProxyRules()
	proxier.deleteServiceConnections(staleUDPServices.List())

}

因此,最終關(guān)鍵的邏輯都轉(zhuǎn)向了proxier.syncProxyRules(),從我們上面給出的內(nèi)部模塊交互圖也能看得出來(lái)。對(duì)于proxier.syncProxyRules(),我們放到后面來(lái)詳細(xì)討論,現(xiàn)在你只要知道proxier.syncProxyRules()負(fù)責(zé)將proxy中緩存的service/endpoint同步更新到iptables中生成對(duì)應(yīng)Chain和NAT Rules。

####proxyconfig.NewEndpointsConfig endpointsConfig的邏輯和serviceConfig的類似,在這里只給出對(duì)應(yīng)代碼,不再跟進(jìn)分析。

pkg/proxy/config/config.go:84

func NewEndpointsConfig() *EndpointsConfig {
	// The updates channel is used to send interrupts to the Endpoints handler.
	// It's buffered because we never want to block for as long as there is a
	// pending interrupt, but don't want to drop them if the handler is doing
	// work.
	updates := make(chan struct{}, 1)
	store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
	mux := config.NewMux(store)
	bcaster := config.NewBroadcaster()
	go watchForUpdates(bcaster, store, updates)
	return &EndpointsConfig{mux, bcaster, store}
}

####endpointsConfig.RegisterHandler

pkg/proxy/config/config.go:97

func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
	c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
		glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
		handler.OnEndpointsUpdate(instance.([]api.Endpoints))
	}))
}

####proxyconfig.NewSourceAPI

proxyconfig.NewSourceAPI是很關(guān)鍵的,它負(fù)責(zé)給service updates channel和endpoint updates channel配置數(shù)據(jù)源,它是通過(guò)周期性的List和Watch kube-apiserver中的all service and endpoint來(lái)提供數(shù)據(jù)的,發(fā)給對(duì)應(yīng)的channel。默認(rèn)的List周期是15min,可通過(guò)--config-sync-period修改。下面來(lái)看其具體代碼:

func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
	servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
	cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()

	endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
	cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()
}

// NewServiceStore creates an undelta store that expands updates to the store into
// ServiceUpdate events on the channel. If no store is passed, a default store will
// be initialized. Allows reuse of a cache store across multiple components.
func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store {
	fn := func(objs []interface{}) {
		var services []api.Service
		for _, o := range objs {
			services = append(services, *(o.(*api.Service)))
		}
		ch <- ServiceUpdate{Op: SET, Services: services}
	}
	if store == nil {
		store = cache.NewStore(cache.MetaNamespaceKeyFunc)
	}
	return &cache.UndeltaStore{
		Store:    store,
		PushFunc: fn,
	}
}

// NewEndpointsStore creates an undelta store that expands updates to the store into
// EndpointsUpdate events on the channel. If no store is passed, a default store will
// be initialized. Allows reuse of a cache store across multiple components.
func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store {
	fn := func(objs []interface{}) {
		var endpoints []api.Endpoints
		for _, o := range objs {
			endpoints = append(endpoints, *(o.(*api.Endpoints)))
		}
		ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
	}
	if store == nil {
		store = cache.NewStore(cache.MetaNamespaceKeyFunc)
	}
	return &cache.UndeltaStore{
		Store:    store,
		PushFunc: fn,
	}
}

代碼很簡(jiǎn)單,不需要過(guò)多解釋。

###執(zhí)行Run開始工作 創(chuàng)建完P(guān)roxyServer后,就執(zhí)行Run方法開始工作了,它主要負(fù)責(zé)周期性(default 30s)的同步proxy中的services/endpionts到iptables中生成對(duì)應(yīng)Chain and NAT Rules。

cmd/kube-proxy/app/server.go:308
func (s *ProxyServer) Run() error {
	...

	// Start up a webserver if requested
	if s.Config.HealthzPort > 0 {
		http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
			fmt.Fprintf(w, "%s", s.ProxyMode)
		})
		configz.InstallHandler(http.DefaultServeMux)
		go wait.Until(func() {
			err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(int(s.Config.HealthzPort)), nil)
			if err != nil {
				glog.Errorf("Starting health server failed: %v", err)
			}
		}, 5*time.Second, wait.NeverStop)
	}

	...

	// Just loop forever for now...
	s.Proxier.SyncLoop()
	return nil
}

Run方法關(guān)鍵代碼很簡(jiǎn)單,就是執(zhí)行對(duì)應(yīng)proxier的SyncLoop()。我們還是以iptables mode為例,看看它是如何實(shí)現(xiàn)SyncLoop()的:

pkg/proxy/iptables/proxier.go:416
// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
func (proxier *Proxier) SyncLoop() {
	t := time.NewTicker(proxier.syncPeriod)
	defer t.Stop()
	for {
		<-t.C
		glog.V(6).Infof("Periodic sync")
		proxier.Sync()
	}
}

SyncLoop中,通過(guò)設(shè)置定時(shí)器,默認(rèn)每30s會(huì)執(zhí)行一次proxier.Sync(),可以通過(guò)--iptables-sync-period修改默認(rèn)時(shí)間。那我們繼續(xù)跟進(jìn)Sync()的代碼:

pkg/proxy/iptables/proxier.go:409
// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
	proxier.mu.Lock()
	defer proxier.mu.Unlock()
	proxier.syncProxyRules()
}

可見,最終還是調(diào)用proxier.syncProxyRules()。前一節(jié)中創(chuàng)建ProxyServer的分析也是一樣,最終watch到service/endpoint有更新時(shí),都會(huì)調(diào)用到proxier.syncProxyRules()。那下面我們就來(lái)看看proxier.syncProxyRules()的代碼。

###proxier.syncProxyRules

下面的proxier.syncProxyRules代碼是iptables mode對(duì)應(yīng)的實(shí)現(xiàn)。userspace mode的代碼我就不貼了。

pkg/proxy/iptables/proxier.go:791
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
	if proxier.throttle != nil {
		proxier.throttle.Accept()
	}
	start := time.Now()
	defer func() {
		glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
	}()
	// don't sync rules till we've received services and endpoints
	if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
		glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
		return
	}
	glog.V(3).Infof("Syncing iptables rules")

	// Create and link the kube services chain.
	{
		tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
		for _, table := range tablesNeedServicesChain {
			if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil {
				glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err)
				return
			}
		}

		tableChainsNeedJumpServices := []struct {
			table utiliptables.Table
			chain utiliptables.Chain
		}{
			{utiliptables.TableFilter, utiliptables.ChainOutput},
			{utiliptables.TableNAT, utiliptables.ChainOutput},
			{utiliptables.TableNAT, utiliptables.ChainPrerouting},
		}
		comment := "kubernetes service portals"
		args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)}
		for _, tc := range tableChainsNeedJumpServices {
			if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
				glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err)
				return
			}
		}
	}

	// Create and link the kube postrouting chain.
	{
		if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil {
			glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err)
			return
		}

		comment := "kubernetes postrouting rules"
		args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)}
		if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
			glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err)
			return
		}
	}

	// Get iptables-save output so we can check for existing chains and rules.
	// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
	existingFilterChains := make(map[utiliptables.Chain]string)
	iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableFilter)
	if err != nil { // if we failed to get any rules
		glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
	} else { // otherwise parse the output
		existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw)
	}

	existingNATChains := make(map[utiliptables.Chain]string)
	iptablesSaveRaw, err = proxier.iptables.Save(utiliptables.TableNAT)
	if err != nil { // if we failed to get any rules
		glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
	} else { // otherwise parse the output
		existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
	}

	filterChains := bytes.NewBuffer(nil)
	filterRules := bytes.NewBuffer(nil)
	natChains := bytes.NewBuffer(nil)
	natRules := bytes.NewBuffer(nil)

	// Write table headers.
	writeLine(filterChains, "*filter")
	writeLine(natChains, "*nat")

	// Make sure we keep stats for the top-level chains, if they existed
	// (which most should have because we created them above).
	if chain, ok := existingFilterChains[kubeServicesChain]; ok {
		writeLine(filterChains, chain)
	} else {
		writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain))
	}
	if chain, ok := existingNATChains[kubeServicesChain]; ok {
		writeLine(natChains, chain)
	} else {
		writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain))
	}
	if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
		writeLine(natChains, chain)
	} else {
		writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain))
	}
	if chain, ok := existingNATChains[kubePostroutingChain]; ok {
		writeLine(natChains, chain)
	} else {
		writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain))
	}
	if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
		writeLine(natChains, chain)
	} else {
		writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
	}

	// Install the kubernetes-specific postrouting rules. We use a whole chain for
	// this so that it is easier to flush and change, for example if the mark
	// value should ever change.
	writeLine(natRules, []string{
		"-A", string(kubePostroutingChain),
		"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
		"-m", "mark", "--mark", proxier.masqueradeMark,
		"-j", "MASQUERADE",
	}...)

	// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
	// this so that it is easier to flush and change, for example if the mark
	// value should ever change.
	writeLine(natRules, []string{
		"-A", string(KubeMarkMasqChain),
		"-j", "MARK", "--set-xmark", proxier.masqueradeMark,
	}...)

	// Accumulate NAT chains to keep.
	activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set

	// Accumulate the set of local ports that we will be holding open once this update is complete
	replacementPortsMap := map[localPort]closeable{}

	// Build rules for each service.
	for svcName, svcInfo := range proxier.serviceMap {
		protocol := strings.ToLower(string(svcInfo.protocol))

		// Create the per-service chain, retaining counters if possible.
		svcChain := servicePortChainName(svcName, protocol)
		if chain, ok := existingNATChains[svcChain]; ok {
			writeLine(natChains, chain)
		} else {
			writeLine(natChains, utiliptables.MakeChainLine(svcChain))
		}
		activeNATChains[svcChain] = true

		svcXlbChain := serviceLBChainName(svcName, protocol)
		if svcInfo.onlyNodeLocalEndpoints {
			// Only for services with the externalTraffic annotation set to OnlyLocal
			// create the per-service LB chain, retaining counters if possible.
			if lbChain, ok := existingNATChains[svcXlbChain]; ok {
				writeLine(natChains, lbChain)
			} else {
				writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain))
			}
			activeNATChains[svcXlbChain] = true
		} else if activeNATChains[svcXlbChain] {
			// Cleanup the previously created XLB chain for this service
			delete(activeNATChains, svcXlbChain)
		}

		// Capture the clusterIP.
		args := []string{
			"-A", string(kubeServicesChain),
			"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
			"-m", protocol, "-p", protocol,
			"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
			"--dport", fmt.Sprintf("%d", svcInfo.port),
		}
		if proxier.masqueradeAll {
			writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
		}
		if len(proxier.clusterCIDR) > 0 {
			writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
		}
		writeLine(natRules, append(args, "-j", string(svcChain))...)

		// Capture externalIPs.
		for _, externalIP := range svcInfo.externalIPs {
			// If the "external" IP happens to be an IP that is local to this
			// machine, hold the local port open so no other process can open it
			// (because the socket might open but it would never work).
			if local, err := isLocalIP(externalIP); err != nil {
				glog.Errorf("can't determine if IP is local, assuming not: %v", err)
			} else if local {
				lp := localPort{
					desc:     "externalIP for " + svcName.String(),
					ip:       externalIP,
					port:     svcInfo.port,
					protocol: protocol,
				}
				if proxier.portsMap[lp] != nil {
					glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
					replacementPortsMap[lp] = proxier.portsMap[lp]
				} else {
					socket, err := proxier.portMapper.OpenLocalPort(&lp)
					if err != nil {
						glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err)
						continue
					}
					replacementPortsMap[lp] = socket
				}
			} // We're holding the port, so it's OK to install iptables rules.
			args := []string{
				"-A", string(kubeServicesChain),
				"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()),
				"-m", protocol, "-p", protocol,
				"-d", fmt.Sprintf("%s/32", externalIP),
				"--dport", fmt.Sprintf("%d", svcInfo.port),
			}
			// We have to SNAT packets to external IPs.
			writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)

			// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
			// nor from a local process to be forwarded to the service.
			// This rule roughly translates to "all traffic from off-machine".
			// This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
			externalTrafficOnlyArgs := append(args,
				"-m", "physdev", "!", "--physdev-is-in",
				"-m", "addrtype", "!", "--src-type", "LOCAL")
			writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
			dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
			// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
			// This covers cases like GCE load-balancers which get added to the local routing table.
			writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
		}

		// Capture load-balancer ingress.
		for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
			if ingress.IP != "" {
				// create service firewall chain
				fwChain := serviceFirewallChainName(svcName, protocol)
				if chain, ok := existingNATChains[fwChain]; ok {
					writeLine(natChains, chain)
				} else {
					writeLine(natChains, utiliptables.MakeChainLine(fwChain))
				}
				activeNATChains[fwChain] = true
				// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
				// This currently works for loadbalancers that preserves source ips.
				// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.

				args := []string{
					"-A", string(kubeServicesChain),
					"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
					"-m", protocol, "-p", protocol,
					"-d", fmt.Sprintf("%s/32", ingress.IP),
					"--dport", fmt.Sprintf("%d", svcInfo.port),
				}
				// jump to service firewall chain
				writeLine(natRules, append(args, "-j", string(fwChain))...)

				args = []string{
					"-A", string(fwChain),
					"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
				}

				// Each source match rule in the FW chain may jump to either the SVC or the XLB chain
				chosenChain := svcXlbChain
				// If we are proxying globally, we need to masquerade in case we cross nodes.
				// If we are proxying only locally, we can retain the source IP.
				if !svcInfo.onlyNodeLocalEndpoints {
					writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
					chosenChain = svcChain
				}

				if len(svcInfo.loadBalancerSourceRanges) == 0 {
					// allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
					writeLine(natRules, append(args, "-j", string(chosenChain))...)
				} else {
					// firewall filter based on each source range
					allowFromNode := false
					for _, src := range svcInfo.loadBalancerSourceRanges {
						writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...)
						// ignore error because it has been validated
						_, cidr, _ := net.ParseCIDR(src)
						if cidr.Contains(proxier.nodeIP) {
							allowFromNode = true
						}
					}
					// generally, ip route rule was added to intercept request to loadbalancer vip from the
					// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
					// Need to add the following rule to allow request on host.
					if allowFromNode {
						writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
					}
				}

				// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
				// It means the packet cannot go thru the firewall, then mark it for DROP
				writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...)
			}
		}

		// Capture nodeports.  If we had more than 2 rules it might be
		// worthwhile to make a new per-service chain for nodeport rules, but
		// with just 2 rules it ends up being a waste and a cognitive burden.
		if svcInfo.nodePort != 0 {
			// Hold the local port open so no other process can open it
			// (because the socket might open but it would never work).
			lp := localPort{
				desc:     "nodePort for " + svcName.String(),
				ip:       "",
				port:     svcInfo.nodePort,
				protocol: protocol,
			}
			if proxier.portsMap[lp] != nil {
				glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
				replacementPortsMap[lp] = proxier.portsMap[lp]
			} else {
				socket, err := proxier.portMapper.OpenLocalPort(&lp)
				if err != nil {
					glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
					continue
				}
				if lp.protocol == "udp" {
					proxier.clearUdpConntrackForPort(lp.port)
				}
				replacementPortsMap[lp] = socket
			} // We're holding the port, so it's OK to install iptables rules.

			args := []string{
				"-A", string(kubeNodePortsChain),
				"-m", "comment", "--comment", svcName.String(),
				"-m", protocol, "-p", protocol,
				"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
			}
			if !svcInfo.onlyNodeLocalEndpoints {
				// Nodeports need SNAT, unless they're local.
				writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
				// Jump to the service chain.
				writeLine(natRules, append(args, "-j", string(svcChain))...)
			} else {
				// TODO: Make all nodePorts jump to the firewall chain.
				// Currently we only create it for loadbalancers (#33586).
				writeLine(natRules, append(args, "-j", string(svcXlbChain))...)
			}
		}

		// If the service has no endpoints then reject packets.
		if len(proxier.endpointsMap[svcName]) == 0 {
			writeLine(filterRules,
				"-A", string(kubeServicesChain),
				"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
				"-m", protocol, "-p", protocol,
				"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
				"--dport", fmt.Sprintf("%d", svcInfo.port),
				"-j", "REJECT",
			)
			continue
		}

		// Generate the per-endpoint chains.  We do this in multiple passes so we
		// can group rules together.
		// These two slices parallel each other - keep in sync
		endpoints := make([]*endpointsInfo, 0)
		endpointChains := make([]utiliptables.Chain, 0)
		for _, ep := range proxier.endpointsMap[svcName] {
			endpoints = append(endpoints, ep)
			endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip)
			endpointChains = append(endpointChains, endpointChain)

			// Create the endpoint chain, retaining counters if possible.
			if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
				writeLine(natChains, chain)
			} else {
				writeLine(natChains, utiliptables.MakeChainLine(endpointChain))
			}
			activeNATChains[endpointChain] = true
		}

		// First write session affinity rules, if applicable.
		if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
			for _, endpointChain := range endpointChains {
				writeLine(natRules,
					"-A", string(svcChain),
					"-m", "comment", "--comment", svcName.String(),
					"-m", "recent", "--name", string(endpointChain),
					"--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap",
					"-j", string(endpointChain))
			}
		}

		// Now write loadbalancing & DNAT rules.
		n := len(endpointChains)
		for i, endpointChain := range endpointChains {
			// Balancing rules in the per-service chain.
			args := []string{
				"-A", string(svcChain),
				"-m", "comment", "--comment", svcName.String(),
			}
			if i < (n - 1) {
				// Each rule is a probabilistic match.
				args = append(args,
					"-m", "statistic",
					"--mode", "random",
					"--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i)))
			}
			// The final (or only if n == 1) rule is a guaranteed match.
			args = append(args, "-j", string(endpointChain))
			writeLine(natRules, args...)

			// Rules in the per-endpoint chain.
			args = []string{
				"-A", string(endpointChain),
				"-m", "comment", "--comment", svcName.String(),
			}
			// Handle traffic that loops back to the originator with SNAT.
			writeLine(natRules, append(args,
				"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]),
				"-j", string(KubeMarkMasqChain))...)
			// Update client-affinity lists.
			if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
				args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
			}
			// DNAT to final destination.
			args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip)
			writeLine(natRules, args...)
		}

		// The logic below this applies only if this service is marked as OnlyLocal
		if !svcInfo.onlyNodeLocalEndpoints {
			continue
		}

		// Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation
		// TODO - This logic may be combinable with the block above that creates the svc balancer chain
		localEndpoints := make([]*endpointsInfo, 0)
		localEndpointChains := make([]utiliptables.Chain, 0)
		for i := range endpointChains {
			if endpoints[i].localEndpoint {
				// These slices parallel each other; must be kept in sync
				localEndpoints = append(localEndpoints, endpoints[i])
				localEndpointChains = append(localEndpointChains, endpointChains[i])
			}
		}
		// First rule in the chain redirects all pod -> external vip traffic to the
		// Service's ClusterIP instead. This happens whether or not we have local
		// endpoints; only if clusterCIDR is specified
		if len(proxier.clusterCIDR) > 0 {
			args = []string{
				"-A", string(svcXlbChain),
				"-m", "comment", "--comment",
				fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`),
				"-s", proxier.clusterCIDR,
				"-j", string(svcChain),
			}
			writeLine(natRules, args...)
		}

		numLocalEndpoints := len(localEndpointChains)
		if numLocalEndpoints == 0 {
			// Blackhole all traffic since there are no local endpoints
			args := []string{
				"-A", string(svcXlbChain),
				"-m", "comment", "--comment",
				fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()),
				"-j",
				string(KubeMarkDropChain),
			}
			writeLine(natRules, args...)
		} else {
			// Setup probability filter rules only over local endpoints
			for i, endpointChain := range localEndpointChains {
				// Balancing rules in the per-service chain.
				args := []string{
					"-A", string(svcXlbChain),
					"-m", "comment", "--comment",
					fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()),
				}
				if i < (numLocalEndpoints - 1) {
					// Each rule is a probabilistic match.
					args = append(args,
						"-m", "statistic",
						"--mode", "random",
						"--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i)))
				}
				// The final (or only if n == 1) rule is a guaranteed match.
				args = append(args, "-j", string(endpointChain))
				writeLine(natRules, args...)
			}
		}
	}

	// Delete chains no longer in use.
	for chain := range existingNATChains {
		if !activeNATChains[chain] {
			chainString := string(chain)
			if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
				// Ignore chains that aren't ours.
				continue
			}
			// We must (as per iptables) write a chain-line for it, which has
			// the nice effect of flushing the chain.  Then we can remove the
			// chain.
			writeLine(natChains, existingNATChains[chain])
			writeLine(natRules, "-X", chainString)
		}
	}

	// Finally, tail-call to the nodeports chain.  This needs to be after all
	// other service portal rules.
	writeLine(natRules,
		"-A", string(kubeServicesChain),
		"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
		"-m", "addrtype", "--dst-type", "LOCAL",
		"-j", string(kubeNodePortsChain))

	// Write the end-of-table markers.
	writeLine(filterRules, "COMMIT")
	writeLine(natRules, "COMMIT")

	// Sync rules.
	// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
	filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
	natLines := append(natChains.Bytes(), natRules.Bytes()...)
	lines := append(filterLines, natLines...)

	glog.V(3).Infof("Restoring iptables rules: %s", lines)
	err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
	if err != nil {
		glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines)
		// Revert new local ports.
		revertPorts(replacementPortsMap, proxier.portsMap)
		return
	}

	// Close old local ports and save new ones.
	for k, v := range proxier.portsMap {
		if replacementPortsMap[k] == nil {
			v.Close()
		}
	}
	proxier.portsMap = replacementPortsMap
}

感謝各位的閱讀,以上就是“kube-proxy怎么使用”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)kube-proxy怎么使用這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!


網(wǎng)站題目:kube-proxy怎么使用
本文網(wǎng)址:http://weahome.cn/article/ipooig.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部