Node中的流

2018-02-11 14:09:01来源:http://www.ayqy.net/blog/node-stream/作者:黯羽轻扬人点击

分享
一.流的概念

stream是数据集合,与数组、字符串差不多。但stream不一次性访问全部数据,而是一部分一部分发送/接收(chunk式的),所以不必占用那么大块内存,尤其适用于处理大量(外部)数据的场景


stream具有管道(pipeline)特性,例如:


const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

很多原生模块都是基于stream的,包括进程的 stdin/stdout/stderr :



例如常见的场景:


const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);

其中 pipe 方法把可读流的输出(数据源)作为可写流的输入(目标),直接把读文件的输出流作为输入连接到HTTP响应的输出流,从而避免把整个文件读入内存


P.S.甚至日常使用的 console.log() 内部实现 也是stream


二.流的类型

Node中有4种基础流:


Readable

可读流是对源的抽象, 从中可以消耗数据,如 fs.createReadStream


Writable

可写流是对可写入数据的目标的抽象,如 fs.createWriteStream


Duplex(双工)

双工流既可读又可写,如TCP socket


Transform(转换)

转换流本质上是双工流,用于在写入和读取数据时对其进行修改或转换,如 zlib.createGzip 用gzip压缩数据

转换流看一看做一个输入可写流,输出可读流的函数

P.S.有一种转换流叫(Pass)Through Stream(通过流),类似于FP中的 identity = x => x


三.管道

src.pipe(res) 要求源必须可读,目标必须可写,所以,如果是对双工流进行管道传输,就可以像Linux的管道一样链式调用:


readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)

pipe() 方法返回目标流,所以:


// a (readable), b and c (duplex), and d (writable)
a.pipe(b).pipe(c).pipe(d)
// 等价于
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Linux下,等价于
$ a | b | c | d
四.流与事件

事件驱动是Node在设计上的一个重要特点,很多Node原生对象都是基于事件机制( EventEmitter 模块)实现的,包括流( stream 模块):


Most of Node’s objects — like HTTP requests, responses, and streams — implement the EventEmitter module so they can provide a way to emit and listen to events.

所有stream都是 EventEmitter 实例,通过事件机制来读写数据,例如上面提到的 pipe() 方法相当于:


// readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});

P.S. pipe 还处理了一些别的事情,比如错误处理,EoF以及某个流的速度较快/较慢的情况


Readable与Writable stream的主要事件和方法如下:



Readable的主要事件有:


data 事件:stream把一个chunk传递给使用者时触发


end 事件:再没有要从stream中获取(consume)的数据时触发


Writable的主要事件有:


drain 事件,断流了,这是Writable stream可以接收更多数据的信号


finish 事件,当所有数据都已flush到下层系统时触发


五.Readable stream的两种模式:Paused与Flowing

一个Readable stream要么流动(Flowing)要么暂停(Paused),也被称为拉(pull)和推(push)两种模式


创建出来后默认处于Paused状态,可以通过 read() 方法读取数据。如果处于Flowing状态,数据会持续地流出来,此时只需要通过监听事件来使用这些数据,如果没有使用者的话,数据会丢失,所以都会监听Readable stream的 data 事件,实际上监听 data 事件会把Readable stream从Paused状态切换到Flowing,移除 data 事件监听会再切回来。需要手动切换的话,可以通过 resume() 和 pause() 来做


使用 pipe() 方式时不用关心这些,都会自动处理妥当:


Readable触发 data 事件,直到Writable忙不过来了


pipe 收到信号后调用 Readable.pause() ,进入Paused模式


Writable再干一会儿压力不大了的时候,会触发 drain 事件,此时 pipe 调用 Readable.resume() 进入Flowing模式,让Readable接着触发 data 事件


highWaterMark与backpressure

其实 drain 事件就是用来应对 Backpressure现象 的,简单的说,Backpressure就是下游的消费速度限制了传输,造成 下游向上游的反向压力


如果消费速度慢于生产速度,会在下游产生堆积,来不及处理的数据会存放到Writable的buffer里,如果不加(限流)处理,这个buffer会持续增长,可能溢出进而造成错误或数据丢失


Backpressure现象发生的标志是 Writable.write() 返回了 false ,说明来自上游的 待处理数据量已经触及highWaterMark (高水位线,默认16kb):


Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

这是下游开始有点紧张了(todo项足够忙一阵子了)的信号。建议在此时对上游限流,即调用 Readable.pause() 先给停了,给下游多点时间处理堆积的数据,下游觉得轻松了会触发 darin 事件,表示此时有能力处理更多数据了,所以这时候应该开闸放水( Readable.resume() )


