真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Node中的可讀流是什么

這篇文章主要介紹了Node中的可讀流是什么的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Node中的可讀流是什么文章都會(huì)有所收獲,下面我們一起來看看吧。

創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),鐵嶺縣企業(yè)網(wǎng)站建設(shè),鐵嶺縣品牌網(wǎng)站建設(shè),網(wǎng)站定制,鐵嶺縣網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,鐵嶺縣網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

1. 基本概念

1.1. 流的歷史演變

流不是 Nodejs 特有的概念。 它們是幾十年前在 Unix 操作系統(tǒng)中引入的,程序可以通過管道運(yùn)算符(|)對流進(jìn)行相互交互。

在基于Unix系統(tǒng)的MacOS以及Linux中都可以使用管道運(yùn)算符(|),他可以將運(yùn)算符左側(cè)進(jìn)程的輸出轉(zhuǎn)換成右側(cè)的輸入。

在Node中,我們使用傳統(tǒng)的readFile去讀取文件的話,會(huì)將文件從頭到尾都讀到內(nèi)存中,當(dāng)所有內(nèi)容都被讀取完畢之后才會(huì)對加載到內(nèi)存中的文件內(nèi)容進(jìn)行統(tǒng)一處理。

這樣做會(huì)有兩個(gè)缺點(diǎn):

  • 內(nèi)存方面:占用大量內(nèi)存

  • 時(shí)間方面:需要等待數(shù)據(jù)的整個(gè)有效負(fù)載都加載完才會(huì)開始處理數(shù)據(jù)

為了解決上述問題,Node.js效仿并實(shí)現(xiàn)了流的概念,在Node.js流中,一共有四種類型的流,他們都是Node.js中EventEmitter的實(shí)例:

  • 可讀流(Readable Stream)

  • 可寫流(Writable Stream)

  • 可讀可寫全雙工流(Duplex Stream)

  • 轉(zhuǎn)換流(Transform Stream)

為了深入學(xué)習(xí)這部分的內(nèi)容,循序漸進(jìn)的理解Node.js中流的概念,并且由于源碼部分較為復(fù)雜,本人決定先從可讀流開始學(xué)習(xí)這部分內(nèi)容。

1.2. 什么是流(Stream)

流是一種抽象的數(shù)據(jù)結(jié)構(gòu),是數(shù)據(jù)的集合,其中存儲(chǔ)的數(shù)據(jù)類型只能為以下類型(僅針對objectMode === false的情況):

  • string

  • Buffer

我們可以把流看作這些數(shù)據(jù)的集合,就像液體一樣,我們先把這些液體保存在一個(gè)容器里(流的內(nèi)部緩沖區(qū)BufferList),等到相應(yīng)的事件觸發(fā)的時(shí)候,我們再把里面的液體倒進(jìn)管道里,并通知其他人在管道的另一側(cè)拿自己的容器來接里面的液體進(jìn)行處理。

Node中的可讀流是什么

1.3. 什么是可讀流(Readable Stream)

可讀流是流的一種類型,他有兩種模式三種狀態(tài)

兩種讀取模式:

  • 流動(dòng)模式:數(shù)據(jù)會(huì)從底層系統(tǒng)讀取,并通過EventEmitter盡快的將數(shù)據(jù)傳遞給所注冊的事件處理程序中

  • 暫停模式:在這種模式下將不會(huì)讀取數(shù)據(jù),必須顯示的調(diào)用Stream.read()方法來從流中讀取數(shù)據(jù)

三種狀態(tài):

  • readableFlowing === null:不會(huì)產(chǎn)生數(shù)據(jù),調(diào)用Stream.pipe()、Stream.resume會(huì)使其狀態(tài)變?yōu)閠rue,開始產(chǎn)生數(shù)據(jù)并主動(dòng)觸發(fā)事件

  • readableFlowing === false:此時(shí)會(huì)暫停數(shù)據(jù)的流動(dòng),但不會(huì)暫停數(shù)據(jù)的生成,因此會(huì)產(chǎn)生數(shù)據(jù)積壓

  • readableFlowing === true:正常產(chǎn)生和消耗數(shù)據(jù)

