博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
解读Node核心模块Stream系列一(Writable和pipe)
阅读量:6291 次
发布时间:2019-06-22

本文共 9064 字,大约阅读时间需要 30 分钟。

node中的流

  • node中stream模块是非常,非常,非常重要的一个模块,因为很多模块都是这个模块封装的:
  • Readable:可读流,用来读取数据,比如 fs.createReadStream()。
  • Writable:可写流,用来写数据,比如 fs.createWriteStream()。
  • Duplex:双工流,可读+可写,比如 net.Socket()。
  • Transform:转换流,在读写的过程中,可以对数据进行修改,比如 zlib.createDeflate()(数据压缩/解压)。

系列链接

Writable

Writable的例子

  • 客户端上的 HTTP 请求
  • 服务器上的 HTTP 响应
  • fs 写入的流
  • zlib 流
  • crypto 流
  • TCP socket
  • 子进程 stdin
  • process.stdout、process.stderr

Writable的特点和简化实现

特点

  1. Writable拥有一个缓存数据的buffer,同时有一个length来记录buffer的长度
  2. Writable拥有一个highWaterMark来标明buffer的最大容量,如果length小于highWaterMark,则返回 true,否则返回 false
  3. Writable拥有writing来标识生产者正在增加length
  4. Writable拥有write()从写入缓存区数据的同时也会根据标志判断是否调用消费者消耗缓存区
  5. Writable通过clearBuffer来消费缓存区
  6. Writable订阅'drain'事件当一旦所有当前被缓冲的数据块都被排空了(被操作系统接受来进行输出)触发

构造函数

  1. Writable拥有一个缓存数据的buffer,同时有一个length来记录buffer的长度
  2. Writable拥有一个highWaterMark来标明buffer的最大容量,如果length小于highWaterMark,则返回 true,否则返回 false
  3. Writable拥有writing来标识生产者正在增加length
