真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

本篇文章給大家分享的是有關(guān)Go語(yǔ)言中怎么調(diào)度循環(huán)源碼,小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話(huà)不多說(shuō),跟著小編一起來(lái)看看吧。

成都創(chuàng)新互聯(lián)主要從事成都網(wǎng)站制作、做網(wǎng)站、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)墨江,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專(zhuān)業(yè),歡迎來(lái)電咨詢(xún)建站服務(wù):13518219792

概述

提到"調(diào)度",我們首先想到的就是操作系統(tǒng)對(duì)進(jìn)程、線(xiàn)程的調(diào)度。操作系統(tǒng)調(diào)度器會(huì)將系統(tǒng)中的多個(gè)線(xiàn)程按照一定算法調(diào)度到物理CPU上去運(yùn)行。雖然線(xiàn)程比較輕量,但是在調(diào)度時(shí)也有比較大的額外開(kāi)銷(xiāo)。每個(gè)線(xiàn)程會(huì)都占用 1M 以上的內(nèi)存空間,線(xiàn)程切換和恢復(fù)寄存器中的內(nèi)容也需要向系統(tǒng)申請(qǐng)資源。

Go 語(yǔ)言的 Goroutine 可以看作對(duì) thread 加的一層抽象,它更輕量級(jí),不僅減少了上下文切換帶來(lái)的額外開(kāi)銷(xiāo),Goroutine 占用的資源也會(huì)更少。如創(chuàng)建一個(gè) Goroutine 的棧內(nèi)存消耗為 2 KB,而 thread 占用 1M 以上空間;thread 創(chuàng)建和銷(xiāo)毀是內(nèi)核級(jí)的,所以都會(huì)有巨大的消耗,而 Goroutine 由 Go runtime 負(fù)責(zé)管理的,創(chuàng)建和銷(xiāo)毀的消耗非常??;Goroutine 的切換成本也比 thread 要小得多。

G M P 模型

Go 的調(diào)度器使用三個(gè)結(jié)構(gòu)體來(lái)實(shí)現(xiàn) Goroutine 的調(diào)度:G M P。

G:代表一個(gè) Goroutine,每個(gè) Goroutine 都有自己獨(dú)立的棧存放當(dāng)前的運(yùn)行內(nèi)存及狀態(tài)。可以把一個(gè) G 當(dāng)做一個(gè)任務(wù),當(dāng) Goroutine 被調(diào)離 CPU 時(shí),調(diào)度器代碼負(fù)責(zé)把 CPU 寄存器的值保存在 G 對(duì)象的成員變量之中,當(dāng) Goroutine 被調(diào)度起來(lái)運(yùn)行時(shí),調(diào)度器代碼又負(fù)責(zé)把 G 對(duì)象的成員變量所保存的寄存器的值恢復(fù)到 CPU 的寄存器。

M:表示內(nèi)核線(xiàn)程,它本身就與一個(gè)內(nèi)核線(xiàn)程進(jìn)行綁定,每個(gè)工作線(xiàn)程都有唯一的一個(gè) M 結(jié)構(gòu)體的實(shí)例對(duì)象與之對(duì)應(yīng)。M 結(jié)構(gòu)體對(duì)象除了記錄著工作線(xiàn)程的諸如棧的起止位置、當(dāng)前正在執(zhí)行的Goroutine 以及是否空閑等等狀態(tài)信息之外,還通過(guò)指針維持著與 P 結(jié)構(gòu)體的實(shí)例對(duì)象之間的綁定關(guān)系。

P:代表一個(gè)虛擬的 Processor 處理器,它維護(hù)一個(gè)局部 Goroutine 可運(yùn)行 G 隊(duì)列,工作線(xiàn)程優(yōu)先使用自己的局部運(yùn)行隊(duì)列,只有必要時(shí)才會(huì)去訪(fǎng)問(wèn)全局運(yùn)行隊(duì)列,這大大減少了鎖沖突,提高了工作線(xiàn)程的并發(fā)性。每個(gè) G 要想真正運(yùn)行起來(lái),首先需要被分配一個(gè) P。

除了上面三個(gè)結(jié)構(gòu)體以外,還有一個(gè)存放所有Runnable 可運(yùn)行 Goroutine 的容器 schedt。每個(gè)Go程序中schedt結(jié)構(gòu)體只有一個(gè)實(shí)例對(duì)象,在代碼中是一個(gè)共享的全局變量,每個(gè)工作線(xiàn)程都可以訪(fǎng)問(wèn)它以及它所擁有的 Goroutine 運(yùn)行隊(duì)列。

下面是G、P、M以及schedt中的全局隊(duì)列的關(guān)系:

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

從上圖可以看出,每個(gè) m 都綁定了一個(gè) P,每個(gè) P 都有一個(gè)私有的本地 Goroutine 隊(duì)列,m對(duì)應(yīng)的線(xiàn)程從本地和全局 Goroutine 隊(duì)列中獲取 Goroutine 并運(yùn)行,綠色的 G 代表正在運(yùn)行的 G。

在默認(rèn)情況下,運(yùn)行時(shí)會(huì)將 GOMAXPROCS 設(shè)置成當(dāng)前機(jī)器的核數(shù),假設(shè)一個(gè)四核機(jī)器會(huì)創(chuàng)建四個(gè)活躍的操作系統(tǒng)線(xiàn)程,每一個(gè)線(xiàn)程都對(duì)應(yīng)一個(gè)運(yùn)行時(shí)中的 M。

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

詳解

結(jié)構(gòu)體

G M P 結(jié)構(gòu)體定義于src/runtime/runtime2.go

G
type g struct { 
	// 當(dāng)前 Goroutine 的棧內(nèi)存范圍 [stack.lo, stack.hi)
	stack       stack 
	// 用于調(diào)度器搶占式調(diào)度  
	stackguard0 uintptr   

	_panic       *_panic  
	_defer       *_defer  
	// 當(dāng)前 Goroutine 占用的線(xiàn)程
	m            *m       
	// 存儲(chǔ) Goroutine 的調(diào)度相關(guān)的數(shù)據(jù)
	sched        gobuf 
	// Goroutine 的狀態(tài)
	atomicstatus uint32 
	// 搶占信號(hào)
	preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
	// 搶占時(shí)將狀態(tài)修改成 `_Gpreempted`
	preemptStop   bool // transition to _Gpreempted on preemption; otherwise, just deschedule
	// 在同步安全點(diǎn)收縮棧
	preemptShrink bool // shrink stack at synchronous safe point
	...
}

