眾所周知,go普通的map是不支持并發(fā)的,換而言之,不是線程(goroutine)安全的。博主是從golang 1.4開始使用的,那時候map的并發(fā)讀是沒有支持,但是并發(fā)寫會出現(xiàn)臟數(shù)據(jù)。golang 1.6之后,并發(fā)地讀寫會直接panic:
成都創(chuàng)新互聯(lián)主要為客戶提供服務(wù)項目涵蓋了網(wǎng)頁視覺設(shè)計、VI標(biāo)志設(shè)計、網(wǎng)絡(luò)營銷推廣、網(wǎng)站程序開發(fā)、HTML5響應(yīng)式網(wǎng)站建設(shè)公司、移動網(wǎng)站建設(shè)、微商城、網(wǎng)站托管及網(wǎng)頁維護、WEB系統(tǒng)開發(fā)、域名注冊、國內(nèi)外服務(wù)器租用、視頻、平面設(shè)計、SEO優(yōu)化排名。設(shè)計、前端、后端三個建站步驟的完善服務(wù)體系。一人跟蹤測試的建站服務(wù)標(biāo)準。已經(jīng)為展覽展示行業(yè)客戶提供了網(wǎng)站維護服務(wù)。
fatal error: concurrent map read and map write
package main
func main() {
m := make(map[int]int)
go func() {
for {
_ = m[1]
}
}()
go func() {
for {
m[2] = 2
}
}()
select {}
}
所以需要支持對map的并發(fā)讀寫時候,博主使用兩種方法:
golang 1.9之后,go 在sync包下引入了并發(fā)安全的map,也為博主提供了第三種方法。本文重點也在此,為了時效性,本文基于golang 1.10源碼進行分析。
type Map struct {
mu Mutex //互斥鎖,用于鎖定dirty map
read atomic.Value //優(yōu)先讀map,支持原子操作,注釋中有readOnly不是說read是只讀,而是它的結(jié)構(gòu)體。read實際上有寫的操作
dirty map[interface{}]*entry // dirty是一個當(dāng)前最新的map,允許讀寫
misses int // 主要記錄read讀取不到數(shù)據(jù)加鎖讀取read map以及dirty map的次數(shù),當(dāng)misses等于dirty的長度時,會將dirty復(fù)制到read
}
readOnly 主要用于存儲,通過原子操作存儲在Map.read中元素。
type readOnly struct {
m map[interface{}]*entry
amended bool // 如果數(shù)據(jù)在dirty中但沒有在read中,該值為true,作為修改標(biāo)識
}
type entry struct {
// nil: 表示為被刪除,調(diào)用Delete()可以將read map中的元素置為nil
// expunged: 也是表示被刪除,但是該鍵只在read而沒有在dirty中,這種情況出現(xiàn)在將read復(fù)制到dirty中,即復(fù)制的過程會先將nil標(biāo)記為expunged,然后不將其復(fù)制到dirty
// 其他: 表示存著真正的數(shù)據(jù)
p unsafe.Pointer // *interface{}
}
如果你接觸過大Java,那你一定對CocurrentHashMap利用鎖分段技術(shù)增加了鎖的數(shù)目,從而使?fàn)帄Z同一把鎖的線程的數(shù)目得到控制的原理記憶深刻。
那么Golang的sync.Map是否也是使用了相同的原理呢?sync.Map的原理很簡單,使用了空間換時間策略,通過冗余的兩個數(shù)據(jù)結(jié)構(gòu)(read、dirty),實現(xiàn)加鎖對性能的影響。
通過引入兩個map將讀寫分離到不同的map,其中read map提供并發(fā)讀和已存元素原子寫,而dirty map則負責(zé)讀寫。 這樣read map就可以在不加鎖的情況下進行并發(fā)讀取,當(dāng)read map中沒有讀取到值時,再加鎖進行后續(xù)讀取,并累加未命中數(shù),當(dāng)未命中數(shù)大于等于dirty map長度,將dirty map上升為read map。從之前的結(jié)構(gòu)體的定義可以發(fā)現(xiàn),雖然引入了兩個map,但是底層數(shù)據(jù)存儲的是指針,指向的是同一份值。
開始時sync.Map寫入數(shù)據(jù)
X=1
Y=2
Z=3
dirty map主要接受寫請求,read map沒有數(shù)據(jù),此時read map與dirty map數(shù)據(jù)如下圖。
讀取數(shù)據(jù)的時候從read map中讀取,此時read map并沒有數(shù)據(jù),miss記錄從read map讀取失敗的次數(shù),當(dāng)misses>=len(dirty map)時,將dirty map直接升級為read map,這里直接對dirty map進行地址拷貝并且dirty map被清空,misses置為0。此時read map與dirty map數(shù)據(jù)
現(xiàn)在有需求對Z元素進行修改Z=4,sync.Map會直接修改read map的元素。
新加元素K=5,新加的元素就需要操作dirty map了,如果misses達到閥值后dirty map直接升級為read map并且dirty map為空map(read的amended==false),則dirty map需要從read map復(fù)制數(shù)據(jù)。
升級后的效果如下。
如果需要刪除Z,需要分幾種情況:
一種read map存在該元素且read的amended==false:直接將read中的元素置為nil。
另一種為元素剛剛寫入dirty map且未升級為read map:直接調(diào)用golang內(nèi)置函數(shù)delete刪除dirty map的元素;
還有一種是read map和dirty map同時存在該元素:將read map中的元素置為nil,因為read map和dirty map 使用的均為元素地址,所以均被置為nil。
Load返回存儲在映射中的鍵值,如果沒有值,則返回nil。ok結(jié)果指示是否在映射中找到值。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 第一次檢測元素是否存在
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
// 為dirty map 加鎖
m.mu.Lock()
// 第二次檢測元素是否存在,主要防止在加鎖的過程中,dirty map轉(zhuǎn)換成read map,從而導(dǎo)致讀取不到數(shù)據(jù)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 從dirty map中獲取是為了應(yīng)對read map中不存在的新元素
e, ok = m.dirty[key]
// 不論元素是否存在,均需要記錄miss數(shù),以便dirty map升級為read map
m.missLocked()
}
// 解鎖
m.mu.Unlock()
}
// 元素不存在直接返回
if !ok {
return nil, false
}
return e.load()
}
dirty map升級為read map
func (m *Map) missLocked() {
// misses自增1
m.misses++
// 判斷dirty map是否可以升級為read map
if m.misses < len(m.dirty) {
return
}
// dirty map升級為read map
m.read.Store(readOnly{m: m.dirty})
// dirty map 清空
m.dirty = nil
// misses重置為0
m.misses = 0
}
元素取值
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
// 元素不存在或者被刪除,則直接返回
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
read map主要用于讀取,每次Load都先從read讀取,當(dāng)read中不存在且amended為true,就從dirty讀取數(shù)據(jù) 。無論dirty map中是否存在該元素,都會執(zhí)行missLocked函數(shù),該函數(shù)將misses+1,當(dāng)m.misses < len(m.dirty)
時,便會將dirty復(fù)制到read,此時再將dirty置為nil,misses=0。
設(shè)置Key=>Value。
func (m *Map) Store(key, value interface{}) {
// 如果read存在這個鍵,并且這個entry沒有被標(biāo)記刪除,嘗試直接寫入,寫入成功,則結(jié)束
// 第一次檢測
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// dirty map鎖
m.mu.Lock()
// 第二次檢測
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// unexpungelocc確保元素沒有被標(biāo)記為刪除
// 判斷元素被標(biāo)識為刪除
if e.unexpungeLocked() {
// 這個元素之前被刪除了,這意味著有一個非nil的dirty,這個元素不在里面.
m.dirty[key] = e
}
// 更新read map 元素值
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 此時read map沒有該元素,但是dirty map有該元素,并需修改dirty map元素值為最新值
e.storeLocked(&value)
} else {
// read.amended==false,說明dirty map為空,需要將read map 復(fù)制一份到dirty map
if !read.amended {
m.dirtyLocked()
// 設(shè)置read.amended==true,說明dirty map有數(shù)據(jù)
m.read.Store(readOnly{m: read.m, amended: true})
}
// 設(shè)置元素進入dirty map,此時dirty map擁有read map和最新設(shè)置的元素
m.dirty[key] = newEntry(value)
}
// 解鎖,有人認為鎖的范圍有點大,假設(shè)read map數(shù)據(jù)很大,那么執(zhí)行m.dirtyLocked()會耗費花時間較多,完全可以在操作dirty map時才加鎖,這樣的想法是不對的,因為m.dirtyLocked()中有寫入操作
m.mu.Unlock()
}
嘗試存儲元素。
func (e *entry) tryStore(i *interface{}) bool {
// 獲取對應(yīng)Key的元素,判斷是否標(biāo)識為刪除
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
// cas嘗試寫入新元素值
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
// 判斷是否標(biāo)識為刪除
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
unexpungelocc確保元素沒有被標(biāo)記為刪除。如果這個元素之前被刪除了,它必須在未解鎖前被添加到dirty map上。
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
從read map復(fù)制到dirty map。
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 如果標(biāo)記為nil或者expunged,則不復(fù)制到dirty map
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
如果對應(yīng)的元素存在,則返回該元素的值,如果不存在,則將元素寫入到sync.Map。如果已加載值,則加載結(jié)果為true;如果已存儲,則為false。
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// 不加鎖的情況下讀取read map
// 第一次檢測
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// 如果元素存在(是否標(biāo)識為刪除由tryLoadOrStore執(zhí)行處理),嘗試獲取該元素已存在的值或者將元素寫入
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
// 第二次檢測
// 以下邏輯參看Store
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
如果沒有刪除元素,tryLoadOrStore將自動加載或存儲一個值。如果刪除元素,tryLoadOrStore保持條目不變并返回ok= false。
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
// 元素標(biāo)識刪除,直接返回
if p == expunged {
return nil, false, false
}
// 存在該元素真實值,則直接返回原來的元素值
if p != nil {
return *(*interface{})(p), true, true
}
// 如果p為nil(此處的nil,并是不是指元素的值為nil,而是atomic.LoadPointer(&e.p)為nil,元素的nil在unsafe.Pointer是有值的),則更新該元素值
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
刪除元素,采用延遲刪除,當(dāng)read map存在元素時,將元素置為nil,只有在提升dirty的時候才清理刪除的數(shù),延遲刪除可以避免后續(xù)獲取刪除的元素時候需要加鎖。當(dāng)read map不存在元素時,直接刪除dirty map中的元素
func (m *Map) Delete(key interface{}) {
// 第一次檢測
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 第二次檢測
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 不論dirty map是否存在該元素,都會執(zhí)行刪除
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
// 如果在read中,則將其標(biāo)記為刪除(nil)
e.delete()
}
}
元素值置為nil
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
遍歷獲取sync.Map中所有的元素,使用的為快照方式,所以不一定是準確的。
func (m *Map) Range(f func(key, value interface{}) bool) {
// 第一檢測
read, _ := m.read.Load().(readOnly)
// read.amended=true,說明dirty map包含所有有效的元素(含新加,不含被刪除的),使用dirty map
if read.amended {
// 第二檢測
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
// 使用dirty map并且升級為read map
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
// 一貫原則,使用read map作為讀
for k, e := range read.m {
v, ok := e.load()
// 被刪除的不計入
if !ok {
continue
}
// 函數(shù)返回false,終止
if !f(k, v) {
break
}
}
}
經(jīng)過了上面的分析可以得到,sync.Map并不適合同時存在大量讀寫的場景,大量的寫會導(dǎo)致read map讀取不到數(shù)據(jù)從而加鎖進行進一步讀取,同時dirty map不斷升級為read map。 從而導(dǎo)致整體性能較低,特別是針對cache場景.針對append-only以及大量讀,少量寫場景使用sync.Map則相對比較合適。
sync.Map沒有提供獲取元素個數(shù)的Len()方法,不過可以通過Range()實現(xiàn)。
func Len(sm sync.Map) int {
lengh := 0
f := func(key, value interface{}) bool {
lengh++
return true
}
one:=lengh
lengh=0
sm.Range(f)
if one != lengh {
one = lengh
lengh=0
sm.Range(f)
if one