真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

golang開發(fā):go并發(fā)的建議(完)

上次說了一下Go語言布道師 Dave Cheney對(duì)Go并發(fā)的建議,個(gè)人覺得最重要的一條,這次主要想說一下這個(gè)。
8.3. Never start a goroutine without knowning when it will stop(永遠(yuǎn)不要在不知道何時(shí)停止的情況下啟動(dòng) goroutine)

創(chuàng)新互聯(lián)建站網(wǎng)站建設(shè)服務(wù)商,為中小企業(yè)提供網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)服務(wù),網(wǎng)站設(shè)計(jì),網(wǎng)站托管維護(hù)等一站式綜合服務(wù)型公司,專業(yè)打造企業(yè)形象網(wǎng)站,讓您在眾多競(jìng)爭(zhēng)對(duì)手中脫穎而出創(chuàng)新互聯(lián)建站

我們的需求

我這邊當(dāng)時(shí)有個(gè)需求是這樣的,我們有個(gè)考試系統(tǒng)的,每次學(xué)員答完試卷去檢查一下這次交卷是否是這次考試的最后一份試卷,如果是最后一份試卷的話,需要計(jì)算這次考試的總成績(jī),生成考試的學(xué)習(xí)報(bào)告,當(dāng)然了,如果不是最后一份試卷的話啥也不干。
生成試卷和報(bào)告是必須要生成的,不能出現(xiàn)考完試了沒有總成績(jī)和總報(bào)告。
接到這個(gè)需求的時(shí)候,我首先想到的是使用golang的goroutine去異步算出成績(jī)生成報(bào)告。然后寫代碼就是這樣的。

go createReport()

這不剛好是8.3 永遠(yuǎn)不要這樣寫的建議么?
然后覺得應(yīng)該寫一個(gè)管理goroutine異步執(zhí)行任務(wù)的類庫(kù),創(chuàng)建執(zhí)行銷毀都由這個(gè)管理工具去執(zhí)行。準(zhǔn)備寫的時(shí)候發(fā)現(xiàn)B站的代碼里有一個(gè)這樣的類庫(kù),異步執(zhí)行的類庫(kù)。

B站的類庫(kù)

B站代碼里面異步任務(wù)是這個(gè)文件
openbilibili-go-common-master/library/sync/pipeline/fanout/fanout.go

var (
	// ErrFull chan full.
	ErrFull   = errors.New("fanout: chan full")
	stats     = prom.BusinessInfoCount
	traceTags = []trace.Tag{
		trace.Tag{Key: trace.TagSpanKind, Value: "background"},
		trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
	}
)

type options struct {
	worker int
	buffer int
}

// Option fanout option
type Option func(*options)

// Worker specifies the worker of fanout
func Worker(n int) Option {
	if n <= 0 {
		panic("fanout: worker should > 0")
	}
	return func(o *options) {
		o.worker = n
	}
}

// Buffer specifies the buffer of fanout
func Buffer(n int) Option {
	if n <= 0 {
		panic("fanout: buffer should > 0")
	}
	return func(o *options) {
		o.buffer = n
	}
}

type item struct {
	f   func(c context.Context)
	ctx context.Context
}

// Fanout async consume data from chan.
type Fanout struct {
	name    string
	ch      chan item
	options *options
	waiter  sync.WaitGroup

	ctx    context.Context
	cancel func()
}

// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
	if name == "" {
		name = "fanout"
	}
	o := &options{
		worker: 1,
		buffer: 1024,
	}
	for _, op := range opts {
		op(o)
	}
	c := &Fanout{
		ch:      make(chan item, o.buffer),
		name:    name,
		options: o,
	}
	c.ctx, c.cancel = context.WithCancel(context.Background())
	c.waiter.Add(o.worker)
	for i := 0; i < o.worker; i++ {
		go c.proc()
	}
	return c
}

func (c *Fanout) proc() {
	defer c.waiter.Done()
	for {
		select {
		case t := <-c.ch:
			wrapFunc(t.f)(t.ctx)
			stats.State(c.name+"_channel", int64(len(c.ch)))
		case <-c.ctx.Done():
			return
		}
	}
}