下面看看gobuf結(jié)構(gòu)體,主要在調(diào)度器保存或者恢復(fù)上下文的時(shí)候用到:

type gobuf struct {
	// 棧指針
	sp   uintptr
	// 程序計(jì)數(shù)器
	pc   uintptr
	// gobuf對(duì)應(yīng)的Goroutine
	g    guintptr 
	// 系統(tǒng)調(diào)用的返回值
	ret  sys.Uintreg
	...
}

在執(zhí)行過(guò)程中,G可能處于以下幾種狀態(tài):

const (
	//  剛剛被分配并且還沒(méi)有被初始化
	_Gidle = iota // 0 
	// 沒(méi)有執(zhí)行代碼,沒(méi)有棧的所有權(quán),存儲(chǔ)在運(yùn)行隊(duì)列中
	_Grunnable // 1 
	// 可以執(zhí)行代碼,擁有棧的所有權(quán),被賦予了內(nèi)核線(xiàn)程 M 和處理器 P
	_Grunning // 2 
	// 正在執(zhí)行系統(tǒng)調(diào)用,擁有棧的所有權(quán),沒(méi)有執(zhí)行用戶(hù)代碼,
	// 被賦予了內(nèi)核線(xiàn)程 M 但是不在運(yùn)行隊(duì)列上
	_Gsyscall // 3 
	// 由于運(yùn)行時(shí)而被阻塞,沒(méi)有執(zhí)行用戶(hù)代碼并且不在運(yùn)行隊(duì)列上,
	// 但是可能存在于 Channel 的等待隊(duì)列上
	_Gwaiting // 4  
	// 表示當(dāng)前goroutine沒(méi)有被使用,沒(méi)有執(zhí)行代碼,可能有分配的棧
	_Gdead // 6  
	// 棧正在被拷貝,沒(méi)有執(zhí)行代碼,不在運(yùn)行隊(duì)列上
	_Gcopystack // 8 
	// 由于搶占而被阻塞,沒(méi)有執(zhí)行用戶(hù)代碼并且不在運(yùn)行隊(duì)列上,等待喚醒
	_Gpreempted // 9 
	// GC 正在掃描??臻g,沒(méi)有執(zhí)行代碼,可以與其他狀態(tài)同時(shí)存在
	_Gscan          = 0x1000 
	...
)

上面的狀態(tài)看起來(lái)很多,但是實(shí)際上只需要關(guān)注下面幾種就好了:

  • 等待中:_ Gwaiting、_Gsyscall 和 _Gpreempted,這幾個(gè)狀態(tài)表示G沒(méi)有在執(zhí)行;

  • 可運(yùn)行:_Grunnable,表示G已經(jīng)準(zhǔn)備就緒,可以在線(xiàn)程運(yùn)行;

  • 運(yùn)行中:_Grunning,表示G正在運(yùn)行;

M

Go 語(yǔ)言并發(fā)模型中的 M 是操作系統(tǒng)線(xiàn)程,最多只會(huì)有 GOMAXPROCS 個(gè)活躍線(xiàn)程能夠正常運(yùn)行。

type m struct {
	// 持有調(diào)度棧的 Goroutine
	g0      *g       
	// 處理 signal 的 G
	gsignal       *g           
	// 線(xiàn)程本地存儲(chǔ) thread-local
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	// 當(dāng)前運(yùn)行的G
	curg          *g       // current running goroutine
	caughtsig     guintptr // goroutine running during fatal signal
	// 正在運(yùn)行代碼的P
	p             puintptr // attached p for executing go code (nil if not executing go code)
	nextp         puintptr
	// 之前使用的P
	oldp          puintptr  
	...
}
P

調(diào)度器中的處理器 P 是線(xiàn)程 M 和 G 的中間層,用于調(diào)度 G 在 M 上執(zhí)行。

type p struct {
	id          int32
	// p 的狀態(tài)
	status      uint32  
    // 調(diào)度器調(diào)用會(huì)+1
	schedtick   uint32     // incremented on every scheduler call
    // 系統(tǒng)調(diào)用會(huì)+1
	syscalltick uint32     // incremented on every system call
	// 對(duì)應(yīng)關(guān)聯(lián)的 M
	m           muintptr    
	mcache      *mcache
	pcache      pageCache 
	// defer 結(jié)構(gòu)池
	deferpool    [5][]*_defer  
	deferpoolbuf [5][32]*_defer  
	// 可運(yùn)行的 Goroutine 隊(duì)列,可無(wú)鎖訪(fǎng)問(wèn)
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// 緩存可立即執(zhí)行的 G
	runnext guintptr 
	// 可用的 G 列表,G 狀態(tài)等于 Gdead 
	gFree struct {
		gList
		n int32
	}
	...
}

下面看看P的幾個(gè)狀態(tài):

const ( 
	// 表示P沒(méi)有運(yùn)行用戶(hù)代碼或者調(diào)度器 
	_Pidle = iota 
	// 被線(xiàn)程 M 持有,并且正在執(zhí)行用戶(hù)代碼或者調(diào)度器
	_Prunning 
	// 沒(méi)有執(zhí)行用戶(hù)代碼,當(dāng)前線(xiàn)程陷入系統(tǒng)調(diào)用
	_Psyscall
	// 被線(xiàn)程 M 持有,當(dāng)前處理器由于垃圾回收 STW 被停止
	_Pgcstop 
	// 當(dāng)前處理器已經(jīng)不被使用
	_Pdead
)
sched

sched 我們?cè)谏厦嬉蔡岬搅?,主要存放了調(diào)度器持有的全局資源,如空閑的 P 鏈表、 G 的全局隊(duì)列等。