2. 基本原理

2.1. 內(nèi)部狀態(tài)定義(ReadableState)

ReadableState

_readableState: ReadableState {
  objectMode: false, // 操作除了string、Buffer、null之外的其他類型的數(shù)據(jù)需要把這個(gè)模式打開
  highWaterMark: 16384, // 水位限制,1024 \* 16,默認(rèn)16kb,超過這個(gè)限制則會(huì)停止調(diào)用\_read()讀數(shù)據(jù)到buffer中
  buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer鏈表,用于保存數(shù)據(jù)
  length: 0, // 整個(gè)可讀流數(shù)據(jù)的大小,如果是objectMode則與buffer.length相等
  pipes: [], // 保存監(jiān)聽了該可讀流的所有管道隊(duì)列
  flowing: null, // 可獨(dú)流的狀態(tài) null、false、true
  ended: false, // 所有數(shù)據(jù)消費(fèi)完畢
  endEmitted: false, // 結(jié)束事件收否已發(fā)送
  reading: false, // 是否正在讀取數(shù)據(jù)
  constructed: true, // 流在構(gòu)造好之前或者失敗之前,不能被銷毀
  sync: true, // 是否同步觸發(fā)'readable'/'data'事件,或是等到下一個(gè)tick
  needReadable: false, // 是否需要發(fā)送readable事件
  emittedReadable: false, // readable事件發(fā)送完畢
  readableListening: false, // 是否有readable監(jiān)聽事件
  resumeScheduled: false, // 是否調(diào)用過resume方法
  errorEmitted: false, // 錯(cuò)誤事件已發(fā)送
  emitClose: true, // 流銷毀時(shí),是否發(fā)送close事件
  autoDestroy: true, // 自動(dòng)銷毀,在'end'事件觸發(fā)后被調(diào)用
  destroyed: false, // 流是否已經(jīng)被銷毀
  errored: null, // 標(biāo)識流是否報(bào)錯(cuò)
  closed: false, // 流是否已經(jīng)關(guān)閉
  closeEmitted: false, // close事件是否已發(fā)送
  defaultEncoding: 'utf8', // 默認(rèn)字符編碼格式
  awaitDrainWriters: null, // 指向監(jiān)聽了'drain'事件的writer引用,類型為null、Writable、Set
  multiAwaitDrain: false, // 是否有多個(gè)writer等待drain事件 
  readingMore: false, // 是否可以讀取更多數(shù)據(jù)
  dataEmitted: false, // 數(shù)據(jù)已發(fā)送
  decoder: null, // 解碼器
  encoding: null, // 編碼器
  [Symbol(kPaused)]: null
},

2.2. 內(nèi)部數(shù)據(jù)存儲(chǔ)實(shí)現(xiàn)(BufferList)

BufferList是用于流保存內(nèi)部數(shù)據(jù)的容器,它被設(shè)計(jì)為了鏈表的形式,一共有三個(gè)屬性head、tail和length。

BufferList中的每一個(gè)節(jié)點(diǎn)我把它表示為了BufferNode,里面的Data的類型取決于objectMode。

這種數(shù)據(jù)結(jié)構(gòu)獲取頭部的數(shù)據(jù)的速度快于Array.prototype.shift()。

Node中的可讀流是什么

2.2.1. 數(shù)據(jù)存儲(chǔ)類型

如果objectMode === true:

那么data則可以為任意類型,push的是什么數(shù)據(jù)則存儲(chǔ)的就是什么數(shù)據(jù)

objectMode=true

const Stream = require('stream');
const readableStream = new Stream.Readable({
  objectMode: true,
  read() {},
});

readableStream.push({ name: 'lisa'});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push('lisa');
console.log(readableStream._readableState.buffer.tail);
readableStream.push(666);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() => {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123));
console.log(readableStream._readableState.buffer.tail);

