nodejs-stream部分
參考:
https://blog.csdn.net/eeewwwddd/article/details/81042225
http://nodejs.cn/api/stream.html#stream_writable_write_chunk_encoding_callback
?
流(stream)是 Node.js 中處理流式數(shù)據(jù)的抽象接口。?stream?模塊提供了一些 API,用于構(gòu)建實(shí)現(xiàn)了流接口的對(duì)象。
Node.js 提供了多種流對(duì)象。 例如,HTTP 服務(wù)器的請(qǐng)求和?process.stdout?都是流的實(shí)例。
流可以是可讀的、可寫(xiě)的、或者可讀可寫(xiě)的。 所有的流都是?EventEmitter?的實(shí)例,即可以通過(guò)事件的監(jiān)聽(tīng)得以觸發(fā)事件并執(zhí)行一定的操作,如:
stream?模塊可以通過(guò)以下方式使用:
const stream = require('stream');盡管理解流的工作方式很重要,但是?stream?模塊本身主要用于開(kāi)發(fā)者創(chuàng)建新類(lèi)型的流實(shí)例。
對(duì)于以消費(fèi)流對(duì)象為主的開(kāi)發(fā)者,極少需要直接使用?stream?模塊。
?
Node.js 中有四種基本的流類(lèi)型:
Writable - 可寫(xiě)入數(shù)據(jù)的流(例如 fs.createWriteStream())。Readable - 可讀取數(shù)據(jù)的流(例如 fs.createReadStream())。Duplex - 可讀又可寫(xiě)的流(例如 net.Socket)。Transform - 在讀寫(xiě)過(guò)程中可以修改或轉(zhuǎn)換數(shù)據(jù)的 Duplex 流(例如 zlib.createDeflate())?
兩種模式
?二進(jìn)制模式
每個(gè)分塊都是buffer、string對(duì)象
對(duì)象模式
Node.js 創(chuàng)建的流都是運(yùn)作在字符串和?Buffer(或?Uint8Array)上。 當(dāng)然,流的實(shí)現(xiàn)也可以使用其它類(lèi)型的 JavaScript 值(除了?null)。 這些流會(huì)以“對(duì)象模式”進(jìn)行操作。
當(dāng)創(chuàng)建流時(shí),可以使用?objectMode?選項(xiàng)把流實(shí)例切換到對(duì)象模式。 將已存在的流切換到對(duì)象模式是不安全的。
?
?比如如果想創(chuàng)建一個(gè)的可以壓入任意形式數(shù)據(jù)的可讀流,只要在創(chuàng)建流的時(shí)候設(shè)置參數(shù)objectMode為true即可,例如:Readable({ objectMode: true })
如果readable stream寫(xiě)入的是字符串,那么字符串會(huì)默認(rèn)轉(zhuǎn)換為Buffer,如果在創(chuàng)建流的時(shí)候設(shè)置Writable({ decodeStrings: false })參數(shù),那么不會(huì)做轉(zhuǎn)換。
如果readable stream寫(xiě)入的數(shù)據(jù)是對(duì)象,那么需要這樣創(chuàng)建writable stream,Writable({ objectMode: true })
??就是如果輸入的數(shù)據(jù)并不是Buffer(或?Uint8Array)格式的時(shí)候,那么在創(chuàng)建這個(gè)流的時(shí)候就要將其設(shè)置為對(duì)象模式,即設(shè)置其的objectMode: true,舉例:
const DuplexStream = require('readable-stream').Duplex const inherits = require('util').inheritsmodule.exports = PostMessageStreaminherits(PostMessageStream, DuplexStream)function PostMessageStream (opts) {DuplexStream.call(this, { objectMode: true, }) ... }?
緩沖
可寫(xiě)流和可讀流都會(huì)在內(nèi)部的緩沖器中存儲(chǔ)數(shù)據(jù),可以分別使用的?writable.writableBuffer?或?readable.readableBuffer?來(lái)獲取。
可緩沖的數(shù)據(jù)大小取決于傳入流構(gòu)造函數(shù)的?highWaterMark?選項(xiàng)。 對(duì)于普通的流,highWaterMark?指定了字節(jié)的總數(shù)。 對(duì)于對(duì)象模式的流,highWaterMark?指定了對(duì)象的總數(shù)。
當(dāng)調(diào)用?stream.push(chunk)?時(shí),數(shù)據(jù)會(huì)被緩沖在可讀流中。 如果流的消費(fèi)者沒(méi)有調(diào)用?stream.read(),則數(shù)據(jù)會(huì)保留在內(nèi)部隊(duì)列中直到被消費(fèi)。
一旦內(nèi)部的可讀緩沖的總大小達(dá)到?highWaterMark?指定的閾值時(shí),流會(huì)暫時(shí)停止從底層資源讀取數(shù)據(jù),直到當(dāng)前緩沖的數(shù)據(jù)被消費(fèi) (也就是說(shuō),流會(huì)停止調(diào)用內(nèi)部的用于填充可讀緩沖的?readable._read())。
當(dāng)調(diào)用?writable.write(chunk)?時(shí),數(shù)據(jù)會(huì)被緩沖在可寫(xiě)流中。 當(dāng)內(nèi)部的可寫(xiě)緩沖的總大小小于?highWaterMark?設(shè)置的閾值時(shí),調(diào)用?writable.write()?會(huì)返回?true。 一旦內(nèi)部緩沖的大小達(dá)到或超過(guò)?highWaterMark?時(shí),則會(huì)返回?false。
stream?API 的主要目標(biāo),特別是?stream.pipe(),是為了限制數(shù)據(jù)的緩沖到可接受的程度,也就是讀寫(xiě)速度不一致的源頭與目的地不會(huì)壓垮內(nèi)存。
因?yàn)?span id="ze8trgl8bvbq" class="Apple-converted-space">?Duplex?和?Transform?都是可讀又可寫(xiě)的,所以它們各自維護(hù)著兩個(gè)相互獨(dú)立的內(nèi)部緩沖器用于讀取和寫(xiě)入, 這使得它們?cè)诰S護(hù)數(shù)據(jù)流時(shí),讀取和寫(xiě)入兩邊可以各自獨(dú)立地運(yùn)作。 例如,net.Socket?實(shí)例是?Duplex?流,它的可讀端可以消費(fèi)從 socket 接收的數(shù)據(jù),而可寫(xiě)端則可以將數(shù)據(jù)寫(xiě)入到 socket。 因?yàn)閿?shù)據(jù)寫(xiě)入到 socket 的速度可能比接收數(shù)據(jù)的速度快或者慢,所以在讀寫(xiě)兩端獨(dú)立地進(jìn)行操作(或緩沖)就顯得很重要了。
?
【1】用于消費(fèi)流的 API(即讀取流中數(shù)據(jù))
test.js
const http = require('http');const server = http.createServer((req, res) => {// req 是一個(gè) http.IncomingMessage 實(shí)例,它是可讀流。// res 是一個(gè) http.ServerResponse 實(shí)例,它是可寫(xiě)流。 let body = '';// 接收數(shù)據(jù)為 utf8 字符串,// 如果沒(méi)有設(shè)置字符編碼,則會(huì)接收到 Buffer 對(duì)象。req.setEncoding('utf8');// 如果添加了監(jiān)聽(tīng)器,則可讀流會(huì)觸發(fā) 'data' 事件。req.on('data', (chunk) => {body += chunk;});// 'end' 事件表明整個(gè)請(qǐng)求體已被接收。 req.on('end', () => {try {const data = JSON.parse(body);// 響應(yīng)信息給用戶(hù)。res.write(typeof data);res.end();//end()表示寫(xiě)結(jié)束} catch (er) {// json 解析失敗。res.statusCode = 400;return res.end(`錯(cuò)誤: ${er.message}`);}}); });server.listen(1337);然后在終端使用node test.js運(yùn)行該服務(wù)器
然后在另一個(gè)終端使用curl localhost:1337 -d "{}" 連接服務(wù)器localhost:1337 ,-d即post數(shù)據(jù)data為{} ,返回object
curl localhost:1337 -d "{}" 返回object curl localhost:1337 -d "\"foo\"" 返回string curl localhost:1337 -d "not json" 返回 錯(cuò)誤: Unexpected token o in JSON at position 1?
可寫(xiě)流(比如例子中的?res)會(huì)暴露了一些方法,比如?write()?和?end()?用于寫(xiě)入數(shù)據(jù)到流。
當(dāng)數(shù)據(jù)可以從流讀取時(shí),可讀流會(huì)使用?EventEmitter?API 來(lái)通知應(yīng)用程序。 從流讀取數(shù)據(jù)的方式有很多種。
可寫(xiě)流和可讀流都通過(guò)多種方式使用?EventEmitter?API 來(lái)通訊流的當(dāng)前狀態(tài)。
Duplex?流和?Transform?流都是可寫(xiě)又可讀的。
對(duì)于只需寫(xiě)入數(shù)據(jù)到流或從流消費(fèi)數(shù)據(jù)的應(yīng)用程序,并不需要直接實(shí)現(xiàn)流的接口,通常也不需要調(diào)用?require('stream')
?
《1》可寫(xiě)流
可寫(xiě)流是對(duì)數(shù)據(jù)要被寫(xiě)入的目的地的一種抽象。
可寫(xiě)流的例子包括:
- 客戶(hù)端的 HTTP 請(qǐng)求
- 服務(wù)器的 HTTP 響應(yīng)
- fs 的寫(xiě)入流
- zlib 流
- crypto 流
- TCP socket
- 子進(jìn)程 stdin
- process.stdout、process.stderr
上面的一些例子事實(shí)上是實(shí)現(xiàn)了可寫(xiě)流接口的?Duplex?流。
所有可寫(xiě)流都實(shí)現(xiàn)了?stream.Writable?類(lèi)定義的接口。
盡管可寫(xiě)流的具體實(shí)例可能略有差別,但所有的可寫(xiě)流都遵循同一基本的使用模式,如以下例子所示:
const myStream = getWritableStreamSomehow(); myStream.write('一些數(shù)據(jù)'); myStream.write('更多數(shù)據(jù)'); myStream.end('完成寫(xiě)入數(shù)據(jù)');//說(shuō)明完成寫(xiě)入?
stream.Writable 類(lèi)
下面介紹幾類(lèi)事件:
'close' 事件
當(dāng)流或其底層資源(比如文件描述符)被關(guān)閉時(shí)觸發(fā)。 表明不會(huì)再觸發(fā)其他事件,也不會(huì)再發(fā)生操作。
不是所有可寫(xiě)流都會(huì)觸發(fā)?'close'?事件。
'drain' 事件
如果調(diào)用?stream.write(chunk)?返回?false,可能緩沖區(qū)已滿,需要等待,則當(dāng)有空間可以繼續(xù)寫(xiě)入數(shù)據(jù)到流時(shí)會(huì)觸發(fā)?'drain'?事件。
// 向可寫(xiě)流中寫(xiě)入數(shù)據(jù)一百萬(wàn)次。 // 留意背壓(back-pressure)。 function writeOneMillionTimes(writer, data, encoding, callback) {let i = 1000000;write();function write() {let ok = true;do {i--;if (i === 0) {// 最后一次寫(xiě)入。 writer.write(data, encoding, callback);} else {// 檢查是否可以繼續(xù)寫(xiě)入。 // 不要傳入回調(diào),因?yàn)閷?xiě)入還沒(méi)有結(jié)束。ok = writer.write(data, encoding);}} while (i > 0 && ok);if (i > 0) {// 被提前中止。// 當(dāng)觸發(fā) 'drain' 事件時(shí)繼續(xù)寫(xiě)入,繼續(xù)運(yùn)行write()函數(shù)。writer.once('drain', write);}} }'error' 事件
當(dāng)寫(xiě)入數(shù)據(jù)發(fā)生錯(cuò)誤時(shí)觸發(fā)。
當(dāng)觸發(fā)?'error'?事件時(shí),流還未被關(guān)閉
'finish' 事件
調(diào)用?stream.end()?且緩沖數(shù)據(jù)都已傳給底層系統(tǒng)之后觸發(fā)。
const http = require('http');const server = http.createServer((req, res) => {// req 是一個(gè) http.IncomingMessage 實(shí)例,它是可讀流。// res 是一個(gè) http.ServerResponse 實(shí)例,它是可寫(xiě)流。 let body = '';// 接收數(shù)據(jù)為 utf8 字符串,// 如果沒(méi)有設(shè)置字符編碼,則會(huì)接收到 Buffer 對(duì)象。req.setEncoding('utf8');// 如果添加了監(jiān)聽(tīng)器,則可讀流會(huì)觸發(fā) 'data' 事件。req.on('data', (chunk) => {body += chunk;});// 'end' 事件表明整個(gè)請(qǐng)求體已被接收。 req.on('end', () => {try {const data = JSON.parse(body);// 響應(yīng)信息給用戶(hù)。res.write(typeof data);res.end();//會(huì)觸發(fā)finish事件 res.on('finish', () => {console.error('寫(xiě)入已完成');});} catch (er) {// json 解析失敗。res.statusCode = 400;return res.end(`錯(cuò)誤: ${er.message}`);}}); });server.listen(1337);運(yùn)行結(jié)果:
'pipe' 事件
- src?<stream.Readable>?通過(guò)管道流入到可寫(xiě)流的來(lái)源流。
當(dāng)在可讀流上調(diào)用?stream.pipe()?時(shí)觸發(fā)。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => {console.error('有數(shù)據(jù)正通過(guò)管道流入寫(xiě)入器');assert.equal(src,reader);//兩者相等console.log(src); }); reader.pipe(writer);返回:
有數(shù)據(jù)正通過(guò)管道流入寫(xiě)入器 ReadStream {connecting: false,_hadError: false,_handle:TTY { owner: [Circular], onread: [Function: onread], reading: false },_parent: null,_host: null,_readableState:ReadableState {objectMode: false,//非對(duì)象模式highWaterMark: 0,buffer: BufferList { length: 0 },length: 0,pipes:WriteStream {connecting: false,_hadError: false,_handle: [TTY],_parent: null,_host: null,_readableState: [ReadableState],readable: false,_events: [Object],_eventsCount: 7,_maxListeners: undefined,_writableState: [WritableState],writable: true,allowHalfOpen: false,_sockname: null,_writev: null,_pendingData: null,_pendingEncoding: '',server: null,_server: null,columns: 80,rows: 24,_type: 'tty',fd: 1,_isStdio: true,destroySoon: [Function: destroy],_destroy: [Function],[Symbol(asyncId)]: 2,[Symbol(lastWriteQueueSize)]: 0,[Symbol(timeout)]: null,[Symbol(kBytesRead)]: 0,[Symbol(kBytesWritten)]: 0 },pipesCount: 1,flowing: true,ended: false,endEmitted: false,reading: false,sync: false,needReadable: true,emittedReadable: false,readableListening: false,resumeScheduled: true,emitClose: false,destroyed: false,defaultEncoding: 'utf8',awaitDrain: 0,readingMore: false,decoder: null,encoding: null },readable: true,_events:{ end: [ [Function: onReadableStreamEnd], [Function] ],pause: [Function],data: [Function: ondata] },_eventsCount: 3,_maxListeners: undefined,_writableState:WritableState {objectMode: false,highWaterMark: 0,finalCalled: false,needDrain: false,ending: false,ended: false,finished: false,destroyed: false,decodeStrings: false,defaultEncoding: 'utf8',length: 0,writing: false,corked: 0,sync: true,bufferProcessing: false,onwrite: [Function: bound onwrite],writecb: null,writelen: 0,bufferedRequest: null,lastBufferedRequest: null,pendingcb: 0,prefinished: false,errorEmitted: false,emitClose: false,bufferedRequestCount: 0,corkedRequestsFree:{ next: null,entry: null,finish: [Function: bound onCorkedFinish] } },writable: false,allowHalfOpen: false,_sockname: null,_writev: null,_pendingData: null,_pendingEncoding: '',server: null,_server: null,isRaw: false,isTTY: true,fd: 0,[Symbol(asyncId)]: 5,[Symbol(lastWriteQueueSize)]: 0,[Symbol(timeout)]: null,[Symbol(kBytesRead)]: 0,[Symbol(kBytesWritten)]: 0 } View Code?
'unpipe' 事件
- src?<stream.Readable>?被移除可寫(xiě)流管道的來(lái)源流。
當(dāng)在可讀流上調(diào)用?stream.unpipe()?時(shí)觸發(fā)。
當(dāng)可讀流通過(guò)管道流向可寫(xiě)流發(fā)生錯(cuò)誤時(shí),也會(huì)觸發(fā)?'unpipe'?事件。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => {console.error('有數(shù)據(jù)正通過(guò)管道流入寫(xiě)入器');assert.equal(src,reader);// console.log(src); }); writer.on('unpipe', (src) => {console.error('已移除可寫(xiě)流管道');assert.equal(src, reader); }); reader.pipe(writer);//觸發(fā)'pipe'事件 reader.unpipe(writer);//觸發(fā)'unpipe'事件返回:
userdeMacBook-Pro:stream-learning user$ node test.js 有數(shù)據(jù)正通過(guò)管道流入寫(xiě)入器 已移除可寫(xiě)流管道?
?
下面是可使用的方法:
writable.write(chunk[, encoding][, callback])
- chunk?<string>?|?<Buffer>?|?<Uint8Array>?|?<any>?要寫(xiě)入的數(shù)據(jù)。 ?對(duì)于非對(duì)象模式的流chunk?必須是字符串、Buffer?或?Uint8Array。 對(duì)于對(duì)象模式的流,chunk?可以是任何 JavaScript 值,除了?null。
- encoding?<string>?如果?chunk?是字符串,則指定字符編碼。
- callback?<Function>?當(dāng)數(shù)據(jù)塊被輸出到目標(biāo)后的回調(diào)函數(shù)。
- 返回:?<boolean>?如果流需要等待?'drain'?事件觸發(fā)才能繼續(xù)寫(xiě)入更多數(shù)據(jù),則返回?false,否則返回?true。
writable.write()?寫(xiě)入數(shù)據(jù)到流,并在數(shù)據(jù)被完全處理之后調(diào)用?callback。 如果發(fā)生錯(cuò)誤,則?callback?可能被調(diào)用也可能不被調(diào)用。 為了可靠地檢測(cè)錯(cuò)誤,可以為?'error'?事件添加監(jiān)聽(tīng)器。
在接收了?chunk?后,如果內(nèi)部的緩沖小于創(chuàng)建流時(shí)配置的?highWaterMark,則返回?true?。 如果返回?false?,則應(yīng)該停止向流寫(xiě)入數(shù)據(jù),直到?'drain'?事件被觸發(fā)。
當(dāng)流還未被排空時(shí),調(diào)用?write()?會(huì)緩沖?chunk,并返回?false。 一旦所有當(dāng)前緩沖的數(shù)據(jù)塊都被排空了(被操作系統(tǒng)接收并傳輸),則觸發(fā)?'drain'?事件。 建議一旦?write()?返回 false,則不再寫(xiě)入任何數(shù)據(jù)塊,直到?'drain'?事件被觸發(fā)。 當(dāng)流還未被排空時(shí),也是可以調(diào)用?write(),Node.js 會(huì)緩沖所有被寫(xiě)入的數(shù)據(jù)塊,直到達(dá)到最大內(nèi)存占用,這時(shí)它會(huì)無(wú)條件中止。 甚至在它中止之前, 高內(nèi)存占用將會(huì)導(dǎo)致垃圾回收器的性能變差和 RSS 變高(即使內(nèi)存不再需要,通常也不會(huì)被釋放回系統(tǒng))。 如果遠(yuǎn)程的另一端沒(méi)有讀取數(shù)據(jù),TCP 的 socket 可能永遠(yuǎn)也不會(huì)排空,所以寫(xiě)入到一個(gè)不會(huì)排空的 socket 可能會(huì)導(dǎo)致遠(yuǎn)程可利用的漏洞。?
對(duì)于?Transform, 寫(xiě)入數(shù)據(jù)到一個(gè)不會(huì)排空的流尤其成問(wèn)題,因?yàn)?span id="ze8trgl8bvbq" class="Apple-converted-space">?Transform?流默認(rèn)會(huì)被暫停,直到它們被 pipe 或者添加了?'data'?或?'readable'?事件句柄。?
如果要被寫(xiě)入的數(shù)據(jù)可以根據(jù)需要生成或者取得,建議將邏輯封裝為一個(gè)可讀流并且使用?stream.pipe()。 如果要優(yōu)先調(diào)用?write(),則可以使用?'drain'?事件來(lái)防止背壓與避免內(nèi)存問(wèn)題:
var assert = require('assert'); const writer = process.stdout; // const reader = process.stdin; function write(data, cb) { if (!writer.write(data)) { writer.once('drain', cb); } else { process.nextTick(cb); } } // 在回調(diào)函數(shù)被執(zhí)行后再進(jìn)行其他的寫(xiě)入。 write('hello', () => { console.log('完成寫(xiě)入,可以進(jìn)行更多的寫(xiě)入'); });返回:
node test.js hello完成寫(xiě)入,可以進(jìn)行更多的寫(xiě)入?
舉一個(gè)例子說(shuō)明write和drain:
參考https://blog.csdn.net/eeewwwddd/article/details/81042225
- 如果文件不存在會(huì)創(chuàng)建,如果有內(nèi)容會(huì)被清空
- 讀取到highWaterMark的時(shí)候就會(huì)輸出
- 第一次是真的寫(xiě)到文件 后面就是寫(xiě)入緩存區(qū) 再?gòu)木彺鎱^(qū)里面去取
?
let fs = require('fs') let ws = fs.createWriteStream('./foo1.txt',{flags: 'w',encoding: 'utf8',start: 0,//write的highWaterMark只是用來(lái)觸發(fā)是不是干了highWaterMark: 19 //寫(xiě)是默認(rèn)16k,當(dāng)這里設(shè)置的長(zhǎng)度小于或者等于我一下子要寫(xiě)入的字符串長(zhǎng)度時(shí),會(huì)觸發(fā)一次drain,也僅觸發(fā)一次,然后將剩余部分的所有內(nèi)容放入緩存,后面將不會(huì)再觸發(fā)drain了 }) //返回boolean 每當(dāng)write一次都會(huì)在ws中吃下一個(gè)饅頭 當(dāng)吃下的饅頭數(shù)量達(dá)到highWaterMark時(shí) 就會(huì)返回false 吃不下了會(huì)把其余放入緩存 其余狀態(tài)返回true //write只能放string或者buffer var flag = ws.write('today is a good day','utf8',()=>{console.log('write'); }); ws.on('drain', ()=>{console.log('drain'); });返回:
node test.js drain write如果改為highWaterMark: 20,大于輸入內(nèi)容,則不會(huì)觸發(fā)drain
則返回:
?
node test.js write?
?
?
writable.end([chunk][, encoding][, callback])
- chunk?<string>?|?<Buffer>?|?<Uint8Array>?|?<any>?要寫(xiě)入的數(shù)據(jù)。 對(duì)于非對(duì)象模式的流chunk?必須是字符串、Buffer、或?Uint8Array。 對(duì)于對(duì)象模式的流,?chunk?可以是任何 JavaScript 值,除了?null。
- encoding?<string>?如果?chunk?是字符串,則指定字符編碼。
- callback?<Function>?當(dāng)流結(jié)束時(shí)的回調(diào)函數(shù)。
- 返回:?<this>
調(diào)用?writable.end()?表明已沒(méi)有數(shù)據(jù)要被寫(xiě)入可寫(xiě)流。 可選的?chunk?和?encoding?參數(shù)可以在關(guān)閉流之前再寫(xiě)入一塊數(shù)據(jù)。 如果傳入了?callback?函數(shù),則會(huì)做為監(jiān)聽(tīng)器添加到?'finish'?事件。
調(diào)用?stream.end()?之后再調(diào)用?stream.write()?會(huì)導(dǎo)致錯(cuò)誤
writable.cork()
強(qiáng)制把所有寫(xiě)入的數(shù)據(jù)都緩沖到內(nèi)存中。 當(dāng)調(diào)用?stream.uncork()?或?stream.end()?時(shí),緩沖的數(shù)據(jù)才會(huì)被輸出。
當(dāng)寫(xiě)入大量小塊數(shù)據(jù)到流時(shí),內(nèi)部緩沖可能失效,從而導(dǎo)致性能下降,writable.cork()?主要用于避免這種情況。 對(duì)于這種情況,實(shí)現(xiàn)了?writable._writev()?的流可以用更優(yōu)的方式對(duì)寫(xiě)入的數(shù)據(jù)進(jìn)行緩沖。
writable.uncork()
將調(diào)用?stream.cork()?后緩沖的所有數(shù)據(jù)輸出到目標(biāo)。
當(dāng)使用?writable.cork()?和?writable.uncork()?來(lái)管理流的寫(xiě)入緩沖時(shí),建議使用?process.nextTick()?來(lái)延遲調(diào)用?writable.uncork()。 通過(guò)這種方式,可以對(duì)單個(gè) Node.js 事件循環(huán)中調(diào)用的所有?writable.write()?進(jìn)行批處理。
?
擴(kuò)展:?process.nextTick()
process.nextTick(callback[, ...args])
- callback?<Function>
- ...args?<any>?調(diào)用?callback時(shí)傳遞給它的額外參數(shù)
process.nextTick()方法將?callback?添加到"next tick 隊(duì)列"。 一旦當(dāng)前事件輪詢(xún)隊(duì)列的任務(wù)全部完成,在next tick隊(duì)列中的所有callbacks會(huì)被依次調(diào)用。
這種方式不是setTimeout(fn, 0)的別名。它更加有效率。事件輪詢(xún)隨后的ticks 調(diào)用,會(huì)在任何I/O事件(包括定時(shí)器)之前運(yùn)行。
舉例:
console.log('start'); process.nextTick(() => {console.log('nextTick callback'); }); console.log('scheduled'); // Output: // start // scheduled // nextTick callback?
回到writable.uncork(),舉例:
var assert = require('assert'); const writer = process.stdout;writer.cork(); writer.write('一些 '); writer.write('數(shù)據(jù) '); process.nextTick(() => writer.uncork());如果沒(méi)有這一句,運(yùn)行時(shí)沒(méi)有輸出結(jié)果的返回:
node test.js 一些 數(shù)據(jù)如果一個(gè)流上多次調(diào)用?writable.cork(),則必須調(diào)用同樣次數(shù)的?writable.uncork()?才能輸出緩沖的數(shù)據(jù)。
var assert = require('assert'); const writer = process.stdout; writer.cork(); writer.write('一些 '); writer.cork(); writer.write('數(shù)據(jù) '); process.nextTick(() => {writer.uncork();// 數(shù)據(jù)不會(huì)被輸出,直到第二次調(diào)用 uncork()。writer.uncork();//注釋掉這一句就不會(huì)有輸出,正確輸出為一些 數(shù)據(jù) });writable.destroy([error])
- error?<Error>
- 返回:?<this>
銷(xiāo)毀流,并觸發(fā)?'error'?事件且傳入?error?參數(shù)。 調(diào)用該方法后,可寫(xiě)流就結(jié)束了,之后再調(diào)用?write()?或?end()?都會(huì)導(dǎo)致?ERR_STREAM_DESTROYED?錯(cuò)誤。 實(shí)現(xiàn)流時(shí)不應(yīng)該重寫(xiě)這個(gè)方法,而是重寫(xiě)?writable._destroy()
?
writable.setDefaultEncoding(encoding)
- encoding?<string>?默認(rèn)的字符編碼。
- 返回:?<this>
為可寫(xiě)流設(shè)置默認(rèn)的?encoding。
?
轉(zhuǎn)自https://blog.csdn.net/eeewwwddd/article/details/81042225
let fs = require('fs') let EventEmitter = require('events') //只有第一次write的時(shí)候直接用_write寫(xiě)入文件 其余都是放到cache中 但是len超過(guò)了highWaterMark就會(huì)返回false告知需要drain 很占緩存 //從第一次的_write開(kāi)始 回去一直通過(guò)clearBuffer遞歸_write寫(xiě)入文件 如果cache中沒(méi)有了要寫(xiě)入的東西 會(huì)根據(jù)needDrain來(lái)判斷是否觸發(fā)干點(diǎn) class WriteStream extends EventEmitter{constructor(path,options = {}){super()this.path = paththis.highWaterMark = options.highWaterMark || 64*1024this.flags = options.flags || 'r'this.start = options.start || 0this.pos = this.startthis.autoClose = options.autoClose || truethis.mode = options.mode || 0o666//默認(rèn)null就是bufferthis.encoding = options.encoding || null//打開(kāi)這個(gè)文件this.open()//寫(xiě)文件的時(shí)候需要哪些參數(shù)//第一次寫(xiě)入的時(shí)候 是給highWaterMark個(gè)饅頭 他會(huì)硬著頭皮寫(xiě)到文件中 之后才會(huì)把多余吃不下的放到緩存中this.writing = false//緩存數(shù)組this.cache = []this.callbackList = []//數(shù)組長(zhǎng)度this.len = 0//是否觸發(fā)drain事件this.needDrain = false}clearBuffer(){//取緩存中最上面的一個(gè)let buffer = this.cache.shift()if(buffer){//有buffer的情況下this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)}else{//沒(méi)有的話 先看看需不需要drainif(this.needDrain){//觸發(fā)drain 并初始化所有狀態(tài)this.writing = falsethis.needDrain = falsethis.callbackList.shift()()this.emit('drain')}this.callbackList.map(v=>{v()})this.callbackList.length = 0}}_write(chunk,encoding,clearBuffer,callback){//因?yàn)閣rite方法是同步調(diào)用的 所以可能還沒(méi)獲取到fdif(typeof this.fd != 'number'){//直接在open的時(shí)間對(duì)象上注冊(cè)一個(gè)一次性事件 當(dāng)open被emit的時(shí)候會(huì)被調(diào)用return this.once('open',()=>this._write(chunk,encoding,clearBuffer,callback))}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{this.pos += byteWrite//每次寫(xiě)完 相應(yīng)減少內(nèi)存中的數(shù)量this.len -= byteWriteif(callback) this.callbackList.push(callback)//第一次寫(xiě)完 clearBuffer()})}//寫(xiě)入方法write(chunk,encoding=this.encoding,callback){//判斷chunk必須是字符串或者buffer 為了統(tǒng)一都變成bufferchunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding)//維護(hù)緩存的長(zhǎng)度 3this.len += chunk.lengthlet ret = this.len < this.highWaterMarkif(!ret){//表示要觸發(fā)drain事件this.needDrain = true}//正在寫(xiě)入的應(yīng)該放到內(nèi)存中if(this.writing){this.cache.push({chunk,encoding,callback})}else{//這里是第一次寫(xiě)的時(shí)候this.writing = true//專(zhuān)門(mén)實(shí)現(xiàn)寫(xiě)的方法this._write(chunk,encoding,()=>this.clearBuffer(),callback)}// console.log(ret)//能不能繼續(xù)寫(xiě)了 false代表下次寫(xiě)的時(shí)候更占內(nèi)存return ret}destory(){if(typeof this.fd != 'number'){return this.emit('close')}//如果文件被打開(kāi)過(guò) 就關(guān)閉文件并且觸發(fā)close事件fs.close(this.fd,()=>{this.emit('close')})}open(){//fd表示的就是當(dāng)前this.path的這個(gè)文件,從3開(kāi)始(number類(lèi)型)fs.open(this.path,this.flags,(err,fd)=>{//有可能fd這個(gè)文件不存在 需要做處理if(err){//如果有自動(dòng)關(guān)閉 則幫他銷(xiāo)毀if(this.autoClose){//銷(xiāo)毀(關(guān)閉文件,出發(fā)關(guān)閉文件事件)this.destory()}//如果有錯(cuò)誤 就會(huì)觸發(fā)error事件this.emit('error',err)return}//保存文件描述符this.fd = fd//當(dāng)文件打開(kāi)成功時(shí)觸發(fā)open事件this.emit('open',this.fd)})} }?
自定義可寫(xiě)流
因?yàn)閏reateWriteStream內(nèi)部調(diào)用了WriteStream類(lèi),WriteStream又實(shí)現(xiàn)了Writable接口,WriteStream實(shí)現(xiàn)了_write()方法,所以我們通過(guò)自定義一個(gè)類(lèi)繼承stream模塊的Writable,并在原型上自定義一個(gè)_write()就可以自定義自己的可寫(xiě)流
返回:
node test.js <Buffer 79 65 73> ok?
?
《2》可讀流
可讀流是對(duì)提供數(shù)據(jù)的來(lái)源的一種抽象。
可讀流的例子包括:
- 客戶(hù)端的 HTTP 響應(yīng)
- 服務(wù)器的 HTTP 請(qǐng)求
- fs 的讀取流
- zlib 流
- crypto 流
- TCP socket
- 子進(jìn)程 stdout 與 stderr
- process.stdin
所有可讀流都實(shí)現(xiàn)了?stream.Readable?類(lèi)定義的接口。
?
兩種讀取模式
可讀流運(yùn)作于兩種模式之一:流動(dòng)模式(flowing)或暫停模式(paused)。
- 在流動(dòng)模式中,數(shù)據(jù)自動(dòng)從底層系統(tǒng)讀取,并通過(guò)?EventEmitter?接口的事件盡可能快地被提供給應(yīng)用程序。
- 在暫停模式中,必須顯式調(diào)用?stream.read()?讀取數(shù)據(jù)塊。
所有可讀流都開(kāi)始于暫停模式,可以通過(guò)以下方式切換到流動(dòng)模式:
- 添加?'data'?事件句柄。
- 調(diào)用?stream.resume()。
- 調(diào)用?stream.pipe()。
可讀流可以通過(guò)以下方式切換回暫停模式:
- 如果沒(méi)有管道目標(biāo),則調(diào)用?stream.pause()。
- 如果有管道目標(biāo),則移除所有管道目標(biāo)。調(diào)用?stream.unpipe()?可以移除多個(gè)管道目標(biāo)。
只有提供了消費(fèi)或忽略數(shù)據(jù)的機(jī)制后,可讀流才會(huì)產(chǎn)生數(shù)據(jù)。 如果消費(fèi)的機(jī)制被禁用或移除,則可讀流會(huì)停止產(chǎn)生數(shù)據(jù)。
為了向后兼容,移除?'data'?事件句柄不會(huì)自動(dòng)地暫停流。 如果有管道目標(biāo),一旦目標(biāo)變?yōu)?span id="ze8trgl8bvbq" class="Apple-converted-space">?drain?狀態(tài)并請(qǐng)求接收數(shù)據(jù)時(shí),則調(diào)用?stream.pause()?也不能保證流會(huì)保持暫停模式。
如果可讀流切換到流動(dòng)模式,且沒(méi)有可用的消費(fèi)者來(lái)處理數(shù)據(jù),則數(shù)據(jù)將會(huì)丟失。 例如,當(dāng)調(diào)用?readable.resume()?時(shí),沒(méi)有監(jiān)聽(tīng)?'data'?事件或?'data'?事件句柄已移除。
添加?'readable'?事件句柄會(huì)使流自動(dòng)停止流動(dòng),并通過(guò)?readable.read()?消費(fèi)數(shù)據(jù)。 如果?'readable'?事件句柄被移除,且存在?'data'?事件句柄,則流會(huì)再次開(kāi)始流動(dòng)。
?
三種狀態(tài)
可讀流的兩種模式是對(duì)發(fā)生在可讀流中更加復(fù)雜的內(nèi)部狀態(tài)管理的一種簡(jiǎn)化的抽象。
在任意時(shí)刻,可讀流會(huì)處于以下三種狀態(tài)之一:
- readable.readableFlowing === null
- readable.readableFlowing === false
- readable.readableFlowing === true
當(dāng)?readable.readableFlowing?為?null?時(shí),沒(méi)有提供消費(fèi)流數(shù)據(jù)的機(jī)制,所以流不會(huì)產(chǎn)生數(shù)據(jù)。 在這個(gè)狀態(tài)下,監(jiān)聽(tīng)?'data'?事件、調(diào)用?readable.pipe()、或調(diào)用?readable.resume()?都會(huì)使?readable.readableFlowing?切換到?true,可讀流開(kāi)始主動(dòng)地產(chǎn)生數(shù)據(jù)并觸發(fā)事件。
調(diào)用?readable.pause()、readable.unpipe()、或接收到背壓,則?readable.readableFlowing?會(huì)被設(shè)為?false,暫時(shí)停止事件流動(dòng)但不會(huì)停止數(shù)據(jù)的生成。 在這個(gè)狀態(tài)下,為?'data'?事件綁定監(jiān)聽(tīng)器不會(huì)使?readable.readableFlowing?切換到?true。
const { PassThrough, Writable } = require('stream'); const pass = new PassThrough(); const writable = new Writable(); pass.pipe(writable); pass.unpipe(writable); // readableFlowing 現(xiàn)在為 false。 pass.on('data', (chunk) => { console.log(chunk.toString()); }); pass.write('ok'); // 不會(huì)觸發(fā) 'data' 事件。 pass.resume(); // 必須調(diào)用它才會(huì)觸發(fā) 'data' 事件。如果注釋掉它則不會(huì)返回結(jié)果ok當(dāng)?readable.readableFlowing?為?false?時(shí),數(shù)據(jù)可能會(huì)堆積在流的內(nèi)部緩沖中。
?
選擇一種接口風(fēng)格
可讀流的 API 貫穿了多個(gè) Node.js 版本,且提供了多種方法來(lái)消費(fèi)流數(shù)據(jù)。 ??開(kāi)發(fā)者通常應(yīng)該選擇其中一種方法來(lái)消費(fèi)數(shù)據(jù),不要在單個(gè)流使用多種方法來(lái)消費(fèi)數(shù)據(jù)。 混合使用?on('data')、on('readable')、pipe()?或異步迭代器,會(huì)導(dǎo)致不明確的行為。
對(duì)于大多數(shù)用戶(hù),建議使用?readable.pipe(),因?yàn)樗窍M(fèi)流數(shù)據(jù)最簡(jiǎn)單的方式。 如果開(kāi)發(fā)者需要精細(xì)地控制數(shù)據(jù)的傳遞與產(chǎn)生,可以使用?EventEmitter、readable.on('readable')/readable.read()?或?readable.pause()/readable.resume()。
?
stream.Readable 類(lèi)
下面是事件的介紹:
'error' 事件
- <Error>
當(dāng)流因底層內(nèi)部出錯(cuò)而不能產(chǎn)生數(shù)據(jù)、或推送無(wú)效的數(shù)據(jù)塊時(shí)觸發(fā)。
'close' 事件
當(dāng)流或其底層資源(比如文件描述符)被關(guān)閉時(shí)觸發(fā)。 表明不會(huì)再觸發(fā)其他事件,也不會(huì)再發(fā)生操作。
不是所有可讀流都會(huì)觸發(fā)?'close'?事件。
'data' 事件
- chunk?<Buffer>?|?<string>?|?<any>?數(shù)據(jù)塊。 對(duì)于非對(duì)象模式的流,?chunk?可以是字符串或?Buffer。 對(duì)于對(duì)象模式的流,chunk?可以是任何 JavaScript 值,除了?null。
當(dāng)流將數(shù)據(jù)塊傳送給消費(fèi)者后觸發(fā)。 當(dāng)調(diào)用?readable.pipe(),readable.resume()?或綁定監(jiān)聽(tīng)器到?'data'?事件時(shí),流會(huì)轉(zhuǎn)換到流動(dòng)模式。 當(dāng)調(diào)用?readable.read()?且有數(shù)據(jù)塊返回時(shí),也會(huì)觸發(fā)?'data'?事件。
如果使用?readable.setEncoding()?為流指定了默認(rèn)的字符編碼,則監(jiān)聽(tīng)器回調(diào)傳入的數(shù)據(jù)為字符串,否則傳入的數(shù)據(jù)為?Buffer。
const fs = require('fs'); const rr = fs.createReadStream('data.txt');//hello data rr.on('data', (chunk) => {//readable不行,報(bào)錯(cuò)TypeError: Cannot read property 'length' of undefined console.log(`接收到 ${chunk.length} 個(gè)字節(jié)的數(shù)據(jù)`); //chunk為undefined });返回:
node test.js 接收到 10 個(gè)字節(jié)的數(shù)據(jù)?
process.stdin.setEncoding('utf8'); process.stdin.on('data', (chunk) => {//readable不行,會(huì)閃退??????? console.log(`接收到 ${chunk.length} 個(gè)字節(jié)的數(shù)據(jù)`); });返回:
node test.js 今天天氣好 //自己輸入并回車(chē),這個(gè)內(nèi)容就會(huì)被process.stdin收到 接收到 6 個(gè)字節(jié)的數(shù)據(jù)?
之前有試一個(gè)例子一直沒(méi)有成功:
process.stdin.setEncoding('utf8'); // process.stdout.write("請(qǐng)輸入用戶(hù)名:"); process.stdin.on('data', (chunk) => { // var chunk = process.stdin.read(); console.log(chunk); if (chunk !== null) { process.stdout.write(`data: ${chunk}`); } }); process.stdin.on('end', () => { process.stdout.write('end'); });?
'end' 事件
當(dāng)流中沒(méi)有數(shù)據(jù)可供消費(fèi)時(shí)觸發(fā)。
'end'?事件只有在數(shù)據(jù)被完全消費(fèi)掉后才會(huì)觸發(fā)。 要想觸發(fā)該事件,可以將流轉(zhuǎn)換到流動(dòng)模式,或反復(fù)調(diào)用?stream.read()?直到數(shù)據(jù)被消費(fèi)完。
'readable' 事件
當(dāng)流中有數(shù)據(jù)可供讀取時(shí)觸發(fā)
當(dāng)?shù)竭_(dá)流數(shù)據(jù)尾部時(shí),?'readable'?事件也會(huì)觸發(fā)。觸發(fā)順序在?'end'?事件之前。
事實(shí)上,?'readable'?事件表明流有了新的動(dòng)態(tài):要么是有了新的數(shù)據(jù),要么是到了流的尾部。 對(duì)于前者,?stream.read()?將返回可用的數(shù)據(jù)。而對(duì)于后者,?stream.read()?將返回?null。 例如,下面的例子中的?foo.txt?是一個(gè)空文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt'); rr.on('readable', () => { console.log(`讀取的數(shù)據(jù): ${rr.read()}`); }); rr.on('end', () => { console.log('結(jié)束'); });返回:
node test.js 讀取的數(shù)據(jù): null 結(jié)束?
有問(wèn)題:
const fs = require('fs'); const rr = fs.createReadStream('data.txt'); rr.on('readable', function(){//不能是'data'事件,為什么,如果是data,返回只有null和end,明天好好查查這兩者的對(duì)比 var chunk = rr.read(); // 獲取到輸入的信息 console.log(chunk); if(chunk === ''){ rr.emit('end'); // 觸發(fā)end事件 return } if (chunk !== null) { process.stdout.write('data: '+ chunk +'\n'); } // rr.emit('end'); }); rr.on('end', function() { process.stdout.write('end'+'\n'); //也輸出了,只是被擋住了,加上+'\n'就看出來(lái)了 });返回:
node test.js <Buffer 68 65 6c 6c 6f 20 64 61 74 61> data: hello data null end?上面標(biāo)明的錯(cuò)誤都是因?yàn)橐婚_(kāi)始沒(méi)能弄清楚data和readable的區(qū)別,看了博客https://blog.csdn.net/eeewwwddd/article/details/81042225?utm_source=copy后終于明白
參數(shù)
path:讀取的文件的路徑
?
data與readable的區(qū)別:
- readable和讀流的data的區(qū)別就是,readable可以控制自己從緩存區(qū)讀多少和控制讀的次數(shù),而data是每次讀取都清空緩存,讀多少輸出多少
- readable是暫停模式,data是流動(dòng)模式;就是readable需要使用read()來(lái)讀取數(shù)據(jù),data則是從回調(diào)中就能夠得到數(shù)據(jù)
//因?yàn)樯厦娴膁ata事件把數(shù)據(jù)讀了,清空緩存區(qū)。所以導(dǎo)致下面的readable讀出為null rs.on('readable',() => {console.log('readable');console.log(rs.read()); });
返回:
node test.jsdata
he
data
ll
data
o
readable
null
如果把'data'監(jiān)聽(tīng)去掉,那么返回結(jié)果就是:
node test.js readable he readable ll readable o readable null?
舉例說(shuō)明readable的使用情況:
(1)
let rs = fs.createReadStream('./foo.txt', {//內(nèi)容為 Today is a good day.i want to go out for fun.//每次讀7個(gè)highWaterMark: 7,encoding: 'utf8' }) //如果讀流第一次全部讀下來(lái)并且小于highWaterMark,就會(huì)再讀一次(再觸發(fā)一次readable事件) rs.on('readable', () => {let result = rs.read(2);console.log(result) })返回:
node test.js To da(2)
//如果rs.read()不加參數(shù),一次性讀完,會(huì)從緩存區(qū)再讀一次,為null rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js Today i s a goo d day.iwant t o go ou t for f un. null(3)
//如果readable每次都剛好讀完(即rs.read()的參數(shù)剛好和highWaterMark相等),就會(huì)一直觸發(fā)readable事件,如果最后不足他想喝的數(shù),他就會(huì)先觸發(fā)一次null,最后把剩下的喝完 rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js Today i s a goo d day.iwant t o go ou t for f null un.(4)
//一開(kāi)始緩存區(qū)為0的時(shí)候也會(huì)默認(rèn)調(diào)一次readable事件,將foo.txt內(nèi)容清零 rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js null?
實(shí)戰(zhàn):行讀取器(平常我們的文件可能有回車(chē)、換行,此時(shí)如果要每次想讀一行的數(shù)據(jù),就得用到readable)
let EventEmitter = require('events') //如果要將內(nèi)容全部讀出就用on('data'),精確讀取就用on('readable') class LineReader extends EventEmitter {constructor(path) {super()this.rs = fs.createReadStream(path)//回車(chē)符的十六進(jìn)制let RETURN = 0x0d//換行符的十六進(jìn)制let LINE = 0x0alet arr = []this.on('newListener', (type) => {//每次使用 on 監(jiān)聽(tīng)事件時(shí)觸發(fā)'newListener'事件if (type === 'newLine') {//自定義的一個(gè)事件'newLine',觸發(fā)后就調(diào)用'readable',然后自行設(shè)定一次讀取一行的操作this.rs.on('readable', () => {let char//每次讀一個(gè),當(dāng)讀完的時(shí)候會(huì)返回null,終止循環(huán)while (char = this.rs.read(1)) {//讀到文件最后char = nullswitch (char[0]) {case RETURN:break;//Mac下只有換行符,windows下是回車(chē)符和換行符,需要根據(jù)不同的轉(zhuǎn)換。因?yàn)槲疫@里是Maccase LINE://如果是換行符就把數(shù)組轉(zhuǎn)換為字符串let r = Buffer.from(arr).toString('utf8')//把數(shù)組清空arr.length = 0//觸發(fā)newLine事件,把得到的一行數(shù)據(jù)輸出this.emit('newLine', r)break;default://如果不是換行符,就放入數(shù)組中arr.push(char[0])}}})}})//以上只能取出換行符之前的代碼,最后一行的后面沒(méi)有換行符,所以需要特殊處理。當(dāng)讀流讀完需要觸發(fā)end事件時(shí)this.rs.on('end', () => {//取出最后一行數(shù)據(jù),轉(zhuǎn)成字符串let r = Buffer.from(arr).toString('utf8')arr.length = 0this.emit('newLine', r)})} }let lineReader = new LineReader('./foo.txt') lineReader.on('newLine', function (data) {console.log('a line');console.log(data); })返回:
node test.js //可見(jiàn)一次是只讀取一行的 a line if the truth is : a line I a line Am a line A a line boy一般是將整個(gè)文件讀取完的:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('readable', () => {console.log('one time');console.log(rr.read()); }); rr.on('end', () => {console.log('結(jié)束'); });返回:
node test.js one time if the truth is : I Am A boy one time null 結(jié)束?
下面接著方法的介紹:
readable.destroy([error])
銷(xiāo)毀流,并且觸發(fā)error事件。然后,可讀流將釋放所有的內(nèi)部資源。
開(kāi)發(fā)者不應(yīng)該覆蓋這個(gè)方法,應(yīng)該覆蓋readable._destroy方法。
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('open', function () {console.log('文件被打開(kāi)'); }); rr.destroy('something wrong');//有參數(shù)則為出現(xiàn)的錯(cuò)誤,會(huì)觸發(fā)error事件 rr.on('data', function (data) {console.log('data');console.log(data);}); rr.on('error', function (err) {console.log('error');console.log(err); }); rr.on('close', function (err) {console.log('close'); }); rr.on('end', () => {console.log('end'); });返回:
node test.js 文件被打開(kāi) error something wrong如果rr.destroy();參數(shù)為空,則不會(huì)觸發(fā)error事件,而是觸發(fā)close事件,那么返回為:
?
node test.js 文件被打開(kāi) closereadable.isPaused()
- 返回:?<boolean>
readable.isPaused()?方法返回可讀流的當(dāng)前操作狀態(tài)。 該方法主要是在?readable.pipe()?方法的底層機(jī)制中用到。大多數(shù)情況下,沒(méi)有必要直接使用該方法
readable.pause()
- 返回:?this
readable.pause()?方法將會(huì)使 flowing 模式的流停止觸發(fā)?'data'?事件, 進(jìn)而切出 flowing 模式。任何可用的數(shù)據(jù)都將保存在內(nèi)部緩存中。
?
readable.read([size])
- size?<number>?可選參數(shù),確定讀取數(shù)據(jù)的大小.
- 返回?<string>?|?<Buffer>?|?<null>
readable.read()方法從內(nèi)部緩沖區(qū)中抽出并返回一些數(shù)據(jù)。 如果沒(méi)有可讀的數(shù)據(jù),返回null。readable.read()方法默認(rèn)數(shù)據(jù)將作為“Buffer”對(duì)象返回 ,除非已經(jīng)使用readable.setEncoding()方法設(shè)置編碼或流運(yùn)行在對(duì)象模式。
可選的size參數(shù)指定要讀取的特定數(shù)量的字節(jié)。如果size字節(jié)不可讀,將返回null除非流已經(jīng)結(jié)束,在這種情況下所有保留在內(nèi)部緩沖區(qū)的數(shù)據(jù)將被返回。
如果沒(méi)有指定size參數(shù),則內(nèi)部緩沖區(qū)包含的所有數(shù)據(jù)將返回。
readable.read()方法只應(yīng)該在暫停模式下的可讀流上運(yùn)行。在流模式下,readable.read()自動(dòng)調(diào)用直到內(nèi)部緩沖區(qū)的數(shù)據(jù)完全耗盡。
一般來(lái)說(shuō),建議開(kāi)發(fā)人員避免使用'readable'事件和readable.read()方法,使用readable.pipe()或'data'事件代替。
無(wú)論size參數(shù)的值是什么,對(duì)象模式中的可讀流將始終返回調(diào)用readable.read(size)的單個(gè)項(xiàng)目。
注意:如果readable.read()方法返回一個(gè)數(shù)據(jù)塊,那么一個(gè)'data'事件也將被發(fā)送。
注意:在已經(jīng)被發(fā)出的'end'事件后調(diào)用stream.read([size])事件將返回null。不會(huì)拋出運(yùn)行時(shí)錯(cuò)誤。
?
//fd文件描述符,一般通過(guò)fs.open中獲取 //buffer是讀取后的數(shù)據(jù)放入的緩存目標(biāo) //0,從buffer的0位置開(kāi)始放入 //BUFFER_SIZE,每次放BUFFER_SIZE這么長(zhǎng)的長(zhǎng)度 //index,每次從文件的index的位置開(kāi)始讀 //bytesRead,真實(shí)讀到的個(gè)數(shù) fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){})?
?
?
?
readable.resume()
- 返回:?this
readable.resume()?方法會(huì)重新觸發(fā)?'data'?事件, 將暫停模式切換到流動(dòng)模式。
readable.resume()?方法可以用來(lái)充分使用流中的數(shù)據(jù),而不用實(shí)際處理任何數(shù)據(jù),如以下示例所示:
getReadableStreamSomehow().resume().on('end', () => {console.log('Reached the end, but did not read anything.');});readable.setEncoding(encoding)
- encoding?<string>?要使用的編碼
- Returns:?this
readble.setEncoding()?方法會(huì)為從可讀流讀入的數(shù)據(jù)設(shè)置字符編碼
默認(rèn)返回Buffer對(duì)象。設(shè)置編碼會(huì)使得該流數(shù)據(jù)返回指定編碼的字符串而不是Buffer對(duì)象。例如,調(diào)用readable.setEncoding('utf8')會(huì)使得輸出數(shù)據(jù)作為UTF-8數(shù)據(jù)解析,并作為字符串返回。調(diào)用readable.setEncoding('hex')使得數(shù)據(jù)被編碼成16進(jìn)制字符串格式。
可讀流會(huì)妥善處理多字節(jié)字符,如果僅僅直接從流中取出Buffer對(duì)象,很可能會(huì)導(dǎo)致錯(cuò)誤解碼。
?
?
舉例說(shuō)明上面的事件和方法的使用:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('open', function () {//1 先響應(yīng)openconsole.log('文件被打開(kāi)'); }); rr.on('data', function (data) {//2 console.log('data');console.log(rr.isPaused()); //falserr.pause();//3 改為暫停模式,不讀取數(shù)據(jù)了console.log(rr.isPaused());//true console.log(data);}); setTimeout(function () {//7 兩秒后恢復(fù)成流動(dòng)模式繼續(xù)讀取數(shù)據(jù)console.log('resume');console.log(rr.isPaused());//true rr.resume();console.log(rr.isPaused());//true,因?yàn)樘砑?'readable' 事件句柄會(huì)使流自動(dòng)停止流動(dòng),并通過(guò) readable.read() 消費(fèi)數(shù)據(jù)。 如果 'readable' 事件句柄被移除,且存在 'data' 事件句柄,則流會(huì)再次開(kāi)始流動(dòng) },1000); //注釋掉readable后,結(jié)果就為false rr.on('error', function (err) {console.log(err); }); rr.on('readable', () => {//4 因?yàn)閐ata將所有數(shù)據(jù)都讀完并將緩存清空,所以readable只輸出nullconsole.log('readable');console.log(rr.read()); }); rr.on('close', function (err) {//6 關(guān)閉console.log('close'); }); rr.on('end', () => {//5 結(jié)束console.log('end'); });返回:
node test.js 文件被打開(kāi) data false true if the truth is : I Am A boy readable null end close resume true true注釋掉readable返回:
node test.js 文件被打開(kāi) data false true if the truth is : I Am A boy resume true false end close?
readable.pipe(destination[, options])
- destination?<stream.Writable>?數(shù)據(jù)寫(xiě)入目標(biāo)
-
options?<Object>?Pipe 選項(xiàng)
- end?<boolean>?在 reader 結(jié)束時(shí)結(jié)束 writer 。默認(rèn)為?true。
readable.pipe()?綁定一個(gè) [Writable][] 到?readable?上, 將可寫(xiě)流自動(dòng)切換到 flowing 模式并將所有數(shù)據(jù)傳給綁定的 [Writable][]。數(shù)據(jù)流將被自動(dòng)管理。這樣,即使是可讀流較快,目標(biāo)可寫(xiě)流也不會(huì)超負(fù)荷(overwhelmed)。
下面例子將?readable?中的所有數(shù)據(jù)通過(guò)管道傳遞給名為?foo.txt?的文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.pipe(process.stdout);返回:
node test.js if the truth is : I Am A boy可以在單個(gè)可讀流上綁定多個(gè)可寫(xiě)流。
readable.pipe()?方法返回?目標(biāo)流?的引用,這樣就可以對(duì)流進(jìn)行鏈?zhǔn)降毓艿啦僮?#xff1a;
const fs = require('fs'); const zlib = require('zlib'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); const z = zlib.createGzip(); const w = fs.createWriteStream('foo.txt.gz'); rr.pipe(z).pipe(w); //運(yùn)行后,文件夾中果然出現(xiàn)了一個(gè)壓縮文件默認(rèn)情況下,當(dāng)源可讀流(the source Readable stream)觸發(fā)?'end'?事件時(shí),目標(biāo)流也會(huì)調(diào)用?stream.end()?方法從而結(jié)束寫(xiě)入。要禁用這一默認(rèn)行為,?end?選項(xiàng)應(yīng)該指定為?false, 這將使目標(biāo)流保持打開(kāi), 如下面例子所示:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); const writer = fs.createWriteStream('foo2.txt'); rr.pipe(writer,{end:false}); rr.on('end', () => {console.log('end reader'); }); setTimeout(function(){writer.write('請(qǐng)輸入num1的值:');writer.end(); },2000);返回:
node test.jsend reader
且foo2.txt文件中內(nèi)容為:
if the truth is : I Am A boy請(qǐng)輸入num1的值:如果去掉{ end: false },則出錯(cuò):
node test.js end reader events.js:167throw er; // Unhandled 'error' event^Error [ERR_STREAM_WRITE_AFTER_END]: write after end //這就是因?yàn)楫?dāng)源可讀流觸發(fā)?'end'?事件時(shí),目標(biāo)流也會(huì)調(diào)用?stream.end()?方法從而結(jié)束寫(xiě)入這里有一點(diǎn)要警惕,如果可讀流在處理時(shí)發(fā)生錯(cuò)誤,目標(biāo)可寫(xiě)流?不會(huì)?自動(dòng)關(guān)閉。 如果發(fā)生錯(cuò)誤,需要?手動(dòng)?關(guān)閉所有流以避免內(nèi)存泄漏。
注意:不管對(duì)?process.stderr?和?process.stdout?指定什么選項(xiàng),它們都是直到 Node.js 進(jìn)程退出才關(guān)閉。
?
readable.unpipe([destination])
- destination?<stream.Writable>?可選的,指定需要分離的目標(biāo)流
readable.unpipe()?方法將之前通過(guò)stream.pipe()方法綁定的流分離
如果?destination?沒(méi)有傳入, 則所有綁定的流都會(huì)被分離.
如果傳入?destination, 但它沒(méi)有被pipe()綁定過(guò),則該方法不作為.
const readable = getReadableStreamSomehow(); const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(() => {console.log('Stop writing to file.txt');readable.unpipe(writable);console.log('Manually close the file stream');writable.end(); }, 1000);?
readable源碼實(shí)現(xiàn),轉(zhuǎn)自https://blog.csdn.net/eeewwwddd/article/details/81042225
let fs = require('fs') let EventEmitter = require('events') class ReadStream extends EventEmitter{constructor(path,options = {}){super()this.path = paththis.highWaterMark = options.highWaterMark || 64*1024this.flags = options.flags || 'r'this.start = options.start || 0this.pos = this.start //會(huì)隨著讀取的位置改變this.autoClose = options.autoClose || truethis.end = options.end || null//默認(rèn)null就是bufferthis.encoding = options.encoding || null//參數(shù)的問(wèn)題this.reading = false //非流動(dòng)模式//創(chuàng)建個(gè)buffer用來(lái)存儲(chǔ)每次讀出來(lái)的數(shù)據(jù)this.buffers = []//緩存區(qū)長(zhǎng)度this.len = 0//是否要觸發(fā)readable事件this.emittedReadable = false//觸發(fā)open獲取文件的fd標(biāo)識(shí)符this.open()//此方法默認(rèn)同步調(diào)用 每次設(shè)置on監(jiān)聽(tīng)事件時(shí)都會(huì)調(diào)用之前所有的newListener事件this.on('newListener',(type)=>{// 等待著他監(jiān)聽(tīng)data事件if(type === 'readable'){//開(kāi)始讀取 客戶(hù)已經(jīng)監(jiān)聽(tīng)的data事件this.read()}})}//readable真正的源碼中的方法,計(jì)算出和n最接近的2的冪次數(shù) computeNewHighWaterMark(n) {n--;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;n++;return n;}read(n){//當(dāng)讀的數(shù)量大于水平線,會(huì)通過(guò)取2的冪次取比他大和最接近的數(shù)if(this.len < n){this.highWaterMark = this.computeNewHighWaterMark(n)//重新觸發(fā)readbale的callback,所以第一次會(huì)觸發(fā)nullthis.emittedReadable = true//重新讀新的水位線this._read()}//真正讀取到的let buffer = null//說(shuō)明緩存里有這么多,取出來(lái)if(n>0 && n<=this.len){//定義一個(gè)bufferbuffer = Buffer.alloc(n)let buflet flag = truelet index = 0//[buffer<1,2,3,4>,buffer<1,2,3,4>,buffer<1,2,3,4>]//每次取出緩存前的第一個(gè)bufferwhile(flag && (buf = this.buffers.shift())){for(let i=0;i<buf.length;i++){//把取出的一個(gè)buffer中的數(shù)據(jù)放入新定義的buffer中buffer[index++] = buf[i]//當(dāng)buffer的長(zhǎng)度和n(參數(shù))長(zhǎng)度一樣時(shí),停止循環(huán)if(index === n){flag = false//維護(hù)緩存,因?yàn)榭赡芫彺嬷械腷uffer長(zhǎng)度大于n,當(dāng)取出n的長(zhǎng)度時(shí),還會(huì)剩下其余的buffer,我們需要切割buf并且放到緩存數(shù)組之前this.len -= nlet r = buf.slice(i+1)if(r.length){this.buffers.unshift(r)}break}}}}//如果緩存區(qū)沒(méi)有東西,等會(huì)讀完需要觸發(fā)readable事件//這里會(huì)有一種狀況,就是如果每次Readable讀取的數(shù)量正好等于highWaterMark(流讀取到緩存的長(zhǎng)度),就會(huì)每次都等于0,每次都觸發(fā)Readable事件,就會(huì)每次讀,讀到?jīng)]有為止,最后還會(huì)觸發(fā)一下nullif(this.len === 0){this.emittedReadable = true}if(this.len < this.highWaterMark){//默認(rèn),一開(kāi)始的時(shí)候開(kāi)始讀取if(!this.reading){this.reading = true//真正多讀取操作this._read()}}return buffer&&buffer.toString()}_read(){if(typeof this.fd != 'number'){//等待著觸發(fā)open事件后fd肯定拿到了 再去執(zhí)行read方法return this.once('open',()=>{this._read()})}//先讀這么多bufferlet buffer = Buffer.alloc(this.highWaterMark)fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{if(byteRead > 0){//當(dāng)?shù)谝淮巫x到數(shù)據(jù)后,改變r(jià)eading的狀態(tài),如果觸發(fā)read事件,可能還會(huì)在觸發(fā)第二次_readthis.reading = false//每次讀到數(shù)據(jù)增加緩存取得長(zhǎng)度this.len += byteRead//每次讀取之后,會(huì)增加讀取的文件的讀取開(kāi)始位置this.pos += byteRead//將讀到的buffer放入緩存區(qū)buffers中this.buffers.push(buffer.slice(0,byteRead))//觸發(fā)readableif(this.emittedReadable){this.emittedReadable = false//可以讀取了,默認(rèn)開(kāi)始的時(shí)候杯子填滿了this.emit('readable')}}else{//沒(méi)讀到就出發(fā)end事件this.emit('end')}})}destory(){if(typeof this.fd != 'number'){return this.emit('close')}//如果文件被打開(kāi)過(guò) 就關(guān)閉文件并且觸發(fā)close事件fs.close(this.fd,()=>{this.emit('close')})}open(){//fd表示的就是當(dāng)前this.path的這個(gè)文件,從3開(kāi)始(number類(lèi)型)fs.open(this.path,this.flags,(err,fd)=>{//有可能fd這個(gè)文件不存在 需要做處理if(err){//如果有自動(dòng)關(guān)閉 則幫他銷(xiāo)毀if(this.autoClose){//銷(xiāo)毀(關(guān)閉文件,觸發(fā)關(guān)閉文件事件)this.destory()}//如果有錯(cuò)誤 就會(huì)觸發(fā)error事件this.emit('error',err)return}//保存文件描述符this.fd = fd//當(dāng)文件打開(kāi)成功時(shí)觸發(fā)open事件this.emit('open',this.fd)})} }?
自定義可讀流
因?yàn)閏reateReadStream內(nèi)部調(diào)用了ReadStream類(lèi),ReadStream又實(shí)現(xiàn)了Readable接口,ReadStream實(shí)現(xiàn)了_read()方法,所以我們通過(guò)自定義一個(gè)類(lèi)繼承stream模塊的Readable,并在原型上自定義一個(gè)_read()就可以自定義自己的可讀流
返回:
node test.js 100?
?
pipe——管道 可以控制速率,因?yàn)樽x快寫(xiě)慢
let fs = require('fs') //pipe方法叫管道 可以控制速率 let rs = fs.createReadStream('./foo.txt',{highWaterMark: 4 }) let ws = fs.createWriteStream('./foo1.txt',{highWaterMark: 1 }) //會(huì)監(jiān)聽(tīng)rs的on('data')將讀取到的數(shù)據(jù),通過(guò)ws.write的方法寫(xiě)入文件 //調(diào)用寫(xiě)的一個(gè)方法 返回boolean類(lèi)型 //如果返回false就調(diào)用rs的pause方法 暫停讀取 //等待可寫(xiě)流 寫(xiě)入完畢在監(jiān)聽(tīng)drain resume rs rs.pipe(ws) //會(huì)控制速率 防止淹沒(méi)可用內(nèi)存?
let fs = require('fs') //這兩個(gè)是上面自己寫(xiě)的ReadStream和WriteStream let { Readable } = require('stream');class MyRead extends Readable{//流需要一個(gè)_read方法,方法中push什么,外面就接收什么 _read(){//push方法就是上面_read方法中的push一樣,把數(shù)據(jù)放入緩存區(qū)中this.push('100');//如果push了null就表示沒(méi)有東西可讀了,停止(如果不寫(xiě),就會(huì)一直push上面的值,死循環(huán))this.push(null);} }let writer = fs.createWriteStream('./foo1.txt',{highWaterMark: 1 });//如果用原來(lái)的讀寫(xiě),因?yàn)閷?xiě)比較耗時(shí),所以會(huì)多讀少寫(xiě),耗內(nèi)存 MyRead.prototype.pipe = function(dest){this.on('data',(data)=>{let flag = dest.write(data)//如果寫(xiě)入的時(shí)候嘴巴吃滿了就不繼續(xù)讀了,暫停if(!flag){this.pause()}});//如果寫(xiě)的時(shí)候嘴巴里的吃完了,就會(huì)繼續(xù)讀dest.on('drain',()=>{this.resume()});this.on('end',()=>{this.destroy()//銷(xiāo)毀ReadStream//清空緩存中的數(shù)據(jù)fs.fsync(1,()=>{//fs.fsync作用是同步磁盤(pán)緩存,1代表的是文件描述符,0,1,2 文件描述符代表標(biāo)準(zhǔn)輸入設(shè)備(比如鍵盤(pán)),標(biāo)準(zhǔn)輸出設(shè)備(顯示器)和標(biāo)準(zhǔn)錯(cuò)誤dest.destroy()//銷(xiāo)毀WriteStream,之前dest設(shè)的是,但是報(bào)錯(cuò)process.stdout cannot be closed });}); } var reader = new MyRead(); reader.pipe(writer);//結(jié)果就是將100寫(xiě)到了文件foo1.txt中上面的文件描述符處本來(lái)寫(xiě)的是dest.fd,但是報(bào)錯(cuò):
TypeError [ERR_INVALID_ARG_TYPE]: The "fd" argument must be of type number. Received type object
查看writer的fd為null,不知原因,待查明???????????
?
stream.pipeline(...streams[, callback])
- ...streams?<Stream>?兩個(gè)或多個(gè)要用管道連接的流
- callback?<Function>?一個(gè)回調(diào)函數(shù),可以帶有一個(gè)錯(cuò)誤信息參數(shù)
該模塊方法用于在多個(gè)流之間架設(shè)管道,可以自動(dòng)傳遞錯(cuò)誤和完成掃尾工作,并且可在管道架設(shè)完成時(shí)提供一個(gè)回調(diào)函數(shù):
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib');// 使用 pipeline API 輕松連接多個(gè)流 // 并在管道完成時(shí)獲得通知// 使用pipeline高效壓縮一個(gè)可能很大的tar文件: pipeline(fs.createReadStream('foo.txt'),zlib.createGzip(),fs.createWriteStream('foo.tar.gz'),//運(yùn)行后成功壓縮并返回 管道架設(shè)成功 信息(err) => {if (err) {console.error('管道架設(shè)失敗', err);} else {console.log('管道架設(shè)成功');}} );pipeline?API 也可做成承諾:
const util = require('util'); const stream = require('stream'); const fs = require('fs'); const zlib = require('zlib'); const pipeline = util.promisify(stream.pipeline);async function run() {await pipeline(fs.createReadStream('foo.txt'),zlib.createGzip(),fs.createWriteStream('foo.tar.gz')////運(yùn)行后成功壓縮并返回 管道架設(shè)成功 信息 );console.log('管道架設(shè)成功'); }run().catch(console.error);?
?
用于實(shí)現(xiàn)流的 API
其實(shí)就是覆寫(xiě)下面的這些方法來(lái)實(shí)現(xiàn)自己的流操作:
新的流類(lèi)必須實(shí)現(xiàn)一個(gè)或多個(gè)特定的方法,根據(jù)所創(chuàng)建的流類(lèi)型,如下圖所示:
| 只讀流 | Readable | _read |
| 只寫(xiě)流 | writable | _write?,_writev,_final |
| 可讀可寫(xiě)流 | Duplex | _read?,_write?,_writev,_final |
| 操作寫(xiě)數(shù)據(jù),然后讀結(jié)果 | Transform | _transform,_flush,_final |
注意:實(shí)現(xiàn)流的代碼里面不應(yīng)該出現(xiàn)調(diào)用“public”方法的地方因?yàn)檫@些方法是給使用者使用的(流使用者部分的API所述)。這樣做可能會(huì)導(dǎo)致使用流的應(yīng)用程序代碼產(chǎn)生不利的副作用。
const { Writable } = require('stream');class MyWritable extends Writable {constructor(options) {super(options);// ... } }?
?
雙工流
有了雙工流,我們可以在同一個(gè)對(duì)象上同時(shí)實(shí)現(xiàn)可讀和可寫(xiě),就好像同時(shí)繼承這兩個(gè)接口。 重要的是雙工流的可讀性和可寫(xiě)性操作完全獨(dú)立于彼此。這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象。
let { Duplex } = require('stream') //雙工流,可讀可寫(xiě) class MyDuplex extends Duplex{_read(){this.push('hello Duplex')this.push(null)}_write(chunk,encoding,clearBuffer){console.log(chunk)clearBuffer()} }let myDuplex = new MyDuplex() //process.stdin是node自帶的process進(jìn)程中的可讀流,會(huì)監(jiān)聽(tīng)命令行的輸入 //process.stdout是node自帶的process進(jìn)程中的可寫(xiě)流,會(huì)監(jiān)聽(tīng)并輸出在命令行中 //所以這里的意思就是在命令行先輸出hello,然后我們輸入什么他就出來(lái)對(duì)應(yīng)的buffer(先作為可讀流出來(lái)) process.stdin.pipe(myDuplex).pipe(process.stdout)返回:
node test.js hello Duplex?
?
轉(zhuǎn)換流
在讀寫(xiě)過(guò)程中可以修改或轉(zhuǎn)換數(shù)據(jù)的?Duplex?流(例如?zlib.createDeflate())
轉(zhuǎn)換流的輸出是從輸入中計(jì)算出來(lái)的。對(duì)于轉(zhuǎn)換流,我們不必實(shí)現(xiàn)read或write的方法,我們只需要實(shí)現(xiàn)一個(gè)transform方法,將兩者結(jié)合起來(lái)。它有write方法的意思,我們也可以用它來(lái)push數(shù)據(jù)。
let { Transform } = require('stream');class MyTransform extends Transform{_transform(chunk,encoding,callback){//5 myTransform2 push時(shí)則觸發(fā)myTransform的_transformconsole.log(chunk.toString().toUpperCase());//6 然后輸出from MyTransform2的大寫(xiě)內(nèi)容callback();} } let myTransform = new MyTransform();class MyTransform2 extends Transform{_transform(chunk,encoding,callback){//2 觸發(fā)myTransform2的_transformconsole.log(chunk.toString().toUpperCase());//3 輸出input的大寫(xiě)內(nèi)容INPUTthis.push('from MyTransform2');//4 將from MyTransform2內(nèi)容寫(xiě)入myTransformthis.push(null);callback();} } let myTransform2 = new MyTransform2();//此時(shí)myTransform2被作為可寫(xiě)流觸發(fā)_transform,輸出輸入的大寫(xiě)字符后,會(huì)通過(guò)可讀流push字符到下一個(gè)轉(zhuǎn)換流中 //當(dāng)寫(xiě)入的時(shí)候才會(huì)觸發(fā)transform的值,此時(shí)才會(huì)push,所以后面的pipe拿到的chunk是前面的push的值 process.stdin.pipe(myTransform2).pipe(myTransform);返回:
node test.js input //1 輸入回車(chē) INPUTFROM MYTRANSFORM2?
總結(jié)
可讀流
在 flowing 模式下, 可讀流自動(dòng)從系統(tǒng)底層讀取數(shù)據(jù),并通過(guò) EventEmitter 接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。
在 paused 模式下,必須顯式調(diào)用 stream.read() 方法來(lái)從流中讀取數(shù)據(jù)片段。
所有初始工作模式為 paused 的 Readable 流,可以通過(guò)下面三種途徑切換到 flowing 模式:
- 監(jiān)聽(tīng) ‘data’ 事件
- 調(diào)用 stream.resume() 方法
- 調(diào)用 stream.pipe() 方法將數(shù)據(jù)發(fā)送到 Writable
可讀流可以通過(guò)下面途徑切換到 paused 模式:
- 如果不存在管道目標(biāo)(pipe destination),可以通過(guò)調(diào)用 stream.pause() 方法實(shí)現(xiàn)。
- 如果存在管道目標(biāo),可以通過(guò)取消 ‘data’ 事件監(jiān)聽(tīng),并調(diào)用 stream.unpipe() 方法移除所有管道目標(biāo)來(lái)實(shí)現(xiàn)。
可寫(xiě)流
需要知道只有在嘴真正的吃滿了,并且等到把嘴里的和地上的饅頭(緩存中的)都吃下了才會(huì)觸發(fā)drain事件
第一次寫(xiě)入會(huì)直接寫(xiě)入文件中,后面會(huì)從緩存中一個(gè)個(gè)取
雙工流
只是對(duì)可寫(xiě)可讀流的一種應(yīng)用,既可作為可讀流,也能作為可寫(xiě)流,并且作為可讀或者可寫(xiě)時(shí)是隔離的
轉(zhuǎn)換流
一般轉(zhuǎn)換流是邊輸入邊輸出的,而且一般只有觸發(fā)了寫(xiě)入操作時(shí)才會(huì)進(jìn)入_transform方法中。跟雙工流的區(qū)別就是,他的可讀可寫(xiě)是在一起的
?
轉(zhuǎn)載于:https://www.cnblogs.com/wanghui-garcia/p/9798158.html
總結(jié)
以上是生活随笔為你收集整理的nodejs-stream部分的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Ghost配置1——删除社交Link
- 下一篇: kafka channle的应用案例