這篇文章將為大家詳細講解有關(guān)Go語言基于信號搶占式調(diào)度的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)公司專注于奎屯網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供奎屯營銷型網(wǎng)站建設(shè),奎屯網(wǎng)站制作、奎屯網(wǎng)頁設(shè)計、奎屯網(wǎng)站官網(wǎng)定制、小程序開發(fā)服務(wù),打造奎屯網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供奎屯網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
在 Go 的 1.14 版本之前搶占試調(diào)度都是基于協(xié)作的,需要自己主動的讓出執(zhí)行,但是這樣是無法處理一些無法被搶占的邊緣情況。例如:for 循環(huán)或者垃圾回收長時間占用線程,這些問題中的一部分直到 1.14 才被基于信號的搶占式調(diào)度解決。
下面我們通過一個例子來驗證一下1.14 版本和 1.13 版本之間的搶占差異:
package main import ( "fmt" "os" "runtime" "runtime/trace" "sync" ) func main() { runtime.GOMAXPROCS(1) f, _ := os.Create("trace.output") defer f.Close() _ = trace.Start(f) defer trace.Stop() var wg sync.WaitGroup for i := 0; i < 30; i++ { wg.Add(1) go func() { defer wg.Done() t := 0 for i:=0;i<1e8;i++ { t+=2 } fmt.Println("total:", t) }() } wg.Wait() }
這個例子中會通過 go trace 來進行執(zhí)行過程的調(diào)用跟蹤。在代碼中指定 runtime.GOMAXPROCS(1)
設(shè)置最大的可同時使用的 CPU 核數(shù)為1,只用一個 P(處理器),這樣就確保是單處理器的場景。然后調(diào)用一個 for 循環(huán)開啟 10 個 goroutines 來執(zhí)行 func 函數(shù),這是一個純計算且耗時的函數(shù),防止 goroutines 空閑讓出執(zhí)行。
下面我們編譯程序分析 trace 輸出:
$ go build -gcflags "-N -l" main.go -N表示禁用優(yōu)化 -l禁用內(nèi)聯(lián) $ ./main
然后我們獲取到 trace.output 文件后進行可視化展示:
$ go tool trace -http=":6060" ./trace.output
從上面的這個圖可以看出:
因為我們限定了只有一個 P,所以在 PROCS 這一欄里面只有一個 Proc0;
我們在 for 循環(huán)里面啟動了 30 個 goroutines ,所以我們可以數(shù)一下 Proc0 里面的顏色框框,剛好30 個;
30 個 goroutines 在 Proc0 里面是串行執(zhí)行的,一個執(zhí)行完再執(zhí)行另一個,沒有進行搶占;
隨便點擊一個 goroutines 的詳情欄可以看到 Wall Duration 為 0.23s 左右,表示這個 goroutines 持續(xù)執(zhí)行了 0.23s,總共 10 個 goroutines 執(zhí)行時間是 7s 左右;
切入調(diào)用棧 Start Stack Trace 是 main.main.func1:20,在代碼上面是 func 函數(shù)執(zhí)行頭: go func()
;
切走調(diào)用棧 End Stack Trace 是 main.main.func1:26,在代碼上是 func 函數(shù)最后執(zhí)行打?。?code>fmt.Println("total:", t);
從上面的 trace 分析可以知道,Go 的協(xié)作式調(diào)度對 calcSum 函數(shù)是毫無作用的,一旦執(zhí)行開始,只能等執(zhí)行結(jié)束。每個 goroutine 耗費了 0.23s 這么長的時間,也無法搶占它的執(zhí)行權(quán)。
在 Go 1.14 之后引入了基于信號的搶占式調(diào)度,從上面的圖可以看到 Proc0 這一欄中密密麻麻都是 goroutines 在切換時的調(diào)用情況,不會再出現(xiàn) goroutines 一旦執(zhí)行開始,只能等執(zhí)行結(jié)束這種情況。
上面跑動的時間是 4s 左右這個情況可以忽略,因為我是在兩臺配置不同的機器上跑的(主要是我閑麻煩要找兩臺一樣的機器)。
下面我們拉近了看一下明細情況:
通過這個明細可以看出:
這個 goroutine 運行了 0.025s 就讓出執(zhí)行了;
切入調(diào)用棧 Start Stack Trace 是 main.main.func1:21,和上面一樣;
切走調(diào)用棧 End Stack Trace 是 runtime.asyncPreempt:50 ,這個函數(shù)是收到搶占信號時執(zhí)行的函數(shù),從這個地方也能明確的知道,被異步搶占了;
runtime/signal_unix.go
程序啟動時,在runtime.sighandler
中注冊 SIGURG
信號的處理函數(shù)runtime.doSigPreempt
。
initsig
func initsig(preinit bool) { // 預(yù)初始化 if !preinit { signalsOK = true } //遍歷信號數(shù)組 for i := uint32(0); i < _NSIG; i++ { t := &sigtable[i] //略過信號:SIGKILL、SIGSTOP、SIGTSTP、SIGCONT、SIGTTIN、SIGTTOU if t.flags == 0 || t.flags&_SigDefault != 0 { continue } ... setsig(i, funcPC(sighandler)) } }
在 initsig 函數(shù)里面會遍歷所有的信號量,然后調(diào)用 setsig 函數(shù)進行注冊。我們可以查看 sigtable 這個全局變量看看有什么信息:
var sigtable = [...]sigTabT{ /* 0 */ {0, "SIGNONE: no trap"}, /* 1 */ {_SigNotify + _SigKill, "SIGHUP: terminal line hangup"}, /* 2 */ {_SigNotify + _SigKill, "SIGINT: interrupt"}, /* 3 */ {_SigNotify + _SigThrow, "SIGQUIT: quit"}, /* 4 */ {_SigThrow + _SigUnblock, "SIGILL: illegal instruction"}, /* 5 */ {_SigThrow + _SigUnblock, "SIGTRAP: trace trap"}, /* 6 */ {_SigNotify + _SigThrow, "SIGABRT: abort"}, /* 7 */ {_SigPanic + _SigUnblock, "SIGBUS: bus error"}, /* 8 */ {_SigPanic + _SigUnblock, "SIGFPE: floating-point exception"}, /* 9 */ {0, "SIGKILL: kill"}, /* 10 */ {_SigNotify, "SIGUSR1: user-defined signal 1"}, /* 11 */ {_SigPanic + _SigUnblock, "SIGSEGV: segmentation violation"}, /* 12 */ {_SigNotify, "SIGUSR2: user-defined signal 2"}, /* 13 */ {_SigNotify, "SIGPIPE: write to broken pipe"}, /* 14 */ {_SigNotify, "SIGALRM: alarm clock"}, /* 15 */ {_SigNotify + _SigKill, "SIGTERM: termination"}, /* 16 */ {_SigThrow + _SigUnblock, "SIGSTKFLT: stack fault"}, /* 17 */ {_SigNotify + _SigUnblock + _SigIgn, "SIGCHLD: child status has changed"}, /* 18 */ {_SigNotify + _SigDefault + _SigIgn, "SIGCONT: continue"}, /* 19 */ {0, "SIGSTOP: stop, unblockable"}, /* 20 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTSTP: keyboard stop"}, /* 21 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTTIN: background read from tty"}, /* 22 */ {_SigNotify + _SigDefault + _SigIgn, "SIGTTOU: background write to tty"}, /* 23 */ {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"}, /* 24 */ {_SigNotify, "SIGXCPU: cpu limit exceeded"}, /* 25 */ {_SigNotify, "SIGXFSZ: file size limit exceeded"}, /* 26 */ {_SigNotify, "SIGVTALRM: virtual alarm clock"}, /* 27 */ {_SigNotify + _SigUnblock, "SIGPROF: profiling alarm clock"}, /* 28 */ {_SigNotify + _SigIgn, "SIGWINCH: window size change"}, /* 29 */ {_SigNotify, "SIGIO: i/o now possible"}, /* 30 */ {_SigNotify, "SIGPWR: power failure restart"}, /* 31 */ {_SigThrow, "SIGSYS: bad system call"}, /* 32 */ {_SigSetStack + _SigUnblock, "signal 32"}, /* SIGCANCEL; see issue 6997 */ /* 33 */ {_SigSetStack + _SigUnblock, "signal 33"}, /* SIGSETXID; see issues 3871, 9400, 12498 */ ... }
具體的信號含義可以看這個介紹:Unix信號 https://zh.wikipedia.org/wiki/Unix信號。需要注意的是,搶占信號在這里是 _SigNotify + _SigIgn
如下:
{_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"}
下面我們看一下 setsig 函數(shù),這個函數(shù)是在 runtime/os_linux.go
文件里面:
setsig
func setsig(i uint32, fn uintptr) { var sa sigactiont sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART sigfillset(&sa.sa_mask) ... if fn == funcPC(sighandler) { // CGO 相關(guān) if iscgo { fn = funcPC(cgoSigtramp) } else { // 替換為調(diào)用 sigtramp fn = funcPC(sigtramp) } } sa.sa_handler = fn sigaction(i, &sa, nil) }
這里需要注意的是,當 fn 等于 sighandler 的時候,調(diào)用的函數(shù)會被替換成 sigtramp。sigaction 函數(shù)在 Linux 下會調(diào)用系統(tǒng)調(diào)用函數(shù) sys_signal 以及 sys_rt_sigaction 實現(xiàn)安裝信號。
到了這里是信號發(fā)生的時候進行信號的處理,原本應(yīng)該是在發(fā)送搶占信號之后,但是這里我先順著安裝信號往下先講了。大家可以跳到發(fā)送搶占信號后再回來。
上面分析可以看到當 fn 等于 sighandler 的時候,調(diào)用的函數(shù)會被替換成 sigtramp,sigtramp是匯編實現(xiàn),下面我們看看。
src/runtime/sys_linux_amd64.s
:
TEXT runtime·sigtramp(SB),NOSPLIT,$72 ... // We don't save mxcsr or the x87 control word because sigtrampgo doesn't // modify them. MOVQ DX, ctx-56(SP) MOVQ SI, info-64(SP) MOVQ DI, signum-72(SP) MOVQ $runtime·sigtrampgo(SB), AX CALL AX ... RET
這里會被調(diào)用說明信號已經(jīng)發(fā)送響應(yīng)了,runtime·sigtramp
會進行信號的處理。runtime·sigtramp
會繼續(xù)調(diào)用 runtime·sigtrampgo
。
這個函數(shù)在 runtime/signal_unix.go
文件中:
sigtrampgo&sighandler
func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) { if sigfwdgo(sig, info, ctx) { return } c := &sigctxt{info, ctx} g := sigFetchG(c) ... sighandler(sig, info, ctx, g) setg(g) if setStack { restoreGsignalStack(&gsignalStack) } } func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { _g_ := getg() c := &sigctxt{info, ctxt} ... // 如果是一個搶占信號 if sig == sigPreempt && debug.asyncpreemptoff == 0 { // 處理搶占信號 doSigPreempt(gp, c) } ... }
sighandler 方法里面做了很多其他信號的處理工作,我們只關(guān)心搶占部分的代碼,這里最終會通過 doSigPreempt 方法執(zhí)行搶占。
這個函數(shù)在 runtime/signal_unix.go
文件中:
doSigPreempt
func doSigPreempt(gp *g, ctxt *sigctxt) { // 檢查此 G 是否要被搶占并且可以安全地搶占 if wantAsyncPreempt(gp) { // 檢查是否能安全的進行搶占 if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok { // 修改寄存器,并執(zhí)行搶占調(diào)用 ctxt.pushCall(funcPC(asyncPreempt), newpc) } } // 更新一下?lián)屨枷嚓P(guān)字段 atomic.Xadd(&gp.m.preemptGen, 1) atomic.Store(&gp.m.signalPending, 0) }
函數(shù)會處理搶占信號,獲取當前的 SP 和 PC 寄存器并調(diào)用 ctxt.pushCall
修改寄存器,并調(diào)用 runtime/preempt.go
的 asyncPreempt 函數(shù)。
// 保存用戶態(tài)寄存器后調(diào)用asyncPreempt2 func asyncPreempt()
asyncPreempt 的匯編代碼在 src/runtime/preempt_amd64.s
中,該函數(shù)會保存用戶態(tài)寄存器后調(diào)用 runtime/preempt.go
的 asyncPreempt2 函數(shù)中:
asyncPreempt2
func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true // 該 G 是否可以被搶占 if gp.preemptStop { mcall(preemptPark) } else { // 讓 G 放棄當前在 M 上的執(zhí)行權(quán)利,將 G 放入全局隊列等待后續(xù)調(diào)度 mcall(gopreempt_m) } gp.asyncSafePoint = false }
該函數(shù)會獲取當前 G ,然后判斷 G 的 preemptStop 值,preemptStop 會在調(diào)用 runtime/preempt.go
的 suspendG 函數(shù)的時候?qū)?_Grunning
狀態(tài)的 Goroutine 標記成可以被搶占 gp.preemptStop = true
,表示該 G 可以被搶占。
下面我們看一下執(zhí)行搶占任務(wù)會調(diào)用的 runtime/proc.go
的 preemptPark函數(shù):
preemptPark
func preemptPark(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } gp.waitreason = waitReasonPreempted casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted) // 使當前 m 放棄 g,讓出線程 dropg() // 修改當前 Goroutine 的狀態(tài)到 _Gpreempted casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 并繼續(xù)執(zhí)行調(diào)度 schedule() }
preemptPark 會修改當前 Goroutine 的狀態(tài)到 _Gpreempted
,調(diào)用 dropg 讓出線程,最后調(diào)用 schedule 函數(shù)繼續(xù)執(zhí)行其他 Goroutine 的任務(wù)循環(huán)調(diào)度。
gopreempt_m
gopreempt_m 方法比起搶占更像是主動讓權(quán),然后重新加入到執(zhí)行隊列中等待調(diào)度。
func gopreempt_m(gp *g) { goschedImpl(gp) } func goschedImpl(gp *g) { status := readgstatus(gp) ... // 更新狀態(tài)為 _Grunnable casgstatus(gp, _Grunning, _Grunnable) // 使當前 m 放棄 g,讓出線程 dropg() lock(&sched.lock) // 重新加入到全局執(zhí)行隊列中 globrunqput(gp) unlock(&sched.lock) // 并繼續(xù)執(zhí)行調(diào)度 schedule() }
搶占信號的發(fā)送是由 preemptM 進行的。
這個函數(shù)在runtime/signal_unix.go
文件中:
preemptM
const sigPreempt = _SIGURG func preemptM(mp *m) { ... if atomic.Cas(&mp.signalPending, 0, 1) { // preemptM 向 M 發(fā)送搶占請求。 // 接收到該請求后,如果正在運行的 G 或 P 被標記為搶占,并且 Goroutine 處于異步安全點, // 它將搶占 Goroutine。 signalM(mp, sigPreempt) } }
preemptM 這個函數(shù)會調(diào)用 signalM 將在初始化的安裝的 _SIGURG
信號發(fā)送到指定的 M 上。
使用 preemptM 發(fā)送搶占信號的地方主要有下面幾個:
Go 后臺監(jiān)控 runtime.sysmon 檢測超時發(fā)送搶占信號;
Go GC 棧掃描發(fā)送搶占信號;
Go GC STW 的時候調(diào)用 preemptall 搶占所有 P,讓其暫停;
系統(tǒng)監(jiān)控 runtime.sysmon
會在循環(huán)中調(diào)用 runtime.retake
搶占處于運行或者系統(tǒng)調(diào)用中的處理器,該函數(shù)會遍歷運行時的全局處理器。
系統(tǒng)監(jiān)控通過在循環(huán)中搶占主要是為了避免 G 占用 M 的時間過長造成饑餓。
runtime.retake
主要分為兩部分:
調(diào)用 preemptone 搶占當前處理器;
調(diào)用 handoffp 讓出處理器的使用權(quán);
搶占當前處理器
func retake(now int64) uint32 { n := 0 lock(&allpLock) // 遍歷 allp 數(shù)組 for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // 調(diào)度次數(shù) t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) // 處理器上次調(diào)度時間 pd.schedwhen = now // 搶占 G 的執(zhí)行,如果上一次觸發(fā)調(diào)度的時間已經(jīng)過去了 10ms } else if pd.schedwhen+forcePreemptNS <= now { preemptone(_p_) sysretake = true } } ... } unlock(&allpLock) return uint32(n) }
這一過程會獲取當前 P 的狀態(tài),如果處于 _Prunning
或者 _Psyscall
狀態(tài)時,并且上一次觸發(fā)調(diào)度的時間已經(jīng)過去了 10ms,那么會調(diào)用 preemptone 進行搶占信號的發(fā)送,preemptone 在上面我們已經(jīng)講過了,這里就不再復(fù)述。
調(diào)用 handoffp 讓出處理器的使用權(quán)
func retake(now int64) uint32 { n := 0 lock(&allpLock) // 遍歷 allp 數(shù)組 for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status sysretake := false ... if s == _Psyscall { // 系統(tǒng)調(diào)用的次數(shù) t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) // 系統(tǒng)調(diào)用的時間 pd.syscallwhen = now continue } if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } unlock(&allpLock) incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { n++ _p_.syscalltick++ // 讓出處理器的使用權(quán) handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }
這一過程會判斷 P 的狀態(tài)如果處于 _Psyscall
狀態(tài)時,會進行一個判斷,有一個不滿足則調(diào)用 handoffp 讓出 P 的使用權(quán):
runqempty(_p_)
:判斷 P 的任務(wù)隊列是否為空;
atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle)
:nmspinning 表示正在竊取 G 的數(shù)量,npidle 表示空閑 P 的數(shù)量,判斷是否存在空閑 P 和正在進行調(diào)度竊取 G 的 P;
pd.syscallwhen+10*1000*1000 > now
:判斷是否系統(tǒng)調(diào)用時間超過了 10ms ;
GC 相關(guān)的內(nèi)容可以看這篇:《Go語言GC實現(xiàn)原理及源碼分析 https://www.luozhiyun.com/archives/475》。Go 在 GC 時對 GC Root 進行標記的時候會掃描 G 的棧,掃描之前會調(diào)用 suspendG 掛起 G 的執(zhí)行才進行掃描,掃描完畢之后再次調(diào)用 resumeG 恢復(fù)執(zhí)行。
該函數(shù)在:runtime/mgcmark.go
:
markroot
func markroot(gcw *gcWork, i uint32) { ... switch { ... // 掃描各個 G 的棧 default: // 獲取需要掃描的 G var gp *g if baseStacks <= i && i < end { gp = allgs[i-baseStacks] } else { throw("markroot: bad index") } ... // 轉(zhuǎn)交給g0進行掃描 systemstack(func() { ... // 掛起 G,讓對應(yīng)的 G 停止運行 stopped := suspendG(gp) if stopped.dead { gp.gcscandone = true return } if gp.gcscandone { throw("g already scanned") } // 掃描g的棧 scanstack(gp, gcw) gp.gcscandone = true // 恢復(fù)該 G 的執(zhí)行 resumeG(stopped) }) } }
markroot 在掃描棧之前會切換到 G0 轉(zhuǎn)交給g0進行掃描,然后調(diào)用 suspendG 會判斷 G 的運行狀態(tài),如果該 G 處于 運行狀態(tài) _Grunning
,那么會設(shè)置 preemptStop 為 true 并發(fā)送搶占信號。
該函數(shù)在:runtime/preempt.go
:
suspendG
func suspendG(gp *g) suspendGState { ... const yieldDelay = 10 * 1000 var nextPreemptM int64 for i := 0; ; i++ { switch s := readgstatus(gp); s { ... case _Grunning: if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen { break } if !castogscanstatus(gp, _Grunning, _Gscanrunning) { break } // 設(shè)置搶占字段 gp.preemptStop = true gp.preempt = true gp.stackguard0 = stackPreempt asyncM2 := gp.m asyncGen2 := atomic.Load(&asyncM2.preemptGen) // asyncM 與 asyncGen 標記的是循環(huán)里 上次搶占的信息,用來校驗不能重復(fù)搶占 needAsync := asyncM != asyncM2 || asyncGen != asyncGen2 asyncM = asyncM2 asyncGen = asyncGen2 casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning) if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync { now := nanotime() // 限制搶占的頻率 if now >= nextPreemptM { nextPreemptM = now + yieldDelay/2 // 執(zhí)行搶占信號發(fā)送 preemptM(asyncM) } } } ... } }
對于 suspendG 函數(shù)我只截取出了 G 在 _Grunning
狀態(tài)下的處理情況。該狀態(tài)下會將 preemptStop 設(shè)置為 true,也是唯一一個地方設(shè)置為 true 的地方。preemptStop 和搶占信號的執(zhí)行有關(guān),忘記的同學可以翻到上面的 asyncPreempt2 函數(shù)中。
Go GC STW 是通過 stopTheWorldWithSema 函數(shù)來執(zhí)行的,該函數(shù)在 runtime/proc.go
:
stopTheWorldWithSema
func stopTheWorldWithSema() { _g_ := getg() lock(&sched.lock) sched.stopwait = gomaxprocs // 標記 gcwaiting,調(diào)度時看見此標記會進入等待 atomic.Store(&sched.gcwaiting, 1) // 發(fā)送搶占信號 preemptall() // 暫停當前 P _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. ... wait := sched.stopwait > 0 unlock(&sched.lock) if wait { for { // 等待 100 us if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } // 再次進行發(fā)送搶占信號 preemptall() } } ... }
stopTheWorldWithSema 函數(shù)會調(diào)用 preemptall 對所有的 P 發(fā)送搶占信號。
preemptall 函數(shù)的文件位置在 runtime/proc.go
:
preemptall
func preemptall() bool { res := false // 遍歷所有的 P for _, _p_ := range allp { if _p_.status != _Prunning { continue } // 對正在運行的 P 發(fā)送搶占信號 if preemptone(_p_) { res = true } } return res }
preemptall 調(diào)用的 preemptone 會將 P 對應(yīng)的 M 中正在執(zhí)行的 G 并標記為正在執(zhí)行搶占;最后會調(diào)用 preemptM 向 M 發(fā)送搶占信號。
該函數(shù)的文件位置在 runtime/proc.go
:
preemptone
func preemptone(_p_ *p) bool { // 獲取 P 對應(yīng)的 M mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } // 獲取 M 正在執(zhí)行的 G gp := mp.curg if gp == nil || gp == mp.g0 { return false } // 將 G 標記為搶占 gp.preempt = true // 在棧擴張的時候會檢測是否被搶占 gp.stackguard0 = stackPreempt // 請求該 P 的異步搶占 if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true }
具體的邏輯:
程序啟動時,在注冊 _SIGURG
信號的處理函數(shù) runtime.doSigPreempt
;
此時有一個 M1 通過 signalM 函數(shù)向 M2 發(fā)送中斷信號 _SIGURG
;
M2 收到信號,操作系統(tǒng)中斷其執(zhí)行代碼,并切換到信號處理函數(shù)runtime.doSigPreempt
;
M2 調(diào)用 runtime.asyncPreempt
修改執(zhí)行的上下文,重新進入調(diào)度循環(huán)進而調(diào)度其他 G;
關(guān)于Go語言基于信號搶占式調(diào)度的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。