運(yùn)行結(jié)果:

Node中的可讀流是什么

如果objectMode === false:

那么data只能為string或者Buffer或者Uint8Array

objectMode=false

const Stream = require('stream');

const readableStream = new Stream.Readable({

  objectMode: false,

  read() {},

});

readableStream.push({ name: 'lisa'});

運(yùn)行結(jié)果:

Node中的可讀流是什么

2.2.2. 數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)

我們在控制臺(tái)通過node命令行創(chuàng)建一個(gè)可讀流,來觀察buffer中數(shù)據(jù)的變化:

Node中的可讀流是什么

當(dāng)然在push數(shù)據(jù)之前我們需要實(shí)現(xiàn)他的_read方法,或者在構(gòu)造函數(shù)的參數(shù)中實(shí)現(xiàn)read方法:

const Stream = require('stream');

const readableStream = new Stream.Readable();

RS._read = function(size) {}

或者

const Stream = require('stream');

const readableStream = new Stream.Readable({
  
  read(size) {}

});

經(jīng)過readableStream.push('abc')操作之后,當(dāng)前的buffer為:

Node中的可讀流是什么

可以看到目前的數(shù)據(jù)存儲(chǔ)了,頭尾存儲(chǔ)的數(shù)據(jù)都是字符串'abc'的ascii碼,類型為Buffer類型,length表示當(dāng)前保存的數(shù)據(jù)的條數(shù)而非數(shù)據(jù)內(nèi)容的大小。

2.2.3. 相關(guān)API

打印一下BufferList的所有方法可以得到:

Node中的可讀流是什么

除了join是將BufferList序列化為字符串之外,其他都是對數(shù)據(jù)的存取操作。

這里就不一一講解所有的方法了,重點(diǎn)講一下其中的consume 、_getString和_getBuffer。

2.2.3.1. consume

源碼地址:BufferList.consume
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80

comsume

// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
  const data = this.head.data;
  if (n < data.length) {
    // `slice` is the same for buffers and strings.
    const slice = data.slice(0, n);
    this.head.data = data.slice(n);
    return slice;
  }
  if (n === data.length) {
    // First chunk is a perfect match.
    return this.shift();
  }
  // Result spans more than one buffer.
  return hasStrings ? this.\_getString(n) : this.\_getBuffer(n);
}

代碼一共有三個(gè)判斷條件:

  • 如果所消耗的數(shù)據(jù)的字節(jié)長度小于鏈表頭節(jié)點(diǎn)存儲(chǔ)數(shù)據(jù)的長度,則將頭節(jié)點(diǎn)的數(shù)據(jù)取前n字節(jié),并把當(dāng)前頭節(jié)點(diǎn)的數(shù)據(jù)設(shè)置為切片之后的數(shù)據(jù)

  • 如果所消耗的數(shù)據(jù)恰好等于鏈表頭節(jié)點(diǎn)所存儲(chǔ)的數(shù)據(jù)的長度,則直接返回當(dāng)前頭節(jié)點(diǎn)的數(shù)據(jù)

Node中的可讀流是什么

  • 如果所消耗的數(shù)據(jù)的長度大于鏈表頭節(jié)點(diǎn)的長度,那么會(huì)根據(jù)傳入的第二個(gè)參數(shù)進(jìn)行最后一次判斷,判斷當(dāng)前的BufferList底層存儲(chǔ)的是string還是Buffer

2.2.3.2. _getBuffer

源碼地址:BufferList._getBuffer
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137

comsume

// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
  const ret = Buffer.allocUnsafe(n);
  const retLen = n;
  let p = this.head;
  let c = 0;
  do {
    const buf = p.data;
    if (n > buf.length) {
      TypedArrayPrototypeSet(ret, buf, retLen - n);
      n -= buf.length;
    } else {
      if (n === buf.length) {
        TypedArrayPrototypeSet(ret, buf, retLen - n);
        ++c;
        if (p.next)
          this.head = p.next;
        else
          this.head = this.tail = null;
      } else {
       TypedArrayPrototypeSet(ret,
                              new Uint8Array(buf.buffer, buf.byteOffset, n),
                              retLen - n);
        this.head = p;
        p.data = buf.slice(n);
      }
      break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}

總的來說就是循環(huán)對鏈表中的節(jié)點(diǎn)進(jìn)行操作,新建一個(gè)Buffer數(shù)組用于存儲(chǔ)返回的數(shù)據(jù)。

首先從鏈表的頭節(jié)點(diǎn)開始取數(shù)據(jù),不斷的復(fù)制到新建的Buffer中,直到某一個(gè)節(jié)點(diǎn)的數(shù)據(jù)大于等于要取的長度減去已經(jīng)取得的長度。

或者說讀到鏈表的最后一個(gè)節(jié)點(diǎn)后,都還沒有達(dá)到要取的長度,那么就返回這個(gè)新建的Buffer。

2.2.3.3. _getString

源碼地址:BufferList._getString
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106

comsume

// Consumes a specified amount of characters from the buffered data.
_getString(n) {
  let ret = '';
  let p = this.head;
  let c = 0;
  do {
    const str = p.data;
    if (n > str.length) {
    ret += str;
    n -= str.length;
  } else {
    if (n === str.length) {
      ret += str;
      ++c;
      if (p.next)
        this.head = p.next;
      else
        this.head = this.tail = null;
    } else {
      ret += StringPrototypeSlice(str, 0, n);
      this.head = p;
      p.data = StringPrototypeSlice(str, n);
    }
    break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}

對于操作字符串來說和操作Buffer是一樣的,也是循環(huán)從鏈表的頭部開始讀數(shù)據(jù),只是進(jìn)行數(shù)據(jù)的拷貝存儲(chǔ)方面有些差異,還有就是_getString操作返回的數(shù)據(jù)類型是string類型。

2.3. 為什么可讀流是EventEmitter的實(shí)例?

對于這個(gè)問題而言,首先要了解什么是發(fā)布訂閱模式,發(fā)布訂閱模式在大多數(shù)API中都有重要的應(yīng)用,無論是Promise還是Redux,基于發(fā)布訂閱模式實(shí)現(xiàn)的高級API隨處可見。

它的優(yōu)點(diǎn)在于能將事件的相關(guān)回調(diào)函數(shù)存儲(chǔ)到隊(duì)列中,然后在將來的某個(gè)時(shí)刻通知到對方去處理數(shù)據(jù),從而做到關(guān)注點(diǎn)分離,生產(chǎn)者只管生產(chǎn)數(shù)據(jù)和通知消費(fèi)者,而消費(fèi)者則只管處理對應(yīng)的事件及其對應(yīng)的數(shù)據(jù),而Node.js流模式剛好符合這一特點(diǎn)。

那么Node.js流是怎樣實(shí)現(xiàn)基于EventEmitter創(chuàng)建實(shí)例的呢?

這部分源碼在這兒:stream/legacy
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10

legacy

function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

然后在可讀流的源碼中有這么幾行代碼:

這部分源碼在這兒:readable
https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77

legacy

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

首先將Stream的原型對象繼承自EventEmitter,這樣Stream的所有實(shí)例都可以訪問到EventEmitter上的方法。

同時(shí)通過ObjectSetPrototypeOf(Stream, EE)將EventEmitter上的靜態(tài)方法也繼承過來,并在Stream的構(gòu)造函數(shù)中,借用構(gòu)造函數(shù)EE來實(shí)現(xiàn)所有EventEmitter中的屬性的繼承,然后在可讀流里,用同樣的的方法實(shí)現(xiàn)對Stream類的原型繼承和靜態(tài)屬性繼承,從而得到:

Readable.prototype.__proto__ === Stream.prototype;

Stream.prototype.__proto__ === EE.prototype

因此:

Readable.prototype.__proto__.__proto__ === EE.prototype

所以捋著可讀流的原型鏈可以找到EventEmitter的原型,實(shí)現(xiàn)對EventEmitter的繼承

2.4. 相關(guān)API的實(shí)現(xiàn)

這里會(huì)按照源碼文檔中API的出現(xiàn)順序來展示,且僅解讀其中的核心API實(shí)現(xiàn)。

注:此處僅解讀Node.js可讀流源碼中所聲明的函數(shù),不包含外部引入的函數(shù)定義,同時(shí)為了減少篇幅,不會(huì)將所有代碼都拷貝下來。

Readable.prototype

Stream {
  destroy: [Function: destroy],
  _undestroy: [Function: undestroy],
  _destroy: [Function (anonymous)],
  push: [Function (anonymous)],
  unshift: [Function (anonymous)],
  isPaused: [Function (anonymous)],
  setEncoding: [Function (anonymous)],
  read: [Function (anonymous)],
  _read: [Function (anonymous)],
  pipe: [Function (anonymous)],
  unpipe: [Function (anonymous)],
  on: [Function (anonymous)],
  addListener: [Function (anonymous)],
  removeListener: [Function (anonymous)],
  off: [Function (anonymous)],
  removeAllListeners: [Function (anonymous)],
  resume: [Function (anonymous)],
  pause: [Function (anonymous)],
  wrap: [Function (anonymous)],
  iterator: [Function (anonymous)],
  [Symbol(nodejs.rejection)]: [Function (anonymous)],
  [Symbol(Symbol.asyncIterator)]: [Function (anonymous)]
}

2.4.1. push

readable.push

Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};

push方法的主要作用就是將數(shù)據(jù)塊通過觸發(fā)'data'事件傳遞給下游管道,或者將數(shù)據(jù)存儲(chǔ)到自身的緩沖區(qū)中。

以下代碼為相關(guān)偽代碼,僅展示主流程:

readable.push

function readableAddChunk(stream, chunk, encoding, addToFront) {
  const state = stream.\_readableState;
  if (chunk === null) { // push null 流結(jié)束信號,之后不能再寫入數(shù)據(jù)
    state.reading = false;
    onEofChunk(stream, state);
  } else if (!state.objectMode) { // 如果不是對象模式
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk);
    } else if (chunk instanceof Buffer) { //如果是Buffer
    // 處理一下編碼
    } else if (Stream.\_isUint8Array(chunk)) {
      chunk = Stream.\_uint8ArrayToBuffer(chunk);
    } else if (chunk != null) {
      err = new ERR\_INVALID\_ARG\_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
    }
  }

  if (state.objectMode || (chunk && chunk.length > 0)) { // 是對象模式或者chunk是Buffer
    // 這里省略幾種數(shù)據(jù)的插入方式的判斷
    addChunk(stream, state, chunk, true);
  }
}

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync &&
    stream.listenerCount('data') > 0) { // 如果處于流動(dòng)模式,有監(jiān)聽data的訂閱者
      stream.emit('data', chunk);
  } else { // 否則保存數(shù)據(jù)到緩沖區(qū)中
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront) {
      state.buffer.unshift(chunk);
    } else {
      state.buffer.push(chunk);
    }
  }
  maybeReadMore(stream, state); // 嘗試多讀一點(diǎn)數(shù)據(jù)
}

