Go 有兩種并發(fā)編程風(fēng)格:
喀什ssl適用于網(wǎng)站、小程序/APP、API接口等需要進行數(shù)據(jù)傳輸應(yīng)用場景,ssl證書未來市場廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場價格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書合作)期待與您的合作!
這章講第一種 goroutine 和通道。
如果說 goroutine 是 Go 程序并發(fā)的執(zhí)行體,通道就是它們之間的連接。每一個通道是一個具體類型的導(dǎo)管,叫作通道的元素類型。
像 map 一樣,通道是一個使用 make 創(chuàng)建的數(shù)據(jù)結(jié)構(gòu)的應(yīng)用。當(dāng)復(fù)制或者作為參數(shù)傳遞到一個函數(shù)時,復(fù)制的是引用,這樣調(diào)用者和被調(diào)用者都引用了同一份數(shù)據(jù)結(jié)構(gòu)。和其他引用類型一樣,通道的零值是 nil。
通道有兩個主要操作:發(fā)送(send)和接收(receive),兩者統(tǒng)稱為通信。通道還支持第三個操作:關(guān)閉(close),它設(shè)置一個標(biāo)志位來指示值當(dāng)前已經(jīng)發(fā)送完畢。
使用簡單的 make 調(diào)用創(chuàng)建的通道叫無緩沖(unbuffered)通道,但 make 還可以接受第二個可選參數(shù),一個表示通道容量的整數(shù)。如果容量是0,創(chuàng)建的也是無緩沖通道。
使用無緩沖通道進行的通信導(dǎo)致發(fā)送和接收 goroutine 同步化。因此無緩沖通道也稱為同步通道。
通過通道發(fā)送消息有兩個重要的方面需要考慮:
當(dāng)事件沒有攜帶額外的信息時,它單純的目的是進行同步。和 map 實現(xiàn)的集合一樣,可以使用一個 struct{} 元素類型的通道來強調(diào)它,盡管通常使用 bool 或 int 類型的通道來做相同的事情。因為done <- 1
更簡短。書上講集合的時候,使用的是 bool類型,這里講事件同步,使用的是空結(jié)構(gòu)體。
通道可以用來連接 goroutine,這樣一個的輸出是另一個的輸入,這個叫管道(pipline)。
關(guān)閉通道
如果發(fā)送方知道沒有更多的數(shù)據(jù)要發(fā)送,告訴接收者所在的 goroutine 可以停止等待是很有用的。這可以通過調(diào)用內(nèi)置的 Close 函數(shù)來關(guān)閉通道:
ch2 := make(chan bool) // 創(chuàng)建通道 ch2
// 下面是關(guān)閉通道
close(ch2)
在通道關(guān)閉后,任何后續(xù)的發(fā)送操作將會導(dǎo)致應(yīng)用崩潰。當(dāng)關(guān)閉的通道被讀完(就是最后一個發(fā)送的值被接收)后,所有后續(xù)的接收操作都會立即返回,返回值是對應(yīng)類型的零值。
關(guān)閉通道還可以作為一個廣播機制,后面的章節(jié)會具體講。
檢查通道的關(guān)閉
沒有一個直接的方式來判斷是否通道已經(jīng)關(guān)閉,不過可以接收返回兩個參數(shù):接收到的元素,以及一個布爾值(通常是ok),返回 true 表示接收成功,返回 false 表示當(dāng)前的接收操作在一個關(guān)閉的并且讀完的通道上。這個方法檢查的也不是通道是否關(guān)閉了,而是通道里的值是否已經(jīng)取完了。只有關(guān)閉的通道,才能保證不會有新值進入,把里面的值都取完后,會返回 false 表示這次取到的是通道關(guān)閉后的零值,而不是原本就是一個值為零的數(shù)據(jù)。
另外,還提供了一個 range 循環(huán)語法可以在通道上迭代。這個語法更為方便接收在通道上所有發(fā)送的值,接收完最后一個值后結(jié)束循環(huán)。
垃圾回收
結(jié)束時,關(guān)閉沒一個通道不是必需的。只有在通知接收方 goroutine 所有的數(shù)據(jù)都發(fā)送完畢的時候才需要關(guān)閉通道。通道也可以通過垃圾回收器根據(jù)它是否可以訪問來決定是否回收它,而不是根據(jù)它是否關(guān)閉。
Go 還提供了單向通道類型,僅僅導(dǎo)出發(fā)送或接收操作。類型chan<- int
是一個只能發(fā)送的通道,允許發(fā)送單不允許接收。反之,類型<-chan int
是一個只能接收的通道,允許接收但是不能發(fā)送。這里像箭頭一樣的操作符相對于 chan 關(guān)鍵字的位置是一個幫助記憶的點。如果違反這里的接收或發(fā)送的原則,在編譯時會被檢查出來。
在函數(shù)定義時,指定了單向通道的類型。在函數(shù)調(diào)用時,依然是把正常定義的雙向通道類型傳值給函數(shù)的參數(shù)。函數(shù)的調(diào)用會隱式地將普通的通道類型轉(zhuǎn)化為要求的單向通道的類型。在任何賦值操作中將雙向通道轉(zhuǎn)換為單向通道都是允許的,但是反過來是不行的。一旦有一個單向通道,是沒有辦法通過它獲取到引用同一個數(shù)據(jù)結(jié)構(gòu)的雙向通道的類型的。
緩沖通道有一個元素隊列,隊列的最大長度在創(chuàng)建的時候通過 make 的容量參數(shù)來設(shè)置:
ch2 := make(chan string, 3)
通過調(diào)用內(nèi)置的 cap 函數(shù),可以獲取通道緩沖區(qū)的容量。這種需求不太常見。
通過調(diào)用內(nèi)置的 len 函數(shù),可以獲取通道內(nèi)的元素個數(shù)。不過在并發(fā)程序中這個信息會隨著檢索操作很快過時,所以它的價值很低,但是它在錯誤診斷和性能優(yōu)化的時候很有用。
這不是隊列
發(fā)送和接收操作可以在同一個 goroutine 中,但在真實的程序中通常由不同的 goroutine 執(zhí)行。因為語法簡單,新手有時候粗暴地將緩沖通道作為隊列在單個 goroutine 中使用,但是這是個錯誤的用法。通道和 goroutine 的調(diào)度深度關(guān)聯(lián),如果沒有另一個 goroutine 從通道進行接收,發(fā)送者(也許是整個程序)有被永久阻塞的風(fēng)險。如果僅僅需要一個簡單的隊列,使用切片創(chuàng)建一個就好了。
示例:并發(fā)請求最快的鏡像資源
下面的例子展示一個使用緩沖通道的應(yīng)用。它并發(fā)地向三個鏡像地址發(fā)請求,鏡像指相同但分布在不同地理區(qū)域的服務(wù)器。它將它們的響應(yīng)通過一個緩沖通道進行發(fā)送,然后只接收第一個返回的響應(yīng),因為它是最早到達的。所以 mirroredQuery 函數(shù)甚至在兩個比較慢的服務(wù)器還沒有響應(yīng)之前返回了一個結(jié)果。(偶然情況下,會出現(xiàn)像這個例子中的幾個 goroutine 同時在一個通道上并發(fā)發(fā)送,或者同時從一個通道接收的情況。):
func mirroredQuery() string {
responses := make(chan string, 3) // 有幾個鏡像,就要多大的容量,不能少
go func () { responses <- request("asia.gopl.io") }()
go func () { responses <- request("europe.gopl.io") }()
go func () { responses <- request("americas.gopl.io") }()
return <- responses // 返回最快一個獲取到的請求結(jié)果
}
func request(hostname string) (response string) { return "省略獲取返回的代碼" }
goroutine 泄露
在上面的示例中,如果使用的是無緩沖通道,兩個比較慢的 goroutine 將被卡住,因為在它們發(fā)送響應(yīng)結(jié)果到通道的時候沒有 goroutine 來接收。這個情況叫做 goroutine 泄漏。它屬于一個 bug。不像回收變量,泄漏的 goroutine 不會自動回收,所以要確保 goroutine 在不再需要的時候可以自動結(jié)束。
無緩沖和緩沖通道的選擇,緩沖通道容量大小的選擇,都會對程序的正確性產(chǎn)生影響。無緩沖通道提供強同步保障,因為每一次發(fā)送都需要和一次對應(yīng)的接收同步;對于緩沖通道,這些操作則是解耦的。如果知道要發(fā)送的值數(shù)量的上限,通常會創(chuàng)建一個容量是使用上限的緩沖通道,在接收第一個值前就完成所有的發(fā)送。在內(nèi)存無法提供緩沖容量的情況下,可能導(dǎo)致程序死鎖。
有時候需要在多個通道上接收,不能只從一個通道上接收,因為任何一個操作都會在完成前阻塞。所以需要多路復(fù)用那些操作過程,為了實現(xiàn)這個目的,需要一個 select 語句:
select {
case <-ch2:
// ...
case x := <-ch3:
// ...use x...
case ch4 <- y:
// ...
default:
// ...
}
上面展示的是 select 語句的通用形式。像 switch 語句一樣,它有一系列的情況和一個可選的默認(rèn)分支。每一個情況指定一次通信(在一些通道上進行發(fā)送或接收操作)和關(guān)聯(lián)的一段代碼塊。接收表達式操作可能出現(xiàn)在它本身上,像第一個情況,或者在一個短變量聲明中,像第二個情況。第二種形式可以讓你引用所接收的值。
select 一直等待,直到一次通信來告知有一些情況可以執(zhí)行。然后,它進行這次通信,執(zhí)行此情況所對應(yīng)的語句,其他的通信將不會發(fā)生。
下面是一個微妙的例子。通道 ch 的緩沖區(qū)大小為 1,它要么是空的,要么是滿的,因此只有在其中一個狀況下可以執(zhí)行,要么在 i 是偶數(shù)時發(fā)送,要么在 i 是奇數(shù)時接收。它總是輸出 0 2 4 6 8:
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
如果多個情況同時滿足,select 隨機選擇一個,這樣保證每一個通道有相同的機會被選中。在前一個例子中增加緩沖區(qū)的容量,會使輸出變得不可確定,因為當(dāng)緩沖既不空也不滿的情況,相當(dāng)于 select 語句在隨機做選擇。
有時候我們試圖在一個通道上發(fā)送或接收,但是不想在通道沒有準(zhǔn)備好的情況下被阻塞,非阻塞通信。這使用 select 語句也可以做到。select 可以有一個默認(rèn)情況,它用來指定在沒有其他的通信發(fā)生時可以立即執(zhí)行的動作。
下面的 select 語句嘗試從 abort 通道中接收一個值,如果沒有值,它什么也不做。這是一個非阻塞的接收操作。重復(fù)這個動作稱為對通道輪詢:
select {
case <-abort:
fmt.Println("Launch aborted!")
return
default:
// 不執(zhí)行任何操作
}
通道的零值是 nil。令人驚訝的是,nil 通道有時候很有用。因為在 nil 通道上發(fā)送和接收將永遠(yuǎn)阻塞。對于 select 語句中的情況,如果其通道是 nil,它將永遠(yuǎn)不會被選擇。可以用 nil 來開啟或禁用特性所對應(yīng)的情況,比如超時處理或者取消操作,響應(yīng)其他的輸入事件或者發(fā)送事件。
最后來一個示例的實戰(zhàn)。這里要構(gòu)建一個程序,根據(jù)命令行指定的輸入,報告一個或多個目錄的磁盤使用情況,類似 UNIX 的 du 命令。
大多數(shù)的工作由下面的 walkDir 函數(shù)完成,它使用 dirents 輔助函數(shù)來枚舉目錄中的條目:
// walkDir 遞歸地遍歷以 dir 為根目錄的整個文件樹
// 并在 fileSizes 上發(fā)送每個已找到的文件的大小
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
ioutil.ReadDir 函數(shù)返回一個 os.FileInfo 類型的切片,針對單個文件同樣的信息可以通過調(diào)用 os.Stat 函數(shù)來返回。對每一個子目錄,walkDir 遞歸調(diào)用它自己,對于每一個文件,walkDir 發(fā)送一條消息到 fileSizes 通道。消息是文件所占用的字節(jié)數(shù)。
下面的 main 函數(shù)使用兩個 goroutine。后臺 goroutine 調(diào)用 walkDir 遍歷命令行上指定的每一個目錄,最后關(guān)閉 fileSizes 通道。主 goroutine 計算從通道中接收的文件的大小的和,最后輸出總數(shù):
func main() {
// 確定初始目錄
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// 遍歷文件樹
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// 輸出結(jié)果
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.2f GB\n", nfiles, float64(nbytes)/(1<<30)) // 1<<30 就是 2**30 就是 1024*1024*1024
}
現(xiàn)在程序可以正常的工作。
如果程序可以匯報進度的話,會更加友好。如果僅僅只是把 printDiskUsage 調(diào)用移動到循環(huán)內(nèi)部,會有非常多的輸出。
下面的示例,修改了主 goroutine 中記錄結(jié)果的部分。不是在每次迭代中輸出,而是加了一個定時器,通過 select 定期輸出一次結(jié)果。另外還加上了 -v 參數(shù)來控制,可以選擇性的開啟這個功能。如果不開啟功能,那么 tick 通道的值就是 nil,它對應(yīng)的分支在select 中就永遠(yuǎn)是阻塞的。相當(dāng)于沒有開啟這個選項,很直觀的理解:
var verbose = flag.Bool("v", false, "周期性的輸出進度")
func main() {
// 確定初始目錄,沒變化
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// 遍歷文件樹,沒變化
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// 定期輸出結(jié)果
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes 關(guān)閉,則退出,相當(dāng)于原來的遍歷結(jié)束
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
因為這個版本有兩個通道需要接收 size、tick,所以無法使用 range 循環(huán)了。所以第一個 select 的分支需要通過第二個參數(shù) ok 來判斷通道是否已經(jīng)關(guān)閉。這個的 break 退出使用了標(biāo)簽,因為沒有標(biāo)簽的 break 只能跳出當(dāng)前的 select 這層,而這里是需要跳出外層的 for 循環(huán)。
這里的 flag 的解析也值得借鑒,非常簡單。首先是解析指定的參數(shù),這里是 -v 參數(shù)。多余的參數(shù)會通過 flag.Args() 返回一個字符串切片。調(diào)用的時候,必須把解析的參數(shù)放在前面:
PS H:\Go\src\gopl\ch8\du2> go run main.go -v E:\BaiduNetdiskDownload E:\XMPCache E:\Downloads
4 files 0.02 GB
41 files 2.16 GB
177 files 6.99 GB
567 files 46.66 GB
605 files 50.26 GB
PS H:\Go\src\gopl\ch8\du2>
還可以進一步提高效率,這里的 walkDir 也是可以并發(fā)調(diào)用從而充分利用磁盤系統(tǒng)的并行機制。這個版本使用了 sycn.WaitGroup 來為并發(fā)調(diào)用的 walkDir 計數(shù)。當(dāng)計數(shù)器為減為 0 的時候,關(guān)閉 fileSizes 通道:
var verbose = flag.Bool("v", false, "周期性的輸出進度")
func main() {
// 確定初始目錄,沒變化
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// 并行遍歷每一個文檔樹
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes) // 注意,多傳了一個參數(shù)
}
go func() {
n.Wait()
close(fileSizes)
}()
// 定期輸出結(jié)果,沒變化
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes 關(guān)閉,則退出,相當(dāng)于原來的遍歷結(jié)束
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.2f GB\n", nfiles, float64(nbytes)/(1<<30)) // 1<<30 就是 2**30 就是 1024*1024*1024
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) { // 注意,多了個參數(shù)
defer n.Done() // 記得退出時計數(shù)器要減1
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, n, fileSizes) // 注意,多了個參數(shù)
} else {
fileSizes <- entry.Size()
}
}
}
還需要限制一下并發(fā)數(shù),這里要修改一下 dirents 函數(shù)來使用計數(shù)信號量進行限制,防止同時打開太多的文件:
// 用于限制目錄并發(fā)數(shù)的計數(shù)信號量
var sema = make(chan struct{}, 20)
// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // 獲取令牌
defer func() { <-sema }() // 釋放令牌
entries, err := ioutil.ReadDir(dir) // 這個打開文件的操作需要限制并發(fā),在這句之前加上計數(shù)信號量,非常合適
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
現(xiàn)在這個版本的是最好的了。不過下面還會再增加一個取消的操作,這里的取消會用到廣播的機制。
一個 goroutine 無法直接終止另一個,因為這樣會讓所有的共享變量狀態(tài)處于不確定狀態(tài)。正確的做法是使用通道來傳遞一個信號,當(dāng) goroutine 接收到信號時,就終止自己。這里要討論的是如何同時取消多個 goroutine。
一個可選的做法是,給通道發(fā)送你要取消的 goroutine 同樣多的信號。但是如果一些 goroutine 已經(jīng)自己終止了,這樣計數(shù)就多了,就會在發(fā)送過程中卡住。如果某些 goroutine 還會自我繁殖,那么信號的數(shù)量又會太少。通常,任何時刻都很難知道有多少個 goroutine 正在工作。對于取消操作,這里需要一個可靠的機制在一個通道上廣播一個事件,這樣所有的 goroutine 就都能收到信號,而不用關(guān)心具體有多少個 goroutine。
當(dāng)一個通道關(guān)閉且已經(jīng)取完所有發(fā)送的值后,接下來的接收操作都會立刻返回,得到零值。就可以利用這個特性來創(chuàng)建一個廣播機制。第一步,創(chuàng)建一個取消通道,在它上面不發(fā)送任何的值,但是它的關(guān)閉表明程序需要停止它正在做的事情。
這節(jié),先講解取消廣播的實現(xiàn)。然后把這個功能加到上面的例子中去。
還要定義一個工具函數(shù) cancelled,在它被調(diào)用的時候檢測或輪詢取消狀態(tài):
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
接下來,創(chuàng)建一個讀取標(biāo)準(zhǔn)輸入的 goroutine,它通常連接到終端,當(dāng)用戶按回車后,這個 goroutine 通過關(guān)閉 done 通道來廣播取消事件:
// 當(dāng)檢測到輸入時,廣播取消
go func() {
os.Stdin.Read(make([]byte, 1)) // 讀一個字節(jié)
close(done)
}()
現(xiàn)在要讓所有的 goroutine 來響應(yīng)這個取消操作。在主 goroutine 中的 select 中,嘗試從 done 接收。如果接收到了,就需要進行取消操作,但是在結(jié)束之前,它必須耗盡 fileSizes 通道,丟棄它所有的值,知道通道關(guān)閉。這么做是為了保證所有的 walkDir 調(diào)用可以執(zhí)行完,不會卡在向 fileSizes 通道發(fā)送消息上:
for {
select {
case <-done:
// 耗盡 fileSizes,讓已經(jīng)創(chuàng)建的 goroutine 結(jié)束
for range fileSizes {
// 什么也不做
}
return
case siez, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += siez
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
walkDir 的 goroutine 在開始的時候輪詢?nèi)∠麪顟B(tài)。如果是取消的狀態(tài),就什么都不做立即返回。這樣在取消后創(chuàng)建的 goroutine 就會什么都不做而是立刻返回:
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
現(xiàn)在基本就避免了在取消后創(chuàng)建新的 goroutine。但是其他已經(jīng)創(chuàng)建的 goroutine 則會等待他們執(zhí)行完畢。要想更快的響應(yīng),就需要對程序邏輯進行侵入式的修改。要確保在取消事件之后沒有更多昂貴的操作發(fā)生。這就需要更新更多的代碼,但是通??梢酝ㄟ^在少量重要的地方檢察取消狀態(tài)來達到目的。在 dirents 中獲取信號量令牌的操作也可需要快速結(jié)束:
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // 獲取令牌
case <-done:
return nil // 取消
}
defer func() { <-sema }() // 釋放令牌
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
現(xiàn)在,當(dāng)取消事件發(fā)生時,已經(jīng)進入 dirents 函數(shù)的調(diào)用,如果已經(jīng)獲取到了令牌,則會執(zhí)行完畢,但是返回后,在地遞歸調(diào)用 walkDir 的時候就會快速退出。那些還沒獲取令牌的調(diào)用,此時在 select 中會因為從 done 通道中接收到取消的廣播而直接返回 nil。
期望的情況是,當(dāng)然是當(dāng)取消事件到來時 main 函數(shù)可以返回,然后程序隨之退出。如果發(fā)現(xiàn)在取消事件到來的時候 main 函數(shù)沒有返回,可以執(zhí)行一個 panic 調(diào)用。從崩潰的轉(zhuǎn)存儲信息中通常含有足夠的信息來幫助我們分析,發(fā)現(xiàn)哪些 goroutine 還沒有合適的取消。也可能是已經(jīng)取消了,但是需要的時間比較長??傊褂?panic 可以幫助查找原因。