type schedt struct {
	...
	lock mutex 
	// 空閑的 M 列表
	midle        muintptr  
	// 空閑的 M 列表數(shù)量
	nmidle       int32      
	// 下一個(gè)被創(chuàng)建的 M 的 id
	mnext        int64  
	// 能擁有的最大數(shù)量的 M  
	maxmcount    int32    
	// 空閑 p 鏈表
	pidle      puintptr // idle p's
	// 空閑 p 數(shù)量
	npidle     uint32
	// 處于 spinning 狀態(tài)的 M 的數(shù)量
	nmspinning uint32   
	// 全局 runnable G 隊(duì)列
	runq     gQueue
	runqsize int32  
	// 有效 dead G 的全局緩存.
	gFree struct {
		lock    mutex
		stack   gList // Gs with stacks
		noStack gList // Gs without stacks
		n       int32
	} 
	// sudog 結(jié)構(gòu)的集中緩存
	sudoglock  mutex
	sudogcache *sudog 
	// defer 結(jié)構(gòu)的池
	deferlock mutex
	deferpool [5]*_defer 
	...
}

從Go程序啟動(dòng)講起

這里還是借助dlv來(lái)進(jìn)行調(diào)試。有關(guān) dlv 如何斷點(diǎn)匯編的內(nèi)容我在這一篇:https://www.luozhiyun.com/archives/434 《詳解Go中內(nèi)存分配源碼實(shí)現(xiàn)》已經(jīng)有很詳細(xì)的介紹了,感興趣的可以去看看。需要注意的是這里有個(gè)坑,下面的例子是在Linux中進(jìn)行的。

首先我們寫(xiě)一個(gè)非常簡(jiǎn)單的例子:

package main

import "fmt"

func main() {
	fmt.Println("hello world")
}

然后進(jìn)行構(gòu)建:

go build main.go
dlv exec ./main

開(kāi)打程序后按步驟輸入下面的命令:

(dlv) r
Process restarted with PID 33191
(dlv) list
> _rt0_amd64_linux() /usr/local/go/src/runtime/rt0_linux_amd64.s:8 (PC: 0x4648c0)
Warning: debugging optimized function
Warning: listing may not match stale executable
     3: // license that can be found in the LICENSE file.
     4:
     5: #include "textflag.h"
     6:
     7: TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
=>   8:         JMP     _rt0_amd64(SB)
     9:
    10: TEXT _rt0_amd64_linux_lib(SB),NOSPLIT,$0
    11:         JMP     _rt0_amd64_lib(SB) 
(dlv) si
> _rt0_amd64() /usr/local/go/src/runtime/asm_amd64.s:15 (PC: 0x4613e0)
Warning: debugging optimized function
Warning: listing may not match stale executable
    10: // _rt0_amd64 is common startup code for most amd64 systems when using
    11: // internal linking. This is the entry point for the program from the
    12: // kernel for an ordinary -buildmode=exe program. The stack holds the
    13: // number of arguments and the C-style argv.
    14: TEXT _rt0_amd64(SB),NOSPLIT,$-8
=>  15:         MOVQ    0(SP), DI       // argc
    16:         LEAQ    8(SP), SI       // argv
    17:         JMP     runtime·rt0_go(SB)
    18:
    19: // main is common startup code for most amd64 systems when using
    20: // external linking. The C startup code will call the symbol "main"
(dlv)

通過(guò)上面的斷點(diǎn)可以知道在linux amd64系統(tǒng)的啟動(dòng)函數(shù)是在asm_amd64.s的runtime·rt0_go函數(shù)中。當(dāng)然,不同的平臺(tái)有不同的程序入口,感興趣的同學(xué)可以自行去了解。

下面我們看看runtime·rt0_go

TEXT runtime·rt0_go(SB),NOSPLIT,$0
	...
	// 初始化執(zhí)行文件的絕對(duì)路徑
	CALL	runtime·args(SB)
	// 初始化 CPU 個(gè)數(shù)和內(nèi)存頁(yè)大小
	CALL	runtime·osinit(SB)
	// 調(diào)度器初始化
	CALL	runtime·schedinit(SB) 
	// 創(chuàng)建一個(gè)新的 goroutine 來(lái)啟動(dòng)程序
	MOVQ	$runtime·mainPC(SB), AX		// entry
	// 新建一個(gè) goroutine,該 goroutine 綁定 runtime.main
	CALL	runtime·newproc(SB) 
	// 啟動(dòng)M,開(kāi)始調(diào)度goroutine
	CALL	runtime·mstart(SB)
	...

上面的CALL方法中:

schedinit進(jìn)行各種運(yùn)行時(shí)組件初始化工作,這包括我們的調(diào)度器與內(nèi)存分配器、回收器的初始化;

newproc負(fù)責(zé)根據(jù)主 G 入口地址創(chuàng)建可被運(yùn)行時(shí)調(diào)度的執(zhí)行單元;

mstart開(kāi)始啟動(dòng)調(diào)度器的調(diào)度循環(huán);

調(diào)度初始化 runtime.schedinit
func schedinit() {
	...
	_g_ := getg()
	...
	// 最大線(xiàn)程數(shù)10000
	sched.maxmcount = 10000 
	// M0 初始化
	mcommoninit(_g_.m, -1)
	...	  
    // 垃圾回收器初始化
	gcinit()

	sched.lastpoll = uint64(nanotime())
    // 通過(guò) CPU 核心數(shù)和 GOMAXPROCS 環(huán)境變量確定 P 的數(shù)量
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	// P 初始化
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
    ...
}

schedinit函數(shù)會(huì)將 maxmcount 設(shè)置成10000,這也就是一個(gè) Go 語(yǔ)言程序能夠創(chuàng)建的最大線(xiàn)程數(shù)。然后調(diào)用 mcommoninit 對(duì) M0 進(jìn)行初始化,通過(guò) CPU 核心數(shù)和 GOMAXPROCS 環(huán)境變量確定 P 的數(shù)量之后就會(huì)調(diào)用 procresize 函數(shù)對(duì) P 進(jìn)行初始化。

