本篇文章為大家展示了Node.js多進(jìn)程模型中怎么實(shí)現(xiàn)共享內(nèi)存,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
公司主營(yíng)業(yè)務(wù):成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)公司推出江油免費(fèi)做網(wǎng)站回饋大家。
Node.js 由于其單線程模型的設(shè)計(jì),導(dǎo)致一個(gè)Node進(jìn)程(的主線程)只能利用一個(gè)CPU核心,然而現(xiàn)在的機(jī)器基本上都是多核的,這造成了嚴(yán)重的性能浪費(fèi)。通常來(lái)說(shuō),想要利用到多個(gè)核心一般有以下的方法:
編寫Node的C++插件擴(kuò)充線程池,并在JS代碼中將CPU耗時(shí)任務(wù)委托給其它線程處理。
使用worker_threads模塊提供的多線程模型(尚在實(shí)驗(yàn)階段)。
使用child_process 或者 cluster模塊提供的多進(jìn)程模型,每個(gè)進(jìn)程都是一個(gè)獨(dú)立的Node.js進(jìn)程。
從易用、代碼入侵性、穩(wěn)定性的角度來(lái)說(shuō),多進(jìn)程模型通常是首要的選擇?!就扑]學(xué)習(xí):《nodejs 教程》】
Node.js cluster 多進(jìn)程模型存在的問(wèn)題
在cluster模塊提供的多進(jìn)程模型中,每個(gè)Node進(jìn)程都是一個(gè)獨(dú)立且完整的應(yīng)用進(jìn)程,有自己的內(nèi)存空間,其它進(jìn)程無(wú)法訪問(wèn)。因此雖然在項(xiàng)目啟動(dòng)時(shí),所有Worker進(jìn)程具有一致的狀態(tài)和行為,但在之后的運(yùn)行中無(wú)法保證其狀態(tài)維持一致。
例如,項(xiàng)目啟動(dòng)時(shí)有兩個(gè)Worker進(jìn)程,進(jìn)程A和進(jìn)程B,兩個(gè)進(jìn)程都聲明了變量a=1。但之后項(xiàng)目接收到一個(gè)請(qǐng)求,Master進(jìn)程將其分派給進(jìn)程A來(lái)處理,這個(gè)請(qǐng)求將a的值變更為了2,那么此時(shí)進(jìn)程A的內(nèi)存空間中a=2,但是進(jìn)程B的內(nèi)存空間中a依舊是1。此時(shí)如果有個(gè)請(qǐng)求讀取a的值,Master進(jìn)程將這個(gè)請(qǐng)求分派給進(jìn)程A和進(jìn)程B時(shí)讀取到的結(jié)果是不一致的,這就出現(xiàn)了一致性問(wèn)題。
cluster模塊在設(shè)計(jì)時(shí)并沒(méi)有給出解決方案,而是要求Worker進(jìn)程是無(wú)狀態(tài)的,即程序員在寫代碼時(shí)不應(yīng)該允許在處理請(qǐng)求時(shí)修改內(nèi)存中的值,以此來(lái)保障所有Worker進(jìn)程的一致性。然而在實(shí)踐中總會(huì)有各種各樣的情況需要寫內(nèi)存,比如記錄用戶的登錄狀態(tài)等,在許多企業(yè)的實(shí)踐中,通常會(huì)把這些狀態(tài)數(shù)據(jù)記錄在外部,例如數(shù)據(jù)庫(kù)、redis、消息隊(duì)列、文件系統(tǒng)等,每次處理有狀態(tài)請(qǐng)求時(shí)會(huì)讀寫外部存儲(chǔ)空間。
這不失為一種有效的做法,然而這需要額外引入一個(gè)外部存儲(chǔ)空間,同時(shí)還要自行處理多進(jìn)程并發(fā)訪問(wèn)下的一致性問(wèn)題,自行維護(hù)數(shù)據(jù)的生命周期(因?yàn)镹ode進(jìn)程和維護(hù)在外部的數(shù)據(jù)并不是同步創(chuàng)建和銷毀的),以及在高并發(fā)訪問(wèn)情況下的IO性能瓶頸(如果是存儲(chǔ)在數(shù)據(jù)庫(kù)等非內(nèi)存環(huán)境中)。其實(shí)本質(zhì)上來(lái)說(shuō),我們只是需要一個(gè)可供多個(gè)進(jìn)程共享訪問(wèn)的空間罷了,并不需要持久化存儲(chǔ),這段空間的生命周期最好與Node進(jìn)程強(qiáng)綁定,這樣在使用時(shí)能省去不少麻煩。因此跨進(jìn)程的共享內(nèi)存就成了最適合在這種場(chǎng)景使用的方式。
Node.js 的共享內(nèi)存
很遺憾Node本身并未提供共享內(nèi)存的實(shí)現(xiàn),因此我們可以看看npm倉(cāng)庫(kù)中第三方庫(kù)的實(shí)現(xiàn)。這些庫(kù)有些是通過(guò)C++插件擴(kuò)充Node的函數(shù)實(shí)現(xiàn)的,有些是通過(guò)Node提供的IPC機(jī)制實(shí)現(xiàn)的,但很遺憾它們的實(shí)現(xiàn)都很簡(jiǎn)單,并未提供互斥訪問(wèn)、對(duì)象監(jiān)聽(tīng)等功能,這使得使用者必須自己小心維護(hù)這段共享內(nèi)存,否則就會(huì)導(dǎo)致時(shí)序問(wèn)題。
轉(zhuǎn)了一圈下來(lái)沒(méi)找到我想要的。。。那就算了,我自己寫一個(gè)。
首先我們必須理清楚到底需要個(gè)什么樣的共享內(nèi)存,我是根據(jù)我自身的需求出發(fā)(為了在項(xiàng)目中用它來(lái)存儲(chǔ)跨進(jìn)程訪問(wèn)的狀態(tài)數(shù)據(jù)),同時(shí)兼顧通用性,因此會(huì)首先考慮以下幾點(diǎn):
以JS對(duì)象為基本單位進(jìn)行讀寫訪問(wèn)。
能夠進(jìn)程間互斥訪問(wèn),一個(gè)進(jìn)程訪問(wèn)時(shí),其它進(jìn)程被阻塞。
能夠監(jiān)聽(tīng)共享內(nèi)存中的對(duì)象,當(dāng)對(duì)象發(fā)生變化的時(shí)候監(jiān)聽(tīng)的進(jìn)程能被通知到。
在滿足上述條件的前提下,實(shí)現(xiàn)方式盡可能簡(jiǎn)單。
可以發(fā)現(xiàn),其實(shí)我們并不需要操作系統(tǒng)層面的共享內(nèi)存,只需要能夠多個(gè)Node進(jìn)程能訪問(wèn)同一個(gè)對(duì)象就行了,那么就可以在Node本身提供的機(jī)制上實(shí)現(xiàn)。可以使用Master進(jìn)程的一段內(nèi)存空間作為共享內(nèi)存空間,Worker進(jìn)程通過(guò)IPC將讀寫請(qǐng)求委托給Master進(jìn)程,由Master進(jìn)程進(jìn)行讀寫,然后再通過(guò)IPC將結(jié)果返回給Worker進(jìn)程。
為了讓共享內(nèi)存的使用方式在Master進(jìn)程和Worker進(jìn)程中一致,我們可以將對(duì)共享內(nèi)存的操作抽離成一個(gè)接口,在Master進(jìn)程和Worker進(jìn)程中各自實(shí)現(xiàn)這個(gè)接口。類圖如下圖所示,用一個(gè)SharedMemory
類作為抽象接口,在server.js
入口文件中聲明該對(duì)象。其在Master進(jìn)程中實(shí)例化為Manager
對(duì)象,在Worker進(jìn)程中實(shí)例化為Worker
對(duì)象。Manager
對(duì)象來(lái)維護(hù)共享內(nèi)存,并處理對(duì)共享內(nèi)存的讀寫請(qǐng)求,而Worker
對(duì)象則將讀寫請(qǐng)求發(fā)送到Master進(jìn)程。
可以使用Manager
類中的一個(gè)屬性作為共享內(nèi)存對(duì)象,訪問(wèn)該對(duì)象的方式與訪問(wèn)普通JS對(duì)象的方式一致,然后再做一層封裝,只暴露get
、set
、remove
等基本操作,避免該屬性直接被修改。
由于Master進(jìn)程會(huì)優(yōu)先于所有Worker進(jìn)程創(chuàng)建,因此,可以在Master進(jìn)程中聲明共享內(nèi)存空間之后再創(chuàng)建Worker進(jìn)程,以此來(lái)保證每個(gè)Worker進(jìn)程創(chuàng)建后都可以立即訪問(wèn)共享內(nèi)存。
為了使用簡(jiǎn)單,我們可以將SharedMemory
設(shè)計(jì)成單例,這樣每個(gè)進(jìn)程中就只有一個(gè)實(shí)例,并可以在import
了SharedMemory
之后直接使用。
讀寫控制與IPC通信
首先實(shí)現(xiàn)對(duì)外接口SharedMemory
類,這里沒(méi)有使用讓Manager
和Worker
繼承SharedMemory
的方式,而是讓SharedMemory
在實(shí)例化的時(shí)候返回一個(gè)Manager
或Worker
的實(shí)例,從而實(shí)現(xiàn)自動(dòng)選擇子類。
在Node 16中
isPrimary
替代了isMaster
,這里為了兼容使用了兩種寫法。
// shared-memory.js class SharedMemory { constructor() { if (cluster.isMaster || cluster.isPrimary) { return new Manager(); } else { return new Worker(); } } }
Manager
負(fù)責(zé)管理共享內(nèi)存空間,我們直接在Manager
對(duì)象中增加__sharedMemory__
屬性,由于其本身也是JS對(duì)象,會(huì)被納入JS的垃圾回收管理中,因此我們不需要進(jìn)行內(nèi)存清理、數(shù)據(jù)遷移等操作,使得實(shí)現(xiàn)上非常簡(jiǎn)潔。之后在__sharedMemory__
之中定義set
、get
、remove
等標(biāo)準(zhǔn)操作來(lái)提供訪問(wèn)方式。
我們通過(guò)cluster.on('online', callback)
來(lái)監(jiān)聽(tīng)worker進(jìn)程的創(chuàng)建事件,并在創(chuàng)建后立即用worker.on('message', callback)
來(lái)監(jiān)聽(tīng)來(lái)自worker進(jìn)程的IPC通信,并把通信消息交給handle
函數(shù)處理。
handle
函數(shù)的職責(zé)是區(qū)分worker進(jìn)程是想進(jìn)行哪種操作,并取出操作的參數(shù)委托給對(duì)應(yīng)的set
、get
、remove
函數(shù)(注意不是__sharedMemory__
中的set
、get
、remove
)進(jìn)行處理,并將處理后的結(jié)果返還給worker進(jìn)程。
// manager.js const cluster = require('cluster'); class Manager { constructor() { this.__sharedMemory__ = { set(key, value) { this.memory[key] = value; }, get(key) { return this.memory[key]; }, remove(key) { delete this.memory[key]; }, memory: {}, }; // Listen the messages from worker processes. cluster.on('online', (worker) => { worker.on('message', (data) => { this.handle(data, worker); return false; }); }); } handle(data, target) { const args = data.value ? [data.key, data.value] : [data.key]; this[data.method](...args).then((value) => { const msg = { id: data.id, // workerId uuid: data.uuid, // communicationID value, }; target.send(msg); }); } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); resolve('OK'); }); } get(key) { return new Promise((resolve) => { resolve(this.__sharedMemory__.get(key)); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); resolve('OK'); }); } }
Worker
自對(duì)象創(chuàng)建開(kāi)始就使用process.on
監(jiān)聽(tīng)來(lái)自Master進(jìn)程的返回消息(畢竟不能等消息發(fā)送出去以后再監(jiān)聽(tīng)吧,那就來(lái)不及了)。至于__getCallbacks__
對(duì)象的作用一會(huì)兒再說(shuō)。此時(shí)Worker
對(duì)象便創(chuàng)建完成。
之后項(xiàng)目運(yùn)行到某個(gè)地方的時(shí)候,如果要訪問(wèn)共享內(nèi)存,就會(huì)調(diào)用Worker
的set
、get
、remove
函數(shù),它們又會(huì)調(diào)用handle
函數(shù)將消息通過(guò)process.send
發(fā)送到master進(jìn)程,同時(shí),將得到返回結(jié)果時(shí)要進(jìn)行的操作記錄在__getCallbacks__
中。當(dāng)結(jié)果返回時(shí),會(huì)被之前在process.on
中的函數(shù)監(jiān)聽(tīng)到,并從__getCallbacks__
中取出對(duì)應(yīng)的回調(diào)函數(shù),并執(zhí)行。
因?yàn)樵L問(wèn)共享內(nèi)存的過(guò)程中會(huì)經(jīng)過(guò)IPC,所以必定是異步操作,所以需要記錄回調(diào)函數(shù),不能實(shí)現(xiàn)成同步的方式,不然會(huì)阻塞原本的任務(wù)。
// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid'); class Worker { constructor() { this.__getCallbacks__ = {}; process.on('message', (data) => { const callback = this.__getCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } delete this.__getCallbacks__[data.uuid]; }); } set(key, value) { return new Promise((resolve) => { this.handle('set', key, value, () => { resolve(); }); }); } get(key) { return new Promise((resolve) => { this.handle('get', key, null, (value) => { resolve(value); }); }); } remove(key) { return new Promise((resolve) => { this.handle('remove', key, null, () => { resolve(); }); }); } handle(method, key, value, callback) { const uuid = uuid4(); // 每次通信的uuid process.send({ id: cluster.worker.id, method, uuid, key, value, }); this.__getCallbacks__[uuid] = callback; } }
一次共享內(nèi)存訪問(wèn)的完整流程是:調(diào)用Worker
的set
/get
/remove
函數(shù) -> 調(diào)用Worker
的handle
函數(shù),向master進(jìn)程通信并將回調(diào)函數(shù)記錄在__getCallbacks__
-> master進(jìn)程監(jiān)聽(tīng)到來(lái)自worker進(jìn)程的消息 -> 調(diào)用Manager
的handle
函數(shù) -> 調(diào)用Manager
的set
/get
/remove
函數(shù) -> 調(diào)用__sharedMemory__
的set
/get
/remove
函數(shù) -> 操作完成返回Manager
的set
/get
/remove
函數(shù) -> 操作完成返回handle
函數(shù) -> 向worker進(jìn)程發(fā)送通信消息 -> worker進(jìn)程監(jiān)聽(tīng)到來(lái)自master進(jìn)程的消息 -> 從__getCallbacks__
中取出回調(diào)函數(shù)并執(zhí)行。
互斥訪問(wèn)
到目前為止,我們已經(jīng)實(shí)現(xiàn)了讀寫共享內(nèi)存,但還沒(méi)有結(jié)束,目前的共享內(nèi)存是存在嚴(yán)重安全問(wèn)題的。因?yàn)檫@個(gè)共享內(nèi)存是可以所有進(jìn)程同時(shí)訪問(wèn)的,然而我們并沒(méi)有考慮并發(fā)訪問(wèn)時(shí)的時(shí)序問(wèn)題。我們來(lái)看下面這個(gè)例子:
時(shí)間 | 進(jìn)程A | 進(jìn)程B | 共享內(nèi)存中變量x的值 |
---|---|---|---|
t0 | 0 | ||
t1 | 讀取x(x=0) | 0 | |
t2 | x1=x+1(x1=1) | 讀取x(x=0) | 0 |
t3 | 將x1的值寫回x | x2=x+1(x2=1) | 1 |
t4 | 將x2的值寫回x | 1 |
進(jìn)程A和進(jìn)程B的目的都是將x的值加1,理想情況下最后x的值應(yīng)該是2,可是最后的結(jié)果卻是1。這是因?yàn)檫M(jìn)程B在t3時(shí)刻給x的值加1的時(shí)候,使用的是t2時(shí)刻讀取出來(lái)的x的值,但此時(shí)從全局角度來(lái)看,這個(gè)值已經(jīng)過(guò)期了,因?yàn)閠3時(shí)刻x最新的值已經(jīng)被進(jìn)程A寫為了1,可是進(jìn)程B無(wú)法知道進(jìn)程外部的變化,所以導(dǎo)致了t4時(shí)刻最后寫回的值又覆蓋掉了進(jìn)程A寫回的值,等于是進(jìn)程A的行為被覆蓋掉了。
在多線程、多進(jìn)程和分布式中并發(fā)情況下的數(shù)據(jù)一致性問(wèn)題是老大難問(wèn)題了,這里不再展開(kāi)討論。
為了解決上述問(wèn)題,我們必須實(shí)現(xiàn)進(jìn)程間互斥訪問(wèn)某個(gè)對(duì)象,來(lái)避免同時(shí)操作一個(gè)對(duì)象,從而使進(jìn)程可以進(jìn)行原子操作,所謂原子操作就是不可被打斷的一小段連續(xù)操作,為此需要引入鎖的概念。由于讀寫均以對(duì)象為基本單位,因此鎖的粒度設(shè)置為對(duì)象級(jí)別。在某一個(gè)進(jìn)程(的某一任務(wù))獲取了某個(gè)對(duì)象的鎖之后,其它要獲取鎖的進(jìn)程(的任務(wù))會(huì)被阻塞,直到鎖被歸還。而要進(jìn)行寫操作,則必須要先獲取對(duì)象的鎖。這樣在獲取到鎖直到鎖被釋放的這段時(shí)間里,該對(duì)象在共享內(nèi)存中的值不會(huì)被其它進(jìn)程修改,從而導(dǎo)致錯(cuò)誤。
在Manager
的__sharedMemory__
中加入locks
屬性,用來(lái)記錄哪個(gè)對(duì)象的鎖被拿走了,lockRequestQueues
屬性用來(lái)記錄被阻塞的任務(wù)(正在等待鎖的任務(wù))。并增加getLock
函數(shù)和releaseLock
函數(shù),用來(lái)申請(qǐng)和歸還鎖,以及handleLockRequest
函數(shù),用來(lái)使被阻塞的任務(wù)獲得鎖。在申請(qǐng)鎖時(shí),會(huì)先將回調(diào)函數(shù)記錄到lockRequestQueues
隊(duì)尾(因?yàn)榇藭r(shí)該對(duì)象的鎖可能已被拿走),然后再調(diào)用handleLockRequest
檢查當(dāng)前鎖是否被拿走,若鎖還在,則讓隊(duì)首的任務(wù)獲得鎖。歸還鎖時(shí),先將__sharedMemory__.locks
中對(duì)應(yīng)的記錄刪掉,然后再調(diào)用handleLockRequest
讓隊(duì)首的任務(wù)獲得鎖。
// manager.js const { v4: uuid4 } = require('uuid'); class Manager { constructor() { this.__sharedMemory__ = { ... locks: {}, lockRequestQueues: {}, }; } getLock(key) { return new Promise((resolve) => { this.__sharedMemory__.lockRequestQueues[key] = this.__sharedMemory__.lockRequestQueues[key] ?? []; this.__sharedMemory__.lockRequestQueues[key].push(resolve); this.handleLockRequest(key); }); } releaseLock(key, lockId) { return new Promise((resolve) => { if (lockId === this.__sharedMemory__.locks[key]) { delete this.__sharedMemory__.locks[key]; this.handleLockRequest(key); } resolve('OK'); }); } handleLockRequest(key) { return new Promise((resolve) => { if ( !this.__sharedMemory__.locks[key] && this.__sharedMemory__.lockRequestQueues[key]?.length > 0 ) { const callback = this.__sharedMemory__.lockRequestQueues[key].shift(); const lockId = uuid4(); this.__sharedMemory__.locks[key] = lockId; callback(lockId); } resolve(); }); } ... }
在Worker
中,則是增加getLock
和releaseLock
兩個(gè)函數(shù),行為與get
、set
類似,都是調(diào)用handle
函數(shù)。
// worker.js class Worker { getLock(key) { return new Promise((resolve) => { this.handle('getLock', key, null, (value) => { resolve(value); }); }); } releaseLock(key, lockId) { return new Promise((resolve) => { this.handle('releaseLock', key, lockId, (value) => { resolve(value); }); }); } ... }
監(jiān)聽(tīng)對(duì)象
有時(shí)候我們需要監(jiān)聽(tīng)某個(gè)對(duì)象值的變化,在單進(jìn)程N(yùn)ode應(yīng)用中這很容易做到,只需要重寫對(duì)象的set
屬性就可以了,然而在多進(jìn)程共享內(nèi)存中,對(duì)象和監(jiān)聽(tīng)者都不在一個(gè)進(jìn)程中,這只能依賴Manager
的實(shí)現(xiàn)。這里,我們選擇了經(jīng)典的觀察者模式來(lái)實(shí)現(xiàn)監(jiān)聽(tīng)共享內(nèi)存中的對(duì)象。
為此,我們先在__sharedMemory__
中加入listeners
屬性,用來(lái)記錄在對(duì)象值發(fā)生變化時(shí)監(jiān)聽(tīng)者注冊(cè)的回調(diào)函數(shù)。然后增加listen
函數(shù),其將監(jiān)聽(tīng)回調(diào)函數(shù)記錄到__sharedMemory__.listeners
中,這個(gè)監(jiān)聽(tīng)回調(diào)函數(shù)會(huì)將變化的值發(fā)送給對(duì)應(yīng)的worker進(jìn)程。最后,在set
和remove
函數(shù)返回前調(diào)用notifyListener
,將所有記錄在__sharedMemory__.listeners
中監(jiān)聽(tīng)該對(duì)象的所有函數(shù)取出并調(diào)用。
// manager.js class Manager { constructor() { this.__sharedMemory__ = { ... listeners: {}, }; } handle(data, target) { if (data.method === 'listen') { this.listen(data.key, (value) => { const msg = { isNotified: true, id: data.id, uuid: data.uuid, value, }; target.send(msg); }); } else { ... } } notifyListener(key) { const listeners = this.__sharedMemory__.listeners[key]; if (listeners?.length > 0) { Promise.all( listeners.map( (callback) => new Promise((resolve) => { callback(this.__sharedMemory__.get(key)); resolve(); }) ) ); } } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); this.notifyListener(key); resolve('OK'); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); this.notifyListener(key); resolve('OK'); }); } listen(key, callback) { if (typeof callback === 'function') { this.__sharedMemory__.listeners[key] = this.__sharedMemory__.listeners[key] ?? []; this.__sharedMemory__.listeners[key].push(callback); } else { throw new Error('a listener must have a callback.'); } } ... }
在Worker
中由于監(jiān)聽(tīng)操作與其它操作不一樣,它是一次注冊(cè)監(jiān)聽(tīng)回調(diào)函數(shù)之后對(duì)象的值每次變化都會(huì)被通知,因此需要在增加一個(gè)__getListenerCallbacks__
屬性用來(lái)記錄監(jiān)聽(tīng)操作的回調(diào)函數(shù),與__getCallbacks__
不同,它里面的函數(shù)在收到master的回信之后不會(huì)刪除。
// worker.js class Worker { constructor() { ... this.__getListenerCallbacks__ = {}; process.on('message', (data) => { if (data.isNotified) { const callback = this.__getListenerCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } } else { ... } }); } handle(method, key, value, callback) { ... if (method === 'listen') { this.__getListenerCallbacks__[uuid] = callback; } else { this.__getCallbacks__[uuid] = callback; } } listen(key, callback) { if (typeof callback === 'function') { this.handle('listen', key, null, callback); } else { throw new Error('a listener must have a callback.'); } } ... }
LRU緩存
有時(shí)候我們需要用用內(nèi)存作為緩存,但多進(jìn)程中各進(jìn)程的內(nèi)存空間獨(dú)立,不能共享,因此也需要用到共享內(nèi)存。但是如果用共享內(nèi)存中的一個(gè)對(duì)象作為緩存的話,由于每次IPC都需要傳輸整個(gè)緩存對(duì)象,會(huì)導(dǎo)致緩存對(duì)象不能太大(否則序列化和反序列化耗時(shí)太長(zhǎng)),而且由于寫緩存對(duì)象的操作需要加鎖,進(jìn)一步影響了性能,而原本我們使用緩存就是為了加快訪問(wèn)速度。其實(shí)在使用緩存的時(shí)候通常不會(huì)做復(fù)雜操作,大多數(shù)時(shí)候也不需要保障一致性,因此我們可以在Manager
再增加一個(gè)共享內(nèi)存__sharedLRUMemory__
,其為一個(gè)lru-cache
實(shí)例,并增加getLRU
、setLRU
、removeLRU
函數(shù),與set
、get
、remove
函數(shù)類似。
// manager.js const LRU = require('lru-cache'); class Manager { constructor() { ... this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 }; this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions); } getLRU(key) { return new Promise((resolve) => { resolve(this.__sharedLRUMemory__.get(key)); }); } setLRU(key, value) { return new Promise((resolve) => { this.__sharedLRUMemory__.set(key, value); resolve('OK'); }); } removeLRU(key) { return new Promise((resolve) => { this.__sharedLRUMemory__.del(key); resolve('OK'); }); } ... }
Worker
中也增加getLRU
、setLRU
、removeLRU
函數(shù)。
// worker.js class Worker { getLRU(key) { return new Promise((resolve) => { this.handle('getLRU', key, null, (value) => { resolve(value); }); }); } setLRU(key, value) { return new Promise((resolve) => { this.handle('setLRU', key, value, () => { resolve(); }); }); } removeLRU(key) { return new Promise((resolve) => { this.handle('removeLRU', key, null, () => { resolve(); }); }); } ... }
目前共享內(nèi)存的實(shí)現(xiàn)已發(fā)到npm倉(cāng)庫(kù)(文檔和源代碼在Github倉(cāng)庫(kù),歡迎pull request和報(bào)bug),可以直接通過(guò)npm安裝:
npm i cluster-shared-memory
下面的示例包含了基本使用方法:
const cluster = require('cluster'); // 引入模塊時(shí)會(huì)根據(jù)當(dāng)前進(jìn)程 master 進(jìn)程還是 worker 進(jìn)程自動(dòng)創(chuàng)建對(duì)應(yīng)的 SharedMemory 對(duì)象 require('cluster-shared-memory'); if (cluster.isMaster) { // 在 master 進(jìn)程中 fork 子進(jìn)程 for (let i = 0; i < 2; i++) { cluster.fork(); } } else { const sharedMemoryController = require('./src/shared-memory'); const obj = { name: 'Tom', age: 10, }; // 寫對(duì)象 await sharedMemoryController.set('myObj', obj); // 讀對(duì)象 const myObj = await sharedMemoryController.get('myObj'); // 互斥訪問(wèn)對(duì)象,首先獲得對(duì)象的鎖 const lockId = await sharedMemoryController.getLock('myObj'); const newObj = await sharedMemoryController.get('myObj'); newObj.age = newObj.age + 1; await sharedMemoryController.set('myObj', newObj); // 操作完之后釋放鎖 await sharedMemoryController.releaseLock('requestTimes', lockId); // 或者使用 mutex 函數(shù)自動(dòng)獲取和釋放鎖 await sharedMemoryController.mutex('myObj', async () => { const newObjM = await sharedMemoryController.get('myObj'); newObjM.age = newObjM.age + 1; await sharedMemoryController.set('myObj', newObjM); }); // 監(jiān)聽(tīng)對(duì)象 sharedMemoryController.listen('myObj', (value) => { console.log(`myObj: ${value}`); }); //寫LRU緩存 await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'}); // 讀對(duì)象 const cacheItem = await sharedMemoryController.getLRU('cacheItem'); }
這種實(shí)現(xiàn)目前尚有幾個(gè)缺點(diǎn):
不能使用PM2的自動(dòng)創(chuàng)建worker進(jìn)程的功能。
由于PM2會(huì)使用自己的
cluster
模塊的master進(jìn)程的實(shí)現(xiàn),而我們的共享內(nèi)存模塊需要在master進(jìn)程維護(hù)一個(gè)內(nèi)存空間,則不能使用PM2的實(shí)現(xiàn),因此不能使用PM2的自動(dòng)創(chuàng)建worker進(jìn)程的功能。
傳輸?shù)膶?duì)象必須可序列化,且不能太大。
如果使用者在獲取鎖之后忘記釋放,會(huì)導(dǎo)致其它進(jìn)程一直被阻塞,這要求程序員有良好的代碼習(xí)慣。
上述內(nèi)容就是Node.js多進(jìn)程模型中怎么實(shí)現(xiàn)共享內(nèi)存,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。