真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Nodejs中怎么實現(xiàn)多線程

Nodejs中怎么實現(xiàn)多線程,針對這個問題,這篇文章詳細介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名注冊、虛擬空間、營銷軟件、網(wǎng)站建設(shè)、信陽網(wǎng)站維護、網(wǎng)站推廣。

1 背景

需求中有以下場景

1 對稱解密、非對稱解密

2 壓縮、解壓

3 大量文件的增刪改查

4 處理大量的字符串,解析協(xié)議

上面的場景都是非常耗時間的,解密、壓縮、文件操作,nodejs使用了內(nèi)置的線程池支持了異步。但是處理字符串和解析協(xié)議是單純消耗cpu的操作。而且nodejs對解密的支持似乎不是很好。我使用了純js的解密庫,所以無法在nodejs主線程里處理。尤其rsa解密,非常耗時間。

所以這時候就要探索解決方案,nodejs提供了多線程的能力。所以自然就選擇了這種方案。但是這只是初步的想法和方案。因為nodejs雖然提供了多線程能力,但是沒有提供一個應(yīng)用層的線程池。所以如果我們單純地使用多線程,一個請求一個線程,這顯然不現(xiàn)實。我們不得不實現(xiàn)自己的線程池。本文分享的內(nèi)容是這個線程池的實現(xiàn)。

線程池的設(shè)計涉及到很多方面,對于純cpu型的任務(wù),線程數(shù)和cpu核數(shù)要相等才能達到最優(yōu)的性能,否則過多的線程引起的上下文切換反而會導(dǎo)致性能下降。而對于io型的任務(wù),更多的線程理論上是會更好,因為可以更早地給硬盤發(fā)出命令,磁盤會優(yōu)化并持續(xù)地處理請求,想象一下,如果發(fā)出一個命令,硬盤處理一個,然后再發(fā)下一個命令,再處理一個,這樣顯然效率很低。當(dāng)然,線程數(shù)也不是越多越好。線程過多會引起系統(tǒng)負載過高,過多上下文切換也會帶來性能的下降。下面看一下線程池的實現(xiàn)方案。

2 設(shè)計思路

首先根據(jù)配置創(chuàng)建多個線程(分為預(yù)創(chuàng)建和懶創(chuàng)建),然后對用戶暴露提交任務(wù)的接口,由調(diào)度中心負責(zé)接收任務(wù),然后根據(jù)策略選擇處理該任務(wù)的線程。子線程一直在輪詢是否有任務(wù)需要處理。處理完通知調(diào)度中心。

Nodejs中怎么實現(xiàn)多線程

下面看一下具體的實現(xiàn)

2.1 和用戶通信的數(shù)據(jù)結(jié)構(gòu)

class UserWork extends EventEmitter {     constructor({ workId, threadId }) {         super();         this.workId = workId;         this.threadId = threadId;         workPool[workId] = this;     } }

用戶提交任務(wù)的時候,調(diào)度中心返回一個UserWork對象。用戶可以使用該對象和調(diào)度中心通信。

2.2 調(diào)度中心的實現(xiàn)

調(diào)度中心的實現(xiàn)大致分為以下幾個邏輯。

2.2.1 初始化

constructor(options = {}) {        this.options = options;        // 線程池總?cè)蝿?wù)數(shù)        this.totalWork = 0;        // 子線程隊列        this.workerQueue = [];        // 核心線程數(shù)        this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;        // 線程池最大線程數(shù),如果不支持動態(tài)擴容則最大線程數(shù)等于核心線程數(shù)        this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;        // 工作線程處理任務(wù)的模式        this.sync = options.sync !== false;        // 超過任務(wù)隊列長度時的處理策略        this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;        // 是否預(yù)創(chuàng)建子線程        this.preCreate = options.preCreate === true;        this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;        this.pollIntervalTime = ~~options.pollIntervalTime || config.POLL_INTERVAL_TIME;        this.maxWork = ~~options.maxWork || config.MAX_WORK;        // 是否預(yù)創(chuàng)建線程池        this.preCreate && this.preCreateThreads();    }

從初始化代碼中我們看到線程池大致支持的能力。

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 核心線程數(shù)

  3. 最大線程數(shù)

  4. 過載時的處理策略,和過載的閾值

  5. 子線程空閑退出的時間和輪詢?nèi)蝿?wù)的時間

  6. 是否預(yù)創(chuàng)建線程池

  7. 是否支持動態(tài)擴容