M0 初始化
func mcommoninit(mp *m, id int64) {
	_g_ := getg()
	...
	lock(&sched.lock)
	// 如果傳入id小于0,那么id則從mReserveID獲取,初次從mReserveID獲取id為0
	if id >= 0 {
		mp.id = id
	} else {
		mp.id = mReserveID()
	}
	//random初始化,用于竊取 G
	mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
	mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
	if mp.fastrand[0]|mp.fastrand[1] == 0 {
		mp.fastrand[1] = 1
	}
	// 創(chuàng)建用于信號(hào)處理的gsignal,只是簡(jiǎn)單的從堆上分配一個(gè)g結(jié)構(gòu)體對(duì)象,然后把棧設(shè)置好就返回了
	mpreinit(mp)
	if mp.gsignal != nil {
		mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
	}

	// 把 M 掛入全局鏈表allm之中
	mp.alllink = allm
	...
}

這里傳入的 id 是-1,初次調(diào)用會(huì)將 id 設(shè)置為 0,這里并未對(duì)m0做什么關(guān)于調(diào)度相關(guān)的初始化,所以可以簡(jiǎn)單的認(rèn)為這個(gè)函數(shù)只是把m0放入全局鏈表allm之中就返回了。

P 初始化

runtime.procresize

var allp       []*p 

func procresize(nprocs int32) *p {
	// 獲取先前的 P 個(gè)數(shù)
	old := gomaxprocs
	// 更新統(tǒng)計(jì)信息
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now
	// 根據(jù) runtime.MAXGOPROCS 調(diào)整 p 的數(shù)量,因?yàn)?nbsp;runtime.MAXGOPROCS 用戶(hù)可以自行設(shè)定
	if nprocs > int32(len(allp)) { 
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p, nprocs) 
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}
 
	// 初始化新的 P
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		// 為空,則申請(qǐng)新的 P 對(duì)象
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}

	_g_ := getg()
	// P 不為空,并且 id 小于 nprocs ,那么可以繼續(xù)使用當(dāng)前 P
	if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
		// continue to use the current P
		_g_.m.p.ptr().status = _Prunning
		_g_.m.p.ptr().mcache.prepareForSweep()
	} else { 
		// 釋放當(dāng)前 P,因?yàn)橐咽?
		if _g_.m.p != 0 { 
			_g_.m.p.ptr().m = 0
		}
		_g_.m.p = 0
		p := allp[0]
		p.m = 0
		p.status = _Pidle
		// P0 綁定到當(dāng)前的 M0
		acquirep(p) 
	}
	// 從未使用的 P 釋放資源
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy() 
		// 不能釋放 p 本身,因?yàn)樗赡茉?nbsp;m 進(jìn)入系統(tǒng)調(diào)用時(shí)被引用
	}
	// 釋放完 P 之后重置allp的長(zhǎng)度
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		unlock(&allpLock)
	}
	var runnablePs *p
	// 將沒(méi)有本地任務(wù)的 P 放到空閑鏈表中
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		// 當(dāng)前正在使用的 P 略過(guò)
		if _g_.m.p.ptr() == p {
			continue
		}
		// 設(shè)置狀態(tài)為 _Pidle 
		p.status = _Pidle
		// P 的任務(wù)列表是否為空
		if runqempty(p) {
			// 放入到空閑列表中
			pidleput(p)
		} else {
			// 獲取空閑 M 綁定到 P 上
			p.m.set(mget())
            // 
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	return runnablePs
}

procresize方法的執(zhí)行過(guò)程如下:

  1. allp 是全局變量 P 的資源池,如果 allp 的切片中的處理器數(shù)量少于期望數(shù)量,會(huì)對(duì)切片進(jìn)行擴(kuò)容;

  2. 擴(kuò)容的時(shí)候會(huì)使用 new 申請(qǐng)一個(gè)新的 P ,然后使用 init 初始化,需要注意的是初始化的 P 的 id 就是傳入的 i 的值,狀態(tài)為 _Pgcstop;

  3. 然后通過(guò) _g_.m.p 獲取 M0,如果 M0 已與有效的 P 綁定上,則將 被綁定的 P 的狀態(tài)修改為 _Prunning。否則獲取 allp[0] 作為 P0 調(diào)用 runtime.acquirep 與 M0 進(jìn)行綁定;

  4. 超過(guò)處理器個(gè)數(shù)的 P 通過(guò)p.destroy釋放資源,p.destroy會(huì)將與 P 相關(guān)的資源釋放,并將 P 狀態(tài)設(shè)置為 _Pdead;

  5. 通過(guò)截?cái)喔淖內(nèi)肿兞?allp 的長(zhǎng)度保證與期望處理器數(shù)量相等;

  6. 遍歷 allp 檢查 P 的是否處于空閑狀態(tài),是的話(huà)放入到空閑列表中;

P.init

func (pp *p) init(id int32) {
	// 設(shè)置id
	pp.id = id
	// 設(shè)置狀態(tài)為 _Pgcstop
	pp.status = _Pgcstop
	// 與 sudog 相關(guān)
	pp.sudogcache = pp.sudogbuf[:0]
	for i := range pp.deferpool {
		pp.deferpool[i] = pp.deferpoolbuf[i][:0]
	}
	pp.wbBuf.reset()
	// mcache 初始化
	if pp.mcache == nil {
		if id == 0 {
			if mcache0 == nil {
				throw("missing mcache?")
			} 
			pp.mcache = mcache0
		} else {
			pp.mcache = allocmcache()
		}
	}
	...
	lockInit(&pp.timersLock, lockRankTimers)
}

這里會(huì)初始化一些 P 的字段值,如設(shè)置 id、status、sudogcache、mcache、lock相關(guān) 。

初始化 sudogcache 這個(gè)字段存的是 sudog 的集合與 Channel 相關(guān),可以看這里:多圖詳解Go中的Channel源碼 https://www.luozhiyun.com/archives/427。

每個(gè) P 中會(huì)保存相應(yīng)的 mcache ,能快速的進(jìn)行分配微對(duì)象和小對(duì)象的分配,具體的可以看這里:詳解Go中內(nèi)存分配源碼實(shí)現(xiàn) https://www.luozhiyun.com/archives/434。

下面再來(lái)看看 runtime.acquirep 是如何將 P 與 M 綁定的:

runtime.acquirep

