一、前言
共享內(nèi)存廣泛用于Redis,Kafka,RabbitMQ 等高性能組件中,本文主要提供一個(gè)共享內(nèi)存在廣告埋點(diǎn)數(shù)據(jù)采集的實(shí)戰(zhàn)場(chǎng)景。
二、共享內(nèi)存原理
1、原理
在Linux中,每個(gè)進(jìn)程都有屬于自己的進(jìn)程控制塊(PCB)和地址空間(Addr Space),并且都有一個(gè)與之對(duì)應(yīng)的頁表,負(fù)責(zé)將進(jìn)程的虛擬地址與物理地址進(jìn)行映射,通過內(nèi)存管理單元(MMU)進(jìn)行管理。兩個(gè)不同的虛擬地址通過頁表映射到物理空間的同一區(qū)域,它們所指向的這塊區(qū)域即共享內(nèi)存。
當(dāng)兩個(gè)進(jìn)程通過頁表將虛擬地址映射到物理地址時(shí),在物理地址中有一塊共同的內(nèi)存區(qū),即共享內(nèi)存,這塊內(nèi)存可以被兩個(gè)進(jìn)程同時(shí)看到。這樣當(dāng)一個(gè)進(jìn)程進(jìn)行寫操作,另一個(gè)進(jìn)程讀操作就可以實(shí)現(xiàn)進(jìn)程間通信。但是,我們要確保一個(gè)進(jìn)程在寫的時(shí)候不能被讀,因此我們使用信號(hào)量來實(shí)現(xiàn)同步與互斥。
對(duì)于一個(gè)共享內(nèi)存,實(shí)現(xiàn)采用的是引用計(jì)數(shù)的原理,當(dāng)進(jìn)程脫離共享存儲(chǔ)區(qū)后,計(jì)數(shù)器減一,掛架成功時(shí),計(jì)數(shù)器加一,只有當(dāng)計(jì)數(shù)器變?yōu)榱銜r(shí),才能被刪除。當(dāng)進(jìn)程終止時(shí),它所附加的共享存儲(chǔ)區(qū)都會(huì)自動(dòng)脫離。
2、與傳統(tǒng)文件對(duì)比
共享內(nèi)存可以說是最有用的進(jìn)程間通信方式,也是最快的IPC形式, 因?yàn)檫M(jìn)程可以直接讀寫內(nèi)存,而不需要任何 數(shù)據(jù)的拷貝。對(duì)于像管道和消息隊(duì)列等通信方式,則需要在內(nèi)核和用戶空間進(jìn)行四次的數(shù)據(jù)拷貝 共享內(nèi)存則只拷貝兩次數(shù)據(jù): 一次從輸入文件到共享內(nèi)存區(qū),另一次從共享內(nèi)存區(qū)到輸出文件。
實(shí)際上,進(jìn)程之間在共享內(nèi) 存時(shí),并不總是讀寫少量數(shù)據(jù)后就解除映射,有新的通信時(shí),再重新建立共享內(nèi)存區(qū)域。而是保持共享區(qū)域,直 到通信完畢為止,這樣,數(shù)據(jù)內(nèi)容一直保存在共享內(nèi)存中,并沒有寫回文件。共享內(nèi)存中的內(nèi)容往往是在解除映 射時(shí)才寫回文件的。因此,采用共享內(nèi)存的通信方式效率是非常高的。
傳統(tǒng)文件
UNIX 訪問文件的傳統(tǒng)方法是用 open 打開它們,如果有多個(gè)進(jìn)程訪問同一個(gè)文件,則每一個(gè)進(jìn)程在自己的地址空間都包含有該文件的副本,這不必要地浪費(fèi)了存儲(chǔ)空間。
下圖說明了兩個(gè)進(jìn)程同時(shí)讀一個(gè)文件的同一頁的情形。系統(tǒng)要將該頁從磁盤讀到高速緩沖區(qū)中,每個(gè)進(jìn)程再執(zhí)行一個(gè)存儲(chǔ)器內(nèi)的復(fù)制操作將數(shù)據(jù)從高速緩沖區(qū)讀到自己的地址空間。
共享存儲(chǔ)映射
現(xiàn)在考慮另一種處理方法:進(jìn)程 A 和進(jìn)程 B 都將該頁映射到自己的地址空間,當(dāng)進(jìn)程 A 第一次訪問該頁中的數(shù)據(jù)時(shí), 它生成一個(gè)缺頁中斷。內(nèi)核此時(shí)讀入這一頁到內(nèi)存并更新頁表使之指向它。以后,當(dāng)進(jìn)程B訪問同一頁面而出現(xiàn)缺頁中斷時(shí),該頁已經(jīng)在內(nèi)存,內(nèi)核只需要將進(jìn)程 B 的頁表登記項(xiàng)指向次頁即可。
3、mmap()
(1)mmap()系統(tǒng)調(diào)用
mmap()系統(tǒng)調(diào)用使得進(jìn)程之間通過映射同一個(gè)普通文件實(shí)現(xiàn)共享內(nèi)存。普通文件被映射到進(jìn)程地址空間后,進(jìn)程可以向訪問普通內(nèi)存一樣對(duì)文件進(jìn)行訪問,不必再調(diào)用read(),write()等操作。
mmap()系統(tǒng)調(diào)用形式如下:
1void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )
mmap的作用是映射文件描述符fd指定文件的 [off,off + len]區(qū)域至調(diào)用進(jìn)程的[addr, addr + len]的內(nèi)存區(qū)域:
數(shù)fd為即將映射到進(jìn)程空間的文件描述字,一般由open()返回,同時(shí),fd可以指定為-1,此時(shí)須指定flags參數(shù)中的,MAP_ANON,表明進(jìn)行的是匿名映射(不涉及具體的文件名,避免了文件的創(chuàng)建及打開,很顯然只能用于具有親緣關(guān)系的進(jìn)程間通信)。
len是映射到調(diào)用進(jìn)程地址空間的字節(jié)數(shù),它從被映射文件開頭offset個(gè)字節(jié)開始算起。
prot 參數(shù)指定共享內(nèi)存的訪問權(quán)限??扇∪缦聨讉€(gè)值的或:PROT_READ(可讀) , PROT_WRITE (可寫), PROT_EXEC (可執(zhí)行), PROT_NONE(不可訪問)。
flags由以下幾個(gè)常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必選其一,而MAP_FIXED則不推薦使用。
offset參數(shù)一般設(shè)為0,表示從文件頭開始映射。
參數(shù)addr指定文件應(yīng)被映射到進(jìn)程空間的起始地址,一般被指定一個(gè)空指針,此時(shí)選擇起始地址的任務(wù)留給內(nèi)核來完成。函數(shù)的返回值為最后文件映射到進(jìn)程空間的地址,進(jìn)程可直接操作起始地址為該值的有效地址。
(2)mmap()返回地址的訪問
對(duì)mmap()返回地址的訪問,linux采用的是頁式管理機(jī)制。
對(duì)于用mmap()映射普通文件來說,進(jìn)程會(huì)在自己的地址空間新增一塊空間,空間大小由mmap()的len參數(shù)指定,注意,進(jìn)程并不一定能夠?qū)θ啃略隹臻g都能進(jìn)行有效訪問。
進(jìn)程能夠訪問的有效地址大小取決于文件被映射部分的大小。
簡單的說,能夠容納文件被映射部分大小的最少頁面?zhèn)€數(shù)決定了進(jìn)程從mmap()返回的地址開始,能夠有效訪問的地址空間大小。
超過這個(gè)空間大小,內(nèi)核會(huì)根據(jù)超過的嚴(yán)重程度返回發(fā)送不同的信號(hào)給進(jìn)程。可用如下圖示說明:
三、VCS 共享內(nèi)存采集實(shí)戰(zhàn)
VCS(vivo control system): 負(fù)責(zé)全網(wǎng)所有類型的監(jiān)控指標(biāo)采集,為上游運(yùn)維平臺(tái)提供底層命令通道能力和全網(wǎng)插件升級(jí)管控能力。
1、數(shù)據(jù)結(jié)構(gòu)
2、分區(qū)讀寫
為了要確保一個(gè)進(jìn)程在寫的時(shí)候不能被讀,我們使用idx來標(biāo)記可讀塊。
3、規(guī)則,指標(biāo)和值
下圖描述的是從連續(xù)內(nèi)存空間轉(zhuǎn)化成【規(guī)則,維度,值】語義的過程:
4、源碼分析
5、general.proto
通用監(jiān)控上報(bào)協(xié)議:
general.proto
syntax = “proto2”;
package general;
message Data {
map kv = 1;
}
message GeneralData {
optional string rule_id = 1;
repeated Data data = 2;
optional int64 count = 3;
optional int64 left_size = 4;
optional int32 version = 5;
}
6、constant.go 配置參數(shù)
| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預(yù)留長度 | magincNum2(4byte) | 4k protect |
package moni_shm
const (
OssShmId uint32 = 0x3eeff00
MagicNum1 uint32 = 0x650a218
MagicNum2 uint32 = 0x138a4f2
CreateShmLock = “/var/run/.oss_shm_lock”
OssMapOneAttrCnt = 1024 * 128 //1024 個(gè)規(guī)則
OssOneAttrEntryCnt = 128 //每個(gè)規(guī)則有128個(gè)指標(biāo)
EntrySz = 4
OssMapCnt = 2
OneAttrSz = OssOneAttrEntryCnt * EntrySz
OssMapSz = OssMapOneAttrCnt * OneAttrSz
OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4
defaultIntervalSec = 60
defaultTopic = “moni_general_shared_memory”
)
7、util.go 工具類
內(nèi)存清零工具和“整頁”分配:
cd package moni_shm
import (
“unsafe”
)
//取整分配
func align(actual, to uint64) uint64 {
return (actual + to - 1) / to * to
}
//連續(xù)空間清0
func zero(ptr uintptr, bts uint64) {
if 0 == bts {
return
}
const sz = 4096
var next uint64
cnt := 0
for ; next+sz 《= bts; { //按頁清零
arr := (*[sz]byte)(unsafe.Pointer(ptr))
for i := range *arr {
(*arr)[i] = 0
}
next += sz
ptr += uintptr(sz)
cnt++
}
if next == bts {
return
}
var i uintptr
for i = 0; i 《 uintptr(bts-next); i++ { //剩余空間清零
*(*byte)(unsafe.Pointer(ptr + i)) = 0
}
}
8、mgr.go 采集邏輯
共享內(nèi)存采集邏輯對(duì)應(yīng) “規(guī)則指標(biāo)和值”:
var (
_basePtr uintptr = 0
_shmUtil = NewShmUtil(OssShmId, OssAttrSz)
_intervalSec = defaultIntervalSec
_topic = defaultTopic
_on bool = false
)
func Stat(on bool) {
_on = on
}
func Start() {
go collect() //開始采集
}
func tryInitBaseptr() error {
var err error
if _basePtr == 0 {
_basePtr, err = _shmUtil.GetData() //獲取當(dāng)前共享內(nèi)存數(shù)據(jù)塊首地址
if nil != err {
logrus.Warnf(“init base ptr failed, retrying: %v”, err)
}
}
return err
}
func collect() {
var (
cost time.Duration
start time.Time
first = true
)
for {
if !first {
time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期對(duì)齊
}
first = false
start = time.Now()
if !_on {
cost = time.Since(start)
continue
}
if _basePtr == 0 {
if err := tryInitBaseptr(); nil != err {
cost = time.Since(start)
continue
}
}
d := collectOnce()
for _, v := range d {
moni_report.ProductReportData(*v)
}
cost = time.Since(start)
}
}
func collectOnce() []*moni_report.ReportData {
now := time.Now()
var ret []*moni_report.ReportData
data := make(map[uint32]*general.GeneralData)
d := SwitchAndFetch(_basePtr)
logrus.Infof(“sending %d data from shm”, len(d))
for _, v := range d {
ruleId := strconv.FormatUint(uint64(v[0]), 10)
dim := strconv.FormatUint(uint64(v[1]), 10)
value := strconv.FormatUint(uint64(v[2]), 10)
if _, ok := data[v[0]]; !ok {
data[v[0]] = &general.GeneralData{
RuleId: proto.String(ruleId),
Data: []*general.Data{},
}
}
data[v[0]].Data = append(data[v[0]].Data, &general.Data{
Kv: map[string]string{
dim: value,
“timestamp”: strconv.FormatInt(now.Unix()*1000, 10),
“ip”: viper.GetString(“host.inner_ip”),
},
})
}
logrus.Infof(“collect format shm data:%v”, data)
for _, v := range data {
bts, err := proto.Marshal(v)
if nil != err {
logrus.Errorf(“marshal shm data failed: %v”, err)
continue
}
ret = append(ret, &moni_report.ReportData{
DataBytes: bts,
Topic: _topic,
})
}
return ret
}
9、shmutil.go 共享內(nèi)存操作
每60秒根據(jù)idx值切換可讀區(qū),采集后上報(bào)后,清零,切換到下一區(qū)。
package moni_shm
import (
“fmt”
“l(fā)og”
“os”
“syscall”
“unsafe”
“github.com/sirupsen/logrus”
)
const (
IpcCreate = 00001000
)
var (
ErrNotCreated = fmt.Errorf(“shm not created”)
ErrCreateFailed = fmt.Errorf(“shm create failed”)
)
type shmOpt func(*ShmUtil)
func WithCreate(b bool) shmOpt {
return func(u *ShmUtil) {
u.create = b
}
}
/*共享內(nèi)存數(shù)據(jù)結(jié)構(gòu)
|1page mprotect|page align data|1page mprotect|
| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預(yù)留長度 | magincNum2(4byte) | 4k protect |
*/
type ShmUtil struct {
pageSz int
dataSz uint64
total uint64
shmKey uint32
create bool
base uintptr
data uintptr
}
func NewShmUtil(key uint32, sz uint64, cfgs 。。.shmOpt) *ShmUtil {
if key == 0 {
panic(“invalid shm key: 0”)
}
ret := &ShmUtil{
dataSz: sz,
shmKey: key,
}
ret.pageSz = os.Getpagesize() //獲取頁大小
ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按頁分配“包體”大小
ret.total = ret.dataSz + uint64(ret.pageSz)*2 // 總空間大小=包體大小 + 頭尾各2頁保護(hù)地址
for _, c := range cfgs {
c(ret)
}
return ret
}
func (s *ShmUtil) attachShm(flag int) error {
created := false
shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag)) //使用已存在的共享內(nèi)存,返回共享內(nèi)存標(biāo)識(shí)符
if 0 != errno {
return errno
}
if shmid 《 0 {
if !s.create { //不允創(chuàng)建,直接返回
return ErrNotCreated
}
shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate)) //新創(chuàng)建共享內(nèi)存
if 0 != errno {
return fmt.Errorf(“shm create: %v”, errno)
}
if shmid 《 0 {
return ErrCreateFailed
}
created = true
}
addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0) //掛接共享內(nèi)存到當(dāng)前進(jìn)程
if 0 != errno {
return fmt.Errorf(“shmat: %v”, errno)
}
if created {
zero(addr, s.total)//新創(chuàng)建的共享內(nèi)存,初始化共享內(nèi)存數(shù)據(jù)
}
s.base = addr //記錄共享內(nèi)存首地址 用于之后的釋放
s.data = s.base + uintptr(s.pageSz) //寫數(shù)據(jù)的起始地址
_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)
if 0 != errno { //鎖定共享內(nèi)存頭,鎖指定的內(nèi)存區(qū)間必須包含整個(gè)內(nèi)存頁(4K)
s.detach()
return fmt.Errorf(“mprotect head: %v”, errno)
}
_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //鎖指定共享內(nèi)存尾,區(qū)間開始的地址start必須是一個(gè)內(nèi)存頁的起始地址,并且區(qū)間長度len必須是頁大小的整數(shù)倍。
if 0 != errno {
s.detach()
return fmt.Errorf(“mprotect tail: %v”, errno)
}
return nil
}
func (s *ShmUtil) detach() { //進(jìn)程去關(guān)聯(lián)共享內(nèi)存
if 0 != s.base {
syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)
s.base = 0
s.data = 0
}
}
/*
獲取內(nèi)存并且返回?cái)?shù)據(jù)段起始位置
s.create 決定是否新申請(qǐng)共享內(nèi)存
*/
func (s *ShmUtil) GetData() (uintptr, error) {
if s.data != 0 {
return s.data, nil
}
if err := s.attachShm(0666); nil != err { //初始化共享內(nèi)存,并關(guān)聯(lián)到進(jìn)程
return 0, err
}
return s.data, nil
}
func SwitchAndFetch(ptr uintptr) [][3]uint32 { //從共享內(nèi)存讀取 [][3]uint32{ossid,key,value}
if ptr == 0 {
return nil
}
m1 := (*uint32)(unsafe.Pointer(ptr))
m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))
if MagicNum1 != *m1 || MagicNum2 != *m2 {
logrus.Errorf(“magic 1 in header: wrote:%v\tread:%v\n”, MagicNum1, *m1)
logrus.Errorf(“magic 2 in tail: wrote:%v\tread:%v\n”, MagicNum2, *m2)
return nil
}
idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切換塊標(biāo)志
old := *idx
*idx = 1 - *idx
ret := PartialRead(ptr, old) //讀取當(dāng)前idx塊數(shù)據(jù)
zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //讀完清0
return ret
}
//根據(jù)idx輪流讀數(shù)據(jù)區(qū)域
func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根據(jù)idx獲取塊起始地址
startPtr := ptr + 8 + uintptr(idx)*OssMapSz
ret := ReadOssMap(startPtr)
log.Printf(“result: %v\n”, ret)
return ret
}
func ReadOssMap(ptr uintptr) [][3]uint32 { //1個(gè)周期內(nèi)的指標(biāo)總?cè)萘繛?128*1024 = 128k = 13W
var ret [][3]uint32
var i uint32 = 0
for i = 0; i 《 OssMapOneAttrCnt; i++ { //1個(gè)周期最多支持1024個(gè)業(yè)務(wù)
for _, v := range ReadOneAttr(ptr) {
ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]
}
ptr += OneAttrSz // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4
}
return ret
}
func ReadOneAttr(ptr uintptr) [][2]uint32 {
var ret [][2]uint32
var i uint32 = 0
for i = 0; i 《 OssOneAttrEntryCnt; i++ { //目前默認(rèn)一個(gè)業(yè)務(wù)下最多有128單維度指標(biāo), OssOneAttrEntryCnt = 128
v := *(*uint32)(unsafe.Pointer(ptr))
if v != 0 {
ret = append(ret, [2]uint32{i, v}) // [keyID, value]
}
ptr += EntrySz // 4yte 讀取一個(gè)指標(biāo)
}
return ret
}
四、總結(jié)
本文通過共享內(nèi)存的原理和詳細(xì)分析了一個(gè)共享內(nèi)存在生產(chǎn)上的應(yīng)用場(chǎng)景,希望能為大家拋磚引玉。
編輯:hfy
-
Linux
+關(guān)注
關(guān)注
87文章
11329瀏覽量
209975 -
內(nèi)存管理
+關(guān)注
關(guān)注
0文章
168瀏覽量
14162 -
共享內(nèi)存
+關(guān)注
關(guān)注
0文章
16瀏覽量
8328
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論