首页前端开发JavaScript浅谈Nodejs中的可读流,可读流如何实现?

浅谈Nodejs中的可读流,可读流如何实现?

时间2024-01-29 21:06:02发布访客分类JavaScript浏览637
导读:收集整理的这篇文章主要介绍了浅谈Nodejs中的可读流,可读流如何实现?,觉得挺不错的,现在分享给大家,也给大家做个参考。本篇文章给大家介绍一下Nodejs中的流(stream),看看Node可读流的实现方法。有一定的参考价值,有需要的朋友...
收集整理的这篇文章主要介绍了浅谈Nodejs中的可读流,可读流如何实现?,觉得挺不错的,现在分享给大家,也给大家做个参考。本篇文章给大家介绍一下Nodejs中的流(stream),看看Node可读流的实现方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。

stream的概念

流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块用于构建实现了流接口的对象。【推荐学习:《nodejs 教程》】

stream的作用

读写大文件的过程中,不会一次性的读写到内存中。可以控制每次读写的个数

stream的分类

1、可读流-Readable

例:fs.createReadStream;

源码位置:lib/_stream_readable.js

2、可写流-WrITable

例:fs.createWritestream;

源码位置:lib/_stream_writable.js

3、双工流-Duplex:满足读写的功能

例:net.Socket();

源码位置:lib/_stream_duplex.js

4、转化流-transform:用途:压缩,转码

例:

const {
 Transform }
     = require('stream');
    Transform.call(this, '要转换的数据');
    //具体的使用详情 见node官网

-源码位置:lib/_stream_tranform.js

可读流读取文件的过程

  • 读取文件代码过程
const path = require("path");
    const aPath = path.join(__dirname, "a.txt");
    //需要读取的文件const fs = require("fs");
let rs = fs.createReadStream(aPath, {
  flags: "r",  encoding: null,//默认编码格式是buffer,深挖buffer又要学习字符编码,留个坑 到时候写一个编码规范的学习整理  autoClose: true,//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close  start: 0,  highWaterMark: 3,//每次读取的个数 默认是64*1024个字节}
    );
rs.on("oPEn", function (fd) {
      // fd  number类型  console.LOG("fd", fd);
}
    );
// 他会监听用户,绑定了data事件,就会触发对应的回调,不停的触发rs.on("data", function (chunk) {
//这里会打印的是ascII 值 ,所以可以toString查看详情自己看得懂的样子  console.log({
 chunk }
    , "chunk.toString", chunk.toString());
       //如果想每一段事件 读一点 可以用rs.pause() 做暂停,然后计时器 里rs.resume()再次触发data事件  rs.pause();
//暂停读取}
    );
rs.on("close", function () {
      //当文件读取完毕后 会 触发 end事件  console.log("close");
}
    );
    setInterval(() =>
 {
      rs.resume();
 //再次触发data,直到读完数据为止}
    , 1000);
    
  • 题外话:想说下 文件流和普通可读流的区别

1、open 和close是文件流独有,支持open和close便是文件流

2、可读流都具备 (on('data'),on('end'),on('error'),resume,pause;所以只要支持这些方法就是可读流

可写流写入文件的过程

  • 写入文件代码过程
const fs = require("fs");
    const path = require("path");
    const bPath = path.join(__dirname, "b.txt");
let ws = fs.createWriteStream(bPath, {
//参数和可读流的类似  flags: "w",  encoding: "utf-8",  autoClose: true,  start: 0,  highWaterMark: 3,}
    );
ws.on("open", function (fd) {
      console.log("open", fd);
}
    );
ws.on("close", function () {
      console.log("close");
}
    );
    //write的参数string 或者buffer,ws.write 还有一个boolea的返回值表示是真实写入文件还是放入缓存中ws.write("1");
    let flag = ws.write("1");
console.log({
 flag }
    );
    //trueflag = ws.write("1");
console.log({
 flag }
    );
    //trueflag = ws.write("1");
console.log({
 flag }
    );
    //false

双工流的写入和读取过程

  • 写一个本地服务 做例子

1、server(服务器代码)实现

const net = require("net");
 //net 模块是 node自己封装的tcp层//socket 就是双工流 能读能写  http源码就是用net模块写的 基于tcpconst server = net.createServer(function (socket) {
  socket.on("data", function (data) {
    //监听客户端发来的消息    console.log(data.toString)    socket.write("server:hello");
//写入server:hello  }
    );
  socket.on("end", function () {
        console.log("客户端关闭");
  }
    );
}
    );
server.on("err", function (err) {
      console.log(err);
}
    );
    server.listen(8080);
    //服务端监听8080端口

2、client(客户端) 实现

const net = require("net");
     //net 模块是 node自己封装的tcp层const socket = new net.Socket();
     //socket.connect(8080, "localhost");
 //  表示链接服务器本地8080端口socket.on("connect", function (data) {
      //和服务器建立链接后  socket.write("connect server");
}
    );