核心線程數(shù)是任務(wù)數(shù)沒有達到閾值時的工作線程集合。是處理任務(wù)的主力軍。任務(wù)數(shù)達到閾值后,如果支持動態(tài)擴容(可配置)則會創(chuàng)建新的線程去處理更多的任務(wù)。一旦負載變低,線程空閑時間達到閾值則會自動退出。如果擴容的線程數(shù)達到閾值,還有新的任務(wù)到來,則根據(jù)丟棄策略進行相關(guān)的處理。

2.2.2 創(chuàng)建線程

newThread() {         let { sync } = this;         const worker = new Worker(workerPath, {workerData: { sync, maxIdleTime: this.maxIdleTime, pollIntervalTime: this.pollIntervalTime, }});         const node = {             worker,             // 該線程處理的任務(wù)數(shù)量             queueLength: 0,         };         this.workerQueue.push(node);         const threadId = worker.threadId;         worker.on('exit', (status) => {             // 異常退出則補充線程,正常退出則不補充             if (status) {                 this.newThread();             }             this.totalWork -= node.queueLength;             this.workerQueue = this.workerQueue.filter((worker) => {                 return worker.threadId !== threadId;             });         });         // 和子線程通信         worker.on('message', (result) => {             const {                 work,                 event,             } = result;             const { data, error, workId } = work;             // 通過workId拿到對應(yīng)的userWorker             const userWorker = workPool[workId];             delete workPool[workId];             // 任務(wù)數(shù)減一             node.queueLength--;             this.totalWork--;             switch(event) {                 case 'done':                     // 通知用戶,任務(wù)完成                     userWorker.emit('done', data);                     break;                 case 'error':                     // 通知用戶,任務(wù)出錯                     if (EventEmitter.listenerCount(userWorker, 'error')) {                         userWorker.emit('error', error);                     }                     break;                 default: break;             }         });         worker.on('error', (...rest) => {             console.log(...rest)         });         return node;     }

創(chuàng)建線程主要是調(diào)用nodejs提供的模塊進行創(chuàng)建。然后監(jiān)聽子線程的退出和message、error事件。如果是異常退出則補充線程。調(diào)度中心維護了一個子線程的隊列。記錄了每個子線程(worker)的實例和任務(wù)數(shù)。

2.2.3 選擇執(zhí)行任務(wù)的線程

selectThead() {         let min = Number.MAX_SAFE_INTEGER;         let i = 0;         let index = 0;         // 找出任務(wù)數(shù)最少的線程,把任務(wù)交給他         for (; i < this.workerQueue.length; i++) {             const { queueLength } = this.workerQueue[i];             if (queueLength < min) {                 index = i;                 min = queueLength;             }         }         return this.workerQueue[index];     }

選擇策略目前是選擇任務(wù)數(shù)最少的,本來還支持隨機和輪詢方式,但是貌似沒有什么場景和必要,就去掉了。

2.2.4 暴露提交任務(wù)的接口

submit(filename, options = {}) {         return new Promise(async (resolve, reject) => {             let thread;             // 沒有線程則創(chuàng)建一個             if (this.workerQueue.length) {                 thread = this.selectThead();                 // 任務(wù)隊列非空                 if (thread.queueLength !== 0) {                     // 子線程個數(shù)還沒有達到核心線程數(shù),則新建線程處理                     if (this.workerQueue.length < this.coreThreads) {                         thread = this.newThread();                     } else if (this.totalWork + 1 > this.maxWork){                         // 總?cè)蝿?wù)數(shù)已達到閾值,還沒有達到線程數(shù)閾值,則創(chuàng)建                         if(this.workerQueue.length < this.maxThreads) {                             thread = this.newThread();                         } else {                             // 處理溢出的任務(wù)                             switch(this.discardPolicy) {                                 case DISCARD_POLICY.ABORT:                                      return reject(new Error('queue overflow'));                                 case DISCARD_POLICY.CALLER_RUNS:                                      const userWork =  new UserWork({workId: this.generateWorkId(), threadId});                                      try {                                         const asyncFunction = require(filename);                                         if (!isAsyncFunction(asyncFunction)) {                                             return reject(new Error('need export a async function'));                                         }                                         const result = await asyncFunction(options);                                         resolve(userWork);                                         setImmediate(() => {                                             userWork.emit('done', result);                                         });                                     } catch (error) {                                         resolve(userWork);                                         setImmediate(() => {                                             userWork.emit('error', error);                                         });                                     }                                     return;                                 case DISCARD_POLICY.DISCARD_OLDEST:                                      thread.worker.postMessage({cmd: 'delete'});                                     break;                                 case DISCARD_POLICY.DISCARD:                                     return reject(new Error('discard'));                                 case DISCARD_POLICY.NOT_DISCARD:                                     break;                                 default:                                      break;                             }                         }                     }                 }             } else {                 thread = this.newThread();             }             // 生成一個任務(wù)id             const workId = this.generateWorkId();             // 新建一個work,交給對應(yīng)的子線程             const work = new Work({ workId, filename, options });             const userWork = new UserWork({workId, threadId: thread.worker.threadId});             thread.queueLength++;             this.totalWork++;             thread.worker.postMessage({cmd: 'add', work});             resolve(userWork);         })     }

提交任務(wù)的函數(shù)比較復(fù)雜,提交一個任務(wù)的時候,調(diào)度中心會根據(jù)當(dāng)前的負載情況和線程數(shù),決定對一個任務(wù)做如何處理。如果可以處理,則把任務(wù)交給選中的子線程。最后給用戶返回一個UserWorker對象。

2.3調(diào)度中心和子線程的通信數(shù)據(jù)結(jié)構(gòu)

class Work {     constructor({workId, filename, options}) {         // 任務(wù)id         this.workId = workId;         // 文件名         this.filename = filename;         // 處理結(jié)果,由用戶代碼返回         this.data = null;         // 執(zhí)行出錯         this.error = null;         // 執(zhí)行時入?yún)?nbsp;        this.options = options;     } }

一個任務(wù)對應(yīng)一個id,目前只支持文件的執(zhí)行模式,后續(xù)會支持字符串。

2.4 子線程的實現(xiàn)

子線程的實現(xiàn)主要分為幾個部分

2.4.1 監(jiān)聽調(diào)度中心分發(fā)的命令

parentPort.on('message', ({cmd, work}) => {     switch(cmd) {         case 'delete':             return queue.shift();         case 'add':             return queue.push(work);     } });

2.4.2 輪詢是否有任務(wù)需要處理

function poll() {     const now = Date.now();     if (now - lastWorkTime > maxIdleTime && !queue.length) {         process.exit(0);     }     setTimeout(async () => {         // 處理任務(wù)         poll();     }     }, pollIntervalTime); } // 輪詢判斷是否有任務(wù) poll();

不斷輪詢是否有任務(wù)需要處理,如果沒有并且空閑時間達到閾值則退出。

2.4.3 處理任務(wù)

處理任務(wù)模式分為同步和異步

while(queue.length) {           const work = queue.shift();           try {               const { filename, options } = work;               const asyncFunction = require(filename);               if (!isAsyncFunction(asyncFunction)) {                   return;               }               lastWorkTime = now;                const result = await asyncFunction(options);               work.data = result;               parentPort.postMessage({event: 'done', work});           } catch (error) {               work.error = error.toString();               parentPort.postMessage({event: 'error', work});           }       }

用戶需要導(dǎo)出一個async函數(shù),使用這種方案主要是為了執(zhí)行時可以給用戶傳入?yún)?shù)。并且實現(xiàn)同步。處理完后通知調(diào)度中心。下面是異步處理方式,子線程不需要同步等待用戶的代碼結(jié)果。

const arr = [];        while(queue.length) {            const work = queue.shift();            try {                const { filename } = work;                const asyncFunction = require(filename);                if (!isAsyncFunction(asyncFunction)) {                    return;                }                arr.push({asyncFunction, work});            } catch (error) {                work.error = error.toString();                parentPort.postMessage({event: 'error', work});            }        }        arr.map(async ({asyncFunction, work}) => {            try {                const { options } = work;                lastWorkTime = now;                const result = await asyncFunction(options);                work.data = result;                parentPort.postMessage({event: 'done', work});            } catch (e) {                work.error = error.toString();                parentPort.postMessage({event: 'done', work});            }        })

最后還有一些配置和定制化的功能。

module.exports = {     // 最大的線程數(shù)     MAX_THREADS: 50,     // 線程池最大任務(wù)數(shù)     MAX_WORK: Infinity,     // 默認核心線程數(shù)     CORE_THREADS: 10,     // 最大空閑時間     MAX_IDLE_TIME: 10 * 60 * 1000,     // 子線程輪詢時間     POLL_INTERVAL_TIME: 10, }; // 丟棄策略 const DISCARD_POLICY = {     // 報錯     ABORT: 1,     // 在主線程里執(zhí)行     CALLER_RUNS: 2,     // 丟棄最老的的任務(wù)     DISCARD_OLDEST: 3,     // 丟棄     DISCARD: 4,     // 不丟棄     NOT_DISCARD: 5, };

支持多個類型的線程池

class AsyncThreadPool extends ThreadPool {     constructor(options) {         super({...options, sync: false});     } }  class SyncThreadPool extends ThreadPool {     constructor(options) {         super({...options, sync: true});     } } // cpu型任務(wù)的線程池,線程數(shù)和cpu核數(shù)一樣,不支持動態(tài)擴容 class CPUThreadPool extends ThreadPool {     constructor(options) {         super({...options, coreThreads: cores, expansion: false});     } } // 線程池只有一個線程,類似消息隊列 class SingleThreadPool extends ThreadPool {     constructor(options) {         super({...options, coreThreads: 1, expansion: false });     } } // 線程數(shù)固定的線程池,不支持動態(tài)擴容線程 class FixedThreadPool extends ThreadPool {     constructor(options) {         super({ ...options, expansion: false });     } }

這就是線程池的實現(xiàn),有很多細節(jié)還需要思考。下面是一個性能測試的例子。

3 測試

const { MAX } = require('./constants'); module.exports = async function() {     let ret = 0;     let i = 0;     while(i++ < MAX) {         ret++;         Buffer.from(String(Math.random())).toString('base64');     }     return ret; }

服務(wù)器以單線程和多線程的方式執(zhí)行以上代碼,下面是MAX為10000和100000時,使用CPUThreadPool類型線程池的性能對比(具體代碼參考https://github.com/theanarkh/nodejs-threadpool)。

10000

單線程 [ 358.35, 490.93, 705.23, 982.6, 1155.72 ]

多線程 [ 379.3, 230.35, 315.52, 429.4, 496.04 ]

100000

單線程 [ 2485.5, 4454.63, 6894.5, 9173.16, 11011.16 ]

多線程 [ 1791.75, 2787.15, 3275.08, 4093.39, 3674.91 ]

關(guān)于Nodejs中怎么實現(xiàn)多線程問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。


名稱欄目:Nodejs中怎么實現(xiàn)多線程
轉(zhuǎn)載來于:http://weahome.cn/article/jojidi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部