func acquirep(_p_ *p) { 
	wirep(_p_)
	...
}

func wirep(_p_ *p) {
	_g_ := getg()

	...
	// 將 P 與 M 相互綁定
	_g_.m.p.set(_p_)
	_p_.m.set(_g_.m)
	// 設(shè)置 P 狀態(tài)為 _Prunning
	_p_.status = _Prunning
}

這個(gè)方法十分簡(jiǎn)單,就不解釋了。下面再看看 runtime.pidleput將 P 放入空閑列表:

func pidleput(_p_ *p) {
	// 如果 P 運(yùn)行隊(duì)列不為空,那么不能放入空閑列表
	if !runqempty(_p_) {
		throw("pidleput: P has non-empty run queue")
	}
	// 將 P 與 pidle 列表關(guān)聯(lián)
	_p_.link = sched.pidle
	sched.pidle.set(_p_)
	atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}
G 初始化

從匯編可以知道執(zhí)行完runtime·schedinit后就會(huì)執(zhí)行 runtime.newproc是創(chuàng)建G的入口。

runtime.newproc

func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	// 獲取當(dāng)前的 G 
	gp := getg()
	// 獲取調(diào)用者的程序計(jì)數(shù)器 PC
	pc := getcallerpc() 
	systemstack(func() {
		// 獲取新的 G 結(jié)構(gòu)體
		newg := newproc1(fn, argp, siz, gp, pc)
		_p_ := getg().m.p.ptr()
        // 將 G 加入到 P 的運(yùn)行隊(duì)列
		runqput(_p_, newg, true)
		// mainStarted 為 True 表示主M已經(jīng)啟動(dòng)
		if mainStarted {
			// 喚醒新的  P 執(zhí)行 G
			wakep()
		}
	})
}

runtime.newproc會(huì)獲取 當(dāng)前 G 以及調(diào)用方的程序計(jì)數(shù)器,然后調(diào)用 newproc1 獲取新的 G 結(jié)構(gòu)體;然后將 G 放入到 P 的 runnext 字段中。

runtime.newproc1

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()

	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
	// 加鎖,禁止 G 的 M 被搶占
	acquirem() // disable preemption because it can be holding p in a local var
	siz := narg
	siz = (siz + 7) &^ 7 

	_p_ := _g_.m.p.ptr()
	// 從 P 的空閑列表 gFree 查找空閑 G
	newg := gfget(_p_)
	if newg == nil {
		// 創(chuàng)建一個(gè)棧大小為 2K 大小的 G
		newg = malg(_StackMin)
		// CAS 改變 G 狀態(tài)為 _Gdead
		casgstatus(newg, _Gidle, _Gdead)
		// 將 G 加入到全局 allgs 列表中
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	...
	// 計(jì)算運(yùn)行空間大小
	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp
	...
	if narg > 0 {
		// 從 argp 參數(shù)開(kāi)始的位置,復(fù)制 narg 個(gè)字節(jié)到 spArg(參數(shù)拷貝)
		memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
		...
	}
	// 清理、創(chuàng)建并初始化的 G
	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg, false) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	// 將 G 狀態(tài)CAS為 _Grunnable 狀態(tài)
	casgstatus(newg, _Gdead, _Grunnable) 
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	...
	// 釋放鎖,對(duì)應(yīng)上面 acquirem
	releasem(_g_.m)

	return newg
}

newproc1函數(shù)比較長(zhǎng),下面總結(jié)一下主要做了哪幾件事:

  1. 從 P 的空閑列表 gFree 查找空閑 G;

  2. 如果獲取不到 G ,那么調(diào)用 malg 創(chuàng)建創(chuàng)建一個(gè)新的 G ,需要注意的是 _StackMin 為2048,表示創(chuàng)建的 G 的棧上內(nèi)存占用為2K。然后 CAS 改變 G 狀態(tài)為 _Gdead,并加入到全局 allgs 列表中;

  3. 根據(jù)要執(zhí)行函數(shù)的入口地址和參數(shù),初始化執(zhí)行棧的 SP 和參數(shù)的入棧位置,調(diào)用 memmove 進(jìn)行參數(shù)拷貝;

  4. 清理、創(chuàng)建并初始化的 G,將 G 狀態(tài)CAS為 _Grunnable 狀態(tài),返回;

下面看看 runtime.gfget是如何查找 G:

runtime.gfget

func gfget(_p_ *p) *g {
	retry:
		// 如果 P 的空閑列表 gFree 為空,sched 的的空閑列表 gFree 不為空
		if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
			lock(&sched.gFree.lock) 
			// 從sched 的 gFree 列表中移動(dòng) 32 個(gè)到 P 的 gFree 中
			for _p_.gFree.n < 32 { 
				gp := sched.gFree.stack.pop()
				if gp == nil {
					gp = sched.gFree.noStack.pop()
					if gp == nil {
						break
					}
				}
				sched.gFree.n--
				_p_.gFree.push(gp)
				_p_.gFree.n++
			}
			unlock(&sched.gFree.lock)
			goto retry
		}
		// 此時(shí)如果 gFree 列表還是為空,返回空 
		gp := _p_.gFree.pop()
		if gp == nil {
			return nil
		}
		...
		return gp
}
  1. 當(dāng) P 的空閑列表 gFree 為空時(shí)會(huì)從 sched 持有的空閑列表 gFree 轉(zhuǎn)移32個(gè) G 到當(dāng)前的 P 的空閑列表上;

  2. 然后從 P 的 gFree 列表頭返回一個(gè) G;

當(dāng) newproc 運(yùn)行完 newproc1 后會(huì)調(diào)用 runtime.runqput將 G 放入到運(yùn)行列表中:

runtime.runqput

