如果你正在学习node,那么流一定是一个你需要掌握的概念。如果你想成为一个node高手,那么流一定是武功秘籍中不可缺少的一个部分。
引用自stream-handbook。由此可见,流对于深入学习node的重要性。
流是什么?你可以把流理解成一种传输的能力。通过流,可以以平缓的方式,无副作用的将数据传输到目的地。在node中,node stream创建的流都是专用于string和buffer上的,一般情况下使用buffer。stream表示的是一种传输能力,buffer是传输内容的载体 (可以这样理解,stream:外卖小哥哥, buffer:你的外卖)。创建流的时候将objectmode设置true ,stream同样可以传输任意类型的js对象(除了null,null在流中有特殊用途)。
为什么要使用流?现在有个需求,我们要向客户端传输一个大文件。如果采用下面的方式
const fs = require('fs');const server = require('http').createserver();server.on('request', (req, res) => { fs.readfile('./big.file', (err, data) => { if (err) throw err; res.end(data); });});server.listen(8000);
每次接收一个请求,就要把这个大文件读入内存,然后再传输给客户端。通过这种方式可能会产生以下三种后果:
内存耗尽
拖慢其他进程
增加垃圾回收器的负载
所以这种方式在传输大文件的情况下,不是一个好的方案。并发量一大,几百个请求过来很容易就将内存耗尽。
如果采用流呢?
const fs = require('fs');const server = require('http').createserver();server.on('request', (req, res) => { const src = fs.createreadstream('./big.file'); src.pipe(res);});server.listen(8000);
采用这种方式,不会占用太多内存,读取一点就传输一点,整个过程平缓进行,非常优雅。如果想在传输的过程中,想对文件进行处理,比如压缩、加密等等,也很好扩展(后面会具体介绍)。
流在node中无处不在。从下图中可以看出:
stream分类stream分为四大类:
readable(可读流)
writable (可写流)
duplex (双工流)
transform (转换流)
readable可读流中的数据,在以下两种模式下都能产生数据。
flowing mode
non-flowing mode
两种模式下,触发的方式以及消耗的方式不一样。
flowing mode:数据会源源不断地生产出来,形成“流动”现象。监听流的data事件便可进入该模式。
non-flowing mode下:需要显示地调用read()方法,才能获取数据。
两种模式可以互相转换
流的初始状态是null,通过监听data事件,或者pipe方法,调用resume方法,将流转为flowing mode状态。flowing mode状态下调用pause方法,将流置为non-flowing mode状态。non-flowing mode状态下调用resume方法,同样可以将流置为flowing mode状态。
下面详细介绍下两种模式下,readable流的运行机制。
flowing mode在flowing mode状态下,创建的myreadable读流,直接监听data事件,数据就源源不断的流出来进行消费了。
myreadable.on('data',function(chunk){ consume(chunk);//消费流})
一旦监听data事件之后,readable内部的流程如下图所示
核心的方法是流内部的read方法,它在参数n为不同值时,分别触发不同的操作。下面描述中的hightwatermark表示的是流内部的缓冲池的大小。
n=undefined(消费数据,并触发一次可读流)
n=0(触发一次可读流,但是不会消费)
n>hightwatermark(修改hightwatermark的值)
n<buffer的总数据数(直接返回n个字节的数据)
n>buffer (可以返回null,也可以返回buffer所有的数据(当时最后一次读取))
图中黄色标识的_read(),是用户实现流所需要自己实现的方法,这个方法就是实际读取流的方式(可以这样理解,外卖平台给你提供外卖的能力,那_read()方法就相当于你下单点外卖)。后面会详细介绍如何实现_read方法。
以上的流程可以描述为:监听data方法,readable内部就会调用read方法,来进行触发读流操作,通过判断是同步还是异步读取,来决定读取的数据是否放入缓冲区。如果为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数判定是否emit('data')来消费流,循环读取。如果是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。
从使用者的角度来看,在这种模式下,你可以通过下面的方式来使用流
const fs = require('./fs');const readfile = fs.createreadstream('./big.file');const writefile = fs.createwritestream('./writefile.js');readfile.on('data',function(chunk){ writefile1.write(chunk);})
non-flowing mode相对于flowing mode,non-flowing mode要相对简单很多。
消费该模式下的流,需要使用下面的方式
myreadable.on(‘readable’,function(){ const chunk = myreadable.read() consume(chunk);//消费流})
在non-flowing mode下,readable内部的流程如下图:
从这个图上看出,你要实现该模式的读流,同样要实现一个_read方法。
整个流程如下:监听readable方法,readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,然后发送emit readable事件,通知用户端消费。
从使用者的角度来看,你可以通过下面的方式来使用该模式下的流
const fs = require('fs');const readfile = fs.createreadstream('./big.file');const writefile = fs.createwritestream('./writefile.js');readfile.on('readable',function(chunk) { while (null !== (chunk = myreadable.read())) { writefile.write(chunk); }});
writable相对于读流,写流的机制就更容易理解了。
写流使用下面的方式进行数据写入
mywrite.write(chunk);
调用write后,内部writable的流程如下图所示
类似于读流,实现一个写流,同样需要用户实现一个_write方法。
整个流程是这样的:调用write之后,会首先判定是否要写入缓冲区。如果不需要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。
从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。
const fs = require('fs');const readfile = fs.createreadstream('./big.file');const writefile = fs.createwritestream('./writefile.js');readfile.on('data',function(chunk) { writefile.write(chunk);})
可以看到,使用写流是非常简单的。
我们先讲解一下如何实现一个读流和写流,再来看duplex和transform是什么,因为了解了如何实现一个读流和写流,再来理解duplex和transform就非常简单了。
实现自定义的readable实现自定义的readable,只需要实现一个_read方法即可,需要在_read方法中调用push方法来实现数据的生产。如下面的代码所示:
const readable = require('stream').readable;class myreadable extends readable { constructor(datasource, options) { super(options); this.datasource = datasource; } _read() { const data = this.datasource.makedata(); settimeout(()=>{ this.push(data); }); }}// 模拟资源池const datasource = { data: new array(10).fill('-'), makedata() { if (!datasource.data.length) return null; return datasource.data.pop(); }};const myreadable = new myreadable(datasource,);myreadable.on('readable', () => { let chunk; while (null !== (chunk = myreadable.read())) { console.log(chunk); }});
实现自定义的writable实现自定义的writable,只需要实现一个_write方法即可。在_write中消费chunk写入到相应地方,并且调用callback回调。如下面代码所示:
const writable = require('stream').writable;class mywritable extends writable{ constuctor(options){ super(options); } _write(chunk,endcoding,callback){ console.log(chunk); callback && callback(); }}const mywritable = new mywritable();
duplex双工流:简单理解,就是讲一个readable流和一个writable流绑定到一起,它既可以用来做读流,又可以用来做写流。
实现一个duplex流,你需要同时实现_read和_write方法。
有一点需要注意的是:它所包含的 readable流和writable流是完全独立,互不影响的两个流,两个流使用的不是同一个缓冲区。通过下面的代码可以验证
// 模拟资源池1const datasource1 = { data: new array(10).fill('a'), makedata() { if (!datasource1.data.length) return null; return datasource1.data.pop(); }};// 模拟资源池2const datasource2 = { data: new array(10).fill('b'), makedata() { if (!datasource2.data.length) return null; return datasource2.data.pop(); }};const readable = require('stream').readable;class myreadable extends readable { constructor(datasource, options) { super(options); this.datasource = datasource; } _read() { const data = this.datasource.makedata(); settimeout(()=>{ this.push(data); }) }}const writable = require('stream').writable;class mywritable extends writable{ constructor(options){ super(options); } _write(chunk, encoding, callback) { console.log(chunk.tostring()); callback && callback(); }}const duplex = require('stream').duplex;class myduplex extends duplex{ constructor(datasource,options) { super(options); this.datasource = datasource; } _read() { const data = this.datasource.makedata(); settimeout(()=>{ this.push(data); }) } _write(chunk, encoding, callback) { console.log(chunk.tostring()); callback && callback(); }}const mywritable = new mywritable();const myreadable = new myreadable(datasource1);const myduplex = new myduplex(datasource1);myreadable.pipe(myduplex).pipe(mywritable);
打印的结果是
abababababababababab
从这个结果可以看出,myreadable.pipe(myduplex),myduplex充当的是写流,写入的内容是a;myduplex.pipe(mywritable),myduplex充当的是读流,往mywritable写的却是b;所以说它所包含的 readable流和writable流是完全独立的。
transform理解了duplex,就更好理解transform了。transform是一个转换流,它既有读的功能又有写的功能,但是它和duplex不同的是,它的读流和写流共用同一个缓冲区;也就是说,通过它读入什么,那它就能写入什么。
实现一个transform,你只需要实现一个_transform方法。比如最简单的transform:passthrough,其源代码如下所示
passthrough就是一个transform,但是这个转换流,什么也没做,相当于一个透明的转换流。可以看到_transform中什么都没有,只是简单的将数据进行回调。
如果我们在这个环节做些扩展,只需要在_transform中直接扩展就行了。比如我们可以对流进行压缩,加密,混淆等等操作。
backpress最后介绍一个流中非常重要的一个概念:背压。要了解这个,我们首先来看下pipe和highwatermaker是什么。
pipe首先看下下面的代码
const fs = require('./fs');const readfile = fs.createreadstream('./big.file');const writefile = fs.createwritestream('./writefile.js');readfile.pipe(writefile);
上面的代码和下面是等价的
const fs = require('./fs');const readfile = fs.createreadstream('./big.file');const writefile = fs.createwritestream('./writefile.js');readfile.on('data',function(data){ var flag = ws.write(data); if(!flag){ // 当前写流缓冲区已满,暂停读数据 readfile.pause(); }})writefile.on('drain',function()){ readfile.resume();// 当前写流缓冲区已清空,重新开始读流}readfile.on('end',function(data){ writefile.end();//将写流缓冲区的数据全部写入,并且关闭写入的文件})
pipe所做的操作就是相当于为写流和读流自动做了速度的匹配。
读写流速度不匹配的情况下,一般情况下不会造成什么问题,但是会造成内存增加。内存消耗增加,就有可能会带来一系列的问题。所以在使用的流的时候,强烈推荐使用pipe。
highwatermakerhighwatermaker说白了,就是定义缓冲区的大小。
默认16kb(readable最大8m)
可以自定义
背压的概念可以理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highwatermaker设置的大小。
如上面的代码 var flag = ws.write(data);,一旦写流的缓冲区满了,那flag就会置为false,反向促进读流的速度调整。
stream的应用场景主要有以下场景
文件操作(复制,压缩,解压,加密等)
下面的就很容易就实现了文件复制的功能。
const fs = require('fs');const readfile = fs.createreadstream('big.file');const writefile = fs.createwritestream('big_copy.file');readfile.pipe(writefile);
那我们想在复制的过程中对文件进行压缩呢?
const fs = require('fs');const readfile = fs.createreadstream('big.file');const writefile = fs.createwritestream('big.gz');const zlib = require('zlib');readfile.pipe(zlib.creategzip()).pipe(writefile);
实现解压、加密也是类似的。
静态文件服务器
比如需要返回一个html,可以使用如下代码。
var http = require('http');var fs = require('fs');http.createserver(function(req,res){ fs.createreadstream('./a.html').pipe(res);}).listen(8000);
以上就是node stream的运行机制讲解(附示例)的详细内容。
