小編給大家分享一下golang中如何實(shí)現(xiàn)并發(fā)安全Map以及分段鎖,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)公司:成立與2013年為各行業(yè)開(kāi)拓出企業(yè)自己的“網(wǎng)站建設(shè)”服務(wù),為上千余家公司企業(yè)提供了專業(yè)的成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)和網(wǎng)站推廣服務(wù), 按需制作網(wǎng)站由設(shè)計(jì)師親自精心設(shè)計(jì),設(shè)計(jì)的效果完全按照客戶的要求,并適當(dāng)?shù)奶岢龊侠淼慕ㄗh,擁有的視覺(jué)效果,策劃師分析客戶的同行競(jìng)爭(zhēng)對(duì)手,根據(jù)客戶的實(shí)際情況給出合理的網(wǎng)站構(gòu)架,制作客戶同行業(yè)具有領(lǐng)先地位的。分?jǐn)噫i
type SimpleCache struct { mu sync.RWMutex items map[interface{}]*simpleItem }
在日常開(kāi)發(fā)中, 上述這種數(shù)據(jù)結(jié)構(gòu)肯定不少見(jiàn),因?yàn)間olang的原生map是非并發(fā)安全的,所以為了保證map的并發(fā)安全,最簡(jiǎn)單的方式就是給map加鎖。
之前使用過(guò)兩個(gè)本地內(nèi)存緩存的開(kāi)源庫(kù), gcache, cache2go,其中存儲(chǔ)緩存對(duì)象的結(jié)構(gòu)都是這樣,對(duì)于輕量級(jí)的緩存庫(kù),為了設(shè)計(jì)簡(jiǎn)潔(包含清理過(guò)期對(duì)象等 ) 再加上當(dāng)需要緩存大量數(shù)據(jù)時(shí)有redis,memcache等明星項(xiàng)目解決。 但是如果拋開(kāi)這些因素遇到真正數(shù)量巨大的數(shù)據(jù)量時(shí),直接對(duì)一個(gè)map加鎖,當(dāng)map中的值越來(lái)越多,訪問(wèn)map的請(qǐng)求越來(lái)越多,大家都競(jìng)爭(zhēng)這一把鎖顯得并發(fā)訪問(wèn)控制變重。 在go1.9引入sync.Map 之前,比較流行的做法就是使用分段鎖,顧名思義就是將鎖分段,將鎖的粒度變小,將存儲(chǔ)的對(duì)象分散到各個(gè)分片中,每個(gè)分片由一把鎖控制,這樣使得當(dāng)需要對(duì)在A分片上的數(shù)據(jù)進(jìn)行讀寫(xiě)時(shí)不會(huì)影響B(tài)分片的讀寫(xiě)。
分段鎖的實(shí)現(xiàn)
// Map 分片 type ConcurrentMap []*ConcurrentMapShared // 每一個(gè)Map 是一個(gè)加鎖的并發(fā)安全Map type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex // 各個(gè)分片Map各自的鎖 }
主流的分段鎖,即通過(guò)hash取模的方式找到當(dāng)前訪問(wèn)的key處于哪一個(gè)分片之上,再對(duì)該分片進(jìn)行加鎖之后再讀寫(xiě)。分片定位時(shí),常用有BKDR, FNV32等hash算法得到key的hash值。
func New() ConcurrentMap { // SHARD_COUNT 默認(rèn)32個(gè)分片 m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{ items: make(map[string]interface{}), } } return m }
在初始化好分片后, 對(duì)分片上的數(shù)據(jù)進(jìn)行讀寫(xiě)時(shí)就需要用hash取模進(jìn)行分段定位來(lái)確認(rèn)即將要讀寫(xiě)的分片。
獲取段定位
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } // FNV hash func fnv32(key string) uint32 { hash := uint32(2166136261) const prime32 = uint32(16777619) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash }
之后對(duì)于map的GET SET 就簡(jiǎn)單順利成章的完成
Set And Get
func (m ConcurrentMap) Set(key string, value interface{}) { shard := m.GetShard(key) // 段定位找到分片 shard.Lock() // 分片上鎖 shard.items[key] = value // 分片操作 shard.Unlock() // 分片解鎖 } func (m ConcurrentMap) Get(key string) (interface{}, bool) { shard := m.GetShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok }
由此一個(gè)分段鎖Map就實(shí)現(xiàn)了, 但是比起普通的Map, 常用到的方法比如獲取所有key, 獲取所有Val 操作是要比原生Map復(fù)雜的,因?yàn)橐闅v每一個(gè)分片的每一個(gè)數(shù)據(jù), 好在golang的并發(fā)特性使得解決這類(lèi)問(wèn)題變得非常簡(jiǎn)單
Keys
// 統(tǒng)計(jì)當(dāng)前分段map中item的個(gè)數(shù) func (m ConcurrentMap) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { shard := m[i] shard.RLock() count += len(shard.items) shard.RUnlock() } return count } // 獲取所有的key func (m ConcurrentMap) Keys() []string { count := m.Count() ch := make(chan string, count) // 每一個(gè)分片啟動(dòng)一個(gè)協(xié)程 遍歷key go func() { wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) for _, shard := range m { go func(shard *ConcurrentMapShared) { defer wg.Done() shard.RLock() // 每個(gè)分片中的key遍歷后都寫(xiě)入統(tǒng)計(jì)用的channel for key := range shard.items { ch <- key } shard.RUnlock() }(shard) } wg.Wait() close(ch) }() keys := make([]string, count) // 統(tǒng)計(jì)各個(gè)協(xié)程并發(fā)讀取Map分片的key for k := range ch { keys = append(keys, k) } return keys }
這里寫(xiě)了一個(gè)benchMark來(lái)對(duì)該分段鎖Map和原生的Map加鎖方式進(jìn)行壓測(cè), 場(chǎng)景為將一萬(wàn)個(gè)不重復(fù)的鍵值對(duì)同時(shí)以100萬(wàn)次寫(xiě)和100萬(wàn)次讀,分別進(jìn)行5次壓測(cè), 如下壓測(cè)代碼
func BenchmarkMapShared(b *testing.B) { num := 10000 testCase := genNoRepetTestCase(num) // 10000個(gè)不重復(fù)的鍵值對(duì) m := New() for _, v := range testCase { m.Set(v.Key, v.Val) } b.ResetTimer() for i := 0; i < 5; i++ { b.Run(strconv.Itoa(i), func(b *testing.B) { b.N = 1000000 wg := sync.WaitGroup{} wg.Add(b.N * 2) for i := 0; i < b.N; i++ { e := testCase[rand.Intn(num)] go func(key string, val interface{}) { m.Set(key, val) wg.Done() }(e.Key, e.Val) go func(key string) { _, _ = m.Get(key) wg.Done() }(e.Key) } wg.Wait() }) } }
原生Map加鎖壓測(cè)結(jié)果
分段鎖壓測(cè)結(jié)果
可以看出在將鎖的粒度細(xì)化后再面對(duì)大量需要控制并發(fā)安全的訪問(wèn)時(shí),分段鎖Map的耗時(shí)比原生Map加鎖要快3倍有余
Sync.Map
go1.9之后加入了支持并發(fā)安全的Map sync.Map, sync.Map 通過(guò)一份只使用原子操作的數(shù)據(jù)和一份冗余了只讀數(shù)據(jù)的加鎖數(shù)據(jù)實(shí)現(xiàn)一定程度上的讀寫(xiě)分離,使得大多數(shù)讀操作和更新操作是原子操作,寫(xiě)入新數(shù)據(jù)才加鎖的方式來(lái)提升性能。以下是 sync.Map源碼剖析, 結(jié)構(gòu)體中的注釋都會(huì)在具體實(shí)現(xiàn)代碼中提示相呼應(yīng)
type Map struct { // 保護(hù)dirty的鎖 mu Mutex // 只讀數(shù)據(jù)(修改采用原子操作) read atomic.Value // 包含只讀中所有數(shù)據(jù)(冗余),寫(xiě)入新數(shù)據(jù)時(shí)也在dirty中操作 dirty map[interface{}]*entry // 當(dāng)原子操作訪問(wèn)只讀read時(shí)找不到數(shù)據(jù)時(shí)會(huì)去dirty中尋找,此時(shí)misses+1,dirty及作為存儲(chǔ)新寫(xiě)入的數(shù)據(jù),又冗余了只讀結(jié)構(gòu)中的數(shù)據(jù),所以當(dāng)misses > dirty 的長(zhǎng)度時(shí), 會(huì)將dirty升級(jí)為read,同時(shí)將老的dirty置nil misses int } // Map struct 中的 read 就是readOnly 的指針 type readOnly struct { // 基礎(chǔ)Map m map[interface{}]*entry // 用于表示當(dāng)前dirty中是否有read中不存在的數(shù)據(jù), 在寫(xiě)入數(shù)據(jù)時(shí), 如果發(fā)現(xiàn)dirty中沒(méi)有新數(shù)據(jù)且dirty為nil時(shí),會(huì)將read中未被刪除的數(shù)據(jù)拷貝一份冗余到dirty中, 過(guò)程與Map struct中的 misses相呼應(yīng) amended bool } // 數(shù)據(jù)項(xiàng) type entry struct { p unsafe.Pointer } // 用于標(biāo)記數(shù)據(jù)項(xiàng)已被刪除(主要保證數(shù)據(jù)冗余時(shí)的并發(fā)安全) // 上述Map結(jié)構(gòu)中說(shuō)到有一個(gè)將read數(shù)據(jù)拷貝冗余至dirty的過(guò)程, 因?yàn)閯h除數(shù)據(jù)項(xiàng)是將*entry置nil, 為了避免冗余過(guò)程中因并發(fā)問(wèn)題導(dǎo)致*entry改變而影響到拷貝后的dirty正確性,所以sync.Map使用expunged來(lái)標(biāo)記entry是否被刪除 var expunged = unsafe.Pointer(new(interface{}))
在下面sync.Map具體實(shí)現(xiàn)中將會(huì)看到很多“雙檢查”代碼,因?yàn)橥ㄟ^(guò)原子操作獲取的值可能在進(jìn)行其他非原子操作過(guò)程中已改變,所以再非原子操作后需要使用之前原子操作獲取的值需要再次進(jìn)行原子操作獲取。
compareAndSwap 交換并比較, 用于在多線程編程中實(shí)現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時(shí)改寫(xiě)某一數(shù)據(jù)時(shí)導(dǎo)致數(shù)據(jù)不一致問(wèn)題。
sync.Map Write
func (m *Map) Store(key, value interface{}) { // 先不上鎖,而是從只讀數(shù)據(jù)中按key讀取, 如果已存在以compareAndSwap操作進(jìn)行覆蓋(update) read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() // 雙檢查獲取read read, _ = m.read.Load().(readOnly) // 如果data在read中,更新entry if e, ok := read.m[key]; ok { // 如果原子操作讀到的數(shù)據(jù)是被標(biāo)記刪除的, 則視為新數(shù)據(jù)寫(xiě)入dirty if e.unexpungeLocked() { m.dirty[key] = e } // 原子操作寫(xiě)新數(shù)據(jù) e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { // 原子操作寫(xiě)新數(shù)據(jù) e.storeLocked(&value) } else { // 新數(shù)據(jù) // 當(dāng)dirty中沒(méi)有新數(shù)據(jù)時(shí),將read中數(shù)據(jù)冗余到dirty if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() } func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p) if p == expunged { return false } for { if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } } // 在dirty中沒(méi)有比read多出的新數(shù)據(jù)時(shí)觸發(fā)冗余 func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { // 檢查entry是否被刪除, 被刪除的數(shù)據(jù)不冗余 if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { // 將被刪除(置nil)的數(shù)據(jù)以cas原子操作標(biāo)記為expunged(防止因并發(fā)情況下其他操作導(dǎo)致冗余進(jìn)dirty的數(shù)據(jù)不正確) if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
sync.Map Read
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 只讀數(shù)據(jù)中沒(méi)有,并且dirty有比read多的數(shù)據(jù),加鎖在dirty中找 if !ok && read.amended { m.mu.Lock() // 雙檢查, 因?yàn)樯湘i之前的語(yǔ)句是非原子性的 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 只讀中沒(méi)有讀取到的次數(shù)+1 e, ok = m.dirty[key] // 檢查是否達(dá)到觸發(fā)dirty升級(jí)read的條件 m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } // atomic.Load 但被標(biāo)記為刪除的會(huì)返回nil return e.load() } func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
sync.Map DELETE
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 只讀中不存在需要到dirty中去刪除 if !ok && read.amended { m.mu.Lock() // 雙檢查, 因?yàn)樯湘i之前的語(yǔ)句是非原子性的 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() } } func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
同樣以剛剛壓測(cè)原生加鎖Map和分段鎖的方式來(lái)壓測(cè)sync.Map
壓測(cè)平均下來(lái)sync.Map和分段鎖差別不大,但是比起分段鎖, sync.Map則將鎖的粒度更加的細(xì)小到對(duì)數(shù)據(jù)的狀態(tài)上,使得大多數(shù)據(jù)可以無(wú)鎖化操作, 同時(shí)比分段鎖擁有更好的拓展性,因?yàn)榉侄捂i使用前總是要定一個(gè)分片數(shù)量, 在做擴(kuò)容或者縮小時(shí)很麻煩, 但要達(dá)到sync.Map這種性能既好又能動(dòng)態(tài)擴(kuò)容的程度,代碼就相對(duì)復(fù)雜很多。
還有注意在使用sync.Map時(shí)切忌不要將其拷貝, go源碼中有對(duì)sync.Map注釋到” A Map must not be copied after first use.”因?yàn)楫?dāng)sync.Map被拷貝之后, Map類(lèi)型的dirty還是那個(gè)map 但是read 和 鎖卻不是之前的read和鎖(都不在一個(gè)世界你拿什么保護(hù)我), 所以必然導(dǎo)致并發(fā)不安全(為了寫(xiě)博我把sync.Map代碼復(fù)制出來(lái)一份把私有成員改成可外部訪問(wèn)的打印指針)
以上是“golang中如何實(shí)現(xiàn)并發(fā)安全Map以及分段鎖”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)成都網(wǎng)站設(shè)計(jì)公司行業(yè)資訊頻道!
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。