socket.on("data", function (data) {
      //监听数据,读取服务器传来的数据  console.log(data.toString());
  socket.destroy()}
    );
socket.write('ok')socket.on("error", function (err) {
      console.log(err);
}
    );
    

3.题外话 如果想看tcp的三次握手和四次挥手 可以 通过我上述代码 用Wireshark(一个抓包工具)看实际过程

转化流 transform过程

转化流是双工流的一种, 允许实现输入,并在对数据执行某些操作后返回输出,两者有依赖关系

  • 代码过程(这个例子我的参考来处)
const stream = require('stream')let c = 0;
const readable = stream.Readable({
  highWaterMark: 2,  read: function () {
        let data = c  26 ? Number(c++ + 97) : null;
        console.log('push', data);
        this.push( String.FromCharCode(data));
}
}
)const transform = stream.Transform({
  highWaterMark: 2,  transform: function (buf, enc, next) {
        console.log('transform', buf.toString());
        next(null, buf);
  }
}
    )readable.pipe(transform);
    
  • 打印结果

可读流的实现

跟着断点先了解 可读流的调用过程

就前面可读流文件的读取过程的代码为例子 打断点

rs.on('open')@H_112_126@

rs.on('open')为断点入口进入

1、通过Stream.PRototype.on.call 继承Stream类

源文件位置:no dlib/_stream_readable.js(我是通过断点点到这里 直接找,我也没找到)

  • 再点进去 发现 Stream 是EventEmitter的子类 那么 可读流也可以支持发布订阅

2、监听的事件类型是否是data和readable任意一个 不是 继续 下一个事件的监听

rs.on('data')

  • data的部分做两件事

    1、判断flowing(默认值是null)不为false 就自动resume方法执行继续 文件读取(这里我的案例是rs.pause(); 手动将flowing 值为false了所以不会继续调用)

    2、那如果我没有调用rs.pause() 会继续调用resume 看看resume里做了什么

2.1 最终调用了 stream.read()继续读取文件; 直到文件读取结束依次去emit end 和close事件

小结:所以data默认是会不断的读取文件直到文件读取完毕 ,如果想要文件读取变可控可以和我一样用rs.pause()

自己实现

实现思路

继承EventEmitter发布订阅管理我们的事件

const fs = require("fs");
    const EventEmitter = require("events");
class ReadStream extends EventEmitter {
}
    module.exports = ReadStream;
    

数据初始化

constructor(path, options = {
}
) {
        super();
        //参考fs 写实例需要用到的参数    this.path = path;
        this.flags = options.flags || "r";
        this.encoding - options.encoding || null;
    //默认编码格式是buffer    this.autoClose = options.autoClose || true;
    //相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close    this.start = options.start || 0;
    //数据读取的开始位置    this.end = options.end;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
    //默认一次读取64个字节的数据     this.offset = this.start;
    //fs.read的偏移量    this.fd = undefined;
     //初始化fd 用于 open成功后的fd做赋值  供 read里使用    this.flowing = false;
    //实现pause和resume备用,设置flag,当监听到data事件的时候 改 flowing为true,    this.open();
 //初始化的时候就要调用Open    this.on("readStreaMListener", function (type) {
      // console.log(type)//这里打印就能看到 实例上所有 通过on 绑定的事件名称      if (type === "data") {
          //监听到data事件的时候 改 flowing为true        this.flowing = true;
            this.read();
      }
    }
    );
    }
    

文件读取方法read,pause,resume,open和destroy的实现

open()

 open() {
     // 调用fs.open 读取目标文件     fs.open(this.path, this.flags, (err, fd) =>
 {
           this.fd = fd;
     //赋值一个fd 供后面的 read()方式使用,文件读取成功,fd是返回一个数字      this.emit("open", fd);
    }
    );
    

