真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Go并發(fā)非阻塞緩存

競(jìng)態(tài)檢測(cè)器

即使再仔細(xì)的檢查,仍然可能在并發(fā)上犯錯(cuò)。Go 的 runtime 提供了動(dòng)態(tài)分析工具:競(jìng)態(tài)檢測(cè)器(race detectotr)。
在下一節(jié)的示例中會(huì)用到競(jìng)態(tài)檢測(cè)器,所以在用之前,先了解一下這個(gè)工具。

我們提供的服務(wù)有:成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)、微信公眾號(hào)開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、鳳城ssl等。為成百上千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的鳳城網(wǎng)站制作公司

開啟競(jìng)態(tài)檢測(cè)器

簡(jiǎn)單地把 -race 命令行參數(shù)加到 go build、go run、go test 命令里即可使用該功能。它會(huì)讓編譯器為你的應(yīng)用或測(cè)試構(gòu)建一個(gè)修改后的版本,這個(gè)版本有額外的手法可以高效記錄在執(zhí)行時(shí)對(duì)共享變量的所有訪問,以及讀寫這些變量的 goroutine 標(biāo)識(shí)。除此之外,還會(huì)記錄所有的同步事件、包括 go 語句、通道操作、鎖的調(diào)用等。(完整的同步事件集合可以在語言規(guī)范中的 “The Go Memory Model” 文檔中找到。)

如何檢查到競(jìng)態(tài)

競(jìng)態(tài)檢測(cè)器會(huì)研究事件流,找到那些有問題的案例,即一個(gè) goroutine 寫入一個(gè)變量后,中間沒有任何同步的操作,就有另外一個(gè) goroutine 讀寫了該變量。這種情況表明有對(duì)共享變量的并發(fā)訪問,即數(shù)據(jù)競(jìng)態(tài)。工具會(huì)輸出一份報(bào)告,包括變量的標(biāo)識(shí)以及讀寫 goroutine 當(dāng)時(shí)的調(diào)用棧。通常情況下這些信息足以定位問題了,下一章的示例會(huì)應(yīng)用到實(shí)戰(zhàn)中。

哪些競(jìng)態(tài)可能查不到

競(jìng)態(tài)檢測(cè)器報(bào)告所有實(shí)際運(yùn)行了的數(shù)據(jù)競(jìng)態(tài)。但只能檢測(cè)到那些在運(yùn)行時(shí)發(fā)生的競(jìng)態(tài),無法用來保證肯定不發(fā)生競(jìng)態(tài)。所以為了保證效果,需要全部測(cè)試包含了并發(fā)調(diào)用的場(chǎng)景。

可以在生產(chǎn)環(huán)境開啟競(jìng)態(tài)檢測(cè)器

由于存在額外的記錄工作,帶競(jìng)態(tài)檢測(cè)功能的程序在執(zhí)行時(shí)需要更長(zhǎng)的時(shí)間和更多的內(nèi)存,但即使對(duì)于生成環(huán)境的任務(wù),這種額外開支也是可以接受的。對(duì)于那些偶發(fā)的競(jìng)態(tài)條件,使用競(jìng)態(tài)檢測(cè)器可以節(jié)省很多調(diào)試的時(shí)間。

示例:并發(fā)非阻塞緩存

創(chuàng)建一個(gè)并發(fā)非阻塞的緩存系統(tǒng),它能解決函數(shù)記憶(memoizing)的問題,即緩存函數(shù)的結(jié)果,達(dá)到多次調(diào)用但只須計(jì)算一次結(jié)果。這個(gè)問題在并發(fā)實(shí)戰(zhàn)中很常見但已有的庫(kù)不能很好地解決這個(gè)問題。這里的解決方案將會(huì)是并發(fā)安全的,并且要避免簡(jiǎn)單地對(duì)整個(gè)緩存使用單個(gè)鎖而帶來的鎖爭(zhēng)奪問題。

被緩存結(jié)果的函數(shù)