push操作主要分為對objectMode的判斷,不同的類型對傳入的數(shù)據(jù)會(huì)做不同的操作:

  • objectMode === false: 將數(shù)據(jù)(chunk)轉(zhuǎn)換成Buffer

  • objectMode === true: 將數(shù)據(jù)原封不動(dòng)的傳遞給下游

其中addChunk的第一個(gè)判斷主要是處理Readable處于流動(dòng)模式、有data監(jiān)聽器、并且緩沖區(qū)數(shù)據(jù)為空時(shí)的情況。

這時(shí)主要將數(shù)據(jù)passthrough透傳給訂閱了data事件的其他程序,否則就將數(shù)據(jù)保存到緩沖區(qū)里面。

2.4.2. read

除去對邊界條件的判斷、流狀態(tài)的判斷,這個(gè)方法主要有兩個(gè)操作

  • 調(diào)用用戶實(shí)現(xiàn)的_read方法,對執(zhí)行結(jié)果進(jìn)行處理

  • 從緩沖區(qū)buffer中讀取數(shù)據(jù),并觸發(fā)'data'事件

readable.read

// 如果read的長度大于hwm,則會(huì)重新計(jì)算hwm
if (n > state.highWaterMark) {
  state.highWaterMark = computeNewHighWaterMark(n);  
}
// 調(diào)用用戶實(shí)現(xiàn)的\_read方法
try {
  const result = this.\_read(state.highWaterMark);
  if (result != null) {
    const then = result.then;
    if (typeof then === 'function') {
      then.call(
        result,
        nop,
        function(err) {
          errorOrDestroy(this, err);
        });
    }
  }
} catch (err) {
  errorOrDestroy(this, err);
}