read()

 read() {
       // console.log("一开始read里的", this.fd);
 //但是这样依旧拿不到 open后的fd,用 发布订阅 通过on来获取 绑定的事件type    //这里要做一个容错处理 ,因为open是异步读取文件,read里无法马上拿到open结果  if (typeof this.fd !== "number") {
          //订阅open,给绑定一个回调事件read 直到this.fd有值      return this.once("open", () =>
     this.read());
    }
 }
      //fd打开后 调用fs.read  //实例上的start值是未知number,存在实际剩余的可读的文件大小highWaterMar的情况 ,用howMuchToRead 替换highWaterMark 去做fs.read的每次读取buffer的大小    let howMuchToRead = this.end      ? Math.min(this.end - this.offset + 1, this.highWaterMark)      : this.highWaterMark;
      //定义一个用户 传进来的highWaterMark 大小的Buffer对象    const buffer = Buffer.alloc(this.highWaterMark);
           //读取文件中的内容fd给buffer 从0位置开始,每次读取howMuchToRead个。插入数据,同时更新偏移量    fs.read(      this.fd,      buffer,      0,      howMuchToRead,      this.offset,      (err, bytesRead) =>
 {
        if (bytesRead) {
              // 每读完一次,偏移量=已经读到的数量          this.offset += bytesRead;
              this.emit("data", buffer.slice(0, bytesRead));
              //写到这里实例上的data 已经可以打印出数据了 但是 继续读取 调用this.read() 直到bytesRead不存在 说明数据读取完毕了 走else          //回调 this.read();
时候判断 this.flowing 是否为true          //pause调用后this.flowing将为false          if (this.flowing) {
                this.read();
          }
        }
 else {
              // 执行到这 bytesRead不存在说明  文件数据读取完毕了已经 触发end          this.emit("end");
    //emit 实例上绑定的end事件          //destroy 还没写到 稍等 马上后面就实现...          this.destroy();
        }
      }
        );
    

resume()

文件读取不去data事件,会触发对应的回调,不停的触发 所以想要变可控可以手动调用 resume()& pause()

  • pause的实现,调用的时候设置 this.flowing=false,打断 read()
  pause() {
        this.flowing = false;
  }
    

pause()

  • pause 打断 read()多次读取,可以使用resume 打开 this.flowing=true 并调用read
resume() {
    if (!this.flowing) {
          this.flowing = true;
          this.read();
    }
  }
    

destroy()

  • 文件open不成功时候抛错时调用
  • 文件读取完毕后& & this.autoClose===true ,read()里文件读取end的时候 就执行close
  destroy(err) {
    if (err) {
          this.emit("error");
    }
    // 把close放destroy里 并 在read里调用    if (this.autoClose) {
          fs.close(this.fd, () =>
 {
            this.emit("close");
      }
    );
    }
  }
    

完整代码

  • 实现代码
/** *实现简单的可读流 */const fs = require("fs");
    const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {
}
) {
        super();
        //参考fs 写实例需要用到的参数    this.path = path;
        this.flags = options.flags || "r";
        this.encoding - options.encoding || null;
        this.autoClose = options.autoClose || true;
        this.start = options.start || 0;
        this.end = options.end;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.fd = undefined;
        this.offset = this.start;
        this.flowing = false;
        this.open();
     this.on("newListener", function (type) {
      if (type === "data") {
            this.flowing = true;
            this.read();
      }
    }
    );
  }
  destroy(err) {
    if (err) {
          this.emit("error");
    }
    if (this.autoClose) {
          fs.close(this.fd, () =>
 {
            this.emit("close");
      }
    );
    }
  }
  open() {
        fs.open(this.path, this.flags, (err, fd) =>
 {
      if (err) {
            return this.destroy(err);
      }
          this.fd = fd;
          this.emit("open", fd);
    }
    );
  }
  resume() {
    if (!this.flowing) {
          this.flowing = true;
          this.read();
    }
  }
  pause() {
        this.flowing = false;
  }
  read() {
    if (typeof this.fd !== "number") {
          return this.once("open", () =>
     this.read());
    }
        let howMuchToRead = this.end      ? Math.min(this.end - this.offset + 1, this.highWaterMark)      : this.highWaterMark;
        const buffer = Buffer.alloc(this.highWaterMark);
        fs.read(      this.fd,      buffer,      0,      howMuchToRead,      this.offset,      (err, bytesRead) =>
 {
        if (bytesRead) {
              this.offset += bytesRead;
              this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
                this.read();
          }
        }
 else {
              this.emit("end");
              this.destroy();
        }
      }
        );
  }
}
    module.exports = ReadStream;
    
  • 调用代码
const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
  flags: "r",  encoding: null, //默认编码格式是buffer  autoClose: true, //相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close  start: 0,  highWaterMark: 3, //每次读取的个数 默认是64*1024个字节}
    );
    

可写流的实现

待续...

更多编程相关知识,请访问:编程视频!!

以上就是浅谈Nodejs中的可读流,可读流如何实现?的详细内容,更多请关注其它相关文章!

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!

Nodejs

若转载请注明出处: 浅谈Nodejs中的可读流,可读流如何实现?
本文地址: https://pptw.com/jishu/591598.html
聊聊Angular中的指令(Directive) javascript dom对象怎么转字符串

游客 回复需填写必要信息