之前在協(xié)調(diào)多個(gè)goroutine的時(shí)候,使用了通道?;径际前聪旅孢@樣來使用的:
我們提供的服務(wù)有:成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、浦口ssl等。為數(shù)千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的浦口網(wǎng)站制作公司
package main
import "fmt"
func main() {
done := make(chan struct{})
count := 5
for i := 0; i < count; i++ {
go func(i int) {
defer func() {
done <- struct{}{}
}()
fmt.Println(i)
}(i)
}
for j := 0; j < count; j++ {
<- done
}
fmt.Println("Over")
}
這里有一個(gè)問題,要保證主goroutine最后從通道接收元素的的次數(shù)需要與之前其他goroutine發(fā)送元素的次數(shù)相同。
其實(shí),在這種應(yīng)用場景下,可以選用另外一個(gè)同步工具,就是這里要講的sync包的WaitGroup類型。
sync.WaitGroup類型,它比通道更加適合實(shí)現(xiàn)這種一對多的goroutine協(xié)作流程。WaitGroup是開箱即用的,也是并發(fā)安全的。同時(shí),與之前提到的同步工具一樣,它一旦被真正的使用就不能被復(fù)制了。
WaitGroup擁有三個(gè)指針方法,可以想象該類型中有一個(gè)計(jì)數(shù)器,默認(rèn)值是0,下面的方法就是操作或判斷計(jì)數(shù)器:
Add(-1)
,可以在defer語句中調(diào)用它現(xiàn)在就用WaitGroup來改造開篇的程序:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup // 開箱即用,所以直接聲明就好了,沒必要用短變量聲明
// wg := sync.WaitGroup{} // 短變量聲明可以這么寫
count := 5
for i := 0; i < count; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
fmt.Println("Over")
}
改造后,在主goroutine最后等待退出的部分現(xiàn)在看著要美觀多了。這個(gè)就是WaitGroup典型的應(yīng)用場景了。
計(jì)數(shù)器不能小于0
在sync.WaitGroup類型值中計(jì)數(shù)器的值是不可以小于0的。一旦小于0會引發(fā)panic,不適當(dāng)?shù)恼{(diào)用Done方法和Add方法就有可能使它小于0而引發(fā)panic。
盡早增加計(jì)數(shù)器的值
如果在對它的Add方法的首次調(diào)用,與對它的Wait方法的調(diào)用是同時(shí)發(fā)起的。比如,在同時(shí)啟動(dòng)的兩個(gè)goroutine中,分別調(diào)用這兩個(gè)方法,那就就有可能會讓這里的Add方法拋出一個(gè)panic。并且這種情況不太容易,應(yīng)該予以重視。所以雖然WaitGroup值本身并不需要初始化,但是盡早的增加其計(jì)數(shù)器的值是非要必要的。
復(fù)用的情況
WaitGroup的值是可以被復(fù)用的,但需要保證其計(jì)數(shù)周期的完整性。這里的計(jì)數(shù)周期指的是這樣一個(gè)過程:該值中的計(jì)數(shù)器值由0變?yōu)榱四硞€(gè)正整數(shù),而后又經(jīng)過一系列的變化,最終由某個(gè)正整數(shù)又變回了0。這個(gè)過程可以被視為一個(gè)計(jì)數(shù)周期。在一個(gè)此類的生命周期中,它可以經(jīng)歷任意多個(gè)計(jì)數(shù)周期。但是,只有在它走完當(dāng)前的計(jì)數(shù)周期后,才能夠開始下一個(gè)計(jì)數(shù)周期。
也就是說,如果一個(gè)此類值的Wait方法在它的某個(gè)計(jì)數(shù)周期中被調(diào)用,那么就會立即阻塞當(dāng)前的goroutine,直至這個(gè)計(jì)數(shù)周期完成。在這種情況下,該值的下一個(gè)計(jì)數(shù)周期必須要等到這個(gè)Wait方法執(zhí)行結(jié)束之后,才能夠開始。
Wait方法是有一個(gè)執(zhí)行的過程的,如果在這個(gè)方法執(zhí)行期間,跨越了兩個(gè)計(jì)數(shù)周期,就會引發(fā)一個(gè)panic。比如,當(dāng)前的goroutine調(diào)用了Wait方法而阻塞了。另一個(gè)goroutine調(diào)用了Done方法使計(jì)數(shù)器變成了0。此時(shí)會喚醒之前阻塞的goroutine,并且去執(zhí)行Wait方法中其余的代碼(這里還在這行Wait方法,執(zhí)行的是源碼sync.Wait方法里的代碼,不是我們自己寫的程序的Wait之后的代碼)。在這個(gè)時(shí)候,又有一個(gè)goroutine調(diào)用了Add方法,使計(jì)數(shù)器的值又從0變?yōu)榱四硞€(gè)正整數(shù)。此時(shí)正在執(zhí)行的Wait方法就會立即拋出一個(gè)panic。
上面給了3種會引發(fā)panic的情況。關(guān)于后兩種情況,建議如下:
不要把增加計(jì)數(shù)器值的操作和調(diào)用Wait方法的代碼,放在不同的goroutine中執(zhí)行。
就是要杜絕對同一個(gè)WatiGroup值的兩種操作的并發(fā)執(zhí)行。
后面提到的兩種情況,不是每次都會發(fā)生,通常需要反復(fù)的實(shí)驗(yàn)才能夠引發(fā)panic的情況。雖然不是每次都發(fā)生,但是在長期運(yùn)行的過程中,這種情況是必然會出現(xiàn)的,應(yīng)該予以重視并且避免。
如果對復(fù)現(xiàn)這些異常情況感興趣,可以看一下sync代碼包中的waitgroup_test.go文件。其中的名稱以TestWaitGroupMisuse為前綴的測試函數(shù),很好的展示了這些異常情況發(fā)生的條件。
與sync.WaitGroup類型一樣,Sync.Once類型也屬于結(jié)構(gòu)體類型,同樣也是開箱即用和并發(fā)安全的。由于這個(gè)類型中包含了一個(gè)sync.Mutex類型的字段,所以復(fù)制改類型的值也會導(dǎo)致功能失效。
Do方法
Once類型的Do方法只接收一個(gè)參數(shù),參數(shù)的類型必須是func(),即無參數(shù)無返回的函數(shù)。該方法的功能并不是對每一種參數(shù)函數(shù)都只執(zhí)行一次,而是只執(zhí)行首次被調(diào)用時(shí)傳入的那個(gè)函數(shù),并且之后不會再執(zhí)行任何參數(shù)函數(shù)。所以,如果有多個(gè)需要執(zhí)行一次的函數(shù),應(yīng)該為它們每一個(gè)都分配一個(gè)sync.Once類型的值。
基本用法如下:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter uint32
var once sync.Once
once.Do(func() {
atomic.AddUint32(&counter, 1)
})
fmt.Println("counter:", counter)
// 這次調(diào)用不會被執(zhí)行
once.Do(func() {
atomic.AddUint32(&counter, 2)
})
fmt.Println("counter:", counter)
}
done字段
Once類型中還要一個(gè)名為done的uint32類型的字段。它的作用是記錄所屬值的Do方法被調(diào)用的次數(shù)。不過改字段的值只可能是0或1.一旦Do方法的首次調(diào)用完成,它的值就會從0變?yōu)?。
關(guān)于done的類型,其實(shí)用布爾類型就夠了,這里只所以用uint32類型的原因是它的操作必須是原子操作,只能使用原子操作支持的數(shù)據(jù)類型。
Do方法的實(shí)現(xiàn)方式
Do方法在一開始就會通過atomic.LoadUint32來獲取done字段的值,并且如果發(fā)現(xiàn)值為1就直接返回。這步只是初步保證了Do方法只會執(zhí)行首次調(diào)用是傳入的函數(shù)。
不過單憑上面的判斷是不夠的。如果兩個(gè)goroutine都調(diào)用了同一個(gè)新的Once值的Do方法,并且?guī)缀跬瑫r(shí)執(zhí)行到了其中的這個(gè)條件判斷代碼,那么它們就都會因判斷結(jié)果為false而繼續(xù)執(zhí)行Do方法中剩余的代碼。
基于上面的可能,在初步保證的判斷之后,Do方法會立即鎖定其所屬值中的那個(gè)sync.Mutex類型的m字段。然后,它會在臨界區(qū)中再次檢查done字段的值。此時(shí)done的值應(yīng)該仍然是0,并且已經(jīng)加鎖。此時(shí)才認(rèn)為是條件滿足,才會去調(diào)用參數(shù)函數(shù)。并且用原子操作把done的值變?yōu)?。
單例模式
如果熟悉設(shè)計(jì)模式中的單例模式的話,這個(gè)Do方法的實(shí)現(xiàn)方式,與單例模式有很多相似之處。都會先在臨界區(qū)之外判斷一次關(guān)鍵條件,若條件不滿足則立即返回。這通常被稱為快路徑,或者叫做快速失敗路徑。
如果條件滿足,那么到了臨界區(qū)中還要再對關(guān)鍵條件進(jìn)行一次判斷,這主要是為了更加嚴(yán)謹(jǐn)。這兩次條件判斷常被統(tǒng)稱為(跨臨界區(qū)的)雙重檢查。由于進(jìn)入臨界區(qū)前要加鎖,顯然會降低代碼的執(zhí)行速度,所以其中的第二次條件判斷,以及后續(xù)的操作就被稱為慢路徑或者常規(guī)路徑。
Do方法中的代碼不多,但它卻應(yīng)用了一個(gè)很經(jīng)典的編程范式。
一、由于Do方法只會在參數(shù)函數(shù)執(zhí)行結(jié)束之后把done字段的值變?yōu)?,因此,如果參數(shù)函數(shù)的執(zhí)行需要很長的時(shí)間或者根本就不會結(jié)束,那么就有可能會導(dǎo)致相關(guān)goroutine的同時(shí)阻塞。
比如,有多個(gè)goroutine并發(fā)的調(diào)用了同一個(gè)Once值的Do方法,并且傳入的函數(shù)都會一直執(zhí)行而不結(jié)束。那么,這些goroutine就都會因調(diào)用了這個(gè)Do方法而阻塞。此時(shí),那個(gè)搶先執(zhí)行了參數(shù)函數(shù)的goroutine之外,其他的goroutine都會被阻塞在該Once值的互斥鎖m的那行代碼上。
效果演示的示例代碼:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
once := sync.Once{} // 這里換短變量聲明
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// 這個(gè)函數(shù)會被執(zhí)行
once.Do(func() {
for i := 0; i < 10; i++ {
fmt.Printf("\r任務(wù)[1-%d]執(zhí)行中...", i)
time.Sleep(time.Millisecond * 400)
}
})
fmt.Printf("\n任務(wù)[1]執(zhí)行完畢\n")
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 300)
// 這句Do方法的調(diào)用會一直阻塞,知道上面的函數(shù)執(zhí)行完畢
// 然后Do方法里的函數(shù)不會執(zhí)行
once.Do(func() {
fmt.Println("任務(wù)[2]執(zhí)行中...")
})
// 上面Do方法阻塞結(jié)束后,直接會執(zhí)行下面的代碼
fmt.Println("任務(wù)[2]執(zhí)行完畢")
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 300)
once.Do(func() {
fmt.Println("任務(wù)[3]執(zhí)行中...")
})
fmt.Println("任務(wù)[3]執(zhí)行完畢")
}()
wg.Wait()
fmt.Println("Over")
}
二、Do方法在參數(shù)函數(shù)執(zhí)行結(jié)束后,對done字段的賦值用的是原子操作,并且這一操作是被掛載defer語句中的。因此,不論參數(shù)函數(shù)的執(zhí)行會以怎樣的方式結(jié)束,done字段的值都會變?yōu)?。
這樣就是說即時(shí)參數(shù)函數(shù)沒有執(zhí)行成功,比如引發(fā)了panic。也是無法使用同一個(gè)Once值重新執(zhí)行別的函數(shù)了。所以,如果需要為參數(shù)函數(shù)的執(zhí)行設(shè)定重試機(jī)制,就要考慮在適當(dāng)?shù)臅r(shí)候替換Once值。
參考下面的示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
once := sync.Once{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if p := recover(); p != nil {
fmt.Printf("PANIC: %v\n", p)
// 下面的語句會給once變量替換一個(gè)新的Once值,這樣下面的第二個(gè)任務(wù)還能被執(zhí)行
// once = sync.Once{}
}
}()
once.Do(func() {
fmt.Println("開始執(zhí)行參數(shù)函數(shù),緊接著會引發(fā)panic")
panic(fmt.Errorf("主動(dòng)引發(fā)了一個(gè)panic")) // panic之后就去調(diào)用defer了
fmt.Println("參數(shù)函數(shù)執(zhí)行完畢") // 這行不會執(zhí)行,后面的都不會執(zhí)行
})
fmt.Println("Do方法調(diào)用完畢") // 這行也不會執(zhí)行
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 500)
once.Do(func() {
fmt.Println("第二個(gè)任務(wù)執(zhí)行中...")
time.Sleep(time.Millisecond * 800)
fmt.Println("第二個(gè)任務(wù)執(zhí)行結(jié)束")
})
fmt.Println("第二個(gè)任務(wù)結(jié)束")
}()
wg.Wait()
fmt.Println("Over")
}
延遲一個(gè)昂貴的初始化步驟到有實(shí)際需求的時(shí)刻是一個(gè)很好的實(shí)踐。這也是sync.Once的一個(gè)使用場景。
下面是從書上改的示例代碼:
package main
import (
"fmt"
"sync"
)
var once sync.Once
var testmap map[string] int32
// 對testmap進(jìn)行初始化的函數(shù)
func loadTestmap() {
testmap = map[string] int32{
"k1": 1,
"k2": 2,
"k3": 3,
}
}
// 獲取testmap對應(yīng)key的值,如果沒有初始化,會先執(zhí)行初始化
// 書上說這個(gè)函數(shù)是并發(fā)安全的,這里的map初始化之后,內(nèi)容不會再變
func getKey(key string) int32 {
once.Do(loadTestmap)
// 最后的return這句可能不是并發(fā)安全的,不過線程安全的map不是這里的重點(diǎn)
// 假定這里的map在初始化之后只會被多個(gè)goroutine讀取,其內(nèi)容不會再改變
return testmap[key]
}
func main() {
fmt.Println(getKey("k1"))
}
這里不考慮map線程安全的問題,而且書上的例子這里的map只用來存放數(shù)據(jù),初始化之后不會對其內(nèi)容進(jìn)行修改。
這里主要是保證在變量初始化過程中的并發(fā)安全。以這種方式來使用sync.Once,可以避免變量在正確構(gòu)造之前就被其它goroutine分享。否則,在別的goroutine中可能會獲取到一個(gè)內(nèi)容不完整的變量。
sync代碼包的WaitGroup類型和Once類型都是非常易用的同步工具。它們都是開箱即用和并發(fā)安全的。
Once類型使用互斥鎖和原子操作實(shí)現(xiàn)了功能,而WatiGroup類型中只用到了原子操作。所以可以說,它們都是更高層次的同步工具。它們都基于基本的同步工具,實(shí)現(xiàn)了某種特定的功能。sync包中的其他高級同步工具,其實(shí)也都是這樣的。