func runqput(_p_ *p, gp *g, next bool) {
	if randomizeScheduler && next && fastrand()%2 == 0 {
		next = false
	} 
	if next {
	retryNext:
	// 將 G 放入到 runnext 中作為下一個(gè)處理器執(zhí)行的任務(wù)
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		} 
		// 將原來(lái) runnext 的 G 放入到運(yùn)行隊(duì)列中
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&_p_.runqhead)  
	t := _p_.runqtail
	// 放入到 P 本地運(yùn)行隊(duì)列中
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1)  
		return
	}
	// P 本地隊(duì)列放不下了,放入到全局的運(yùn)行隊(duì)列中
	if runqputslow(_p_, gp, h, t) {
		return
	} 
	goto retry
}
  1. runtime.runqput會(huì)根據(jù) next 來(lái)判斷是否要將 G 放入到 runnext 中;

  2. next 為 false 的時(shí)候會(huì)將傳入的 G 嘗試放入到本地隊(duì)列中,本地隊(duì)列時(shí)一個(gè)大小為256的環(huán)形鏈表,如果放不下了則調(diào)用 runqputslow函數(shù)將 G 放入到全局隊(duì)列的 runq 中。

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

調(diào)度循環(huán)

我們繼續(xù)回到runtime·rt0_go中,在初始化工作完成后,會(huì)調(diào)用runtime·mstart開(kāi)始調(diào)度 G

TEXT runtime·rt0_go(SB),NOSPLIT,$0
	...
	// 初始化執(zhí)行文件的絕對(duì)路徑
	CALL	runtime·args(SB)
	// 初始化 CPU 個(gè)數(shù)和內(nèi)存頁(yè)大小
	CALL	runtime·osinit(SB)
	// 調(diào)度器初始化
	CALL	runtime·schedinit(SB) 
	// 創(chuàng)建一個(gè)新的 goroutine 來(lái)啟動(dòng)程序
	MOVQ	$runtime·mainPC(SB), AX		// entry
	// 新建一個(gè) goroutine,該 goroutine 綁定 runtime.main
	CALL	runtime·newproc(SB) 
	// 啟動(dòng)M,開(kāi)始調(diào)度goroutine
	CALL	runtime·mstart(SB)
	...

runtime·mstart會(huì)調(diào)用到runtime·mstart1會(huì)初始化 M0 并調(diào)用runtime.schedule進(jìn)入調(diào)度循環(huán)。

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

func mstart1() {
	_g_ := getg()

	if _g_ != _g_.m.g0 {
		throw("bad runtime·mstart")
	} 
	// 一旦調(diào)用 schedule 就不會(huì)返回,所以需要保存一下棧幀
	save(getcallerpc(), getcallersp())
	asminit()
	minit() 
	// 設(shè)置信號(hào) handler
	if _g_.m == &m0 {
		mstartm0()
	}
	// 執(zhí)行啟動(dòng)函數(shù)
	if fn := _g_.m.mstartfn; fn != nil {
		fn()
	}
	// 如果當(dāng)前 m 并非 m0,則要求綁定 p
	if _g_.m != &m0 {
		acquirep(_g_.m.nextp.ptr())
		_g_.m.nextp = 0
	}
	// 開(kāi)始調(diào)度
	schedule()
}

mstart1保存調(diào)度信息后,會(huì)調(diào)用schedule進(jìn)入調(diào)度循環(huán),尋找一個(gè)可執(zhí)行的 G 并執(zhí)行。下面看看schedule執(zhí)行函數(shù)。

schedule
func schedule() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("schedule: holding locks")
	} 
	... 
top:
	pp := _g_.m.p.ptr()
	pp.preempt = false
	// GC 等待
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	// 不等于0,說(shuō)明在安全點(diǎn)
	if pp.runSafePointFn != 0 {
		runSafePointFn()
	}

	// 如果在 spinning ,那么運(yùn)行隊(duì)列應(yīng)該為空,
	if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}
	// 運(yùn)行 P 上準(zhǔn)備就緒的 Timer
	checkTimers(pp, 0)

	var gp *g
	var inheritTime bool 
	...
	if gp == nil { 
		// 為了公平,每調(diào)用 schedule 函數(shù) 61 次就要從全局可運(yùn)行 G 隊(duì)列中獲取
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			// 從全局隊(duì)列獲取1個(gè) G
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	// 從 P 本地獲取 G 任務(wù)
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr()) 
	}
	// 運(yùn)行到這里表示從本地運(yùn)行隊(duì)列和全局運(yùn)行隊(duì)列都沒(méi)有找到需要運(yùn)行的 G
	if gp == nil {
		// 阻塞地查找可用 G
		gp, inheritTime = findrunnable() // blocks until work is available
	}
	...
	// 執(zhí)行 G 任務(wù)函數(shù)
	execute(gp, inheritTime)
}

在這個(gè)函數(shù)中,我們只關(guān)注調(diào)度有關(guān)的代碼。從上面的代碼可以知道主要是從下面幾個(gè)方向去尋找可用的 G:

  1. 為了保證公平,當(dāng)全局運(yùn)行隊(duì)列中有待執(zhí)行的 G 時(shí),通過(guò)對(duì) schedtick 取模 61 ,表示調(diào)度器每調(diào)度 61 次的時(shí)候,都會(huì)嘗試從全局隊(duì)列里取出待運(yùn)行的 G 來(lái)運(yùn)行;

  2. 調(diào)用 runqget 從 P 本地的運(yùn)行隊(duì)列中查找待執(zhí)行的 G;

  3. 如果前兩種方法都沒(méi)有找到 G ,會(huì)通過(guò) findrunnable 函數(shù)去其他 P 里面去“偷”一些 G 來(lái)執(zhí)行,如果“偷”不到,就阻塞直到有可運(yùn)行的 G;

全局隊(duì)列獲取 G
func globrunqget(_p_ *p, max int32) *g {
	// 如果全局隊(duì)列中沒(méi)有 G 直接返回
	if sched.runqsize == 0 {
		return nil
	}
	// 計(jì)算 n 的個(gè)數(shù)
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}
	// n 的最大個(gè)數(shù)
	if max > 0 && n > max {
		n = max
	}
	if n > int32(len(_p_.runq))/2 {
		n = int32(len(_p_.runq)) / 2
	}

	sched.runqsize -= n
	// 拿到全局隊(duì)列隊(duì)頭 G
	gp := sched.runq.pop()
	n--
	// 將其余 n-1 個(gè) G 從全局隊(duì)列放入本地隊(duì)列
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(_p_, gp1, false)
	}
	return gp
}