func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
	res = func(ctx context.Context) {
		defer func() {
			if r := recover(); r != nil {
				buf := make([]byte, 64*1024)
				buf = buf[:runtime.Stack(buf, false)]
				log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)
			}
		}()
		f(ctx)
		if tr, ok := trace.FromContext(ctx); ok {
			tr.Finish(nil)
		}
	}
	return
}

// Do save a callback func.
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) {
	if f == nil || c.ctx.Err() != nil {
		return c.ctx.Err()
	}
	nakeCtx := metadata.WithContext(ctx)
	if tr, ok := trace.FromContext(ctx); ok {
		tr = tr.Fork("", "Fanout:Do").SetTag(traceTags...)
		nakeCtx = trace.NewContext(nakeCtx, tr)
	}
	select {
	case c.ch <- item{f: f, ctx: nakeCtx}:
	default:
		err = ErrFull
	}
	stats.State(c.name+"_channel", int64(len(c.ch)))
	return
}

// Close close fanout
func (c *Fanout) Close() error {
	if err := c.ctx.Err(); err != nil {
		return err
	}
	c.cancel()
	c.waiter.Wait()
	return nil
}

使用方法
	ca := New("cache", Worker(100), Buffer(1024))
	var run bool
	ca.Do(context.Background(), func(c context.Context) {
		run = true
	})

主要分析一下這個(gè)類庫(kù),以后自己寫或者使用的時(shí)候就能得心應(yīng)手了,而且這個(gè)類庫(kù)也算是創(chuàng)建goroutine,通過channel通信的經(jīng)典寫法吧

1.New方法調(diào)用的時(shí)候,會(huì)創(chuàng)建buffer個(gè)ch channel,worker個(gè)goroutine.由于ch是空的,worker個(gè)goroutine會(huì)阻塞住,一直等待有程序往ch里面寫入數(shù)據(jù)
2.Do函數(shù)一但被調(diào)用,會(huì)傳入異步任務(wù)的func,func就會(huì)寫入到ch里面了,goroutine就可以從ch里面讀取到數(shù)據(jù),并且執(zhí)行這個(gè)數(shù)據(jù)里面的func
踐行了這個(gè)原則
不要通過共享內(nèi)存來通信,要通過通信來共享內(nèi)存

有個(gè)需要注意的點(diǎn),就Do函數(shù)在執(zhí)行代碼是這樣的

代碼里面可以看到在c.ch 寫入數(shù)據(jù)的時(shí)候,如果超過c.ch的長(zhǎng)度(測(cè)試代碼里面是1024)就報(bào)錯(cuò)返回了,這樣就不能保證每個(gè)異步任務(wù)都能穩(wěn)定執(zhí)行了,這樣的結(jié)果就是,如果程序處理慢或者異步任務(wù)數(shù)量比較多的話(超過1024),異步任務(wù)就無法完成。當(dāng)然了,我們也可以修改代碼改成等待ch的里面數(shù)據(jù)被goroutine處理的小于1024了,也會(huì)執(zhí)行,這樣就變成一個(gè)不可控的程序了,如果有3000個(gè)異步任務(wù)沒人知道執(zhí)行完成需要多長(zhǎng)時(shí)間,然后我們程序如果重啟的話,是等待它完成重啟還是強(qiáng)制重啟,等待完成不知道需要等待多長(zhǎng)時(shí)間,強(qiáng)制重啟就無法保證任務(wù)能夠全部完成。

最終方案

為了一定能夠在任何異常情況算出分?jǐn)?shù)和生成報(bào)告,最后使用消息隊(duì)列做了這件事,發(fā)送完成答卷的消息,接收到完成答卷的消息之后算出分?jǐn)?shù)生成報(bào)告。做完之后雖然保證了可靠性,但是覺得自己發(fā)消息自己收消息確實(shí)也很別扭。
不知道其他童鞋有沒有更好的更合理的方案。


網(wǎng)頁標(biāo)題:golang開發(fā):go并發(fā)的建議(完)
當(dāng)前路徑:http://weahome.cn/article/dsoiccc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部