如果說用戶實(shí)現(xiàn)的_read方法返回的是一個(gè)promise,則調(diào)用這個(gè)promise的then方法,將成功和失敗的回調(diào)傳入,便于處理異常情況。

read方法從緩沖區(qū)里讀區(qū)數(shù)據(jù)的核心代碼如下:

readable.read

function fromList(n, state) {
  // nothing buffered.
  if (state.length === 0)
    return null;
  let ret;
  if (state.objectMode)
    ret = state.buffer.shift();
  else if (!n || n >= state.length) { // 處理n為空或者大于緩沖區(qū)的長度的情況
    // Read it all, truncate the list.
    if (state.decoder) // 有解碼器,則將結(jié)果序列化為字符串
      ret = state.buffer.join('');
    else if (state.buffer.length === 1) // 只有一個(gè)數(shù)據(jù),返回頭節(jié)點(diǎn)數(shù)據(jù)
      ret = state.buffer.first();
    else // 將所有數(shù)據(jù)存儲(chǔ)到一個(gè)Buffer中
      ret = state.buffer.concat(state.length);
    state.buffer.clear(); // 清空緩沖區(qū)
  } else {
    // 處理讀取長度小于緩沖區(qū)的情況
    ret = state.buffer.consume(n, state.decoder);
  }
  return ret;
}

2.4.3. _read

用戶初始化Readable stream時(shí)必須實(shí)現(xiàn)的方法,可以在這個(gè)方法里調(diào)用push方法,從而持續(xù)的觸發(fā)read方法,當(dāng)我們push null時(shí)可以停止流的寫入操作。

示例代碼:

readable._read

const Stream = require('stream');
const readableStream = new Stream.Readable({
  read(hwm) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 122) {
      this.push(null);
    }
  },
});
readableStream.currentCharCode = 97;
readableStream.pipe(process.stdout);
// abcdefghijklmnopqrstuvwxyz%

2.4.4. pipe(重要)

將一個(gè)或多個(gè)writable流綁定到當(dāng)前的Readable流上,并且將Readable流切換到流動(dòng)模式。

這個(gè)方法里面有很多的事件監(jiān)聽句柄,這里不會(huì)一一介紹:

readable.pipe

Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this.\_readableState;
  state.pipes.push(dest); // 收集Writable流

  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      pause();
    }
  }
  // Tell the dest that it's being piped to.
  dest.emit('pipe', src);
  // 啟動(dòng)流,如果流處于暫停模式
  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } else if (!state.flowing) {
    src.resume();
  }
  return dest;
}

pipe操作和Linux的管道操作符'|'非常相似,將左側(cè)輸出變?yōu)橛覀?cè)輸入,這個(gè)方法會(huì)將可寫流收集起來進(jìn)行維護(hù),并且當(dāng)可讀流觸發(fā)'data'事件。

有數(shù)據(jù)流出時(shí),就會(huì)觸發(fā)可寫流的寫入事件,從而做到數(shù)據(jù)傳遞,實(shí)現(xiàn)像管道一樣的操作。并且會(huì)自動(dòng)將處于暫停模式的可讀流變?yōu)榱鲃?dòng)模式。