在做系統(tǒng)之前,先準(zhǔn)備一個(gè)將要被測(cè)試的函數(shù)。這里將使用下面的 httpGetBody 函數(shù)作為示例來演示函數(shù)記憶。調(diào)用 HTTP 請(qǐng)求相當(dāng)昂貴,所以我希望只在第一次請(qǐng)求的時(shí)候去發(fā)起請(qǐng)求,而之后都可以在緩存中找到結(jié)果直接返回:

func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

先保證能緩存這個(gè)函數(shù)的執(zhí)行結(jié)果,之后再使用更多個(gè)函數(shù)來測(cè)試和驗(yàn)證功能。

串行的版本

這是一個(gè)并發(fā)不安全的版本,不過把基本功能先實(shí)現(xiàn),并發(fā)安全的問題之后再進(jìn)行優(yōu)化:

// memo包提供了一個(gè)對(duì)類型 Func 并發(fā)不安全的函數(shù)記憶功能
package memo

// Memo 緩存了調(diào)用 Func 的結(jié)果
type Memo struct {
    f     Func
    cache map[string]result
}

// Func 是用于記憶的函數(shù)類型
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]result)}
}

// 注意:并發(fā)不安全
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}

Memo 實(shí)例包含了被記憶的函數(shù) f (類型為Func),以及緩存,類型為一個(gè) key 為字符串,value 為 result 的 map。每個(gè) result 都是調(diào)用 f 產(chǎn)生的結(jié)果:一個(gè)值和一個(gè)錯(cuò)誤,在設(shè)計(jì)的推進(jìn)過程中會(huì)展示 Memo 的幾種變體,但所有變體都會(huì)遵守這些基本概念。

串行測(cè)試
下面的例子展示了如何使用 Memo。下面是完整的測(cè)試源碼文件,包括上一小節(jié)寫的被測(cè)試的函數(shù),以及一串 URL。每個(gè) URL 會(huì)發(fā)起兩次請(qǐng)求。對(duì)于每個(gè) URL,首先調(diào)用 Get,打印延時(shí)和返回的數(shù)據(jù)長(zhǎng)度:

package memo

import (
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "testing"
    "time"
)

func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

var urls = []string{
    "http://docscn.studygolang.com/",
    "https://studygolang.com/",
    "https://studygolang.com/pkgdoc",
    "https://github.com/adonovan/gopl.io/tree/master/ch9",
}

func TestSequential(t *testing.T) { // 串行
    m := New(httpGetBody)
    urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
    for _, url := range urls {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
        }
        t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
    }
}

func TestConcurrent(t *testing.T) { // 并行
    m := New(httpGetBody)
    var n sync.WaitGroup
    urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
    n.Add(len(urls))
    for _, url := range urls {
        go func(url string) {
            defer n.Done()
            start := time.Now()
            value, err := m.Get(url)
            if err != nil {
                log.Print(err)
            }
            t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
        }(url)
    }
    n.Wait()
}

這里使用 testing 包系統(tǒng)的測(cè)試效果。上面有兩個(gè)測(cè)試函數(shù),先只用 TestSequential 進(jìn)行測(cè)試,串行的發(fā)起請(qǐng)求。從下面的測(cè)試結(jié)果看,每一個(gè) URL 第一次調(diào)用都會(huì)消耗一定的時(shí)間,但對(duì) URL 第二次的請(qǐng)求會(huì)立刻返回結(jié)果:

PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestSequential -v
=== RUN   TestSequential
http://docscn.studygolang.com/, 87.1978ms, 6612 bytes
https://studygolang.com/, 203.3312ms, 81819 bytes
https://studygolang.com/pkgdoc, 33.0053ms, 1261 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 1.4428937s, 61185 bytes
http://docscn.studygolang.com/, 0s, 6612 bytes
https://studygolang.com/, 0s, 81819 bytes
https://studygolang.com/pkgdoc, 0s, 1261 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 0s, 61185 bytes
--- PASS: TestSequential (1.81s)
PASS
ok      gopl/output/memo/memo1  2.063s
PS H:\Go\src\gopl\output\memo\memo1>

默認(rèn)在測(cè)試成功的時(shí)候不打印這類日志,不過可以加上 -v 參數(shù)在成功時(shí)也打印測(cè)試日志。

