真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

go channel原理及使用場(chǎng)景

轉(zhuǎn)載自:go channel原理及使用場(chǎng)景

創(chuàng)新互聯(lián)主要從事成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)南昌,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):028-86922220

源碼解析

type hchan struct {
	qcount   uint           // Channel 中的元素個(gè)數(shù)
	dataqsiz uint           // Channel 中的循環(huán)隊(duì)列的長(zhǎng)度
	buf      unsafe.Pointer // Channel 的緩沖區(qū)數(shù)據(jù)指針
	elemsize uint16 // 當(dāng)前 Channel 能夠收發(fā)的元素大小
	closed   uint32
	elemtype *_type // 當(dāng)前 Channel 能夠收發(fā)的元素類型
	sendx    uint   // Channel 的發(fā)送操作處理到的位置
	recvx    uint   // Channel 的接收操作處理到的位置
  recvq    waitq  // 當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)
	sendq    waitq  // 當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

創(chuàng)建channel

channel的初始化有2種,一種是沒有緩沖區(qū)的channel,一種是有緩沖區(qū)的channel。對(duì)應(yīng)的初始化之后hchan也是有區(qū)別的。

無(wú)緩沖區(qū)的channel,初始化的時(shí)候只為channel分配內(nèi)存,緩沖區(qū)dataqsiz的長(zhǎng)度為0

有緩沖的channel,初始化時(shí)會(huì)為channel和緩沖區(qū)分配內(nèi)存,dataqsiz長(zhǎng)度大于0

同時(shí)channel的元素大小和緩沖區(qū)的長(zhǎng)度都是有大小限制的

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

  // 如果內(nèi)存超了,或者分配的內(nèi)存大于channel最大分配內(nèi)存,或者分配的size小于0,直接Panic
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// 如果沒有緩沖區(qū),分配一段內(nèi)存
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 有緩沖時(shí),如果元素不包含指針類型,會(huì)為當(dāng)前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內(nèi)存空間
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 有緩沖區(qū),且元素包含指針類型,channel和buf數(shù)組各自分配內(nèi)存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

  // 元素大小,元素類型,循環(huán)數(shù)組長(zhǎng)度,更新到channel
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

發(fā)送數(shù)據(jù)(ch <- i)

  • 發(fā)送數(shù)據(jù)前會(huì)加鎖,防止多個(gè)線程并發(fā)修改數(shù)據(jù)。如果channel已經(jīng)關(guān)閉,直接Panic

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    	lock(&c.lock)
    
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("send on closed channel"))
    	}
    
  • 當(dāng)存在等待的接收者時(shí),通過(guò) runtime.send 直接將數(shù)據(jù)發(fā)送給阻塞的接收者

    當(dāng)channel的recvq隊(duì)列不為空,而且channel是沒有數(shù)據(jù)數(shù)據(jù)寫入的。這個(gè)時(shí)候如果有數(shù)據(jù)寫入,會(huì)直接把數(shù)據(jù)拷貝到接收者變量所在的內(nèi)存地址上。即使這是一個(gè)有緩沖的channel,當(dāng)有等待的接收者時(shí),也是直接給接收者,不會(huì)先保存到循環(huán)隊(duì)列

    // 如果目標(biāo) Channel 沒有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine,那么 runtime.chansend 會(huì)從接收隊(duì)列 recvq 中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù)
    if sg := c.recvq.dequeue(); sg != nil {
    		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    		return true
    	}
    
    // 
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    	if sg.elem != nil {
        // 調(diào)用 runtime.sendDirect 將發(fā)送的數(shù)據(jù)直接拷貝到 x = <-c 表達(dá)式中變量 x 所在的內(nèi)存地址上
    		sendDirect(c.elemtype, sg, ep)
    		sg.elem = nil
    	}
    	gp := sg.g
    	unlockf()
    	gp.param = unsafe.Pointer(sg)
      // 調(diào)用 runtime.goready 將等待接收數(shù)據(jù)的 Goroutine 標(biāo)記成可運(yùn)行狀態(tài) Grunnable 并把該 Goroutine 放到發(fā)送方所在的處理器的 runnext 上等待執(zhí)行,該處理器在下一次調(diào)度時(shí)會(huì)立刻喚醒數(shù)據(jù)的接收方;
      // 需要注意的是,發(fā)送數(shù)據(jù)的過(guò)程只是將接收方的 Goroutine 放到了處理器的 runnext 中,程序沒有立刻執(zhí)行該 Goroutine
    	goready(gp, skip+1)
    }
    
  • 當(dāng)緩沖區(qū)存在空余空間時(shí),將發(fā)送的數(shù)據(jù)寫入 Channel 的緩沖區(qū)

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    	...
      // 如果當(dāng)前元素?cái)?shù)小于循環(huán)隊(duì)列的長(zhǎng)度
    	if c.qcount < c.dataqsiz {
        // 使用 runtime.chanbuf 計(jì)算出下一個(gè)可以存儲(chǔ)數(shù)據(jù)的位置
    		qp := chanbuf(c, c.sendx)
        // 將發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)中
    		typedmemmove(c.elemtype, qp, ep)
        // 發(fā)送的位置索引+1
    		c.sendx++
        // 如果循環(huán)隊(duì)列滿了就從0開始
        // 因?yàn)檫@里的 buf 是一個(gè)循環(huán)數(shù)組,所以當(dāng) sendx 等于 dataqsiz 時(shí)會(huì)重新回到數(shù)組開始的位置
    		if c.sendx == c.dataqsiz {
    			c.sendx = 0
    		}
        // 增加當(dāng)前元素?cái)?shù)
    		c.qcount++
    		unlock(&c.lock)
    		return true
    	}
    	...
    }
    
  • 當(dāng)不存在緩沖區(qū)或者緩沖區(qū)已滿時(shí),等待其他 Goroutine 從 Channel 接收數(shù)據(jù)

    當(dāng)因?yàn)椴淮嬖诰彌_區(qū)或者緩沖區(qū)已滿無(wú)法寫入時(shí),會(huì)構(gòu)造sudog等待執(zhí)行的gorutine結(jié)構(gòu),放到hchan的等待隊(duì)列中,直到被喚醒,把數(shù)據(jù)放到緩沖區(qū)或者直接拷貝給接收者

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    	...
      // 使用 select 關(guān)鍵字可以向 Channel 非阻塞地發(fā)送消息
    	if !block {
    		unlock(&c.lock)
    		return false
    	}
    
      // 獲取發(fā)送數(shù)據(jù)使用的 Goroutine
    	gp := getg()
      //  獲取 runtime.sudog 結(jié)構(gòu)
    	mysg := acquireSudog()
      // 設(shè)置待發(fā)送數(shù)據(jù)的內(nèi)存地址
    	mysg.elem = ep
      // 設(shè)置發(fā)送數(shù)據(jù)的goroutine
    	mysg.g = gp
      mysg.isSelect = false
      // 設(shè)置發(fā)送的channel
    	mysg.c = c
      // 設(shè)置到goroutine的waiting上
    	gp.waiting = mysg
      // 加入到發(fā)送等待隊(duì)列
    	c.sendq.enqueue(mysg)
      // 阻塞等待喚醒
    	atomic.Store8(&gp.parkingOnChan, 1)
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    	KeepAlive(ep)
    
    	// someone woke us up.
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	closed := !mysg.success
    	gp.param = nil
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	mysg.c = nil
    	releaseSudog(mysg)
    	if closed {
    		if c.closed == 0 {
    			throw("chansend: spurious wakeup")
    		}
    		panic(plainError("send on closed channel"))
    	}
    	return true
    }
    

