這篇文章將為大家詳細講解有關Node Stream中運行機制的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
創(chuàng)新互聯(lián)公司是專業(yè)的廣南網(wǎng)站建設公司,廣南接單;提供網(wǎng)站制作、成都網(wǎng)站制作,網(wǎng)頁設計,網(wǎng)站設計,建網(wǎng)站,PHP網(wǎng)站建設等專業(yè)做網(wǎng)站服務;采用PHP框架,可快速的進行廣南網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!你可以把流理解成一種傳輸?shù)哪芰ΑMㄟ^流,可以以平緩的方式,無副作用的將數(shù)據(jù)傳輸?shù)侥康牡?。在Node中,Node Stream創(chuàng)建的流都是專用于String和Buffer上的,一般情況下使用Buffer。Stream表示的是一種傳輸能力,Buffer是傳輸內(nèi)容的載體 (可以這樣理解,Stream:外賣小哥哥, Buffer:你的外賣)。創(chuàng)建流的時候?qū)bjectMode設置true ,Stream同樣可以傳輸任意類型的JS對象(除了null,null在流中有特殊用途)。
現(xiàn)在有個需求,我們要向客戶端傳輸一個大文件。如果采用下面的方式
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
每次接收一個請求,就要把這個大文件讀入內(nèi)存,然后再傳輸給客戶端。通過這種方式可能會產(chǎn)生以下三種后果:
內(nèi)存耗盡
拖慢其他進程
增加垃圾回收器的負載
所以這種方式在傳輸大文件的情況下,不是一個好的方案。并發(fā)量一大,幾百個請求過來很容易就將內(nèi)存耗盡。
如果采用流呢?
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
采用這種方式,不會占用太多內(nèi)存,讀取一點就傳輸一點,整個過程平緩進行,非常優(yōu)雅。如果想在傳輸?shù)倪^程中,想對文件進行處理,比如壓縮、加密等等,也很好擴展(后面會具體介紹)。
流在Node中無處不在。從下圖中可以看出:
Stream分為四大類:
Readable(可讀流)
Writable (可寫流)
Duplex (雙工流)
Transform (轉(zhuǎn)換流)
可讀流中的數(shù)據(jù),在以下兩種模式下都能產(chǎn)生數(shù)據(jù)。
Flowing Mode
Non-Flowing Mode
兩種模式下,觸發(fā)的方式以及消耗的方式不一樣。
Flowing Mode:數(shù)據(jù)會源源不斷地生產(chǎn)出來,形成“流動”現(xiàn)象。監(jiān)聽流的data
事件便可進入該模式。
Non-Flowing Mode下:需要顯示地調(diào)用read()
方法,才能獲取數(shù)據(jù)。
兩種模式可以互相轉(zhuǎn)換
流的初始狀態(tài)是Null,通過監(jiān)聽data
事件,或者pipe
方法,調(diào)用resume
方法,將流轉(zhuǎn)為Flowing Mode
狀態(tài)。Flowing Mode
狀態(tài)下調(diào)用pause
方法,將流置為Non-Flowing Mode
狀態(tài)。Non-Flowing Mode
狀態(tài)下調(diào)用resume
方法,同樣可以將流置為Flowing Mode
狀態(tài)。
下面詳細介紹下兩種模式下,Readable流的運行機制。
在Flowing Mode狀態(tài)下,創(chuàng)建的myReadable讀流,直接監(jiān)聽data事件,數(shù)據(jù)就源源不斷的流出來進行消費了。
myReadable.on('data',function(chunk){ consume(chunk);//消費流 })
一旦監(jiān)聽data事件之后,Readable內(nèi)部的流程如下圖所示
核心的方法是流內(nèi)部的read方法,它在參數(shù)n為不同值時,分別觸發(fā)不同的操作。下面描述中的hightwatermark表示的是流內(nèi)部的緩沖池的大小。
n=undefined(消費數(shù)據(jù),并觸發(fā)一次可讀流)
n=0(觸發(fā)一次可讀流,但是不會消費)
n>hightwatermark(修改hightwatermark的值)
n
n>buffer (可以返回null,也可以返回buffer所有的數(shù)據(jù)(當時最后一次讀?。?
圖中黃色標識的_read(),是用戶實現(xiàn)流所需要自己實現(xiàn)的方法,這個方法就是實際讀取流的方式(可以這樣理解,外賣平臺給你提供外賣的能力,那_read()方法就相當于你下單點外賣)。后面會詳細介紹如何實現(xiàn)_read方法。
以上的流程可以描述為:監(jiān)聽data方法,Readable內(nèi)部就會調(diào)用read方法,來進行觸發(fā)讀流操作,通過判斷是同步還是異步讀取,來決定讀取的數(shù)據(jù)是否放入緩沖區(qū)。如果為異步的,那么就要調(diào)用flow方法,來繼續(xù)觸發(fā)read方法,來讀取流,同時根據(jù)size參數(shù)判定是否emit('data')來消費流,循環(huán)讀取。如果是同步的,那就emit('data')來消費流,同時繼續(xù)觸發(fā)read方法,來讀取流。一旦push方法傳入的是null,整個流就結(jié)束了。
從使用者的角度來看,在這種模式下,你可以通過下面的方式來使用流
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk){ writeFile1.write(chunk); })
相對于Flowing mode,Non-Flowing Mode要相對簡單很多。
消費該模式下的流,需要使用下面的方式
myReadable.on(‘readable’,function(){ const chunk = myReadable.read() consume(chunk);//消費流 })
在Non-Flowing Mode下,Readable內(nèi)部的流程如下圖:
從這個圖上看出,你要實現(xiàn)該模式的讀流,同樣要實現(xiàn)一個_read方法。
整個流程如下:監(jiān)聽readable方法,Readable內(nèi)部就會調(diào)用read方法。調(diào)用用戶實現(xiàn)的_read方法,來push數(shù)據(jù)到緩沖池,然后發(fā)送emit readable事件,通知用戶端消費。
從使用者的角度來看,你可以通過下面的方式來使用該模式下的流
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('readable',function(chunk) { while (null !== (chunk = myReadable.read())) { writeFile.write(chunk); } });
相對于讀流,寫流的機制就更容易理解了。
寫流使用下面的方式進行數(shù)據(jù)寫入
myWrite.write(chunk);
調(diào)用write后,內(nèi)部Writable的流程如下圖所示
類似于讀流,實現(xiàn)一個寫流,同樣需要用戶實現(xiàn)一個_write方法。
整個流程是這樣的:調(diào)用write之后,會首先判定是否要寫入緩沖區(qū)。如果不需要,那就調(diào)用用戶實現(xiàn)的_write方法,將流寫入到相應的地方,_write會調(diào)用一個writeable內(nèi)部的一個回調(diào)函數(shù)。
從使用者的角度來看,使用一個寫流,采用下面的代碼所示的方式。
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk) { writeFile.write(chunk); })
可以看到,使用寫流是非常簡單的。
我們先講解一下如何實現(xiàn)一個讀流和寫流,再來看Duplex和Transform是什么,因為了解了如何實現(xiàn)一個讀流和寫流,再來理解Duplex和Transform就非常簡單了。
實現(xiàn)自定義的Readable,只需要實現(xiàn)一個_read方法即可,需要在_read方法中調(diào)用push方法來實現(xiàn)數(shù)據(jù)的生產(chǎn)。如下面的代碼所示:
const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }); } } // 模擬資源池 const dataSource = { data: new Array(10).fill('-'), makeData() { if (!dataSource.data.length) return null; return dataSource.data.pop(); } }; const myReadable = new MyReadable(dataSource,); myReadable.on('readable', () => { let chunk; while (null !== (chunk = myReadable.read())) { console.log(chunk); } });
實現(xiàn)自定義的writable,只需要實現(xiàn)一個_write方法即可。在_write中消費chunk寫入到相應地方,并且調(diào)用callback回調(diào)。如下面代碼所示:
const Writable = require('stream').Writable; class Mywritable extends Writable{ constuctor(options){ super(options); } _write(chunk,endcoding,callback){ console.log(chunk); callback && callback(); } } const myWritable = new Mywritable();
雙工流:簡單理解,就是講一個Readable流和一個Writable流綁定到一起,它既可以用來做讀流,又可以用來做寫流。
實現(xiàn)一個Duplex流,你需要同時實現(xiàn)_read和_write方法。
有一點需要注意的是:它所包含的 Readable流和Writable流是完全獨立,互不影響的兩個流,兩個流使用的不是同一個緩沖區(qū)。通過下面的代碼可以驗證
// 模擬資源池1 const dataSource1 = { data: new Array(10).fill('a'), makeData() { if (!dataSource1.data.length) return null; return dataSource1.data.pop(); } }; // 模擬資源池2 const dataSource2 = { data: new Array(10).fill('b'), makeData() { if (!dataSource2.data.length) return null; return dataSource2.data.pop(); } }; const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } } const Writable = require('stream').Writable; class MyWritable extends Writable{ constructor(options){ super(options); } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const Duplex = require('stream').Duplex; class MyDuplex extends Duplex{ constructor(dataSource,options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const myWritable = new MyWritable(); const myReadable = new MyReadable(dataSource1); const myDuplex = new MyDuplex(dataSource1); myReadable.pipe(myDuplex).pipe(myWritable);
打印的結(jié)果是
abababababababababab
從這個結(jié)果可以看出,myReadable.pipe(myDuplex)
,myDuplex充當?shù)氖菍懥?,寫入的?nèi)容是a;myDuplex.pipe(myWritable)
,myDuplex充當?shù)氖亲x流,往myWritable寫的卻是b;所以說它所包含的 Readable流和Writable流是完全獨立的。
理解了Duplex,就更好理解Transform了。Transform是一個轉(zhuǎn)換流,它既有讀的功能又有寫的功能,但是它和Duplex不同的是,它的讀流和寫流共用同一個緩沖區(qū);也就是說,通過它讀入什么,那它就能寫入什么。
實現(xiàn)一個Transform,你只需要實現(xiàn)一個_transform方法。比如最簡單的Transform:PassThrough,其源代碼如下所示
PassThrough就是一個Transform,但是這個轉(zhuǎn)換流,什么也沒做,相當于一個透明的轉(zhuǎn)換流??梢钥吹?strong>_transform中什么都沒有,只是簡單的將數(shù)據(jù)進行回調(diào)。
如果我們在這個環(huán)節(jié)做些擴展,只需要在_transform中直接擴展就行了。比如我們可以對流進行壓縮,加密,混淆等等操作。
最后介紹一個流中非常重要的一個概念:背壓。要了解這個,我們首先來看下pipe和highWaterMaker是什么。
首先看下下面的代碼
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.pipe(writeFile);
上面的代碼和下面是等價的
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(data){ var flag = ws.write(data); if(!flag){ // 當前寫流緩沖區(qū)已滿,暫停讀數(shù)據(jù) readFile.pause(); } }) writeFile.on('drain',function()){ readFile.resume();// 當前寫流緩沖區(qū)已清空,重新開始讀流 } readFile.on('end',function(data){ writeFile.end();//將寫流緩沖區(qū)的數(shù)據(jù)全部寫入,并且關閉寫入的文件 })
pipe所做的操作就是相當于為寫流和讀流自動做了速度的匹配。
讀寫流速度不匹配的情況下,一般情況下不會造成什么問題,但是會造成內(nèi)存增加。內(nèi)存消耗增加,就有可能會帶來一系列的問題。所以在使用的流的時候,強烈推薦使用pipe。
highWaterMaker說白了,就是定義緩沖區(qū)的大小。
默認16Kb(Readable大8M)
可以自定義
背壓的概念可以理解為:為了防止讀寫流速度不匹配而產(chǎn)生的一種調(diào)整機制;背壓該調(diào)整機制的觸發(fā)時機,受限于highWaterMaker設置的大小。
如上面的代碼 var flag = ws.write(data);
,一旦寫流的緩沖區(qū)滿了,那flag
就會置為false,反向促進讀流的速度調(diào)整。
主要有以下場景
文件操作(復制,壓縮,解壓,加密等)
下面的就很容易就實現(xiàn)了文件復制的功能。
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big_copy.file'); readFile.pipe(writeFile);
那我們想在復制的過程中對文件進行壓縮呢?
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big.gz'); const zlib = require('zlib'); readFile.pipe(zlib.createGzip()).pipe(writeFile);
實現(xiàn)解壓、加密也是類似的。
靜態(tài)文件服務器
比如需要返回一個html,可以使用如下代碼。
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.createReadStream('./a.html').pipe(res); }).listen(8000);
關于“Node Stream中運行機制的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
另外有需要云服務器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。