本篇文章給大家分享的是有關(guān)怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn),小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來(lái)看看吧。
成都創(chuàng)新互聯(lián)專注于治多企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),成都做商城網(wǎng)站。治多網(wǎng)站建設(shè)公司,為治多等地區(qū)提供建站服務(wù)。全流程按需網(wǎng)站開發(fā),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
共享內(nèi)存廣泛用于redis,Kafka,RabbitMQ 等高性能組件中,本文主要提供一個(gè)共享內(nèi)存在廣告埋點(diǎn)數(shù)據(jù)采集的實(shí)戰(zhàn)場(chǎng)景。
在Linux中,每個(gè)進(jìn)程都有屬于自己的進(jìn)程控制塊(PCB)和地址空間(Addr Space),并且都有一個(gè)與之對(duì)應(yīng)的頁(yè)表,負(fù)責(zé)將進(jìn)程的虛擬地址與物理地址進(jìn)行映射,通過內(nèi)存管理單元(MMU)進(jìn)行管理。兩個(gè)不同的虛擬地址通過頁(yè)表映射到物理空間的同一區(qū)域,它們所指向的這塊區(qū)域即共享內(nèi)存。
當(dāng)兩個(gè)進(jìn)程通過頁(yè)表將虛擬地址映射到物理地址時(shí),在物理地址中有一塊共同的內(nèi)存區(qū),即共享內(nèi)存,這塊內(nèi)存可以被兩個(gè)進(jìn)程同時(shí)看到。這樣當(dāng)一個(gè)進(jìn)程進(jìn)行寫操作,另一個(gè)進(jìn)程讀操作就可以實(shí)現(xiàn)進(jìn)程間通信。但是,我們要確保一個(gè)進(jìn)程在寫的時(shí)候不能被讀,因此我們使用信號(hào)量來(lái)實(shí)現(xiàn)同步與互斥。
對(duì)于一個(gè)共享內(nèi)存,實(shí)現(xiàn)采用的是引用計(jì)數(shù)的原理,當(dāng)進(jìn)程脫離共享存儲(chǔ)區(qū)后,計(jì)數(shù)器減一,掛架成功時(shí),計(jì)數(shù)器加一,只有當(dāng)計(jì)數(shù)器變?yōu)榱銜r(shí),才能被刪除。當(dāng)進(jìn)程終止時(shí),它所附加的共享存儲(chǔ)區(qū)都會(huì)自動(dòng)脫離。
共享內(nèi)存可以說是最有用的進(jìn)程間通信方式,也是最快的IPC形式, 因?yàn)檫M(jìn)程可以直接讀寫內(nèi)存,而不需要任何 數(shù)據(jù)的拷貝。對(duì)于像管道和消息隊(duì)列等通信方式,則需要在內(nèi)核和用戶空間進(jìn)行四次的數(shù)據(jù)拷貝 共享內(nèi)存則只拷貝兩次數(shù)據(jù): 一次從輸入文件到共享內(nèi)存區(qū),另一次從共享內(nèi)存區(qū)到輸出文件。
實(shí)際上,進(jìn)程之間在共享內(nèi) 存時(shí),并不總是讀寫少量數(shù)據(jù)后就解除映射,有新的通信時(shí),再重新建立共享內(nèi)存區(qū)域。而是保持共享區(qū)域,直 到通信完畢為止,這樣,數(shù)據(jù)內(nèi)容一直保存在共享內(nèi)存中,并沒有寫回文件。共享內(nèi)存中的內(nèi)容往往是在解除映 射時(shí)才寫回文件的。因此,采用共享內(nèi)存的通信方式效率是非常高的。
傳統(tǒng)文件
UNIX 訪問文件的傳統(tǒng)方法是用 open 打開它們,如果有多個(gè)進(jìn)程訪問同一個(gè)文件,則每一個(gè)進(jìn)程在自己的地址空間都包含有該文件的副本,這不必要地浪費(fèi)了存儲(chǔ)空間。
下圖說明了兩個(gè)進(jìn)程同時(shí)讀一個(gè)文件的同一頁(yè)的情形。系統(tǒng)要將該頁(yè)從磁盤讀到高速緩沖區(qū)中,每個(gè)進(jìn)程再執(zhí)行一個(gè)存儲(chǔ)器內(nèi)的復(fù)制操作將數(shù)據(jù)從高速緩沖區(qū)讀到自己的地址空間。
共享存儲(chǔ)映射
現(xiàn)在考慮另一種處理方法:進(jìn)程 A 和進(jìn)程 B 都將該頁(yè)映射到自己的地址空間,當(dāng)進(jìn)程 A 第一次訪問該頁(yè)中的數(shù)據(jù)時(shí), 它生成一個(gè)缺頁(yè)中斷。內(nèi)核此時(shí)讀入這一頁(yè)到內(nèi)存并更新頁(yè)表使之指向它。以后,當(dāng)進(jìn)程B訪問同一頁(yè)面而出現(xiàn)缺頁(yè)中斷時(shí),該頁(yè)已經(jīng)在內(nèi)存,內(nèi)核只需要將進(jìn)程 B 的頁(yè)表登記項(xiàng)指向次頁(yè)即可。
(1)mmap()系統(tǒng)調(diào)用
mmap()系統(tǒng)調(diào)用使得進(jìn)程之間通過映射同一個(gè)普通文件實(shí)現(xiàn)共享內(nèi)存。普通文件被映射到進(jìn)程地址空間后,進(jìn)程可以向訪問普通內(nèi)存一樣對(duì)文件進(jìn)行訪問,不必再調(diào)用read(),write()等操作。
mmap()系統(tǒng)調(diào)用形式如下:
void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )
mmap的作用是映射文件描述符fd指定文件的 [off,off + len]區(qū)域至調(diào)用進(jìn)程的[addr, addr + len]的內(nèi)存區(qū)域:
數(shù)fd為即將映射到進(jìn)程空間的文件描述字,一般由open()返回,同時(shí),fd可以指定為-1,此時(shí)須指定flags參數(shù)中的,MAP_ANON,表明進(jìn)行的是匿名映射(不涉及具體的文件名,避免了文件的創(chuàng)建及打開,很顯然只能用于具有親緣關(guān)系的進(jìn)程間通信)。
len是映射到調(diào)用進(jìn)程地址空間的字節(jié)數(shù),它從被映射文件開頭offset個(gè)字節(jié)開始算起。
prot 參數(shù)指定共享內(nèi)存的訪問權(quán)限??扇∪缦聨讉€(gè)值的或:PROT_READ(可讀) , PROT_WRITE (可寫), PROT_EXEC (可執(zhí)行), PROT_NONE(不可訪問)。
flags由以下幾個(gè)常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必選其一,而MAP_FIXED則不推薦使用。
offset參數(shù)一般設(shè)為0,表示從文件頭開始映射。
參數(shù)addr指定文件應(yīng)被映射到進(jìn)程空間的起始地址,一般被指定一個(gè)空指針,此時(shí)選擇起始地址的任務(wù)留給內(nèi)核來(lái)完成。函數(shù)的返回值為最后文件映射到進(jìn)程空間的地址,進(jìn)程可直接操作起始地址為該值的有效地址。
(2)mmap()返回地址的訪問
對(duì)mmap()返回地址的訪問,linux采用的是頁(yè)式管理機(jī)制。
對(duì)于用mmap()映射普通文件來(lái)說,進(jìn)程會(huì)在自己的地址空間新增一塊空間,空間大小由mmap()的len參數(shù)指定,注意,進(jìn)程并不一定能夠?qū)θ啃略隹臻g都能進(jìn)行有效訪問。
進(jìn)程能夠訪問的有效地址大小取決于文件被映射部分的大小。
簡(jiǎn)單的說,能夠容納文件被映射部分大小的最少頁(yè)面?zhèn)€數(shù)決定了進(jìn)程從mmap()返回的地址開始,能夠有效訪問的地址空間大小。
超過這個(gè)空間大小,內(nèi)核會(huì)根據(jù)超過的嚴(yán)重程度返回發(fā)送不同的信號(hào)給進(jìn)程??捎萌缦聢D示說明:
為了要確保一個(gè)進(jìn)程在寫的時(shí)候不能被讀,我們使用idx來(lái)標(biāo)記可讀塊。
下圖描述的是從連續(xù)內(nèi)存空間轉(zhuǎn)化成【規(guī)則,維度,值】語(yǔ)義的過程:
通用監(jiān)控上報(bào)協(xié)議:
general.proto syntax = "proto2"; package general; message Data { mapkv = 1; } message GeneralData { optional string rule_id = 1; repeated Data data = 2; optional int64 count = 3; optional int64 left_size = 4; optional int32 version = 5; }
| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預(yù)留長(zhǎng)度 | magincNum2(4byte) | 4k protect |
package moni_shm const ( OssShmId uint32 = 0x3eeff00 MagicNum1 uint32 = 0x650a218 MagicNum2 uint32 = 0x138a4f2 CreateShmLock = "/var/run/.oss_shm_lock" OssMapOneAttrCnt = 1024 * 128 //1024 個(gè)規(guī)則 OssOneAttrEntryCnt = 128 //每個(gè)規(guī)則有128個(gè)指標(biāo) EntrySz = 4 OssMapCnt = 2 OneAttrSz = OssOneAttrEntryCnt * EntrySz OssMapSz = OssMapOneAttrCnt * OneAttrSz OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4 defaultIntervalSec = 60 defaultTopic = "moni_general_shared_memory" )
內(nèi)存清零工具和"整頁(yè)"分配:
cd package moni_shm import ( "unsafe" ) //取整分配 func align(actual, to uint64) uint64 { return (actual + to - 1) / to * to } //連續(xù)空間清0 func zero(ptr uintptr, bts uint64) { if 0 == bts { return } const sz = 4096 var next uint64 cnt := 0 for ; next+sz <= bts; { //按頁(yè)清零 arr := (*[sz]byte)(unsafe.Pointer(ptr)) for i := range *arr { (*arr)[i] = 0 } next += sz ptr += uintptr(sz) cnt++ } if next == bts { return } var i uintptr for i = 0; i < uintptr(bts-next); i++ { //剩余空間清零 *(*byte)(unsafe.Pointer(ptr + i)) = 0 } }
共享內(nèi)存采集邏輯對(duì)應(yīng) “規(guī)則指標(biāo)和值”:
var ( _basePtr uintptr = 0 _shmUtil = NewShmUtil(OssShmId, OssAttrSz) _intervalSec = defaultIntervalSec _topic = defaultTopic _on bool = false ) func Stat(on bool) { _on = on } func Start() { go collect() //開始采集 } func tryInitBaseptr() error { var err error if _basePtr == 0 { _basePtr, err = _shmUtil.GetData() //獲取當(dāng)前共享內(nèi)存數(shù)據(jù)塊首地址 if nil != err { logrus.Warnf("init base ptr failed, retrying: %v", err) } } return err } func collect() { var ( cost time.Duration start time.Time first = true ) for { if !first { time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期對(duì)齊 } first = false start = time.Now() if !_on { cost = time.Since(start) continue } if _basePtr == 0 { if err := tryInitBaseptr(); nil != err { cost = time.Since(start) continue } } d := collectOnce() for _, v := range d { moni_report.ProductReportData(*v) } cost = time.Since(start) } } func collectOnce() []*moni_report.ReportData { now := time.Now() var ret []*moni_report.ReportData data := make(map[uint32]*general.GeneralData) d := SwitchAndFetch(_basePtr) logrus.Infof("sending %d data from shm", len(d)) for _, v := range d { ruleId := strconv.FormatUint(uint64(v[0]), 10) dim := strconv.FormatUint(uint64(v[1]), 10) value := strconv.FormatUint(uint64(v[2]), 10) if _, ok := data[v[0]]; !ok { data[v[0]] = &general.GeneralData{ RuleId: proto.String(ruleId), Data: []*general.Data{}, } } data[v[0]].Data = append(data[v[0]].Data, &general.Data{ Kv: map[string]string{ dim: value, "timestamp": strconv.FormatInt(now.Unix()*1000, 10), "ip": viper.GetString("host.inner_ip"), }, }) } logrus.Infof("collect format shm data:%v", data) for _, v := range data { bts, err := proto.Marshal(v) if nil != err { logrus.Errorf("marshal shm data failed: %v", err) continue } ret = append(ret, &moni_report.ReportData{ DataBytes: bts, Topic: _topic, }) } return ret }
每60秒根據(jù)idx值切換可讀區(qū),采集后上報(bào)后,清零,切換到下一區(qū)。
package moni_shm import ( "fmt" "log" "os" "syscall" "unsafe" "github.com/sirupsen/logrus" ) const ( IpcCreate = 00001000 ) var ( ErrNotCreated = fmt.Errorf("shm not created") ErrCreateFailed = fmt.Errorf("shm create failed") ) type shmOpt func(*ShmUtil) func WithCreate(b bool) shmOpt { return func(u *ShmUtil) { u.create = b } } /*共享內(nèi)存數(shù)據(jù)結(jié)構(gòu) |1page mprotect|page align data|1page mprotect| | 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預(yù)留長(zhǎng)度 | magincNum2(4byte) | 4k protect | */ type ShmUtil struct { pageSz int dataSz uint64 total uint64 shmKey uint32 create bool base uintptr data uintptr } func NewShmUtil(key uint32, sz uint64, cfgs ...shmOpt) *ShmUtil { if key == 0 { panic("invalid shm key: 0") } ret := &ShmUtil{ dataSz: sz, shmKey: key, } ret.pageSz = os.Getpagesize() //獲取頁(yè)大小 ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按頁(yè)分配“包體”大小 ret.total = ret.dataSz + uint64(ret.pageSz)*2 // 總空間大小=包體大小 + 頭尾各2頁(yè)保護(hù)地址 for _, c := range cfgs { c(ret) } return ret } func (s *ShmUtil) attachShm(flag int) error { created := false shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag)) //使用已存在的共享內(nèi)存,返回共享內(nèi)存標(biāo)識(shí)符 if 0 != errno { return errno } if shmid < 0 { if !s.create { //不允創(chuàng)建,直接返回 return ErrNotCreated } shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate)) //新創(chuàng)建共享內(nèi)存 if 0 != errno { return fmt.Errorf("shm create: %v", errno) } if shmid < 0 { return ErrCreateFailed } created = true } addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0) //掛接共享內(nèi)存到當(dāng)前進(jìn)程 if 0 != errno { return fmt.Errorf("shmat: %v", errno) } if created { zero(addr, s.total)//新創(chuàng)建的共享內(nèi)存,初始化共享內(nèi)存數(shù)據(jù) } s.base = addr //記錄共享內(nèi)存首地址 用于之后的釋放 s.data = s.base + uintptr(s.pageSz) //寫數(shù)據(jù)的起始地址 _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0) if 0 != errno { //鎖定共享內(nèi)存頭,鎖指定的內(nèi)存區(qū)間必須包含整個(gè)內(nèi)存頁(yè)(4K) s.detach() return fmt.Errorf("mprotect head: %v", errno) } _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //鎖指定共享內(nèi)存尾,區(qū)間開始的地址start必須是一個(gè)內(nèi)存頁(yè)的起始地址,并且區(qū)間長(zhǎng)度len必須是頁(yè)大小的整數(shù)倍。 if 0 != errno { s.detach() return fmt.Errorf("mprotect tail: %v", errno) } return nil } func (s *ShmUtil) detach() { //進(jìn)程去關(guān)聯(lián)共享內(nèi)存 if 0 != s.base { syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0) s.base = 0 s.data = 0 } } /* 獲取內(nèi)存并且返回?cái)?shù)據(jù)段起始位置 s.create 決定是否新申請(qǐng)共享內(nèi)存 */ func (s *ShmUtil) GetData() (uintptr, error) { if s.data != 0 { return s.data, nil } if err := s.attachShm(0666); nil != err { //初始化共享內(nèi)存,并關(guān)聯(lián)到進(jìn)程 return 0, err } return s.data, nil } func SwitchAndFetch(ptr uintptr) [][3]uint32 { //從共享內(nèi)存讀取 [][3]uint32{ossid,key,value} if ptr == 0 { return nil } m1 := (*uint32)(unsafe.Pointer(ptr)) m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64)) if MagicNum1 != *m1 || MagicNum2 != *m2 { logrus.Errorf("magic 1 in header: wrote:%v\tread:%v\n", MagicNum1, *m1) logrus.Errorf("magic 2 in tail: wrote:%v\tread:%v\n", MagicNum2, *m2) return nil } idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切換塊標(biāo)志 old := *idx *idx = 1 - *idx ret := PartialRead(ptr, old) //讀取當(dāng)前idx塊數(shù)據(jù) zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //讀完清0 return ret } //根據(jù)idx輪流讀數(shù)據(jù)區(qū)域 func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根據(jù)idx獲取塊起始地址 startPtr := ptr + 8 + uintptr(idx)*OssMapSz ret := ReadOssMap(startPtr) log.Printf("result: %v\n", ret) return ret } func ReadOssMap(ptr uintptr) [][3]uint32 { //1個(gè)周期內(nèi)的指標(biāo)總?cè)萘繛?nbsp;128*1024 = 128k = 13W var ret [][3]uint32 var i uint32 = 0 for i = 0; i < OssMapOneAttrCnt; i++ { //1個(gè)周期最多支持1024個(gè)業(yè)務(wù) for _, v := range ReadOneAttr(ptr) { ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value] } ptr += OneAttrSz // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4 } return ret } func ReadOneAttr(ptr uintptr) [][2]uint32 { var ret [][2]uint32 var i uint32 = 0 for i = 0; i < OssOneAttrEntryCnt; i++ { //目前默認(rèn)一個(gè)業(yè)務(wù)下最多有128單維度指標(biāo), OssOneAttrEntryCnt = 128 v := *(*uint32)(unsafe.Pointer(ptr)) if v != 0 { ret = append(ret, [2]uint32{i, v}) // [keyID, value] } ptr += EntrySz // 4yte 讀取一個(gè)指標(biāo) } return ret }
以上就是怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn),小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。