并行測(cè)試
這次測(cè)試中所有的 Get 都是串行的。因?yàn)?HTTP 請(qǐng)求通過并發(fā)來改善的空間很大,所以這次使用 TestConcurrent 進(jìn)行測(cè)試,讓所有的請(qǐng)求并發(fā)進(jìn)行。這個(gè)測(cè)試要使用 sync.WaitGroup 等待所有的請(qǐng)求完成后再返回結(jié)果。
這次的測(cè)試結(jié)果基本上都是緩存無效的情況,不過偶爾還會(huì)出現(xiàn)無法正常運(yùn)行的情況。除了緩存無效,可能還會(huì)有緩存命中后返回錯(cuò)誤結(jié)果,甚至崩潰:

PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestConcurrent -v
=== RUN   TestConcurrent
http://docscn.studygolang.com/, 92.9972ms, 6612 bytes
http://docscn.studygolang.com/, 98.9889ms, 6612 bytes
https://studygolang.com/pkgdoc, 204.8383ms, 1261 bytes
https://studygolang.com/pkgdoc, 205.8387ms, 1261 bytes
https://studygolang.com/, 234.1566ms, 81819 bytes
https://studygolang.com/, 235.1749ms, 81819 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 1.5041445s, 61184 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 2.1051443s, 61184 bytes
--- PASS: TestConcurrent (2.11s)
PASS
ok      gopl/output/memo/memo1  2.346s
PS H:\Go\src\gopl\output\memo\memo1>

加上競(jìng)態(tài)檢測(cè)器進(jìn)行并行測(cè)試
更糟糕的是,多數(shù)時(shí)候這樣都能正常運(yùn)行,所以甚至很難注意到這樣并發(fā)調(diào)用是有問題的。但是如果加上 -race 標(biāo)志后再運(yùn)行,那么競(jìng)態(tài)檢測(cè)器就會(huì)輸出如下的報(bào)告:

PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestConcurrent -v -race
=== RUN   TestConcurrent
==================
WARNING: DATA RACE
Write at 0x00c000062cf0 by goroutine 11:
  runtime.mapassign_faststr()
      D:/Go/src/runtime/map_faststr.go:190 +0x0
  gopl/output/memo/memo1.(*Memo).Get()
      H:/Go/src/gopl/output/memo/memo1/memo.go:27 +0x1d8
  gopl/output/memo/memo1.TestConcurrent.func1()
      H:/Go/src/gopl/output/memo/memo1/memo_test.go:57 +0xc0

Previous write at 0x00c000062cf0 by goroutine 7:
  runtime.mapassign_faststr()
      D:/Go/src/runtime/map_faststr.go:190 +0x0
  gopl/output/memo/memo1.(*Memo).Get()
      H:/Go/src/gopl/output/memo/memo1/memo.go:27 +0x1d8
  gopl/output/memo/memo1.TestConcurrent.func1()
      H:/Go/src/gopl/output/memo/memo1/memo_test.go:57 +0xc0
...
FAIL    gopl/output/memo/memo1  2.883s

這里就是因?yàn)閮蓚€(gè) goroutine 在沒使用同步的情況下更新了 Memo.cache 這個(gè) map。因?yàn)檎麄€(gè) Get 并不是并發(fā)安全的,它存在數(shù)據(jù)競(jìng)態(tài):

// 注意:并發(fā)不安全
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}

所以,接下來就是要改進(jìn),實(shí)現(xiàn)并發(fā)安全。

使用互斥鎖

讓緩存并發(fā)安全最簡(jiǎn)單的方法就是用一個(gè)基于監(jiān)控的同步機(jī)制。需要給 Memo 加一個(gè)互斥量,并在 Get 開始就獲取互斥鎖,在返回前釋放互斥鎖,這樣就可以讓 cache 相關(guān)的操作發(fā)生在臨界區(qū)域內(nèi)了:

// Memo 緩存了調(diào)用 Func 的結(jié)果
type Memo struct {
    f     Func
    mu    sync.Mutex // 保護(hù) cache
    cache map[string]result
}

