條件變量(conditional variable),和互斥鎖一樣,也是一個同步工具。我們常常會把條件變量與互斥鎖一起討論。實際上,條件變量是基于互斥鎖的,它必須有互斥鎖的支撐才能發(fā)揮作用。
黃岡ssl適用于網(wǎng)站、小程序/APP、API接口等需要進行數(shù)據(jù)傳輸應用場景,ssl證書未來市場廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場價格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:028-86922220(備注:SSL證書合作)期待與您的合作!
條件變量并不是被用來保護臨界區(qū)和共享資源的,它是用于協(xié)調(diào)想要訪問共享資源的那些線程的。當共享資源的狀態(tài)發(fā)生變化時,它可以被用來通知被互斥鎖阻塞的線程。
使用條件變量的最大優(yōu)勢就是在效率方面的提升。當共享資源的狀態(tài)不滿足條件的時候,想操作它的線程再也不用循環(huán)往復的做檢查了,只要等待通知就好了。
條件變量需要與互斥鎖配合使用。條件變量的初始化需要互斥鎖,并且它的方法有的也是基于互斥鎖的。
條件變量提供的方法有三個:
在利用條件變量等待通知的時候,需要在它基于的那個互斥鎖的保護下進行。
在進行單發(fā)通知或光爆通知的時候,需要在對應的互斥鎖解鎖之后再做操作。
創(chuàng)建條件變量
結合代碼理解上面的含義,先創(chuàng)建幾個變量:
var lock sync.RWMutex
sendCond := sync.NewCond(&lock)
recvCond := sync.NewCond(lock.RLocker())
條件變量的類型
lock是一個讀寫鎖,基于這把鎖,創(chuàng)建了2個代表條件變量的變量,這兩個變量的類型是*sync.Cond,是由sync.NewCond函數(shù)來初始化的。
初始化
與互斥鎖鎖不同,這里不是開箱即用的,只能使用sync.NewCond函數(shù)來創(chuàng)建它的指針值,這個函數(shù)需要一個sync.Locker類型的參數(shù)。
前面說過,條件變量是基于互斥鎖的,它必須有互斥鎖的支持才能夠起作用。因此,這里的參數(shù)是必須的,它也會參與到條件變量的方法實現(xiàn)中去。
sync.Locker接口
sync.Locker其實是一個接口,包含兩個方法Lock()和Unlock():
type Locker interface {
Lock()
Unlock()
}
sync.Mutex類型sync,RWMutex類型都擁有這兩個方法,不過都是指針方法。因此這兩個類型的指針類型才是sync.Locker接口的實現(xiàn)類型。
初始化的過程
在為sendCond初始化的時候,把lock變量的指針作為參數(shù)。這里lock變量的Lock方法和Unlock方法分別用于對其中寫鎖的鎖定和解鎖。這里與實現(xiàn)接口的兩個方法的名稱是對應的。
在為recvCond初始化的時候,需要的是lock變量的讀鎖,并且還得是sync.Locker接口類型,就是要實現(xiàn)了Lock和Unlock方法的讀鎖??墒莑ock變量中用于讀鎖的方法卻是RLock方法和RUnlock方法,這里名稱不對應了。不過有一個RLocker方法可以實現(xiàn)這一需求,下面是源碼里實現(xiàn)的部分,很簡單:
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
這里我有一些小疑惑,3個方法里面都是類型斷言吧。RLocker方法把原來的讀寫鎖類型轉成一個新的類型然后返回。后面的兩個方法,為了用新類型調(diào)用讀寫鎖類型里的方法,先進行類型斷言,轉成讀寫鎖原本的類型,然后調(diào)用它的方法。
使用條件變量
下面是截取的使用時的部分代碼:
lock.Lock()
for !isEmpty {
sendCond.Wait()
}
isEmpty = false
// 這里可以做寫入的操作
lock.Unlock()
recvCond.Signal()
上面是一個寫入的流程。之前的代碼定義了一個狀態(tài)變量isEmpty,只有狀態(tài)為空的時候,才允許寫入,寫入后把狀態(tài)設置為非空。
這里要先調(diào)用Lock方法,等待通知(wait)是要在互斥鎖的保護下進行的。
然后再操作完之后,先調(diào)用Unlock方法,再發(fā)送通知,發(fā)送通知的操作要在互斥鎖解鎖之后。
這里等待的出sendCond的信號,而最后發(fā)送的是recvCond的信號。在另一個讀取的流程里則正好相反。利用條件變量可以實現(xiàn)單向的通知,而這里要實現(xiàn)雙向的通知,就需要兩個條件變量。這是條件變量的基本使用原則。
上面把關鍵的代碼分析了一下,下面是完整的示例代碼:
package main
import (
"fmt"
"sync"
"time"
"flag"
)
var useCond bool
func init() {
flag.BoolVar(&useCond, "cond", false, "是否使用條件變量")
}
type msgBox struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}
func main() {
flag.Parse()
fmt.Println("是否開啟了條件變量保護:", useCond)
var lock sync.RWMutex
msgBox := msgBox{
isEmpty: true, // 默認值是false,狀態(tài)初始值應該為true
sendCond: sync.NewCond(&lock), // 不是開箱即用的,需要在使用前初始化
recvCond: sync.NewCond(lock.RLocker()),
}
done := make(chan struct{})
max := 5
// 寫操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < max; i++ {
time.Sleep(time.Millisecond * 200)
// 先進行保護
lock.Lock()
// 再等待通知
for useCond && !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msgBox.isEmpty = false
msg := fmt.Sprintf("第 %d 條消息", i)
msgBox.message = msg
fmt.Printf("發(fā)送消息[%d]: %s\n", i, msg)
// 先解鎖
lock.Unlock()
// 再發(fā)送通知
msgBox.recvCond.Signal()
}
}(max)
// 讀操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max; j++ {
time.Sleep(time.Millisecond * 500)
lock.RLock()
for useCond && msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msgBox.isEmpty = true
msg := msgBox.message
fmt.Printf("接收消息[%d]: %s\n", j, msg)
lock.RUnlock()
msgBox.sendCond.Signal()
}
}(max)
<-done
<-done
fmt.Println("Over")
}
代碼中條件變量的作用
在這個例子里,寫的時候要獲取到寫鎖,讀的時候要獲取到讀鎖,這個邏輯和之前互斥鎖是一樣的。但是只是獲取到鎖還不能做操作,這里還要再做一個限制,所以就用到了條件變量。
在這個例子里,寫操作和讀操作是需要成對出現(xiàn)的。寫完一次之后,依然能獲取到寫鎖,但是不能立刻寫。而是要等待讀操作把之前寫入的數(shù)據(jù)讀過之后,才能再次寫入,把之前的內(nèi)容覆蓋掉。讀操作也是一樣。這里就需要兩個goroutine之間傳遞信號了。
通過命令行參數(shù)分別在開啟/關閉條件變量的環(huán)境下運行,可以看到其中的作用:
go run main.go
go run main.go -cond
條件變量的Wait方法主要做了4件事:
先解鎖,在阻塞
在Wait方法里,必須要先解鎖,在阻塞當前goroutine。否則就違背了互斥鎖要成對出現(xiàn)的原則。并且當前goroutine在解鎖千就阻塞的話,當前goroutine就不可能在執(zhí)行解鎖了。即使不考慮原則,讓別的goroutine來解鎖,又會有重復解鎖可能。
使用for語句
并且Wait方法建議是放在一個for循環(huán)里的。這里似乎也是可以用if語句的。但是if語句只能檢查狀態(tài)一次,而for的話可以進行多次檢查。如果goroutine收到了通知而喚醒,但是此時檢查時發(fā)現(xiàn)狀態(tài)還是不對,那么就應該再次調(diào)用Wait方法。保險起見,在包裹條件變量的Wait方法總是應該使用for語句。
這2個方法都是用來發(fā)送通知的。Signal方法的通知只會喚醒一個goroutine,而Broadcast方法的通知會喚醒所有等待的goroutine。Wait方法會把當前的goroutine添加到通知隊列的隊尾,而Signal方法會從通知隊列的隊首開始查找可以被喚醒的goroutine。因此Signal方法喚醒的一般是最早等待的那個goroutine。
適用場景
這2個方法的行為決定他們的適用場景。確定只有一個goroutine在等待通知,或者值需要喚醒一個goroutine的時候,就使用Signal方法。否則,使用Broadcast方法總是沒錯的,Broadcast方法的適用場景更多。
通知的即時性
條件變量的通知具有即時性。如果發(fā)送通知的時候沒有goroutine在等待,那么該次通知就會被直接丟棄。之后再開始等待的goroutine需要等待之后的通知。
還是前面那個示例,稍微改了改,把讀寫鎖換成了互斥鎖,通知方法把Signal換成了Broadcast:
package main
import (
"fmt"
"sync"
"time"
)
var lock sync.Mutex
// 匿名結構體,定義并初始化賦值
// 嵌入式鎖(Embedded lock)的場景適合使用匿名結構體
var msgBox = struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}{
isEmpty: true,
sendCond: sync.NewCond(&lock),
recvCond: sync.NewCond(&lock),
}
// 用于設置消息的函數(shù)
func send(id, index int) {
lock.Lock()
for !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msg := fmt.Sprintf("msg: [%d-%d]", id, index)
msgBox.message = msg
fmt.Printf("發(fā)送消息[%d-%d]: %s\t", id, index, msg)
msgBox.isEmpty = false
lock.Unlock()
msgBox.recvCond.Broadcast()
}
// 用于讀取消息的函數(shù)
func recv(id, index int) {
lock.Lock()
for msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msg := msgBox.message
msgBox.message = ""
fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg)
msgBox.isEmpty = true
lock.Unlock()
msgBox.sendCond.Broadcast()
}
func main() {
done := make(chan struct{})
count := 5
// 啟動一個goroutine用于發(fā)送
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 100)
send(id, i)
}
}(0, count * 2)
// 啟動兩個goroutine用于接收
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 300)
recv(id, i)
}
}(1, count)
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 400)
recv(id, i)
}
}(2, count)
<- done
<- done
<- done
fmt.Println("Over")
}