接收數(shù)據(jù)(<- ch)

  • 從一個(gè)空 Channel 接收數(shù)據(jù)

    goroutine會(huì)讓出使用權(quán),并阻塞等待

    	if c == nil {
    		if !block {
    			return
    		}
        // 讓出使用權(quán)
    		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    		throw("unreachable")
    	}
    
    	// 不獲取鎖的情況下,檢查失敗的非阻塞操作
    	if !block && empty(c) {
    		// 顯示未關(guān)閉,繼續(xù)返回false,因?yàn)閏hannel不會(huì)重新打開
    		if atomic.Load(&c.closed) == 0 {
    			return
    		}
    
    		if empty(c) {
    			// The channel is irreversibly closed and empty.
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
          // Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何數(shù)據(jù),那么會(huì)清除 ep 指針中的數(shù)據(jù)并立刻返回
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false
    		}
    	}
    
    	var t0 int64
    	if blockprofilerate > 0 {
    		t0 = cputicks()
    	}
    
    	lock(&c.lock)
    
    	if c.closed != 0 && c.qcount == 0 {
    		if raceenabled {
    			raceacquire(c.raceaddr())
    		}
    		unlock(&c.lock)
        // Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何數(shù)據(jù),那么會(huì)清除 ep 指針中的數(shù)據(jù)并立刻返回
    		if ep != nil {
    			typedmemclr(c.elemtype, ep)
    		}
    		return true, false
    	}
    
  • 當(dāng)存在等待的發(fā)送者時(shí),通過(guò) runtime.recv 從阻塞的發(fā)送者或者緩沖區(qū)中獲取數(shù)據(jù)

    如果是無(wú)緩沖的channel,當(dāng)有接收者進(jìn)來(lái)時(shí),會(huì)直接從阻塞的發(fā)送者拷貝數(shù)據(jù)

    如果是有緩沖的channel,當(dāng)有接收者進(jìn)來(lái)時(shí),會(huì)先從緩沖區(qū)拿數(shù)據(jù),接著等待的發(fā)送者會(huì)把數(shù)據(jù)拷貝到緩沖區(qū)

    注意這個(gè)時(shí)候并沒有直接去喚醒發(fā)送者,而是放到下次p的執(zhí)行隊(duì)列中中,下次調(diào)度時(shí)會(huì)喚醒發(fā)送者,發(fā)送者會(huì)做一些釋放資源的操作

    if sg := c.sendq.dequeue(); sg != nil {
    		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    		return true, true
    	}
    
    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    	if c.dataqsiz == 0 {
    		if raceenabled {
    			racesync(c, sg)
    		}
    		if ep != nil {
    			// 如果無(wú)緩存,直接從發(fā)送者拷貝數(shù)據(jù)
    			recvDirect(c.elemtype, sg, ep)
    		}
    	} else {
    		// 由于隊(duì)列已滿,接收數(shù)據(jù)的索引和發(fā)送數(shù)據(jù)的索引一致
        qp := chanbuf(c, c.recvx)
    		if raceenabled {
    			racenotify(c, c.recvx, nil)
    			racenotify(c, c.recvx, sg)
    		}
    		// 數(shù)據(jù)從隊(duì)列拷貝到目標(biāo)內(nèi)存地址
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)
    		}
    		// 數(shù)據(jù)從發(fā)送者拷貝到緩沖區(qū)
    		typedmemmove(c.elemtype, qp, sg.elem)
    		c.recvx++
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
    		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    	}
    	sg.elem = nil
    	gp := sg.g
    	unlockf()
    	gp.param = unsafe.Pointer(sg)
    	sg.success = true
    	if sg.releasetime != 0 {
    		sg.releasetime = cputicks()
    	}
      // 無(wú)論發(fā)生哪種情況,運(yùn)行時(shí)都會(huì)調(diào)用 runtime.goready 將當(dāng)前處理器的 runnext 設(shè)置成發(fā)送數(shù)據(jù)的 Goroutine,在調(diào)度器下一次調(diào)度時(shí)將阻塞的發(fā)送方喚醒。
    	goready(gp, skip+1)
    }
    
  • 當(dāng)緩沖區(qū)存在數(shù)據(jù)時(shí),從 Channel 的緩沖區(qū)中接收數(shù)據(jù)

    if c.qcount > 0 {
    		// 直接從隊(duì)列取數(shù)據(jù)
    		qp := chanbuf(c, c.recvx)
    		if raceenabled {
    			racenotify(c, c.recvx, nil)
    		}
      	// 放到目標(biāo)內(nèi)存
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)
    		}
      	// 清空隊(duì)列中對(duì)應(yīng)的元素
    		typedmemclr(c.elemtype, qp)
      	// 接收索引+1
    		c.recvx++
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
      	// 隊(duì)列元素-1
    		c.qcount--
    		unlock(&c.lock)
    		return true, true
    	}
    
  • 當(dāng)緩沖區(qū)中不存在數(shù)據(jù)時(shí),等待其他 Goroutine 向 Channel 發(fā)送數(shù)據(jù)

    if !block {
    		unlock(&c.lock)
    		return false, false
    	}
    
    	// no sender available: block on this channel.
    	gp := getg()
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    	// No stack splits between assigning elem and enqueuing mysg
    	// on gp.waiting where copystack can find it.
    	mysg.elem = ep
    	mysg.waitlink = nil
    	gp.waiting = mysg
    	mysg.g = gp
    	mysg.isSelect = false
    	mysg.c = c
    	gp.param = nil
    	c.recvq.enqueue(mysg)
    	// 阻塞等待,讓出使用權(quán)
    	atomic.Store8(&gp.parkingOnChan, 1)
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
    	// 喚醒之后清空sudog
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	success := mysg.success
    	gp.param = nil
    	mysg.c = nil
    	releaseSudog(mysg)
    	return true, success
    