// Get 是并發(fā)安全的
func (memo *Memo) Get(key string) (interface{}, error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}

加上鎖之后,再運(yùn)行并發(fā)測(cè)試函數(shù),競(jìng)態(tài)檢測(cè)器不報(bào)警了。但是這次的修改后,之前對(duì)性能的優(yōu)化就失效了。由于每次調(diào)用 Memo.f 時(shí)都上鎖,所以現(xiàn)在的 Get 方法運(yùn)行的使用實(shí)際又是串行的了。這里需要一個(gè)非阻塞的緩存,一個(gè)不會(huì)把他需要記憶的函數(shù)串行運(yùn)行的緩存。
調(diào)用 Get 是不需要鎖保護(hù)的。調(diào)用 Get 的判斷依據(jù)是之前的獲取 map 的 key,這個(gè)操作需要加鎖。調(diào)用 Get 返回后,需要把返回結(jié)果更新到 map 中去,這個(gè)操作也需要加鎖。在 map 查詢結(jié)束后,先釋放鎖。不加鎖的情況下調(diào)用 Get。等到結(jié)果返回需要更新 map 的時(shí)候,再加鎖更新 map。具體修改如下:

func (memo *Memo) Get(key string) (interface{}, error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}

現(xiàn)在,可以安全的并行運(yùn)行了,但是緩存又失效了。某些URL被獲取了兩次。修改一下測(cè)試源碼文件的被測(cè)試函數(shù) httpGetBody,在開頭輸出一行日志,可以觀察到每個(gè)URL被調(diào)用的次數(shù):

func httpGetBody(url string) (interface{}, error) {
    log.Printf("httpGetBody: %s", url) // 輸出哪些 url 被函數(shù)調(diào)用了,從緩存獲取結(jié)果時(shí)不會(huì)有這個(gè)輸出
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

修改之后,可以用最初的串行版本再測(cè)試一下。那個(gè)版本是一定用到緩存的效果的。而現(xiàn)在的版本,在并發(fā)的情況下無法用上緩存。
在幾個(gè) goroutine 幾乎同時(shí)調(diào)用的 Get 來獲取同一個(gè) URL 時(shí),每個(gè) goroutine 都首先查詢緩存,發(fā)現(xiàn)緩存中沒有需要的數(shù)據(jù),然后就都去執(zhí)行 Get 來獲取結(jié)果,最后又都用獲得的結(jié)果來更新 map,其中一個(gè)結(jié)果會(huì)被另外一個(gè)覆蓋。
在理想的情況下,應(yīng)該要避免這種額外的處理。這個(gè)功能有時(shí)稱為重復(fù)抑制(duplicate suppression)。

重復(fù)抑制

下面這個(gè)版本,map 的每個(gè)元素是一個(gè)指向 entry 結(jié)構(gòu)的指針。除了與之前一樣包含一個(gè)已經(jīng)記住的函數(shù) f 調(diào)用結(jié)果之外,每個(gè) entry 還新加一個(gè)通道 ready。在設(shè)置了 entry 和 result 字段后,通道會(huì)關(guān)閉,正在等待的 goroutine 會(huì)收到廣播,然后就可以從 entry 字段讀取結(jié)果了:

// memo包提供了一個(gè)對(duì)類型 Func 并發(fā)安全的函數(shù)記憶功能
// 并發(fā)、重復(fù)抑制、非阻塞的緩存
package memo

import "sync"

// Func 是用于記憶的函數(shù)類型
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // res 準(zhǔn)備好之后會(huì)被關(guān)閉
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
    f     Func
    mu    sync.Mutex // 保護(hù) cache
    cache map[string]*entry
}

// Get 是并發(fā)安全的
func (memo *Memo) Get(key string) (interface{}, error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // 對(duì) key 的第一次訪問,這個(gè) goroutine 負(fù)責(zé)獲取數(shù)據(jù)和廣播數(shù)據(jù)準(zhǔn)備好了的消息
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()

        e.res.value, e.res.err = memo.f(key)
        close(e.ready) // 廣播數(shù)據(jù)已經(jīng)準(zhǔn)備好的消息
    } else {
        // 對(duì)這個(gè) key 的重復(fù)訪問
        memo.mu.Unlock()
        <-e.ready // 等待數(shù)據(jù)準(zhǔn)備完畢
    }
    return e.res.value, e.res.err
}

