前言
創(chuàng)新互聯(lián)公司成都網(wǎng)站建設(shè)按需定制設(shè)計,是成都網(wǎng)站推廣公司,為發(fā)電機回收提供網(wǎng)站建設(shè)服務(wù),有成熟的網(wǎng)站定制合作流程,提供網(wǎng)站定制設(shè)計服務(wù):原型圖制作、網(wǎng)站創(chuàng)意設(shè)計、前端HTML5制作、后臺程序開發(fā)等。成都網(wǎng)站建設(shè)熱線:13518219792
最近在寫項目,需要用到信號量等待一些資源完成,但是最多等待N毫秒。在看本文的正文之前,我們先來看下C語言里的實現(xiàn)方法。
在C語言里,有如下的API來實現(xiàn)帶超時的信號量等待:
SYNOPSIS #includeint pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
然后在查看golang的document后,發(fā)現(xiàn)golang里并沒有實現(xiàn)帶超時的信號量,官方文檔在這里。
原理
我的業(yè)務(wù)場景是這樣的:我有一個緩存字典,當(dāng)多個用戶請求1個不存在的key時,只有1個請求會穿透到后端,而所有用戶都要排隊等這個請求完成,或者超時返回。
怎么實現(xiàn)呢?其實稍微想一想cond的原理,就能模擬一個帶超時的cond出來。
在golang里,要同時實現(xiàn)”掛起等待”和”超時返回”,一般得用select case語法,一個case等待阻塞的資源,一個case等待一個timer,這一點是非常確定的。
原本阻塞的資源應(yīng)該通過條件變量的機制來實現(xiàn)完成通知,既然這里決定用select case,那么自然想到用channel來代替這個完成通知。
接下來的問題就是,很多請求者并發(fā)來獲取這個資源,但是資源還沒有準(zhǔn)備好,所以大家都要排隊并掛起,等待資源完成,并且當(dāng)資源完成后通知大家。
所以,這里很自然要為這個資源做一個隊列,每個請求者創(chuàng)建一個chan,并將chan放到隊列里,接著select case等待這個chan的通知。而另一端,資源完成后遍歷隊列,通知每個chan即可。
最后一個問題是,只有第一個請求者才能穿透請求到后端,而后續(xù)請求者不應(yīng)該穿透重復(fù)的請求,這可以通過判斷緩存里是否有這個key作為判定首次的條件,而標(biāo)記位init來判斷請求者是否應(yīng)該排隊。
我的場景
上面是思路,下面是我的業(yè)務(wù)場景實現(xiàn)。
func (cache *Cache) Get(key string, keyType int) *string { if keyType == KEY_TYPE_DOMAIN { key = "#" + key } else { key = "=" + key } cache.mutex.Lock() item, existed := cache.dict[key] if !existed { item = &cacheItem{} item.key = &key item.waitQueue = list.New() cache.dict[key] = item } cache.mutex.Unlock() conf := config.GetConfig() lastGet := getCurMs() item.mutex.Lock() item.lastGet = lastGet if item.init { // 已存在并且初始化 defer item.mutex.Unlock() return item.value } // 未初始化,排隊等待結(jié)果 wait := waitItem{} wait.wait_chan = make(chan *string, 1) item.waitQueue.PushBack(&wait) item.mutex.Unlock() // 新增key, 啟動goroutine獲取初始值 if !existed { go cache.initCacheItem(item, keyType) } timer := time.NewTimer(time.Duration(conf.Cache_waitTime) * time.Millisecond) var retval *string = nil // 等待初始化完成 select { case retval = <- wait.wait_chan: case <- timer.C: } return retval }
簡述一下整個過程:
在initCacheItem函數(shù)里,數(shù)據(jù)已獲取成功
// 一旦標(biāo)記為init, 后續(xù)請求將不再操作waitQueue item.mutex.Lock() item.value = newValue item.init = true item.expire = expire item.mutex.Unlock() // 喚醒所有排隊者 waitQueue := item.waitQueue for elem := waitQueue.Front(); elem != nil; elem = waitQueue.Front() { wait := elem.Value.(*waitItem) wait.wait_chan <- newValue waitQueue.Remove(elem) }
最后
這樣就實現(xiàn)了帶超時的條件變量效果,實際上我的場景是一個broadcast的cond例子,大家可以參照思路實現(xiàn)自己想要的效果,活學(xué)活用。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對創(chuàng)新互聯(lián)的支持。