本篇內(nèi)容介紹了“node中的stream有哪些類型”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)公司基于成都重慶香港及美國等地區(qū)分布式IDC機房數(shù)據(jù)中心構(gòu)建的電信大帶寬,聯(lián)通大帶寬,移動大帶寬,多線BGP大帶寬租用,是為眾多客戶提供專業(yè)服務(wù)器托管報價,主機托管價格性價比高,為金融證券行業(yè)成都棕樹機房,ai人工智能服務(wù)器托管提供bgp線路100M獨享,G口帶寬及機柜租用的專業(yè)成都idc公司。
node stream有4種類型:1、Readable(可讀流)。需要實現(xiàn)“_read”方法來返回內(nèi)容;2、Writable(可寫流),需要實現(xiàn)“_write”方法來接受內(nèi)容;3、Duplex(可讀可寫流),需要實現(xiàn)“_read”和“_write”方法來接受和返回內(nèi)容;4、Transform(轉(zhuǎn)換流),需要實現(xiàn)“_transform”方法來把接受的內(nèi)容轉(zhuǎn)換之后返回內(nèi)容。
本教程操作環(huán)境:windows7系統(tǒng)、nodejs16版,DELL G3電腦。
流(Stream)在 Nodejs 中是個十分基礎(chǔ)的概念,很多基礎(chǔ)模塊都是基于流實現(xiàn)的,扮演著十分重要的角色。同時流也是是一個十分難以理解的概念,這主要是相關(guān)的文檔比較缺少,對于 NodeJs 初學(xué)者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對于大部分 NodeJs 使用者來說,僅僅是用來開發(fā) Web 應(yīng)用,對流的不充分認識并不影響使用。但是,理解流能夠?qū)?NodeJs 中的其他模塊有更好的理解,同時在某些情況下,使用流來處理數(shù)據(jù)會有更好的效果。
Stream 是在 Node.js 中處理流數(shù)據(jù)的抽象接口。Stream 并不是一個實際的接口,而是對所有流的一種統(tǒng)稱。實際的接口有 ReadableStream、 WritableStream、ReadWriteStream 這幾個。
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator ; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類的接口(ts中接口是可以繼承類的,因為他們只是在進行類型的合并)。
上面這些接口對應(yīng)的實現(xiàn)類分別是 Readable、Writable 和 Duplex
NodeJs中的流有4種:
Readable 可讀流(實現(xiàn)ReadableStream)
Writable 可寫流(實現(xiàn)WritableStream)
Duplex 可讀可寫流(繼承Readable后實現(xiàn)WritableStream)
Transform 轉(zhuǎn)換流(繼承Duplex)
它們都有要實現(xiàn)的方法:
Readable 需要實現(xiàn) _read 方法來返回內(nèi)容
Writable 需要實現(xiàn) _write 方法來接受內(nèi)容
Duplex 需要實現(xiàn) _read 和 _write 方法來接受和返回內(nèi)容
Transform 需要實現(xiàn) _transform 方法來把接受的內(nèi)容轉(zhuǎn)換之后返回
可讀流(Readable)是流的一種類型,他有兩種模式三種狀態(tài)
兩種讀取模式:
流動模式:數(shù)據(jù)會從底層系統(tǒng)讀取寫入到緩沖區(qū),當緩沖區(qū)被寫滿后自動通過 EventEmitter 盡快的將數(shù)據(jù)傳遞給所注冊的事件處理程序中
暫停模式:在這種模式下將不會主動觸發(fā) EventEmitter 傳輸數(shù)據(jù),必須顯示的調(diào)用 Readable.read()
方法來從緩沖區(qū)中讀取數(shù)據(jù),read 會觸發(fā)響應(yīng)到 EventEmitter 事件。
三種狀態(tài):
readableFlowing === null(初始狀態(tài))
readableFlowing === false(暫停模式)
readableFlowing === true(流動模式)
初始時流的 readable.readableFlowing
為 null
添加data事件后變?yōu)?true 。調(diào)用 pause()
、unpipe()
、或接收到背壓或者添加 readable
事件,則 readableFlowing
會被設(shè)為 false ,在這個狀態(tài)下,為 data 事件綁定監(jiān)聽器不會使 readableFlowing 切換到 true。
調(diào)用 resume()
可以讓可讀流的 readableFlowing
切換到 true
移除所有的 readable 事件是使 readableFlowing 變?yōu)?null 的唯一方法。
事件名 | 說明 |
---|---|
readable | 當緩沖區(qū)有新的可讀取數(shù)據(jù)時觸發(fā)(每一個想緩存池插入節(jié)點都會觸發(fā)) |
data | 每一次消費數(shù)據(jù)后都會觸發(fā),參數(shù)是本次消費的數(shù)據(jù) |
close | 流關(guān)閉時觸發(fā) |
error | 流發(fā)生錯誤時觸發(fā) |
方法名 | 說明 |
---|---|
read(size) | 消費長度為size的數(shù)據(jù),返回null表示當前數(shù)據(jù)不足size,否則返回本次消費的數(shù)據(jù)。size不傳遞時表示消費緩存池中所有數(shù)據(jù) |
const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 緩存池浮標值 }) readStreams.on('readable', () => { console.log('緩沖區(qū)滿了') readStreams.read()// 消費緩存池的所有數(shù)據(jù),返回結(jié)果并且觸發(fā)data事件 }) readStreams.on('data', (data) => { console.log('data') })
https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
當 size 為 0 會觸發(fā) readable 事件。
當緩存池中的數(shù)據(jù)長度達到浮標值 highWaterMark
后,就不會在主動請求生產(chǎn)數(shù)據(jù),而是等待數(shù)據(jù)被消費后在生產(chǎn)數(shù)據(jù)
暫停狀態(tài)的流如果不調(diào)用 read
來消費數(shù)據(jù)時,后續(xù)也不會在觸發(fā) data
和 readable
,當調(diào)用 read
消費時會先判斷本次消費后剩余的數(shù)據(jù)長度是否低于 浮標值
,如果低于 浮標值
就會在消費前請求生產(chǎn)數(shù)據(jù)。這樣在 read
后的邏輯執(zhí)行完成后新的數(shù)據(jù)大概率也已經(jīng)生產(chǎn)完成,然后再次觸發(fā) readable
,這種提前生產(chǎn)下一次消費的數(shù)據(jù)存放在緩存池的機制也是緩存流為什么快的原因
流動狀態(tài)下的流有兩種情況
生產(chǎn)速度慢于消費速度時:這種情況下每一個生產(chǎn)數(shù)據(jù)后一般緩存池中都不會有剩余數(shù)據(jù),直接將本次生產(chǎn)的數(shù)據(jù)傳遞給 data 事件即可(因為沒有進入緩存池,所以也不用調(diào)用 read 來消費),然后立即開始生產(chǎn)新數(shù)據(jù),待上一次數(shù)據(jù)消費完后新數(shù)據(jù)才生產(chǎn)好,再次觸發(fā) data ,一只到流結(jié)束。
生產(chǎn)速度快于消費速度時:此時每一次生產(chǎn)完數(shù)據(jù)后一般緩存池都還存在未消費的數(shù)據(jù),這種情況一般會在消費數(shù)據(jù)時開始生產(chǎn)下一次消費的數(shù)據(jù),待舊數(shù)據(jù)消費完后新數(shù)據(jù)已經(jīng)生產(chǎn)完并且放入緩存池
他們的區(qū)別僅僅在于數(shù)據(jù)生產(chǎn)后緩存池是否還存在數(shù)據(jù),如果存在數(shù)據(jù)則將生產(chǎn)的數(shù)據(jù) push 到緩存池等待消費,如果不存在則直接將數(shù)據(jù)交給 data 而不加入緩存池。
值得注意的是當一個緩存池中存在數(shù)據(jù)的流從暫停模式進入的流動模式時,會先循環(huán)調(diào)用 read 來消費數(shù)據(jù)只到返回 null
暫停模式下,一個可讀流讀創(chuàng)建時,模式是暫停模式,創(chuàng)建后會自動調(diào)用 _read
方法,把數(shù)據(jù)從數(shù)據(jù)源 push
到緩沖池中,直到緩沖池中的數(shù)據(jù)達到了浮標值。每當數(shù)據(jù)到達浮標值時,可讀流會觸發(fā)一個 " readable
" 事件,告訴消費者有數(shù)據(jù)已經(jīng)準備好了,可以繼續(xù)消費。
一般來說, 'readable'
事件表明流有新的動態(tài):要么有新的數(shù)據(jù),要么到達流的盡頭。所以,數(shù)據(jù)源的數(shù)據(jù)被讀完前,也會觸發(fā)一次 'readable'
事件;
消費者 " readable
" 事件的處理函數(shù)中,通過 stream.read(size)
主動消費緩沖池中的數(shù)據(jù)。
const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, // 參數(shù)的 read 方法會作為流的 _read 方法,用于獲取源數(shù)據(jù) read(size) { // 假設(shè)我們的源數(shù)據(jù)上 1000 個1 let chunk = null // 讀取數(shù)據(jù)的過程一般是異步的,例如IO操作 setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) // 每一次成功 push 數(shù)據(jù)到緩存池后都會觸發(fā) readable myReadable.on('readable', () => { const chunk = myReadable.read()//消費當前緩存池中所有數(shù)據(jù) console.log(chunk.toString()) })
值得注意的是, 如果 read(size) 的 size 大于浮標值,會重新計算新的浮標值,新浮標值是size的下一個二次冪(size <= 2^n,n取最小值)
// hwm 不會大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB限制 n = MAX_HWM; } else { //取下一個2最高冪,以防止過度增加hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; } return n; }
所有可讀流開始的時候都是暫停模式,可以通過以下方法可以切換至流動模式:
添加 " data
" 事件句柄;
調(diào)用 “ resume
”方法;
使用 " pipe
" 方法把數(shù)據(jù)發(fā)送到可寫流
流動模式下,緩沖池里面的數(shù)據(jù)會自動輸出到消費端進行消費,同時,每次輸出數(shù)據(jù)后,會自動回調(diào) _read
方法,把數(shù)據(jù)源的數(shù)據(jù)放到緩沖池中,如果此時緩存池中不存在數(shù)據(jù)則會直接吧數(shù)據(jù)傳遞給 data 事件,不會經(jīng)過緩存池;直到流動模式切換至其他暫停模式,或者數(shù)據(jù)源的數(shù)據(jù)被讀取完了( push(null)
);
可讀流可以通過以下方式切換回暫停模式:
如果沒有管道目標,則調(diào)用 stream.pause()
。
如果有管道目標,則移除所有管道目標。調(diào)用 stream.unpipe()
可以移除多個管道目標。
const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) myReadable.on('data', data => { console.log(data.toString()) })
相對可讀流來說,可寫流要簡單一些。
當生產(chǎn)者調(diào)用 write(chunk)
時,內(nèi)部會根據(jù)一些狀態(tài)(corked,writing等)選擇是否緩存到緩沖隊列中或者調(diào)用 _write
,每次寫完數(shù)據(jù)后,會嘗試清空緩存隊列中的數(shù)據(jù)。如果緩沖隊列中的數(shù)據(jù)大小超出了浮標值(highWaterMark),消費者調(diào)用 write(chunk)
后會返回 false
,這時候生產(chǎn)者應(yīng)該停止繼續(xù)寫入。
那么什么時候可以繼續(xù)寫入呢?當緩沖中的數(shù)據(jù)都被成功 _write
之后,清空了緩沖隊列后會觸發(fā) drain
事件,這時候生產(chǎn)者可以繼續(xù)寫入數(shù)據(jù)。
當生產(chǎn)者需要結(jié)束寫入數(shù)據(jù)時,需要調(diào)用 stream.end
方法通知可寫流結(jié)束。
const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// 會作為_write方法 setTimeout(()=>{ fileContent += chunk callback()// 寫入結(jié)束后調(diào)用 }, 500) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()
注意,在緩存池中數(shù)據(jù)到達浮標值后,此時緩存池中可能存在多個節(jié)點,在清空緩存池的過程中(循環(huán)調(diào)用_read),并不會向可讀流一樣盡量一次消費長度為浮標值的數(shù)據(jù),而是每次消費一個緩沖區(qū)節(jié)點,即使這個緩沖區(qū)長度于浮標值不一致也是如此
const { Writable } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log('消費', chunk.toString()) callback()// 寫入結(jié)束后調(diào)用 }, 100) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) let count = 0 function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end() } } productionData() myWritable.on('drain', productionData)
上述是一個浮標值為 10
的可寫流,現(xiàn)在數(shù)據(jù)源是一個 0——20
到連續(xù)的數(shù)字字符串,productionData
用于寫入數(shù)據(jù)。
首先第一次調(diào)用 myWritable.write("0")
時,因為緩存池不存在數(shù)據(jù),所以 "0"
不進入緩存池,而是直接交給 _wirte
,myWritable.write("0")
返回值為 true
當執(zhí)行 myWritable.write("1")
時,因為 _wirte
的 callback
還未調(diào)用,表明上一次數(shù)據(jù)還未寫入完,位置保證數(shù)據(jù)寫入的有序性,只能創(chuàng)建一個緩沖區(qū)將 "1"
加入緩存池中。后面 2-9
都是如此
當執(zhí)行 myWritable.write("10")
時,此時緩沖區(qū)長度為 9
(1-9),還未到達浮標值, "10"
繼續(xù)作為一個緩沖區(qū)加入緩存池中,此時緩存池長度變?yōu)?11
,所以 myWritable.write("1")
返回 false
,這意味著緩沖區(qū)的數(shù)據(jù)已經(jīng)足夠,我們需要等待 drain
事件通知時再生產(chǎn)數(shù)據(jù)。
100ms過后,_write("0", encoding, callback)
的 callback
被調(diào)用,表明 "0"
已經(jīng)寫入完成。然后會檢查緩存池中是否存在數(shù)據(jù),如果存在則會先調(diào)用 _read
消費緩存池的頭節(jié)點("1"
),然后繼續(xù)重復(fù)這個過程直到緩存池為空后觸發(fā) drain
事件,再次執(zhí)行 productionData
調(diào)用 myWritable.write("11")
,觸發(fā)第1步開始的過程,直到流結(jié)束。
在理解了可讀流與可寫流后,雙工流就好理解了,雙工流事實上是繼承了可讀流然后實現(xiàn)了可寫流(源碼是這么寫的,但是應(yīng)該說是同時實現(xiàn)了可讀流和可寫流更加好)。
Duplex 流需要同時實現(xiàn)下面兩個方法
實現(xiàn) _read() 方法,為可讀流生產(chǎn)數(shù)據(jù)
實現(xiàn) _write() 方法,為可寫流消費數(shù)據(jù)
上面兩個方法如何實現(xiàn)在上面可寫流可讀流的部分已經(jīng)介紹過了,這里需要注意的是,雙工流是存在兩個獨立的緩存池分別提供給兩個流,他們的數(shù)據(jù)源也不一樣
以 NodeJs 的標準輸入輸出流為例:
當我們在控制臺輸入數(shù)據(jù)時會觸發(fā)其 data 事件,這證明他有可讀流的功能,每一次用戶鍵入回車相當于調(diào)用可讀的 push 方法推送生產(chǎn)的數(shù)據(jù)。
當我們調(diào)用其 write 方法時也可以向控制臺輸出內(nèi)容,但是不會觸發(fā) data 事件,這說明他有可寫流的功能,而且有獨立的緩沖區(qū),_write 方法的實現(xiàn)內(nèi)容就是讓控制臺展示文字。
// 每當用戶在控制臺輸入數(shù)據(jù)(_read),就會觸發(fā)data事件,這是可讀流的特性 process.stdin.on('data', data=>{ process.stdin.write(data); }) // 每隔一秒向標準輸入流生產(chǎn)數(shù)據(jù)(這是可寫流的特性,會直接輸出到控制臺上),不會觸發(fā)data setInterval(()=>{ process.stdin.write('不是用戶控制臺輸入的數(shù)據(jù)') }, 1000)
可以將 Duplex 流視為具有可寫流的可讀流。兩者都是獨立的,每個都有獨立的內(nèi)部緩沖區(qū)。讀寫事件獨立發(fā)生。
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|
Transform 流是雙工的,其中讀寫以因果關(guān)系進行。雙工流的端點通過某種轉(zhuǎn)換鏈接。讀取要求發(fā)生寫入。
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------
對于創(chuàng)建 Transform 流,最重要的是要實現(xiàn) _transform
方法而不是 _write
或者 _read
。 _transform
中對可寫流寫入的數(shù)據(jù)做處理(消費)然后為可讀流生產(chǎn)數(shù)據(jù)。
轉(zhuǎn)換流還經(jīng)常會實現(xiàn)一個 `_flush` 方法,他會在流結(jié)束前被調(diào)用,一般用于對流的末尾追加一些東西,例如壓縮文件時的一些壓縮信息就是在這里加上的
const { write } = require('fs') const { Transform, PassThrough } = require('stream') const reurce = '1312123213124341234213423428354816273513461891468186499126412' const transform = new Transform({ highWaterMark: 10, transform(chunk ,encoding, callback){// 轉(zhuǎn)換數(shù)據(jù),調(diào)用push將轉(zhuǎn)換結(jié)果加入緩存池 this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){// end觸發(fā)前執(zhí)行 this.push('<<<') callback() } }) // write 不斷寫入數(shù)據(jù) let count = 0 transform.write('>>>') function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() } } productionData() transform.on('drain', productionData) let result = '' transform.on('data', data=>{ result += data.toString() }) transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })
“node中的stream有哪些類型”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!