這篇文章主要講解了“nodejs流基類怎么實現(xiàn)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“nodejs流基類怎么實現(xiàn)”吧!
創(chuàng)新新互聯(lián),憑借10年的成都網(wǎng)站設(shè)計、成都做網(wǎng)站經(jīng)驗,本著真心·誠心服務(wù)的企業(yè)理念服務(wù)于成都中小企業(yè)設(shè)計網(wǎng)站有上千余家案例。做網(wǎng)站建設(shè),選創(chuàng)新互聯(lián)公司。
流是對數(shù)據(jù)生產(chǎn),消費(fèi)的一種抽象,今天先分析一下流基類的實現(xiàn)
const EE = require('events');
const util = require('util');
// 流的基類
function Stream() {
EE.call(this);
}
// 繼承事件訂閱分發(fā)的能力
util.inherits(Stream, EE);
流的基類只提供了一個函數(shù)就是pipe。用于實現(xiàn)管道化。這個方法代碼比較多,分開說。
function ondata(chunk) {
// 源流有數(shù)據(jù)到達(dá),并且目的流可寫
if (dest.writable) {
// 目的流過載并且源流實現(xiàn)了pause方法,那就暫停可讀流的讀取操作,等待目的流觸發(fā)drain事件
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}
// 監(jiān)聽data事件,可讀流有數(shù)據(jù)的時候,會觸發(fā)data事件
source.on('data', ondata);
function ondrain() {
// 目的流可寫了,并且可讀流可讀,切換成自動讀取模式
if (source.readable && source.resume) {
source.resume();
}
}
// 監(jiān)聽drain事件,目的流可以消費(fèi)數(shù)據(jù)了就會觸發(fā)該事件
dest.on('drain', ondrain);
這是管道化時流控實現(xiàn)的地方,主要是利用了write返回值和drain事件。
// 目的流不是標(biāo)準(zhǔn)輸出或標(biāo)準(zhǔn)錯誤,并且end不等于false
if (!dest._isStdio && (!options || options.end !== false)) {
// 源流沒有數(shù)據(jù)可讀了,執(zhí)行end回調(diào),告訴目的流,沒有數(shù)據(jù)可讀了
source.on('end', onend);
// 源流關(guān)閉了,執(zhí)行close回調(diào)
source.on('close', onclose);
}
// 兩個函數(shù)只會執(zhí)行一次,也只會執(zhí)行一個
var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
// 執(zhí)行目的流的end函數(shù),說明寫數(shù)據(jù)完畢
dest.end();
}
function onclose() {
if (didOnEnd) return;
didOnEnd = true;
// 銷毀目的流
if (typeof dest.destroy === 'function') dest.destroy();
}
這里是處理源流結(jié)束和關(guān)閉后,通知目的流的邏輯。
// remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
source.removeListener('close', onclose);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
dest.removeListener('close', cleanup);
}
function onerror(er) {
// 出錯了,清除注冊的事件,包括正在執(zhí)行的onerror函數(shù)
cleanup();
// 如果用戶沒有監(jiān)聽流的error事件,則拋出錯誤,所以我們業(yè)務(wù)代碼需要監(jiān)聽error事件
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
}
// 監(jiān)聽流的error事件
source.on('error', onerror);
dest.on('error', onerror);
// 源流關(guān)閉或者沒有數(shù)據(jù)可讀時,清除注冊的事件
source.on('end', cleanup);
source.on('close', cleanup);
// 目的流關(guān)閉了也清除他注冊的事件
dest.on('close', cleanup);
這里主要是處理了error事件和流關(guān)閉/結(jié)束/出錯時清除訂閱的事件。這就是流基類的所有邏輯。
感謝各位的閱讀,以上就是“nodejs流基類怎么實現(xiàn)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對nodejs流基類怎么實現(xiàn)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!