這篇文章將為大家詳細(xì)講解有關(guān)如何用nodejs源碼分析線程,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)是一家專業(yè)提供港閘企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、網(wǎng)站制作、成都h5網(wǎng)站建設(shè)、小程序制作等業(yè)務(wù)。10年已為港閘眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站制作公司優(yōu)惠進行中。
我們先看一下一般的使用例子。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做點耗時的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我們先分析一下這個代碼的意思。因為上面的代碼在主線程和子線程都會被執(zhí)行一遍。所以首先通過isMainThread判斷當(dāng)前是主線程還是子線程。主線程的話,就創(chuàng)建一個子線程,然后監(jiān)聽子線程發(fā)過來的消息。子線程的話,首先執(zhí)行業(yè)務(wù)相關(guān)的代碼,還可以監(jiān)聽主線程傳過來的消息。下面我們開始分析源碼。分析完,會對上面的代碼有更多的理解。
首先我們從worker_threads模塊開始分析。這是一個c++模塊。我們看一下他導(dǎo)出的功能。require("work_threads")的時候就是引用了InitWorker函數(shù)導(dǎo)出的功能。
void InitWorker(Local
翻譯成js大概是
function c++Worker(object) {
// 關(guān)聯(lián)起來,后續(xù)在js層調(diào)用c++層函數(shù)時,取出來,拿到c++層真正的worker對象
object[0] = this;
...
}
function New(object) {
const worker = new c++Worker(object);
}
function Worker() {
New(this);
}
Worker.prototype = {
startThread,StartThread,
StopThread: StopThread,
...
}
module.exports = {
Worker: Worker,
getEnvMessagePort: GetEnvMessagePort,
isMainThread: true | false
...
}
了解work_threads模塊導(dǎo)出的功能后,我們看new Worker的時候的邏輯。根據(jù)上面代碼導(dǎo)出的邏輯,我們知道這時候首先會新建一個c++對象。對應(yīng)上面的Worker函數(shù)中的this。然后執(zhí)行New回調(diào),并傳入tihs。我們看New函數(shù)的邏輯。我們省略一系列的參數(shù)處理,主要代碼如下。
// args.This()就是我們剛才傳進來的this
Worker* worker = new Worker(env, args.This(),
url, per_isolate_opts,
std::move(exec_argv_out));
我們再看Worker類。
Worker::Worker(Environment* env,
Local
新建一個Worker,結(jié)構(gòu)如下
constructor(filename, options = {}) {
super();
// 忽略一系列參數(shù)處理,new Worker就是上面提到的c++層的
this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
// messagePort就是上面圖中的messagePort,指向_parent_port
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
// 開始接收消息,我們這里不深入messagePort,后續(xù)單獨分析
this[kPort].start();
// 申請一個通信管道,兩個端口
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
// 向另一端發(fā)送消息
this[kPort].postMessage({
argv,
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// 開啟線程
this[kHandle].startThread();
}
上面的代碼主要邏輯如下
1 保存messagePort,然后給messagePort的對端(看上面的圖)發(fā)送消息,但是這時候還沒有接收者,所以消息會緩存到MessagePortData,即child_port_data_ 中。
2 申請一個通信管道,用于主線程和子線程通信。_parent_port和child_port是給nodejs使用的,新申請的管道是給用戶使用的。
3 創(chuàng)建子線程。
我們看創(chuàng)建線程的時候,做了什么。
void Worker::StartThread(const FunctionCallbackInfo& args) {
Worker* w;
// 解包出對應(yīng)的Worker對象
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
// 新建一個子線程,然后執(zhí)行Run函數(shù),從此在子線程里執(zhí)行
uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
w->Run();
}, static_cast(w))
}
我們繼續(xù)看Run
void Worker::Run() {
{
// 新建一個env
env_.reset(new Environment(data.isolate_data_.get(),
context,
std::move(argv_),
std::move(exec_argv_),
Environment::kNoFlags,
thread_id_));
// 初始化libuv,往libuv注冊
env_->InitializeLibuv(start_profiler_idle_notifier_);
// 創(chuàng)建一個MessagePort
CreateEnvMessagePort(env_.get());
// 執(zhí)行internal/main/worker_thread.js
StartExecution(env_.get(), "internal/main/worker_thread");
// 開始事件循環(huán)
do {
uv_run(&data.loop_, UV_RUN_DEFAULT);
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
if (more && !is_stopped()) continue;
more = uv_loop_alive(&data.loop_);
} while (more == true && !is_stopped());
}
}
我們分步驟分析上面的代碼
1 CreateEnvMessagePort
void Worker::CreateEnvMessagePort(Environment* env) {
child_port_ = MessagePort::New(env,
env->context(),
std::move(child_port_data_));
if (child_port_ != nullptr)
env->set_message_port(child_port_->object(isolate_));
}
child_port_data_這個變量我們應(yīng)該很熟悉,在這里首先申請一個新的端口。負(fù)責(zé)端口中數(shù)據(jù)管理的對象是child_port_data_。然后在env緩存起來。一會要用。
// 設(shè)置process對象
patchProcessObject();
// 獲取剛才緩存的端口
onst port = getEnvMessagePort();
port.on('message', (message) => {
// 加載腳本
if (message.type === LOAD_SCRIPT) {
const {
argv,
cwdCounter,
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
const CJSLoader = require('internal/modules/cjs/loader');
loadPreloadModules();
/*
由主線程申請的MessageChannel管道中,某一端的端口,
設(shè)置publicWorker的parentPort字段,publicWorker就是worker_threads導(dǎo)出的對象,后面需要用
*/
publicWorker.parentPort = publicPort;
// 執(zhí)行時使用的數(shù)據(jù)
publicWorker.workerData = workerData;
// 通知主線程,正在執(zhí)行腳本
port.postMessage({ type: UP_AND_RUNNING });
// 執(zhí)行new Worker(filename)時傳入的文件
CJSLoader.Module.runMain(filename);
})
// 開始接收消息
port.start()
這時候我們再回頭看一下,我們調(diào)用new Worker(filename),然后在子線程里執(zhí)行我們的filename時的場景。我們再次回顧前面的代碼。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做點耗時的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我們知道isMainThread在子線程里是false,parentPort 就是就是messageChannel中的一端。所以parentPort.postMessage給對端發(fā)送消息,就是給主線程發(fā)送消息,我們再看看worker.postMessage('Hello, world!')。
postMessage(...args) {
this[kPublicPort].postMessage(...args);
}
kPublicPort指向的就是messageChannel的另一端。即給子線程發(fā)送消息。那么on('message')就是接收對端發(fā)過來的消息。
關(guān)于如何用nodejs源碼分析線程就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。