很多人都想知道單線程的 Node.js 怎么能與多線程后端競(jìng)爭(zhēng)。考慮到其所謂的單線程特性,許多大公司選擇 Node 作為其后端似乎違反直覺。要想知道原因,必須理解其單線程的真正含義?!疽曨l教程推薦:nodejs視頻教程 】
十余年的新源網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。營(yíng)銷型網(wǎng)站建設(shè)的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整新源建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“新源網(wǎng)站設(shè)計(jì)”,“新源網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。JavaScript 的設(shè)計(jì)非常適合在網(wǎng)上做比較簡(jiǎn)單的事情,比如驗(yàn)證表單,或者說創(chuàng)建彩虹色的鼠標(biāo)軌跡。 在2009年,Node.js的創(chuàng)始人 Ryan Dahl使開發(fā)人員可以用該語言編寫后端代碼。
通常支持多線程的后端語言具有各種機(jī)制,用于在線程和其他面向線程的功能之間同步數(shù)據(jù)。要向 JavaScript 添加對(duì)此類功能的支持,需要修改整個(gè)語言,這不是 Dahl 的目標(biāo)。為了讓純 JavaScript 支持多線程,他必須想一個(gè)變通方法。接下來讓我們探索一下其中的奧秘……
Node.js 是如何工作的Node.js 使用兩種線程:event loop處理的主線程和 worker pool中的幾個(gè)輔助線程。
事件循環(huán)是一種機(jī)制,它采用回調(diào)(函數(shù))并注冊(cè)它們,準(zhǔn)備在將來的某個(gè)時(shí)刻執(zhí)行。它與相關(guān)的 JavaScript 代碼在同一個(gè)線程中運(yùn)行。當(dāng) JavaScript 操作阻塞線程時(shí),事件循環(huán)也會(huì)被阻止。
工作池是一種執(zhí)行模型,它產(chǎn)生并處理單獨(dú)的線程,然后同步執(zhí)行任務(wù),并將結(jié)果返回到事件循環(huán)。事件循環(huán)使用返回的結(jié)果執(zhí)行提供的回調(diào)。
簡(jiǎn)而言之,它負(fù)責(zé)異步 I/O操作 —— 主要是與系統(tǒng)磁盤和網(wǎng)絡(luò)的交互。它主要由諸如 fs
(I/O 密集)或 crypto
(CPU 密集)等模塊使用。工作池用 libuv 實(shí)現(xiàn),當(dāng) Node 需要在 JavaScript 和 C++ 之間進(jìn)行內(nèi)部通信時(shí),會(huì)導(dǎo)致輕微的延遲,但這幾乎不可察覺。
基于這兩種機(jī)制,我們可以編寫如下代碼:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => { if (err) { return null; } console.log(content.toString()); });
前面提到的 fs
模塊告訴工作池使用其中一個(gè)線程來讀取文件的內(nèi)容,并在完成后通知事件循環(huán)。然后事件循環(huán)獲取提供的回調(diào)函數(shù),并用文件的內(nèi)容執(zhí)行它。
以上是非阻塞代碼的示例,我們不必同步等待某事的發(fā)生。只需告訴工作池去讀取文件,并用結(jié)果去調(diào)用提供的函數(shù)即可。由于工作池有自己的線程,因此事件循環(huán)可以在讀取文件時(shí)繼續(xù)正常執(zhí)行。
在不需要同步執(zhí)行某些復(fù)雜操作時(shí),這一切都相安無事:任何運(yùn)行時(shí)間太長(zhǎng)的函數(shù)都會(huì)阻塞線程。如果應(yīng)用程序中有大量這類功能,就可能會(huì)明顯降低服務(wù)器的吞吐量,甚至完全凍結(jié)它。在這種情況下,無法繼續(xù)將工作委派給工作池。
在需要對(duì)數(shù)據(jù)進(jìn)行復(fù)雜的計(jì)算時(shí)(如AI、機(jī)器學(xué)習(xí)或大數(shù)據(jù))無法真正有效地使用 Node.js,因?yàn)椴僮髯枞酥鳎ㄇ椅ㄒ唬┚€程,使服務(wù)器無響應(yīng)。在 Node.js v10.5.0 發(fā)布之前就是這種情況,在這一版本增加了對(duì)多線程的支持。
簡(jiǎn)介:worker_threadsworker_threads
模塊允許我們創(chuàng)建功能齊全的多線程 Node.js 程序。
thread worker 是在單獨(dú)的線程中生成的一段代碼(通常從文件中取出)。
注意,術(shù)語 thread worker,worker和 thread經(jīng)?;Q使用,他們都指的是同一件事。
要想使用 thread worker,必須導(dǎo)入 worker_threads
模塊。讓我們先寫一個(gè)函數(shù)來幫助我們生成這些thread worker,然后再討論它們的屬性。
type WorkerCallback = (err: any, result?: any) => any; export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) { const worker = new Worker(path, { workerData }); worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) => { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker; }
要?jiǎng)?chuàng)建一個(gè) worker,首先必須創(chuàng)建一個(gè) Worker
類的實(shí)例。它的第一個(gè)參數(shù)提供了包含 worker 的代碼的文件的路徑;第二個(gè)參數(shù)提供了一個(gè)名為 workerData
的包含一個(gè)屬性的對(duì)象。這是我們希望線程在開始運(yùn)行時(shí)可以訪問的數(shù)據(jù)。
請(qǐng)注意:不管你是用的是 JavaScript, 還是最終要轉(zhuǎn)換為 JavaScript 的語言(例如,TypeScript),路徑應(yīng)該始終引用帶有 .js
或 .mjs
擴(kuò)展名的文件。
我還想指出為什么使用回調(diào)方法,而不是返回在觸發(fā) message
事件時(shí)將解決的 promise。這是因?yàn)?worker 可以發(fā)送許多 message
事件,而不是一個(gè)。
正如你在上面的例子中所看到的,線程間的通信是基于事件的,這意味著我們?cè)O(shè)置了 worker 在發(fā)送給定事件后調(diào)用的偵聽器。
以下是最常見的事件:
worker.on('error', (error) => {});
只要 worker 中有未捕獲的異常,就會(huì)發(fā)出 error
事件。然后終止 worker,錯(cuò)誤可以作為提供的回調(diào)中的第一個(gè)參數(shù)。
worker.on('exit', (exitCode) => {});
在 worker 退出時(shí)會(huì)發(fā)出 exit
事件。如果在worker中調(diào)用了 process.exit()
,那么 exitCode
將被提供給回調(diào)。如果 worker 以 worker.terminate()
終止,則代碼為1。
worker.on('online', () => {});
只要 worker 停止解析 JavaScript 代碼并開始執(zhí)行,就會(huì)發(fā)出 online
事件。它不常用,但在特定情況下可以提供信息。
worker.on('message', (data) => {});
只要 worker 將數(shù)據(jù)發(fā)送到父線程,就會(huì)發(fā)出 message
事件。
現(xiàn)在讓我們來看看如何在線程之間共享數(shù)據(jù)。
在線程之間交換數(shù)據(jù)要將數(shù)據(jù)發(fā)送到另一個(gè)線程,可以用 port.postMessage()
方法。它的原型如下:
port.postMessage(data[, transferList])
port 對(duì)象可以是 parentPort
,也可以是 MessagePort
的實(shí)例 —— 稍后會(huì)詳細(xì)講解。
第一個(gè)參數(shù) —— 這里被稱為 data
—— 是一個(gè)被復(fù)制到另一個(gè)線程的對(duì)象。它可以是復(fù)制算法所支持的任何內(nèi)容。
數(shù)據(jù)由結(jié)構(gòu)化克隆算法進(jìn)行復(fù)制。引用自 Mozilla:
它通過遞歸輸入對(duì)象來進(jìn)行克隆,同時(shí)保持之前訪問過的引用的映射,以避免無限遍歷循環(huán)。
該算法不復(fù)制函數(shù)、錯(cuò)誤、屬性描述符或原型鏈。還需要注意的是,以這種方式復(fù)制對(duì)象與使用 JSON 不同,因?yàn)樗梢园h(huán)引用和類型化數(shù)組,而 JSON 不能。
由于能夠復(fù)制類型化數(shù)組,該算法可以在線程之間共享內(nèi)存。
在線程之間共享內(nèi)存人們可能會(huì)說像 cluster
或 child_process
這樣的模塊在很久以前就開始使用線程了。這話對(duì),也不對(duì)。
cluster
模塊可以創(chuàng)建多個(gè)節(jié)點(diǎn)實(shí)例,其中一個(gè)主進(jìn)程在它們之間對(duì)請(qǐng)求進(jìn)行路由。集群能夠有效地增加服務(wù)器的吞吐量;但是我們不能用 cluster
模塊生成一個(gè)單獨(dú)的線程。
人們傾向于用 PM2 這樣的工具來集中管理他們的程序,而不是在自己的代碼中手動(dòng)執(zhí)行,如果你有興趣,可以研究一下如何使用 cluster
模塊。
child_process
模塊可以生成任何可執(zhí)行文件,無論它是否是用 JavaScript 寫的。它和 worker_threads
非常相似,但缺少后者的幾個(gè)重要功能。
具體來說 thread workers 更輕量,并且與其父線程共享相同的進(jìn)程 ID。它們還可以與父線程共享內(nèi)存,這樣可以避免對(duì)大的數(shù)據(jù)負(fù)載進(jìn)行序列化,從而更有效地來回傳遞數(shù)據(jù)。
現(xiàn)在讓我們看一下如何在線程之間共享內(nèi)存。為了共享內(nèi)存,必須將 ArrayBuffer
或 SharedArrayBuffer
的實(shí)例作為數(shù)據(jù)參數(shù)發(fā)送到另一個(gè)線程。
這是一個(gè)與其父線程共享內(nèi)存的 worker:
import { parentPort } from 'worker_threads'; parentPort.on('message', () => { const numberOfElements = 100; const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements); const arr = new Int32Array(sharedBuffer); for (let i = 0; i < numberOfElements; i += 1) { arr[i] = Math.round(Math.random() * 30); } parentPort.postMessage({ arr }); });
首先,我們創(chuàng)建一個(gè) SharedArrayBuffer
,其內(nèi)存需要包含100個(gè)32位整數(shù)。接下來創(chuàng)建一個(gè) Int32Array
實(shí)例,它將用緩沖區(qū)來保存其結(jié)構(gòu),然后用一些隨機(jī)數(shù)填充數(shù)組并將其發(fā)送到父線程。
在父線程中:
import path from 'path'; import { runWorker } from '../run-worker'; const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => { if (err) { return null; } arr[0] = 5; }); worker.postMessage({});
把 arr [0]
的值改為5
,實(shí)際上會(huì)在兩個(gè)線程中修改它。
當(dāng)然,通過共享內(nèi)存,我們冒險(xiǎn)在一個(gè)線程中修改一個(gè)值,同時(shí)也在另一個(gè)線程中進(jìn)行了修改。但是我們?cè)谶@個(gè)過程中也得到了一個(gè)好處:該值不需要進(jìn)行序列化就可以另一個(gè)線程中使用,這極大地提高了效率。只需記住管理數(shù)據(jù)正確的引用,以便在完成數(shù)據(jù)處理后對(duì)其進(jìn)行垃圾回收。
共享一個(gè)整數(shù)數(shù)組固然很好,但我們真正感興趣的是共享對(duì)象 —— 這是存儲(chǔ)信息的默認(rèn)方式。不幸的是,沒有 SharedObjectBuffer
或類似的東西,但我們可以自己創(chuàng)建一個(gè)類似的結(jié)構(gòu)。
transferList
中只能包含 ArrayBuffer
和 MessagePort
。一旦它們被傳送到另一個(gè)線程,就不能再次被傳送了;因?yàn)閮?nèi)存里的內(nèi)容已經(jīng)被移動(dòng)到了另一個(gè)線程。
目前,還不能通過 transferList
(可以使用 child_process
模塊)來傳輸網(wǎng)絡(luò)套接字。
線程之間的通信是通過 port 進(jìn)行的,port 是 MessagePort
類的實(shí)例,并啟用基于事件的通信。
使用 port 在線程之間進(jìn)行通信的方法有兩種。第一個(gè)是默認(rèn)值,這個(gè)方法比較容易。在 worker 的代碼中,我們從worker_threads
模塊導(dǎo)入一個(gè)名為 parentPort
的對(duì)象,并使用對(duì)象的 .postMessage()
方法將消息發(fā)送到父線程。
這是一個(gè)例子:
import { parentPort } from 'worker_threads'; const data = { // ... }; parentPort.postMessage(data);
parentPort
是 Node.js 在幕后創(chuàng)建的 MessagePort
實(shí)例,用于與父線程進(jìn)行通信。這樣就可以用 parentPort
和 worker
對(duì)象在線程之間進(jìn)行通信。
線程間的第二種通信方式是創(chuàng)建一個(gè) MessageChannel
并將其發(fā)送給 worker。以下代碼是如何創(chuàng)建一個(gè)新的 MessagePort
并與我們的 worker 共享它:
import path from 'path'; import { Worker, MessageChannel } from 'worker_threads'; const worker = new Worker(path.join(__dirname, 'worker.js')); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => { console.log('message from worker:', message); }); worker.postMessage({ port: port2 }, [port2]);
在創(chuàng)建 port1
和 port2
之后,我們?cè)?port1
上設(shè)置事件監(jiān)聽器并將 port2
發(fā)送給 worker。我們必須將它包含在 transferList
中,以便將其傳輸給 worker 。
在 worker 內(nèi)部:
import { parentPort, MessagePort } from 'worker_threads'; parentPort.on('message', (data) => { const { port }: { port: MessagePort } = data; port.postMessage('heres your message!'); });
這樣,我們就能使用父線程發(fā)送的 port 了。
使用 parentPort
不一定是錯(cuò)誤的方法,但最好用 MessageChannel
的實(shí)例創(chuàng)建一個(gè)新的 MessagePort
,然后與生成的 worker 共享它。
請(qǐng)注意,在后面的例子中,為了簡(jiǎn)便起見,我用了 parentPort
。
可以通過兩種方式使用 worker。第一種是生成一個(gè) worker,然后執(zhí)行它的代碼,并將結(jié)果發(fā)送到父線程。通過這種方法,每當(dāng)出現(xiàn)新任務(wù)時(shí),都必須重新創(chuàng)建一個(gè)工作者。
第二種方法是生成一個(gè) worker 并為 message
事件設(shè)置監(jiān)聽器。每次觸發(fā) message
時(shí),它都會(huì)完成工作并將結(jié)果發(fā)送回父線程,這會(huì)使 worker 保持活動(dòng)狀態(tài)以供以后使用。
Node.js 文檔推薦第二種方法,因?yàn)樵趧?chuàng)建 thread worker 時(shí)需要?jiǎng)?chuàng)建虛擬機(jī)并解析和執(zhí)行代碼,這會(huì)產(chǎn)生比較大的開銷。所以這種方法比不斷產(chǎn)生新 worker 的效率更高。
這種方法被稱為工作池,因?yàn)槲覀儎?chuàng)建了一個(gè)工作池并讓它們等待,在需要時(shí)調(diào)度 message
事件來完成工作。
以下是一個(gè)產(chǎn)生、執(zhí)行然后關(guān)閉 worker 例子:
import { parentPort } from 'worker_threads'; const collection = []; for (let i = 0; i < 10; i += 1) { collection[i] = i; } parentPort.postMessage(collection);
將 collection
發(fā)送到父線程后,它就會(huì)退出。
下面是一個(gè) worker 的例子,它可以在給定任務(wù)之前等待很長(zhǎng)一段時(shí)間:
import { parentPort } from 'worker_threads'; parentPort.on('message', (data: any) => { const result = doSomething(data); parentPort.postMessage(result); });worker_threads 模塊中可用的重要屬性
worker_threads
模塊中有一些可用的屬性:
當(dāng)不在工作線程內(nèi)操作時(shí),該屬性為 true
。如果你覺得有必要,可以在 worker 文件的開頭包含一個(gè)簡(jiǎn)單的 if
語句,以確保它只作為 worker 運(yùn)行。
import { isMainThread } from 'worker_threads'; if (isMainThread) { throw new Error('Its not a worker'); }workerData
產(chǎn)生線程時(shí)包含在 worker 的構(gòu)造函數(shù)中的數(shù)據(jù)。
const worker = new Worker(path, { workerData });
在工作線程中:
import { workerData } from 'worker_threads'; console.log(workerData.property);parentPort
前面提到的 MessagePort
實(shí)例,用于與父線程通信。
分配給 worker 的唯一標(biāo)識(shí)符。
現(xiàn)在我們知道了技術(shù)細(xì)節(jié),接下來實(shí)現(xiàn)一些東西并在實(shí)踐中檢驗(yàn)學(xué)到的知識(shí)。
實(shí)現(xiàn)setTimeout
setTimeout
是一個(gè)無限循環(huán),顧名思義,用來檢測(cè)程序運(yùn)行時(shí)間是否超時(shí)。它在循環(huán)中檢查起始時(shí)間與給定毫秒數(shù)之和是否小于實(shí)際日期。
import { parentPort, workerData } from 'worker_threads'; const time = Date.now(); while (true) { if (time + workerData.time <= Date.now()) { parentPort.postMessage({}); break; } }
這個(gè)特定的實(shí)現(xiàn)產(chǎn)生一個(gè)線程,然后執(zhí)行它的代碼,最后在完成后退出。
接下來實(shí)現(xiàn)使用這個(gè) worker 的代碼。首先創(chuàng)建一個(gè)狀態(tài),用它來跟蹤生成的 worker:
const timeoutState: { [key: string]: Worker } = {};
然后時(shí)負(fù)責(zé)創(chuàng)建 worker 并將其保存到狀態(tài)的函數(shù):
export function setTimeout(callback: (err: any) => any, time: number) { const id = uuidv4(); const worker = runWorker( path.join(__dirname, './timeout-worker.js'), (err) => { if (!timeoutState[id]) { return null; } timeoutState[id] = null; if (err) { return callback(err); } callback(null); }, { time, }, ); timeoutState[id] = worker; return id; }
首先,我們使用 UUID 包為 worker 創(chuàng)建一個(gè)唯一的標(biāo)識(shí)符,然后用先前定義的函數(shù) runWorker
來獲取 worker。我們還向 worker 傳入一個(gè)回調(diào)函數(shù),一旦 worker 發(fā)送了數(shù)據(jù)就會(huì)被觸發(fā)。最后,把 worker 保存在狀態(tài)中并返回 id
。
在回調(diào)函數(shù)中,我們必須檢查該 worker 是否仍然存在于該狀態(tài)中,因?yàn)橛锌赡軙?huì) cancelTimeout()
,這將會(huì)把它刪除。如果確實(shí)存在,就把它從狀態(tài)中刪除,并調(diào)用傳給 setTimeout
函數(shù)的 callback
。
cancelTimeout
函數(shù)使用 .terminate()
方法強(qiáng)制 worker 退出,并從該狀態(tài)中刪除該這個(gè)worker:
export function cancelTimeout(id: string) { if (timeoutState[id]) { timeoutState[id].terminate(); timeoutState[id] = undefined; return true; } return false; }
如果你有興趣,我也實(shí)現(xiàn)了 setInterval
,代碼在這里,但因?yàn)樗鼘?duì)線程什么都沒做(我們重用setTimeout
的代碼),所以我決定不在這里進(jìn)行解釋。
我已經(jīng)創(chuàng)建了一個(gè)短小的測(cè)試代碼,目的是檢查這種方法與原生方法的不同之處。你可以在這里找到代碼。這些是結(jié)果:
native setTimeout { ms: 7004, averageCPUCost: 0.1416 } worker setTimeout { ms: 7046, averageCPUCost: 0.308 }
我們可以看到 setTimeout
有一點(diǎn)延遲 - 大約40ms - 這時(shí) worker 被創(chuàng)建時(shí)的消耗。平均 CPU 成本也略高,但沒什么難以忍受的(CPU 成本是整個(gè)過程持續(xù)時(shí)間內(nèi) CPU 使用率的平均值)。
如果我們可以重用 worker,就能夠降低延遲和 CPU 使用率,這就是要實(shí)現(xiàn)工作池的原因。
實(shí)現(xiàn)工作池如上所述,工作池是給定數(shù)量的被事先創(chuàng)建的 worker,他們保持空閑并監(jiān)聽 message
事件。一旦 message
事件被觸發(fā),他們就會(huì)開始工作并發(fā)回結(jié)果。
為了更好地描述我們將要做的事情,下面我們來創(chuàng)建一個(gè)由八個(gè) thread worker 組成的工作池:
const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
如果你熟悉限制并發(fā)操作,那么你在這里看到的邏輯幾乎相同,只是一個(gè)不同的用例。
如上面的代碼片段所示,我們把指向 worker 的路徑和要生成的 worker 數(shù)量傳給了 WorkerPool
的構(gòu)造函數(shù)。
export class WorkerPool{ private queue: QueueItem [] = []; private workersById: { [key: number]: Worker } = {}; private activeWorkersById: { [key: number]: boolean } = {}; public constructor(public workerPath: string, public numberOfThreads: number) { this.init(); } }
這里還有其他一些屬性,如 workersById
和 activeWorkersById
,我們可以分別保存現(xiàn)有的 worker 和當(dāng)前正在運(yùn)行的 worker 的 ID。還有 queue
,我們可以使用以下結(jié)構(gòu)來保存對(duì)象:
type QueueCallback= (err: any, result?: N) => void; interface QueueItem { callback: QueueCallback ; getData: () => T; }
callback
只是默認(rèn)的節(jié)點(diǎn)回調(diào),第一個(gè)參數(shù)是錯(cuò)誤,第二個(gè)參數(shù)是可能的結(jié)果。 getData
是傳遞給工作池 .run()
方法的函數(shù)(如下所述),一旦項(xiàng)目開始處理就會(huì)被調(diào)用。 getData
函數(shù)返回的數(shù)據(jù)將傳給工作線程。
在 .init()
方法中,我們創(chuàng)建了 worker 并將它們保存在以下狀態(tài)中:
private init() { if (this.numberOfThreads < 1) { return null; } for (let i = 0; i < this.numberOfThreads; i += 1) { const worker = new Worker(this.workerPath); this.workersById[i] = worker; this.activeWorkersById[i] = false; } }
為避免無限循環(huán),我們首先要確保線程數(shù) > 1。然后創(chuàng)建有效的 worker 數(shù),并將它們的索引保存在 workersById
狀態(tài)。我們?cè)?activeWorkersById
狀態(tài)中保存了它們當(dāng)前是否正在運(yùn)行的信息,默認(rèn)情況下該狀態(tài)始終為false。
現(xiàn)在我們必須實(shí)現(xiàn)前面提到的 .run()
方法來設(shè)置一個(gè) worker 可用的任務(wù)。
public run(getData: () => T) { return new Promise((resolve, reject) => { const availableWorkerId = this.getInactiveWorkerId(); const queueItem: QueueItem = { getData, callback: (error, result) => { if (error) { return reject(error); } return resolve(result); }, }; if (availableWorkerId === -1) { this.queue.push(queueItem); return null; } this.runWorker(availableWorkerId, queueItem); }); }
在 promise 函數(shù)里,我們首先通過調(diào)用 .getInactiveWorkerId()
來檢查是否存在空閑的 worker 可以來處理數(shù)據(jù):
private getInactiveWorkerId(): number { for (let i = 0; i < this.numberOfThreads; i += 1) { if (!this.activeWorkersById[i]) { return i; } } return -1; }
接下來,我們創(chuàng)建一個(gè) queueItem
,在其中保存?zhèn)鬟f給 .run()
方法的 getData
函數(shù)以及回調(diào)。在回調(diào)中,我們要么 resolve
或者 reject
promise,這取決于 worker 是否將錯(cuò)誤傳遞給回調(diào)。
如果 availableWorkerId
的值是 -1,意味著當(dāng)前沒有可用的 worker,我們將 queueItem
添加到 queue
。如果有可用的 worker,則調(diào)用 .runWorker()
方法來執(zhí)行 worker。
在 .runWorker()
方法中,我們必須把當(dāng)前 worker 的 activeWorkersById
設(shè)置為使用狀態(tài);為 message
和 error
事件設(shè)置事件監(jiān)聽器(并在之后清理它們);最后將數(shù)據(jù)發(fā)送給 worker。
private async runWorker(workerId: number, queueItem: QueueItem) { const worker = this.workersById[workerId]; this.activeWorkersById[workerId] = true; const messageCallback = (result: N) => { queueItem.callback(null, result); cleanUp(); }; const errorCallback = (error: any) => { queueItem.callback(error); cleanUp(); }; const cleanUp = () => { worker.removeAllListeners('message'); worker.removeAllListeners('error'); this.activeWorkersById[workerId] = false; if (!this.queue.length) { return null; } this.runWorker(workerId, this.queue.shift()); }; worker.once('message', messageCallback); worker.once('error', errorCallback); worker.postMessage(await queueItem.getData()); }
首先,通過使用傳遞的 workerId
,我們從 workersById
中獲得 worker 引用。然后,在 activeWorkersById
中,將 [workerId]
屬性設(shè)置為true,這樣我們就能知道在 worker 在忙,不要運(yùn)行其他任務(wù)。
接下來,分別創(chuàng)建 messageCallback
和 errorCallback
用來在消息和錯(cuò)誤事件上調(diào)用,然后注冊(cè)所述函數(shù)來監(jiān)聽事件并將數(shù)據(jù)發(fā)送給 worker。
在回調(diào)中,我們調(diào)用 queueItem
的回調(diào),然后調(diào)用 cleanUp
函數(shù)。在 cleanUp
函數(shù)中,要?jiǎng)h除事件偵聽器,因?yàn)槲覀儠?huì)多次重用同一個(gè) worker。如果沒有刪除監(jiān)聽器的話就會(huì)發(fā)生內(nèi)存泄漏,內(nèi)存會(huì)被慢慢耗盡。
在 activeWorkersById
狀態(tài)中,我們將 [workerId]
屬性設(shè)置為 false
,并檢查隊(duì)列是否為空。如果不是,就從 queue
中刪除第一個(gè)項(xiàng)目,并用另一個(gè) queueItem
再次調(diào)用 worker。
接著創(chuàng)建一個(gè)在收到 message
事件中的數(shù)據(jù)后進(jìn)行一些計(jì)算的 worker:
import { isMainThread, parentPort } from 'worker_threads'; if (isMainThread) { throw new Error('Its not a worker'); } const doCalcs = (data: any) => { const collection = []; for (let i = 0; i < 1000000; i += 1) { collection[i] = Math.round(Math.random() * 100000); } return collection.sort((a, b) => { if (a > b) { return 1; } return -1; }); }; parentPort.on('message', (data: any) => { const result = doCalcs(data); parentPort.postMessage(result); });
worker 創(chuàng)建了一個(gè)包含 100 萬個(gè)隨機(jī)數(shù)的數(shù)組,然后對(duì)它們進(jìn)行排序。只要能夠多花費(fèi)一些時(shí)間才能完成,做些什么事情并不重要。
以下是工作池簡(jiǎn)單用法的示例:
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8); const items = [...new Array(100)].fill(null); Promise.all( items.map(async (_, i) => { await pool.run(() => ({ i })); console.log('finished', i); }), ).then(() => { console.log('finished all'); });
首先創(chuàng)建一個(gè)由八個(gè) worker 組成的工作池。然后創(chuàng)建一個(gè)包含 100 個(gè)元素的數(shù)組,對(duì)于每個(gè)元素,我們?cè)诠ぷ鞒刂羞\(yùn)行一個(gè)任務(wù)。開始運(yùn)行后將立即執(zhí)行八個(gè)任務(wù),其余任務(wù)被放入隊(duì)列并逐個(gè)執(zhí)行。通過使用工作池,我們不必每次都創(chuàng)建一個(gè) worker,從而大大提高了效率。
結(jié)論worker_threads
提供了一種為程序添加多線程支持的簡(jiǎn)單的方法。通過將繁重的 CPU 計(jì)算委托給其他線程,可以顯著提高服務(wù)器的吞吐量。通過官方線程支持,我們可以期待更多來自AI、機(jī)器學(xué)習(xí)和大數(shù)據(jù)等領(lǐng)域的開發(fā)人員和工程師使用 Node.js.
英文原文地址:https://blog.logrocket.com/a-complete-guide-to-threads-in-node-js-4fa3898fe74f
相關(guān)推薦:nodejs 教程
本文名稱:深入了解Node.js多線程(指南)
鏈接分享:http://weahome.cn/article/cpedhh.html