關(guān)于這里的 map 是否包含某個(gè)元素的判斷,之前都是返回兩個(gè)值,通過ok來判斷。之前的示例中,map的元素是結(jié)構(gòu)體,由于結(jié)構(gòu)體類型的零值不是nil,通過ok來判斷比較好。這里的元素類型是結(jié)構(gòu)體指針,當(dāng)然可以繼續(xù)使用ok來判斷。不過現(xiàn)在是指針類型了,零值是nil也不會(huì)和非零值的情況搞混,所以也可以直接通過nil來判斷。
現(xiàn)在調(diào)用 Get 會(huì)獲取鎖,然后去 map 中查詢,如果沒有找到,就直接分配并插入一個(gè)新的值,然后釋放鎖。之后其他 goroutine 來查詢的時(shí)候,會(huì)發(fā)現(xiàn)值存在,那么就直接獲取到 map 的值,然后釋放鎖。
map 里的值并不是 Get 返回的數(shù)據(jù),而是數(shù)據(jù)是否準(zhǔn)備好的通道,和存放數(shù)據(jù)的字段。此時(shí)數(shù)據(jù)可能還沒準(zhǔn)備好,數(shù)據(jù)是否準(zhǔn)備好,可以從 ready 通道進(jìn)行判斷。對(duì) ready 通道的讀取操作,會(huì)在數(shù)據(jù)沒有準(zhǔn)備好的時(shí)候一直阻塞。一旦數(shù)據(jù)準(zhǔn)備好了,就會(huì)關(guān)閉 ready 通道,所有從 ready 通道的讀取操作就會(huì)立刻返回。這是利用通道進(jìn)行廣播的方式。所以查詢 map 后獲取值的步驟就是先讀取 ready 通道等待,一旦通道的讀取返回,就表示數(shù)據(jù)已經(jīng)準(zhǔn)備好了,此時(shí)就可以去讀取字段 res 里的內(nèi)容并返回。
注意,entry 中的變量 e.res.value 和 e.res.err 被多個(gè) goroutine 共享。創(chuàng)建 entry 的 goroutine 會(huì)對(duì)這兩個(gè)變量的值進(jìn)行設(shè)置,其他 goroutine 在收到數(shù)據(jù)準(zhǔn)備完畢的廣播后才會(huì)開始讀取這兩個(gè)變量。盡管被多個(gè) goroutine 訪問,但是此處不需要加鎖。ready 通道的關(guān)閉先于其他 goroutine 收到廣播事件,所以第一個(gè) goroutine 對(duì)變量的寫入也先于后續(xù)多個(gè) goroutine 的讀取事件。這種情況下數(shù)據(jù)競(jìng)態(tài)不存在。
到此,并發(fā)、重復(fù)抑制、非阻塞緩存就完成了。

另一種實(shí)現(xiàn)-使用監(jiān)控goroutine

上面的示例是使用一個(gè)互斥量來保護(hù) map 變量的并發(fā)安全。下面是另一種設(shè)計(jì),讓 map 變量限制在一個(gè)監(jiān)控goroutine 中。
首先是類型聲明,New 函數(shù)在創(chuàng)建實(shí)例并返回的同時(shí),還會(huì)啟動(dòng)一個(gè) server 方法。該方法會(huì)集中處理所有的 Get 調(diào)用。我們?cè)讷@取實(shí)例后,依然是調(diào)用 Get 來獲取結(jié)果:

// memo包提供了一個(gè)對(duì)類型 Func 并發(fā)安全的函數(shù)記憶功能
// 并發(fā)、重復(fù)抑制、非阻塞的緩存
// 通過監(jiān)控 goroutine 來實(shí)現(xiàn)并發(fā)安全
package memo

// Func 是用于記憶的函數(shù)類型
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // res 準(zhǔn)備好之后會(huì)被關(guān)閉
}

// Func、result、entry 的聲明和之前一致

