轉(zhuǎn)載自:go channel原理及使用場(chǎng)景
創(chuàng)新互聯(lián)主要從事成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)南昌,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):028-86922220
type hchan struct {
qcount uint // Channel 中的元素個(gè)數(shù)
dataqsiz uint // Channel 中的循環(huán)隊(duì)列的長(zhǎng)度
buf unsafe.Pointer // Channel 的緩沖區(qū)數(shù)據(jù)指針
elemsize uint16 // 當(dāng)前 Channel 能夠收發(fā)的元素大小
closed uint32
elemtype *_type // 當(dāng)前 Channel 能夠收發(fā)的元素類型
sendx uint // Channel 的發(fā)送操作處理到的位置
recvx uint // Channel 的接收操作處理到的位置
recvq waitq // 當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)
sendq waitq // 當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
channel的初始化有2種,一種是沒有緩沖區(qū)的channel,一種是有緩沖區(qū)的channel。對(duì)應(yīng)的初始化之后hchan也是有區(qū)別的。
無(wú)緩沖區(qū)的channel,初始化的時(shí)候只為channel分配內(nèi)存,緩沖區(qū)dataqsiz的長(zhǎng)度為0
有緩沖的channel,初始化時(shí)會(huì)為channel和緩沖區(qū)分配內(nèi)存,dataqsiz長(zhǎng)度大于0
同時(shí)channel的元素大小和緩沖區(qū)的長(zhǎng)度都是有大小限制的
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 如果內(nèi)存超了,或者分配的內(nèi)存大于channel最大分配內(nèi)存,或者分配的size小于0,直接Panic
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 如果沒有緩沖區(qū),分配一段內(nèi)存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 有緩沖時(shí),如果元素不包含指針類型,會(huì)為當(dāng)前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內(nèi)存空間
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 有緩沖區(qū),且元素包含指針類型,channel和buf數(shù)組各自分配內(nèi)存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小,元素類型,循環(huán)數(shù)組長(zhǎng)度,更新到channel
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
發(fā)送數(shù)據(jù)前會(huì)加鎖,防止多個(gè)線程并發(fā)修改數(shù)據(jù)。如果channel已經(jīng)關(guān)閉,直接Panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
當(dāng)存在等待的接收者時(shí),通過(guò) runtime.send
直接將數(shù)據(jù)發(fā)送給阻塞的接收者
當(dāng)channel的recvq隊(duì)列不為空,而且channel是沒有數(shù)據(jù)數(shù)據(jù)寫入的。這個(gè)時(shí)候如果有數(shù)據(jù)寫入,會(huì)直接把數(shù)據(jù)拷貝到接收者變量所在的內(nèi)存地址上。即使這是一個(gè)有緩沖的channel,當(dāng)有等待的接收者時(shí),也是直接給接收者,不會(huì)先保存到循環(huán)隊(duì)列
// 如果目標(biāo) Channel 沒有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine,那么 runtime.chansend 會(huì)從接收隊(duì)列 recvq 中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
// 調(diào)用 runtime.sendDirect 將發(fā)送的數(shù)據(jù)直接拷貝到 x = <-c 表達(dá)式中變量 x 所在的內(nèi)存地址上
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
// 調(diào)用 runtime.goready 將等待接收數(shù)據(jù)的 Goroutine 標(biāo)記成可運(yùn)行狀態(tài) Grunnable 并把該 Goroutine 放到發(fā)送方所在的處理器的 runnext 上等待執(zhí)行,該處理器在下一次調(diào)度時(shí)會(huì)立刻喚醒數(shù)據(jù)的接收方;
// 需要注意的是,發(fā)送數(shù)據(jù)的過(guò)程只是將接收方的 Goroutine 放到了處理器的 runnext 中,程序沒有立刻執(zhí)行該 Goroutine
goready(gp, skip+1)
}
當(dāng)緩沖區(qū)存在空余空間時(shí),將發(fā)送的數(shù)據(jù)寫入 Channel 的緩沖區(qū)
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 如果當(dāng)前元素?cái)?shù)小于循環(huán)隊(duì)列的長(zhǎng)度
if c.qcount < c.dataqsiz {
// 使用 runtime.chanbuf 計(jì)算出下一個(gè)可以存儲(chǔ)數(shù)據(jù)的位置
qp := chanbuf(c, c.sendx)
// 將發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)中
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送的位置索引+1
c.sendx++
// 如果循環(huán)隊(duì)列滿了就從0開始
// 因?yàn)檫@里的 buf 是一個(gè)循環(huán)數(shù)組,所以當(dāng) sendx 等于 dataqsiz 時(shí)會(huì)重新回到數(shù)組開始的位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 增加當(dāng)前元素?cái)?shù)
c.qcount++
unlock(&c.lock)
return true
}
...
}
當(dāng)不存在緩沖區(qū)或者緩沖區(qū)已滿時(shí),等待其他 Goroutine 從 Channel 接收數(shù)據(jù)
當(dāng)因?yàn)椴淮嬖诰彌_區(qū)或者緩沖區(qū)已滿無(wú)法寫入時(shí),會(huì)構(gòu)造sudog等待執(zhí)行的gorutine結(jié)構(gòu),放到hchan的等待隊(duì)列中,直到被喚醒,把數(shù)據(jù)放到緩沖區(qū)或者直接拷貝給接收者
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 使用 select 關(guān)鍵字可以向 Channel 非阻塞地發(fā)送消息
if !block {
unlock(&c.lock)
return false
}
// 獲取發(fā)送數(shù)據(jù)使用的 Goroutine
gp := getg()
// 獲取 runtime.sudog 結(jié)構(gòu)
mysg := acquireSudog()
// 設(shè)置待發(fā)送數(shù)據(jù)的內(nèi)存地址
mysg.elem = ep
// 設(shè)置發(fā)送數(shù)據(jù)的goroutine
mysg.g = gp
mysg.isSelect = false
// 設(shè)置發(fā)送的channel
mysg.c = c
// 設(shè)置到goroutine的waiting上
gp.waiting = mysg
// 加入到發(fā)送等待隊(duì)列
c.sendq.enqueue(mysg)
// 阻塞等待喚醒
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
從一個(gè)空 Channel 接收數(shù)據(jù)
goroutine會(huì)讓出使用權(quán),并阻塞等待
if c == nil {
if !block {
return
}
// 讓出使用權(quán)
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 不獲取鎖的情況下,檢查失敗的非阻塞操作
if !block && empty(c) {
// 顯示未關(guān)閉,繼續(xù)返回false,因?yàn)閏hannel不會(huì)重新打開
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
// Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何數(shù)據(jù),那么會(huì)清除 ep 指針中的數(shù)據(jù)并立刻返回
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何數(shù)據(jù),那么會(huì)清除 ep 指針中的數(shù)據(jù)并立刻返回
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
當(dāng)存在等待的發(fā)送者時(shí),通過(guò) runtime.recv
從阻塞的發(fā)送者或者緩沖區(qū)中獲取數(shù)據(jù)
如果是無(wú)緩沖的channel,當(dāng)有接收者進(jìn)來(lái)時(shí),會(huì)直接從阻塞的發(fā)送者拷貝數(shù)據(jù)
如果是有緩沖的channel,當(dāng)有接收者進(jìn)來(lái)時(shí),會(huì)先從緩沖區(qū)拿數(shù)據(jù),接著等待的發(fā)送者會(huì)把數(shù)據(jù)拷貝到緩沖區(qū)
注意這個(gè)時(shí)候并沒有直接去喚醒發(fā)送者,而是放到下次p的執(zhí)行隊(duì)列中中,下次調(diào)度時(shí)會(huì)喚醒發(fā)送者,發(fā)送者會(huì)做一些釋放資源的操作
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// 如果無(wú)緩存,直接從發(fā)送者拷貝數(shù)據(jù)
recvDirect(c.elemtype, sg, ep)
}
} else {
// 由于隊(duì)列已滿,接收數(shù)據(jù)的索引和發(fā)送數(shù)據(jù)的索引一致
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// 數(shù)據(jù)從隊(duì)列拷貝到目標(biāo)內(nèi)存地址
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 數(shù)據(jù)從發(fā)送者拷貝到緩沖區(qū)
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 無(wú)論發(fā)生哪種情況,運(yùn)行時(shí)都會(huì)調(diào)用 runtime.goready 將當(dāng)前處理器的 runnext 設(shè)置成發(fā)送數(shù)據(jù)的 Goroutine,在調(diào)度器下一次調(diào)度時(shí)將阻塞的發(fā)送方喚醒。
goready(gp, skip+1)
}
當(dāng)緩沖區(qū)存在數(shù)據(jù)時(shí),從 Channel 的緩沖區(qū)中接收數(shù)據(jù)
if c.qcount > 0 {
// 直接從隊(duì)列取數(shù)據(jù)
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 放到目標(biāo)內(nèi)存
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空隊(duì)列中對(duì)應(yīng)的元素
typedmemclr(c.elemtype, qp)
// 接收索引+1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 隊(duì)列元素-1
c.qcount--
unlock(&c.lock)
return true, true
}
當(dāng)緩沖區(qū)中不存在數(shù)據(jù)時(shí),等待其他 Goroutine 向 Channel 發(fā)送數(shù)據(jù)
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// 阻塞等待,讓出使用權(quán)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 喚醒之后清空sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
當(dāng) Channel 是一個(gè)空指針或者已經(jīng)被關(guān)閉時(shí),Go 語(yǔ)言運(yùn)行時(shí)都會(huì)直接崩潰并拋出異常
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
將 recvq
和 sendq
兩個(gè)隊(duì)列中的數(shù)據(jù)加入到 Goroutine 列表 gList
中,與此同時(shí)該函數(shù)會(huì)清除所有 runtime.sudog
上未被處理的元素
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 為所有被阻塞的 Goroutine 調(diào)用 runtime.goready 觸發(fā)調(diào)度。
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
有4個(gè)goroutine,編號(hào)為1、2、3、4。每秒鐘會(huì)有一個(gè)goroutine打印出自己的編號(hào),要求寫一個(gè)程序,讓輸出的編號(hào)總是按照1、2、3、4、1、2、3、4...的順序打印出來(lái)
package main
import (
"fmt"
"time"
)
func main() {
// 4個(gè)channel
chs := make([]chan int, 4)
for i, _ := range chs {
chs[i] = make(chan int)
// 開4個(gè)協(xié)程
go func(i int) {
for {
// 獲取當(dāng)前channel值并打印
v := <-chs[i]
fmt.Println(v + 1)
time.Sleep(time.Second)
// 把下一個(gè)值寫入下一個(gè)channel,等待下一次消費(fèi)
chs[(i+1)%4] <- (v + 1) % 4
}
}(i)
}
// 往第一個(gè)塞入0
chs[0] <- 0
select {}
}
package main
import (
"fmt"
"time"
)
func main() {
// 每次處理3個(gè)請(qǐng)求
chLimit := make(chan struct{}, 3)
for i := 0; i < 20; i++ {
chLimit <- struct{}{}
go func(i int) {
fmt.Println("下游服務(wù)處理邏輯...", i)
time.Sleep(time.Second * 3)
<-chLimit
}(i)
}
time.Sleep(30 * time.Second)
}
如果覺得sleep太丑太暴力,可以用waitGroup控制結(jié)束時(shí)機(jī)
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
// 每次處理3個(gè)請(qǐng)求
chLimit := make(chan struct{}, 3)
for i := 0; i < 20; i++ {
chLimit <- struct{}{}
wg.Add(1)
go func(i int) {
fmt.Println("下游服務(wù)處理邏輯...", i)
time.Sleep(time.Second * 3)
<-chLimit
wg.Done()
}(i)
}
wg.Wait()
}
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
for {
select {
case <-closing:
return
default:
fmt.Println("業(yè)務(wù)邏輯...")
time.Sleep(1 * time.Second)
}
}
}()
termChan := make(chan os.Signal)
// 監(jiān)聽退出信號(hào)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
// 退出中
close(closing)
// 退出之前清理一下
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
log.Println("清理超時(shí)不等了")
}
log.Println("優(yōu)雅退出")
}
func doCleanup(closed chan struct{}) {
time.Sleep(time.Minute)
// 清理完后退出
close(closed)
}
初始化一個(gè)緩沖區(qū)為1的channel,放入元素代表一把鎖,誰(shuí)獲取到這個(gè)元素就代表獲取了這把鎖,釋放鎖的時(shí)候再把這個(gè)元素放回channel
package main
import (
"log"
"time"
)
type Mutex struct {
ch chan struct{}
}
// 初始化鎖
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 加鎖,阻塞獲取
func (m *Mutex) Lock() {
<- m.ch
}
// 釋放鎖
func (m *Mutex) Unlock() {
select {
// 成功寫入channel代表釋放成功
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 嘗試獲取鎖
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
// 成功獲取鎖關(guān)閉定時(shí)器
timer.Stop()
return true
case <-timer.C:
}
// 獲取鎖超時(shí)
return false
}
// 是否上鎖
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
log.Printf("locked v %v\n", ok)
ok = m.TryLock()
log.Printf("locked v %v\n", ok)
go func() {
time.Sleep(5*time.Second)
m.Unlock()
}()
ok = m.LockTimeout(10*time.Second)
log.Printf("LockTimeout v %v\n", ok)
}
參考:
極刻時(shí)間《go 并發(fā)編程實(shí)戰(zhàn)》