const EE = require('events');const util = require('util');const fs = require('fs');function Writable(path,options) {//这个参数是源码没有的,这里主要是为了读取fs为案例加的    EE.call(this);//构造函数继承EventEmiter        this.path = path;    this.autoClose = options.autoClose || true;    this.highWaterMark = options.highWaterMark || 64 * 1024;//64k    this.encoding = options.encoding || null;    this.flags = options.flags || 'w'; 这个源码没有的,这里主要是为了fs读取案例加的    this.needEmitDrain = false;// 需要触发drain事件,默认不需要    this.position = 0;// 偏移量    this.cache = []; // 缓存区    this.writing = false;// 是否正在从缓存中读取,生产者增加    this.length = 0; // 缓存区大小,控制长度    this.open(); // 这个源码没有的,这里主要是为了fs读取案例加的}util.inherits(Writable, EE);//原型继承EventEmiter复制代码

write和_write

  1. Writable拥有write()从写入缓存区数据的同时也会根据标志判断是否调用消费者消耗缓存区
  2. Writable通过clearBuffer来消费缓存区
  3. Writable订阅'drain'事件当一旦所有当前被缓冲的数据块都被排空了(被操作系统接受来进行输出)触发
Writable.prototype.write = function (chunk, encoding=this.encoding, callback=()=>{}) {    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);    //第一次虽然数据没有放入到缓存,但是由于后面会调用_write会将这个长度减去,所以先加上,保证length的正确性    this.length += chunk.length;    if (this.length >= this.highWaterMark ) {//消耗缓存的长度大于缓存的最大容量触发drain        this.needDrain = true;     }    if (this.writing) {//如果正在执行写操作,则后面要写入目标的数据先存入缓存        this.cache.push({            chunk, encoding, callback        })    } else {// 没有执行写操作则执行写操作        this.writing = true;         //源码中在这里调用dowrite()然后调用_write()和__writev()        this._write(chunk, encoding, () => {callback();this.clearBuffer()});    }    return this.length < this.highWaterMark //如果缓存区的内容大于了highWaterMark 那就返回false  }  // 源码中在write()中调用dowrite()然后调用_write()和__writev()来进行读操作Writable.prototype._write = function (chunk, encoding, callback) {    if (typeof this.fd !== 'number') {//这里是异步打开的操作,要保证有fd,没有则绑定once等文件open再触发        return this.once('open', () => this._write(chunk, encoding, callback));    }        // 源码中clearBuffer()调用dowrite()来消耗缓存    // 源码中dowrite()再调用onwriteStateUpdate()对length进行更新    // 所以源码中这里不需要调用clearBuffer    {        this.position += bytesWritten // 位置增加便宜        this.length -= bytesWritten;// 缓存长度更新        callback();//里面包含了clearBuffer()        }}//源码中clearBuffer()实是在end的时候调用的,//源码中clearBuffer()调用dowrite()然后调用_write()和__writev()来消耗内存//源码中dowrite()再调用onwriteStateUpdate()对缓存length进行更新//这里只是为了简化function clearBuffer(){     let obj = this.cache.shift();     if(obj){        this._write(obj.chunk,obj.encoding,()=>{obj.callback();this.clearBuffer()});    }else{        if(this.needDrain){            this.writing = false;            this.needDrain = false;            this.emit('drain'); // 触发drain事件        }    } }复制代码

WriteStream

WriteStream和writable的关系

WriteStream其实是writabl的子类,它继承了writabl,以fs.createWriteStream为例(node/lib/internal/fs/streams.js)

然后对上面的_write方法进行了覆盖:
以及对_writev方法进行了覆盖:
并且在其上扩展了open和close:

WriteStream简化实现

只需要对上面的Writable进行showier的修改

const EE = require('events');const util = require('util');const fs = require('fs');function Writable(path,options) {    EE.call(this);        this.path = path;    this.autoClose = options.autoClose || true;    this.highWaterMark = options.highWaterMark || 64 * 1024;    this.encoding = options.encoding || null;    this.flags = options.flags || 'w';    this.needEmitDrain = false;    this.position = 0;    this.cache = [];     this.writing = false;    this.length = 0;     this.open(); }util.inherits(Writable, EE);Writable.prototype.write = function (chunk, encoding=this.encoding, callback=()=>{}) {    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);    this.length += chunk.length;    if (this.length >= this.highWaterMark ) {        this.needDrain = true;     }    if (this.writing) {        this.cache.push({            chunk, encoding, callback        })    } else {        this.writing = true;         this._write(chunk, encoding, () => {callback();this.clearBuffer()});    }    return this.length < this.highWaterMark   }  Writable.prototype._write = function (chunk, encoding, callback) {    if (typeof this.fd !== 'number') {//这里是异步打开的操作,要保证有fd,没有则绑定once等文件open再触发        return this.once('open', () => this._write(chunk, encoding, callback));    }        //将_write和fs.write结合    //源码中是覆盖_write和_writev    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {        this.pos += bytesWritten         this.len -= bytesWritten;        callback();    });}Writable.prototype.destroy = function () {    if (typeof this.fd != 'number') {        this.emit('close');    } else {        fs.close(this.fd, () => {            this.emit('close');        })    }}Writable.prototype.open = function () {    fs.open(this.path, this.flags, (err, fd) => { // fd文件描述符 只要文件打开了就是number        if (err) { // 销毁文件            if (this.autoClose) { // 如果需要自动关闭 触发一下销毁事件            this.destroy();             }            return this.emit('error', err);        }        this.fd = fd;        this.emit('open', fd);    });};function clearBuffer(){     let obj = this.cache.shift();     if(obj){        this._write(obj.chunk,obj.encoding,()=>{obj.callback();this.clearBuffer()});    }else{        if(this.needDrain){            this.writing = false;            this.needDrain = false;            this.emit('drain'); // 触发drain事件        }    } }复制代码

pipe

pipe的使用

const fs = require('fs');const ReadStream = require('./ReadStream');const WriteStream = require('./WriteStream');let rs = new ReadStream('./1.txt',{    highWaterMark:4});let ws = new WriteStream('./3.txt',{    highWaterMark:1});rs.pipe(ws);复制代码

pipe的实现

由于pipe方法是在ReadStream上调用的,所以我们可以修改上篇的ReadStream来实现,源码中Readable和Writable都有pipe的实现

const EE = require('events');const util = require('util');const fs = require('fs');function ReadStream (path,options) {    this.path = path;    this.flags = options.flags || 'r'; //用来标识打开文件的模式    this.encoding = options.encoding || null;    this.highWaterMark = options.highWaterMark || 64 * 1024;    this.start = options.start || 0; //读取(文件)的开始位置    this.end = options.end || null; //读取(文件)的结束位置    this.autoClose = options.autoClose || true;    this.flowing = null; // 默认非流动模式    this.position = this.start // 记录读取数据的位置    this.open(); // 打开文夹    this.on('newListener', function (type) {        if (type === 'data') { // 用户监听了data事件            this.flowing = true;            this.read();        }    })}ReadStream.prototype.read = function (){    if (typeof this.fd !== 'number') {// open操作是异步的,所以必须等待文件打开this.fd存在说明打开文件        return this.once('open', () => this.read());    }    let buffer = Buffer.alloc(this.highWaterMark); // 把数据读取到这个buffer中    //判断每次读取的数据是多少exp:数据源1234567890 highWaterMark=3    //最后一次读取长度为1    let howMuchToRead = Math.min(this.end - this.pos + 1, this.highWaterMark);    fs.read(this.fd, buffer, 0, howMuchToRead, this.position, (err, byteRead) => {    if (byteRead > 0) {        this.emit('data', buffer.slice(0, byteRead));        this.position += byteRead;//更新读取的起点        if (this.flowing) {//处在flowing模式中就一直读            this.read();        }    }else{//读取完毕        this.flowing = null;        this.emit('end');        if(this.autoClose){            this.destroy();        }    }}//通过flowing控制暂停还是继续读取ReadStream.prototype.pause = function(){    this.flowing = false;}ReadStream.prototype.resume = function(){    this.flowing = true;    this.read();}ReadStream.prototype.pipe = function (ws){    this.on('data', (data)=> {        let flag = ws.write(data);//读完之后写,根据flag判断不需要读操作来增加缓存的长度        if (!flag) {            this.pause();        }    });    ws.on('drain',()=> {//当写完缓存之后,lenght=0,发射drain来恢复读取往缓存中添加内容        this.resume();    })  }ReadStream.prototype.destroy = function () {    if (typeof this.fd != 'number') {        this.emit('close');    } else {        fs.close(this.fd, () => {        this.emit('close');        })    }};ReadStream.prototype.open = function() {    fs.open(this.path, this.flags, (err, fd) => {// fd文件描述符 只要文件打开了就是number        if (err) {            if (this.autoClose) { // 如果需要自动关闭 触发一下销毁事件            this.destroy(); // 销毁文件        }        return this.emit('error', err);    }    this.fd = fd;    this.emit('open', fd);    });};复制代码

结语:

希望这篇文章能够让各位看官对Stream熟悉,因为这个模块是node中的核心,很多模块都是继承这个模块实现的,如果熟悉了这个模块,对node的使用以及koa等框架的使用将大有好处,接下来会逐步介绍其他流模式本文参考:

  1. node API
  2. node 源码

转载地址:http://hqkta.baihongyu.com/

你可能感兴趣的文章
对象的继承及对象相关内容探究
查看>>
Spring: IOC容器的实现
查看>>
Serverless五大优势,成本和规模不是最重要的,这点才是
查看>>
Nginx 极简入门教程!
查看>>
iOS BLE 开发小记[4] 如何实现 CoreBluetooth 后台运行模式
查看>>
Item 23 不要在代码中使用新的原生态类型(raw type)
查看>>
为网页添加留言功能
查看>>
JavaScript—数组(17)
查看>>
Android 密钥保护和 C/S 网络传输安全理论指南
查看>>
以太坊ERC20代币合约优化版
查看>>
Why I Began
查看>>
同一台电脑上Windows 7和Ubuntu 14.04的CPU温度和GPU温度对比
查看>>
js数组的操作
查看>>
springmvc Could not write content: No serializer
查看>>
Python系语言发展综述
查看>>
新手 开博
查看>>
借助开源工具高效完成Java应用的运行分析
查看>>
163 yum
查看>>
第三章:Shiro的配置——深入浅出学Shiro细粒度权限开发框架
查看>>
80后创业的经验谈(转,朴实但实用!推荐)
查看>>