// request 是一條請(qǐng)求消息
type request struct {
    key      string        // 需要 Func 運(yùn)行的參數(shù)
    response chan<- result // 每個(gè)客戶端接收結(jié)果的通道
}

type Memo struct{ requests chan request }

func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)} // 創(chuàng)建實(shí)例
    go memo.server(f)                           // 啟動(dòng)服務(wù)端 goroutine
    return memo                                 // 返回實(shí)例,供客戶端調(diào)用
}

可以先往后看客戶端和服務(wù)端的處理邏輯,在回過來看這里聲明的數(shù)據(jù)類型已經(jīng)通道的作用。

客戶端
現(xiàn)在 Get 就需要要給監(jiān)控 goroutine 的通道發(fā)送請(qǐng)求和一個(gè)接收返回結(jié)果的通道。服務(wù)端會(huì)在收到處理請(qǐng)求后進(jìn)行處理,之后再通過客戶端發(fā)來的通道返回結(jié)果。而客戶端發(fā)送請(qǐng)求之后,只需要從自己創(chuàng)建的這個(gè)通道中接收,直到接收到數(shù)據(jù)后,再返回即可:

func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests <- request{key, response}
    res := <- response
    return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }

客戶端使用完之后,可以調(diào)用 Close 方法關(guān)閉發(fā)送請(qǐng)求的通道。

服務(wù)端
上面的 Get 相當(dāng)于一個(gè)客戶端,還需要一個(gè)服務(wù)端來處理 Get 發(fā)來的請(qǐng)求:

func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests { // 一次處理收到的請(qǐng)求
        e := cache[req.key]
        if e == nil {
            // 對(duì)這個(gè) key 的第一次請(qǐng)求
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key) // 調(diào)用 f(key)
        }
        // 無論是否第一次請(qǐng)求,最后要回復(fù)結(jié)果,都有等待 ready 通道返回后,再去讀取結(jié)果
        go e.deliver(req.response)
    }
}

func (e *entry) call(f Func, key string) {
    // 執(zhí)行函數(shù)
    e.res.value, e.res.err = f(key)
    // 發(fā)送廣播通知,數(shù)據(jù)已經(jīng)準(zhǔn)備好了
    close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
    // 等待數(shù)據(jù)準(zhǔn)備完畢
    <-e.ready
    // 向客戶端發(fā)送結(jié)果
    response <- e.res
}

變量 cache 被限制在監(jiān)控 goroutine 中,就是上面的 server 方法。監(jiān)控 goroutine 從 requests 的通道中讀取請(qǐng)求,直到這個(gè)通道被關(guān)閉。對(duì)于每個(gè)請(qǐng)求,先查詢緩存,如果沒有找到就插入一個(gè)新的 entry。
這里 call 和 deliver 方法需要在獨(dú)立的 goroutine 中運(yùn)行,以確保監(jiān)控 goroutine 內(nèi)持續(xù)處理新請(qǐng)求。

完整示例代碼
下面貼上這個(gè)實(shí)現(xiàn)方式的完整代碼:

// memo包提供了一個(gè)對(duì)類型 Func 并發(fā)安全的函數(shù)記憶功能
// 并發(fā)、重復(fù)抑制、非阻塞的緩存
// 通過監(jiān)控 goroutine 來實(shí)現(xiàn)并發(fā)安全
package memo

// Func 是用于記憶的函數(shù)類型
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // res 準(zhǔn)備好之后會(huì)被關(guān)閉
}

// Func、result、entry 的聲明和之前一致

// request 是一條請(qǐng)求消息
type request struct {
    key      string        // 需要 Func 運(yùn)行的參數(shù)
    response chan<- result // 每個(gè)客戶端接收結(jié)果的通道
}

type Memo struct{ requests chan request }

func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)} // 創(chuàng)建實(shí)例
    go memo.server(f)                           // 啟動(dòng)服務(wù)端 goroutine
    return memo                                 // 返回實(shí)例,供客戶端調(diào)用
}

func (memo *Memo) Close() { close(memo.requests) }

func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests <- request{key, response}
    res := <-response
    return res.value, res.err
}