關(guān)閉channel

  • 當(dāng) Channel 是一個(gè)空指針或者已經(jīng)被關(guān)閉時(shí),Go 語(yǔ)言運(yùn)行時(shí)都會(huì)直接崩潰并拋出異常

    func closechan(c *hchan) {
    	if c == nil {
    		panic(plainError("close of nil channel"))
    	}
    
    	lock(&c.lock)
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))
    	}
    
  • recvqsendq 兩個(gè)隊(duì)列中的數(shù)據(jù)加入到 Goroutine 列表 gList 中,與此同時(shí)該函數(shù)會(huì)清除所有 runtime.sudog 上未被處理的元素

    c.closed = 1
    
    	var glist gList
    
    	// release all readers
    	for {
    		sg := c.recvq.dequeue()
    		if sg == nil {
    			break
    		}
    		if sg.elem != nil {
    			typedmemclr(c.elemtype, sg.elem)
    			sg.elem = nil
    		}
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    
    	// release all writers (they will panic)
    	for {
    		sg := c.sendq.dequeue()
    		if sg == nil {
    			break
    		}
    		sg.elem = nil
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    	unlock(&c.lock)
    
    	// 為所有被阻塞的 Goroutine 調(diào)用 runtime.goready 觸發(fā)調(diào)度。
    	for !glist.empty() {
    		gp := glist.pop()
    		gp.schedlink = 0
    		goready(gp, 3)
    	}
    

使用場(chǎng)景

報(bào)錯(cuò)情形

  • 往一個(gè)關(guān)閉的channel發(fā)送數(shù)據(jù)會(huì)報(bào)錯(cuò):panic: send on closed channel
  • 關(guān)閉一個(gè)nil的chan會(huì)報(bào)錯(cuò):panic: close of nil channel
  • 關(guān)閉一個(gè)已經(jīng)關(guān)閉的channel報(bào)錯(cuò):panic: close of closed channel

1、一個(gè)經(jīng)典的算法題

有4個(gè)goroutine,編號(hào)為1、2、3、4。每秒鐘會(huì)有一個(gè)goroutine打印出自己的編號(hào),要求寫一個(gè)程序,讓輸出的編號(hào)總是按照1、2、3、4、1、2、3、4...的順序打印出來(lái)

package main

import (
	"fmt"
	"time"
)

func main() {
  // 4個(gè)channel
	chs := make([]chan int, 4)
	for i, _ := range chs {
		chs[i] = make(chan int)
    // 開4個(gè)協(xié)程
		go func(i int) {
			for {
        // 獲取當(dāng)前channel值并打印
				v := <-chs[i]
				fmt.Println(v + 1)
				time.Sleep(time.Second)
        // 把下一個(gè)值寫入下一個(gè)channel,等待下一次消費(fèi)
				chs[(i+1)%4] <- (v + 1) % 4
			}

		}(i)
	}

  // 往第一個(gè)塞入0
	chs[0] <- 0
	select {}
}

2、限流器

package main

import (
	"fmt"
	"time"
)

func main() {
	// 每次處理3個(gè)請(qǐng)求
	chLimit := make(chan struct{}, 3)
	for i := 0; i < 20; i++ {
		chLimit <- struct{}{}
		go func(i int) {
			fmt.Println("下游服務(wù)處理邏輯...", i)
			time.Sleep(time.Second * 3)
			<-chLimit
		}(i)
	}
	time.Sleep(30 * time.Second)
}

如果覺得sleep太丑太暴力,可以用waitGroup控制結(jié)束時(shí)機(jī)

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	// 每次處理3個(gè)請(qǐng)求
	chLimit := make(chan struct{}, 3)
	for i := 0; i < 20; i++ {
		chLimit <- struct{}{}
		wg.Add(1)
		go func(i int) {
			fmt.Println("下游服務(wù)處理邏輯...", i)
			time.Sleep(time.Second * 3)
			<-chLimit
			wg.Done()
		}(i)
	}
	wg.Wait()
}

3、優(yōu)雅退出

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	var closing = make(chan struct{})
	var closed = make(chan struct{})

	go func() {
		for {
			select {
			case <-closing:
				return
			default:
				fmt.Println("業(yè)務(wù)邏輯...")
				time.Sleep(1 * time.Second)
			}
		}
	}()

	termChan := make(chan os.Signal)
  // 監(jiān)聽退出信號(hào)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	<-termChan

  // 退出中
	close(closing)

	// 退出之前清理一下
	go doCleanup(closed)

	select {
	case <-closed:
	case <-time.After(time.Second):
		log.Println("清理超時(shí)不等了")
	}

	log.Println("優(yōu)雅退出")
}