注意,Readable的数据会存放在缓存中,直到有个Writable来消耗这些数据。所以Paused状态只是说不往下流了,已经缓存的数据还在Readable的buffer里。所以如果不限流,来不及处理的数据就缓存在下游,并持续堆积,限流的话,这部分数据被缓存在上游,因为限流了而不再持续堆积


另外,Readable也有highWaterMark的概念:


The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

是对从实际数据源读取速度的限制(比如从磁盘读文件),防止生产速度太快引发缓存堆积(比如一顿猛 push() )。所以Flowing Readable的正常工作方式是被 push() – push() – push() …诶,发现buffer里的量已经攒够一个chunk了,吐给下游。同样,Readable触及highWaterMark的标志是 push() 返回 false ,说明Readable的buffer不那么十分空了,此时如果还持续 push() ,没错, 也会出现BackPressure (Readable消费能力限制了从数据源到Readable的传输速度):


快-------------慢
数据源-------->Readable------->Writable
快--------------慢

只要上游(生产)快,下游(消费)慢就会出现BackPressure,所以在 readable.pipe(writable) 的简单场景,可能会出现上面两段BackPressure


六.示例
Writable stream

常见的造大文件:


const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum./n');
}
file.end();

通过 fs.createWriteStream() 创建指向文件的Writable stream,通过 write() 填充数据,写完后 end()


或者更一般的,直接 new 一个 Writable :


const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
// nowrap version
// process.stdout.write(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);

一个最简单的 echo 实现,把当前进程的标准输入接到自定义输出流 outStream ,像日志中间件一样(标准输入流经 outStream ,再该干嘛干嘛去 callback ):


cc
oo
nn
ss
oo
ll
ee
Console {
log: [Function: bound consoleCall],
...
}

write() 方法的3个参数中, chunk 是个Buffer, encoding 在某些场景下需要,大多数时候可以忽略, callback 是应该在 chunk 处理完毕后调用的通知函数,表明写入成功与否(失败的话,传Error对象进去),类似于尾触发机制中的 next()


或者更简单的 echo 实现:


process.stdin.pipe(process.stdout);

直接把标准输入流连接到标准输出流


Readable stream
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

通过 push 向Readable stream里填充数据, push(null) 表示结束。上例中把所有数据都读进来,然后才交给标准输出,实际上有更高效的方式(按需推数据给使用者):


const { Readable } = require('stream');
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

read() 方法每次吐一个字符,使用者从Readable stream取数据的时候, read() 会持续触发


Duplex/Transform stream

Duplex stream兼具Readable和Writable的特点:既可以作为数据源(生产者),也可以作为目标(消费者)。例如:


const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

上例把前2个例子结合起来了, inoutStream 被连接到标准输出流了, A-Z 会作为数据源传递给标准输出(打印出来),同时标准输入流被接到 inoutStream ,来自标准输入的所有数据会被 log 出来,效果如下:


ABCDEFGHIJKLMNOPQRSTUVWXYZcc
oo
nn
ss
oo
ll
ee
Console {
log: [Function: bound consoleCall],
...
}

P.S.先输出 A-Z 是因为 pipe() 会把Readable stream切换到Flowing模式,所以一开始就把 A-Z “流”出来了


注意 ,Duplex stream的Readable与Writable部分是完全独立的,读写互不影响,Duplex只是把两个特性组合成一个对象了,就像 两根筷子一样绑在一起的单向管道


Transform stream是一种有意思的Duplex stream:其 输出是根据输入计算得来的 。所以不用分别实现 read/write() 方法,只实现一个 transform() 方法就够了:


const { Transform } = require('stream');
const upperCaseTr = new Transform({
// 函数签名与write一致
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);

同样,Transform stream的Readable与Writable部分也是独立的(不手动 push 就不会自动传递到Readable部分),只是形式上结合起来了


P.S.另外,stream之间除了可以传递Buffer/String,还可以传递Object(包括Array),具体见 Streams Object Mode


Node提供了一些原生Transform stream,例如 zlib 和 crypto stream:


const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));

简单的命令行工具, gzip 压缩。更多示例见 Node’s built-in transform streams


参考资料

Node.js Streams: Everything you need to know


Node.js writable.write return false?


探究 Node.js 中的 drain 事件


深入理解 Node.js Stream 内部机制


Backpressuring in Streams



最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台