func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests { // 一次處理收到的請(qǐng)求
        e := cache[req.key]
        if e == nil {
            // 對(duì)這個(gè) key 的第一次請(qǐng)求
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key) // 調(diào)用 f(key)
        }
        // 無論是否第一次請(qǐng)求,最后要回復(fù)結(jié)果,都有等待 ready 通道返回后,再去讀取結(jié)果
        go e.deliver(req.response)
    }
}

func (e *entry) call(f Func, key string) {
    // 執(zhí)行函數(shù)
    e.res.value, e.res.err = f(key)
    // 發(fā)送廣播通知,數(shù)據(jù)已經(jīng)準(zhǔn)備好了
    close(e.ready)
}

func (e *entry) deliver(response chan<- result) {
    // 等待數(shù)據(jù)準(zhǔn)備完畢
    <-e.ready
    // 向客戶端發(fā)送結(jié)果
    response <- e.res
}

針對(duì)上面的 memo 包的測(cè)試代碼:

package memo

import (
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "testing"
    "time"
)

func httpGetBody(url string) (interface{}, error) {
    log.Printf("httpGetBody: %s", url)
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

var urls = []string{ // 換一批慢一些的網(wǎng)站,加載時(shí)間1s左右的國(guó)外資源
    "https://github.com/adonovan/gopl.io/tree/master/ch9",
    "https://www.djangoproject.com/",
    "https://getbootstrap.com/",
    "https://www.python.org/",
}

func TestSequential(t *testing.T) { // 串行
    m := New(httpGetBody)
    defer m.Close()
    urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
    for _, url := range urls {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
        }
        t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
    }
}

func TestConcurrent(t *testing.T) { // 并行
    m := New(httpGetBody)
    defer m.Close()
    var n sync.WaitGroup
    urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
    n.Add(len(urls))
    for _, url := range urls {
        go func(url string) {
            defer n.Done()
            start := time.Now()
            value, err := m.Get(url)
            if err != nil {
                log.Print(err)
            }
            t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
        }(url)
    }
    n.Wait()
}

小結(jié)

這里的例子展示了可以使用兩種方案來構(gòu)建并發(fā)結(jié)構(gòu):

  • 共享變量并上鎖
  • 通信順序進(jìn)程(communicating sequential process)

第一種是大家普遍認(rèn)知的,也是Java或者C++等語言中的多線程開發(fā)。
第二種是 Go 語言特有的,也是 Go 語言推薦的。下面是一句推薦的原話:

Do not communicate by sharing memory; instead, share memory by communicating.
Go 箴言:“不要通過共享內(nèi)存來通信,而應(yīng)該通過通信來共享內(nèi)存”。

在給定的情況下也許很難判定哪種方案更好,不過了解他們還是有價(jià)值的。有時(shí)候從一種方案切換到另外一種方案能讓代碼更簡(jiǎn)單。

CSP并發(fā)模型
CSP 是 Communicating Sequential Process 的簡(jiǎn)稱,中文可以叫做通信順序進(jìn)程,是一種并發(fā)編程模型。
CSP 模型由并發(fā)執(zhí)行的實(shí)體(線程或者進(jìn)程)所組成,實(shí)體之間通過發(fā)送消息進(jìn)行通信,這里發(fā)送消息時(shí)使用的就是通道(channel)。CSP 模型的關(guān)鍵是關(guān)注 channel,而不關(guān)注發(fā)送消息的實(shí)體。Go 語言就是借用 CSP 模型的一些概念為之實(shí)現(xiàn)并發(fā)進(jìn)行理論支持。Go 語言并沒有完全實(shí)現(xiàn) CSP 模型的所有理論,僅僅是借用了 process 和 channel 這兩個(gè)概念。process 在 Go 語言上的表現(xiàn)就是 goroutine 是實(shí)際并發(fā)執(zhí)行的實(shí)體,每個(gè)實(shí)體之間通過 channel 通訊來實(shí)現(xiàn)數(shù)據(jù)共享。


分享標(biāo)題:Go并發(fā)非阻塞緩存
文章分享:http://weahome.cn/article/pjjcji.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部