2.4.5. resume

使流從'暫停'模式切換到'流動(dòng)'模式,如果設(shè)置了'readable'事件監(jiān)聽,那么這個(gè)方法其實(shí)是沒有效果的

readable.resume

Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    state.flowing = !state.readableListening; // 是否處于流動(dòng)模式取決于是否設(shè)置了'readable'監(jiān)聽句柄
    resume(this, state);
  }
};

function resume(stream, state) {
  if (!state.resumeScheduled) { // 開關(guān),使resume_方法僅在同一個(gè)Tick中調(diào)用一次
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}

function resume_(stream, state) {
  if (!state.reading) {
    stream.read(0);
  }
  state.resumeScheduled = false;
  stream.emit('resume');
  flow(stream);
}

function flow(stream) { // 當(dāng)流處于流模式該方法會(huì)不斷的從buffer中讀取數(shù)據(jù),直到緩沖區(qū)為空
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null); 
  // 因?yàn)檫@里會(huì)調(diào)用read方法,設(shè)置了'readable'事件監(jiān)聽器的stream,也有可能會(huì)調(diào)用read方法,
  //從而導(dǎo)致數(shù)據(jù)不連貫(不影響data,僅影響在'readable'事件回調(diào)中調(diào)用read方法讀取數(shù)據(jù))
}

2.4.6. pause

將流從流動(dòng)模式轉(zhuǎn)變?yōu)闀和DJ?,停止觸發(fā)'data'事件,將所有的數(shù)據(jù)保存到緩沖區(qū)

readable.pause

Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    debug('pause');
    this._readableState.flowing = false;
    this.emit('pause');
  }
  return this;
};

2.5. 使用方法與工作機(jī)制

使用方法在BufferList部分已經(jīng)講過了,創(chuàng)建一個(gè)Readable實(shí)例,并實(shí)現(xiàn)其_read()方法,或者在構(gòu)造函數(shù)的第一個(gè)對象參數(shù)中實(shí)現(xiàn)read方法。

2.5.1. 工作機(jī)制

Node中的可讀流是什么

這里只畫了大致的流程,以及Readable流的模式轉(zhuǎn)換觸發(fā)條件。

其中:

  • needReadable(true): 暫停模式并且buffer數(shù)據(jù)<=hwm、綁定了readable事件監(jiān)聽函數(shù)、read數(shù)據(jù)時(shí)緩沖區(qū)沒有數(shù)據(jù)或者返回?cái)?shù)據(jù)為空

  • push: 如果處于流動(dòng)模式,緩沖區(qū)里沒有數(shù)據(jù)會(huì)觸發(fā)'data'事件;否則將數(shù)據(jù)保存到緩沖區(qū)根據(jù)needReadable狀態(tài)觸發(fā)'readable'事件

  • read: 讀length=0長度的數(shù)據(jù)時(shí),buffer中的數(shù)據(jù)已經(jīng)到達(dá)hwm或者溢出需要觸發(fā)'readable'事件;從buffer中讀取數(shù)據(jù)并觸發(fā)'data'事件

  • resume: 有'readable'監(jiān)聽,該方法不起作用;否則將流由暫停模式轉(zhuǎn)變?yōu)榱鲃?dòng)模式,并清空緩沖區(qū)里的數(shù)據(jù)

  • readable觸發(fā)條件:綁定了'readable'事件并且緩沖區(qū)里有數(shù)據(jù)、push數(shù)據(jù)時(shí)緩沖區(qū)有數(shù)據(jù),并且needReadable === true、讀length=0長度的數(shù)據(jù)時(shí),buffer中的數(shù)據(jù)已經(jīng)到達(dá)hwm或者溢出。

關(guān)于“Node中的可讀流是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“Node中的可讀流是什么”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


文章標(biāo)題:Node中的可讀流是什么
路徑分享:http://weahome.cn/article/ihigis.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部