func doCleanup(closed chan struct{}) {
	time.Sleep(time.Minute)
  // 清理完后退出
	close(closed)
}

4、實(shí)現(xiàn)互斥鎖

初始化一個(gè)緩沖區(qū)為1的channel,放入元素代表一把鎖,誰(shuí)獲取到這個(gè)元素就代表獲取了這把鎖,釋放鎖的時(shí)候再把這個(gè)元素放回channel

package main

import (
	"log"
	"time"
)

type Mutex struct {
	ch chan struct{}
}

// 初始化鎖
func NewMutex() *Mutex {
	mu := &Mutex{make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}

// 加鎖,阻塞獲取
func (m *Mutex) Lock()  {
	<- m.ch
}

// 釋放鎖
func (m *Mutex) Unlock()  {
	select {
    // 成功寫入channel代表釋放成功
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
}

// 嘗試獲取鎖
func (m *Mutex) TryLock() bool {
	select {
	case <-m.ch:
		return true
	default:

	}
	return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {
	timer := time.NewTimer(timeout)

	select {
	case <-m.ch:
    // 成功獲取鎖關(guān)閉定時(shí)器
		timer.Stop()
		return true
	case <-timer.C:

	}
  // 獲取鎖超時(shí)
	return false
}

// 是否上鎖
func (m *Mutex) IsLocked() bool {
	return len(m.ch) == 0
}


func main()  {
	m := NewMutex()
	ok := m.TryLock()
	log.Printf("locked v %v\n", ok)
	ok = m.TryLock()
	log.Printf("locked v %v\n", ok)

	go func() {
		time.Sleep(5*time.Second)
		m.Unlock()
	}()

	ok = m.LockTimeout(10*time.Second)
	log.Printf("LockTimeout v %v\n", ok)
}

參考:

極刻時(shí)間《go 并發(fā)編程實(shí)戰(zhàn)》


文章標(biāo)題:go channel原理及使用場(chǎng)景
分享路徑:http://weahome.cn/article/dsoicdd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部