globrunqget 會(huì)從全局 runq 隊(duì)列中獲取 n 個(gè) G ,其中第一個(gè) G 用于執(zhí)行,n-1 個(gè) G 從全局隊(duì)列放入本地隊(duì)列。

本地隊(duì)列獲取 G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
	// 如果 runnext 不為空,直接獲取返回
	for {
		next := _p_.runnext
		if next == 0 {
			break
		}
		if _p_.runnext.cas(next, 0) {
			return next.ptr(), true
		}
	}
	// 從本地隊(duì)列頭指針遍歷本地隊(duì)列
	for {
		h := atomic.LoadAcq(&_p_.runqhead)  
		t := _p_.runqtail
		// 表示本地隊(duì)列為空
		if t == h {
			return nil, false
		}
		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
		if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

本地隊(duì)列的獲取會(huì)先從 P 的 runnext 字段中獲取,如果不為空則直接返回。如果 runnext 為空,那么從本地隊(duì)列頭指針遍歷本地隊(duì)列,本地隊(duì)列是一個(gè)環(huán)形隊(duì)列,方便復(fù)用。

任務(wù)竊取 G

任務(wù)竊取方法 findrunnable 非常的復(fù)雜,足足有300行之多,我們慢慢來(lái)分析:

func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()
top:
	_p_ := _g_.m.p.ptr()
	// 如果在 GC,則休眠當(dāng)前 M,直到復(fù)始后回到 top
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	// 運(yùn)行到安全點(diǎn)
	if _p_.runSafePointFn != 0 {
		runSafePointFn()
	}

	now, pollUntil, _ := checkTimers(_p_, 0)
	...
	// 從本地 P 的可運(yùn)行隊(duì)列獲取 G
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime
	}

	// 從全局的可運(yùn)行隊(duì)列獲取 G
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false
		}
	} 
	// 從I/O輪詢(xún)器獲取 G
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		// 嘗試從netpoller獲取Glist
		if list := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			//將其余隊(duì)列放入 P 的可運(yùn)行G隊(duì)列
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false
		}
	}
	...
	if !_g_.m.spinning {
		// 設(shè)置 spinning ,表示正在竊取 G
		_g_.m.spinning = true
		atomic.Xadd(&sched.nmspinning, 1)
	}
	// 開(kāi)始竊取
	for i := 0; i < 4; i++ {
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}
			// 如果 i>2 表示如果其他 P 運(yùn)行隊(duì)列中沒(méi)有 G ,將要從其他隊(duì)列的 runnext 中獲取
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g
			// 隨機(jī)獲取一個(gè) P
			p2 := allp[enum.position()]
			if _p_ == p2 {
				continue
			}
			// 從其他 P 的運(yùn)行隊(duì)列中獲取一般的 G 到當(dāng)前隊(duì)列中
			if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
				return gp, false
			}

			// 如果運(yùn)行隊(duì)列中沒(méi)有 G,那么從 timers 中獲取可執(zhí)行的定時(shí)器
			if i > 2 || (i > 1 && shouldStealTimers(p2)) {
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
				if w != 0 && (pollUntil == 0 || w < pollUntil) {
					pollUntil = w
				}
				if ran {
					if gp, inheritTime := runqget(_p_); gp != nil {
						return gp, inheritTime
					}
					ranTimer = true
				}
			}
		}
	}
	if ranTimer {
		goto top
	}

stop: 
	// 處于 GC 階段的話(huà),獲取執(zhí)行GC標(biāo)記任務(wù)的G
	if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
		_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
		gp := _p_.gcBgMarkWorker.ptr()
		//將本地 P 的 GC 標(biāo)記專(zhuān)用 G 職位 Grunnable
		casgstatus(gp, _Gwaiting, _Grunnable)
		if trace.enabled {
			traceGoUnpark(gp, 0)
		}
		return gp, false
	}

	...
	// 放棄當(dāng)前的 P 之前,對(duì) allp 做一個(gè)快照
	allpSnapshot := allp

	// return P and block
	lock(&sched.lock)
	// 進(jìn)入了 gc,回到頂部并阻塞
	if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
		unlock(&sched.lock)
		goto top
	}
	// 全局隊(duì)列中又發(fā)現(xiàn)了任務(wù)
	if sched.runqsize != 0 {
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		return gp, false
	}
	if releasep() != _p_ {
		throw("findrunnable: wrong p")
	}
	// 將 p 放入 idle 空閑鏈表
	pidleput(_p_)
	unlock(&sched.lock)
 
	wasSpinning := _g_.m.spinning
	if _g_.m.spinning {
		// M 即將睡眠,狀態(tài)不再是 spinning
		_g_.m.spinning = false
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("findrunnable: negative nmspinning")
		}
	}
 
	// 休眠之前再次檢查全局 P 列表
	//遍歷全局 P 列表的 P,并檢查他們的可運(yùn)行G隊(duì)列
	for _, _p_ := range allpSnapshot {
		// 如果這時(shí)本地隊(duì)列不空
		if !runqempty(_p_) {
			lock(&sched.lock)
			// 重新獲取 P
			_p_ = pidleget()
			unlock(&sched.lock)
			if _p_ != nil {
				// M 綁定 P
				acquirep(_p_)
				if wasSpinning {
					// spinning 重新切換為 true
					_g_.m.spinning = true
					atomic.Xadd(&sched.nmspinning, 1)
				}
				// 這時(shí)候是有 work 的,回到頂部尋找 G
				goto top
			}
			break
		}
	}
 
	// 休眠前再次檢查 GC work
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
		lock(&sched.lock)
		_p_ = pidleget()
		if _p_ != nil && _p_.gcBgMarkWorker == 0 {
			pidleput(_p_)
			_p_ = nil
		}
		unlock(&sched.lock)
		if _p_ != nil {
			acquirep(_p_)
			if wasSpinning {
				_g_.m.spinning = true
				atomic.Xadd(&sched.nmspinning, 1)
			}
			// Go back to idle GC check.
			goto stop
		}
	}

	// poll network
	// 休眠前再次檢查 poll 網(wǎng)絡(luò)
	if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
		...
		lock(&sched.lock)
		_p_ = pidleget()
		unlock(&sched.lock)
		if _p_ == nil {
			injectglist(&list)
		} else {
			acquirep(_p_)
			if !list.empty() {
				gp := list.pop()
				injectglist(&list)
				casgstatus(gp, _Gwaiting, _Grunnable)
				if trace.enabled {
					traceGoUnpark(gp, 0)
				}
				return gp, false
			}
			if wasSpinning {
				_g_.m.spinning = true
				atomic.Xadd(&sched.nmspinning, 1)
			}
			goto top
		}
	} else if pollUntil != 0 && netpollinited() {
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
			netpollBreak()
		}
	}
	// 休眠當(dāng)前 M
	stopm()
	goto top
}

這個(gè)函數(shù)需要注意一下,工作線(xiàn)程M的自旋狀態(tài)(spinning)。工作線(xiàn)程在從其它工作線(xiàn)程的本地運(yùn)行隊(duì)列中盜取 G 時(shí)的狀態(tài)稱(chēng)為自旋狀態(tài)。有關(guān)netpoller的知識(shí)可以到這里看:詳解Go語(yǔ)言I/O多路復(fù)用netpoller模型 https://www.luozhiyun.com/archives/439。

下面我們看一下 findrunnable 做了什么:

  1. 首先檢查是是否正在進(jìn)行 GC,如果是則暫止當(dāng)前的 M 并阻塞休眠;

  2. 從本地運(yùn)行隊(duì)列、全局運(yùn)行隊(duì)列中查找 G;

  3. 從網(wǎng)絡(luò)輪詢(xún)器中查找是否有 G 等待運(yùn)行;

  4. 將 spinning 設(shè)置為 true 表示開(kāi)始竊取 G。竊取過(guò)程用了兩個(gè)嵌套for循環(huán),內(nèi)層循環(huán)遍歷 allp 中的所有 P ,查看其運(yùn)行隊(duì)列是否有 G,如果有,則取其一半到當(dāng)前工作線(xiàn)程的運(yùn)行隊(duì)列,然后從 findrunnable 返回,如果沒(méi)有則繼續(xù)遍歷下一個(gè) P 。需要注意的是,遍歷 allp 時(shí)是從隨機(jī)位置上的 P 開(kāi)始,防止每次遍歷時(shí)使用同樣的順序訪(fǎng)問(wèn)allp中的元素;

  5. 所有的可能性都嘗試過(guò)了,在準(zhǔn)備休眠 M 之前,還要進(jìn)行額外的檢查;

  6. 首先檢查此時(shí)是否是 GC mark 階段,如果是,則直接返回 mark 階段的 G;

  7. 休眠之前再次檢查全局 P 列表,遍歷全局 P 列表的 P,并檢查他們的可運(yùn)行G隊(duì)列;

  8. 還需要再檢查是否有 GC mark 的 G 出現(xiàn),如果有,獲取 P 并回到第一步,重新執(zhí)行偷取工作;

  9. 再檢查是否存在 poll 網(wǎng)絡(luò)的 G,如果有,則直接返回;

  10. 什么都沒(méi)找到,那么休眠當(dāng)前的 M ;

任務(wù)執(zhí)行

schedule 運(yùn)行到到這里表示終于找到了可以運(yùn)行的 G:

func execute(gp *g, inheritTime bool) {
	_g_ := getg()

	// 將 G 綁定到當(dāng)前 M 上
	_g_.m.curg = gp
	gp.m = _g_.m
	// 將 g 正式切換為 _Grunning 狀態(tài)
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	// 搶占信號(hào)
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		// 調(diào)度器調(diào)度次數(shù)增加 1
		_g_.m.p.ptr().schedtick++
	} 
	... 
    // gogo 完成從 g0 到 gp 真正的切換
	gogo(&gp.sched)
}

當(dāng)開(kāi)始執(zhí)行 execute 后,G 會(huì)被切換到 _Grunning 狀態(tài),并將 M 和 G 進(jìn)行綁定,最終調(diào)用 runtime.gogo 開(kāi)始執(zhí)行。

runtime.gogo 中會(huì)從 runtime.gobuf 中取出 runtime.goexit 的程序計(jì)數(shù)器和待執(zhí)行函數(shù)的程序計(jì)數(shù)器,然后跳轉(zhuǎn)到 runtime.goexit 中并執(zhí)行:

TEXT runtime·goexit(SB),NOSPLIT,$0-0
	CALL	runtime·goexit1(SB)
	
func goexit1() {
    // 調(diào)用goexit0函數(shù) 
	mcall(goexit0)
}

goexit1 通過(guò) mcall 完成 goexit0 的調(diào)用 :

func goexit0(gp *g) {
	_g_ := getg()
	// 設(shè)置當(dāng)前 G 狀態(tài)為 _Gdead
	casgstatus(gp, _Grunning, _Gdead) 
	// 清理 G
	gp.m = nil
	...
	gp.writebuf = nil
	gp.waitreason = 0
	gp.param = nil
	gp.labels = nil
	gp.timer = nil
 
	// 解綁 M 和 G
	dropg() 
	...
	// 將 G 扔進(jìn) gfree 鏈表中等待復(fù)用
	gfput(_g_.m.p.ptr(), gp)
	// 再次進(jìn)行調(diào)度
	schedule()
}

goexit0 會(huì)對(duì) G 進(jìn)行復(fù)位操作,解綁 M 和 G 的關(guān)聯(lián)關(guān)系,將其 放入 gfree 鏈表中等待其他的 go 語(yǔ)句創(chuàng)建新的 g。在最后,goexit0 會(huì)重新調(diào)用 schedule觸發(fā)新一輪的調(diào)度。

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

總結(jié)

下面用一張圖大致總結(jié)一下調(diào)度過(guò)程:

Go語(yǔ)言中怎么調(diào)度循環(huán)源碼

以上就是Go語(yǔ)言中怎么調(diào)度循環(huán)源碼,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


文章名稱(chēng):Go語(yǔ)言中怎么調(diào)度循環(huán)源碼
網(wǎng)頁(yè)地址:http://weahome.cn/article/ijigep.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部