Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node.js Stream处理 #19

Open
gongmw opened this issue Aug 29, 2020 · 0 comments
Open

Node.js Stream处理 #19

gongmw opened this issue Aug 29, 2020 · 0 comments

Comments

@gongmw
Copy link
Owner

gongmw commented Aug 29, 2020

1. 定义

1.1 什么是Stream

Stream流是一种数据传输手段,是端到端信息交换的一种方式,而且是有顺序的,是逐块读取数据、处理内容,用于顺序读取输入或写入输出。

其实Stream在计算机中一个相当古老的概念,它起源于1960年代早期的Unix。

"Stream是随着时间的推移从源流到目的地的一系列数据" @ddprrt

我们要知道Stream可以有多种类型:文件,计算机的内存或键盘或鼠标等输入设备都可以称谓。一旦打开一个流,数据就会从其源头到消耗它的进程成块地流动。如果是一个文件,则每个字符或字节将被读取一次,在比如我们在键盘的上面敲打的每个按键都将通过流传输数据然后进行响应。

Stream的应用在理论上输入可以是无止境的,而且没有限制所以被广泛应用

1.2 Node.js中的Stream

Node.js中的Stream是由Node核心Stream模块封装提供的功能,是EventEmitter类的实例,基于事件的。

在Node.js中有4种流:

  • 可写:用于写入数据
  • 可读:用于读取数据
  • 双工:用于读取和写入数据
  • 转换:在写入或读取时可以修改数据的位置,例如在压缩功能时候你可以写入和读取解压缩数据。

2. 运用

可能文字太过枯燥,在这里通过一些案例给大家介绍下。

本地添加了一个大约大约1G的test.txt文件,我们用两中方式来读取它,看下它的内存变化。

$ ll
total 1935264
-rw-r--r--  1 fengshi  staff   1.1K May 12 20:10 index.js
drwxr-xr-x  4 fengshi  staff   128B May  6 15:12 node_modules
-rw-r--r--  1 fengshi  staff   352B May  6 15:12 package-lock.json
-rw-r--r--  1 fengshi  staff   244B May  6 15:12 package.json
-rw-r--r--  1 fengshi  staff   942M May 12 20:10 test.txt

先不使用流的方式直接读取到内存返回

const http = require('http');
const fs = require('fs');
const path = require('path');

const textPath=path.resolve(__dirname,'test.txt')
http.createServer((req, res) => {
  fs.readFile(textPath, (err, data) => {
    res.end(data);
  });

}).listen(3000);

因为资源比较大视频生成的gif,可能不是很清晰,注意看清左边MEM内存的变化就好了

在这里插入图片描述

可以清晰看到当执行curl后内存一下子就涨起来满了,几何时增长

我们现在在使用流的方式看下

const http = require('http');
const fs = require('fs');
const path = require('path');

const textPath=path.resolve(__dirname,'test.txt')
http.createServer((req, res) => {
fs.createReadStream(textPath).pipe(res);
}).listen(3000);

在这里插入图片描述
可以明显看到MEM比较缓慢增加,然后比较固定10M左右,右边的数据也一直在输出,如果我们要读的数据在大一些假如10G的电影,结果可想而至!!!

知道了这些我们在看下node中的4种Stream,理解流的思想、其实和理解生产者消费者问题(也称有限缓冲问题)有异曲同工之处。

  • 可读流Readable

  • 可写流Writable:可写入数据的流 fs.createWriteStream() HTTP requests TCP, sockets、child process stdin、process.stdout, process.stderr。

  • 双工流Duplex:TCP sockets、zlib streams、crypto streams。

  • 转换流Transform:zlib streams、crypto streams。

2.1 可读流Readable

Readable流有两种模式运行:flowing 和 paused

  • 在flowing模式下,将自动从底层系统读取数据,并使用事件通过EventEmitter接口将其尽快提供给应用程序 。

  • 在paused模式下,必须显式调用read()该方法才可以从流中读取数据块

2.1.1 flowing模式

最简易的使用就是比较常用的监听data事件,和使用pipe()来获取数据源

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk} bytes of data.`);
});

fs.createReadStream(textPath).pipe(res);

当添加'data'事件或者pipe()处理就自动切换到flowing模式了,但是在flowing模式下一定要消费这些数据,不然数据可能会丢失。

(flowing模式画了一个比较***的图,只是想举例奈何...,)

在这里插入图片描述

在flowing模式下数据并不会直接指向消费者,会先存在水池中,消费者再通过监听data事件从水池中获取数据来生长,但是水池的容量有限,如果
消费速度比数据流入到池的速度慢,为了不浪费,和缓解这种压力,在可读流中有highWatermark这个值表示触发警戒线,在源码里的默认大小是
options.highWaterMark = 64 * 1024;超过这个警戒线,数据就不会在往水池里输入了,或者当消费者主动调用了pause()方法也不会输了。

2.1.2 paused模式

所有Readable流都以paused模式开始的,监听 readable 的回调函数参数不会传递数据,在暂停模式下需要用户手动调用 read() 方法才能得到数据。

const fs = require('fs');
rs = fs.createReadStream(sourcePath);
//监听 readable事件的时候,会进入暂停模式
rs.on('readable', () => {
        const ch = rs.read(1);
});

可读流Readable的一些场景:fs.createReadStream() HTTP responses TCP, sockets

我们可以手动实现一个简易的可读流

const { Readable } = require('stream'); 
const inStream = new Readable({
  read(){}
});
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);

2.2 可写流Writable

Writable流是对数据流向终端设备的抽象,用来消费处理上游的数据,通过可写流程序可以把数据写入设备。

基本原理和读流Stream比较相似,当数据流过来的时候,会直接写入到资源池,当写入速度比较缓慢或者写入暂停时,数据流会进入缓存起来。

实现一个可写流Writable

const { Writable } = require('stream'); 
const writeStream = new Writable({
  write(){}
});
writeStream.write('11');
writeStream.write('22');
writeStream.write('33');
writeStream.end();

最后需要调用end()表示已无数据传入

理解了这些在来看下我们常用的 fs.createReadStream(textPath).pipe(res);这个代码的pipe()方法就会清晰很多。

可写流Writable的一些场景 fs.createWriteStream() HTTP requests TCP, sockets、process.stdout

2.3 双工流Duplex

双工流Duplex,可能一下看到双工这个词会又有点陌生,如果熟悉Websocket通信的应该清楚它的一个特性全双工,就是值发送方和接受方都是各自独立的方法,
发送和接收都没有任何关系。(后面的章节会通过Websocket实现实时股票行情数据展示的项目案例给大家讲解下,希望多多关注)

双工流的读写是完全独立操作 对于 Duplex 流来说直接把writable流与reabable 流两者进行结合来,但是写入的数据与读取的数据没有任何的联系。

实现一个 Duplex同时实现一个writable流和reabable流就可以:

const { Duplex } = require('stream');
const inoutStream = new Duplex({
  write(chunk) {
  },
  read() {
  }
});
process.stdin.pipe(inoutStream).pipe(process.stdout);

双工流Duplex的一些场景:TCP sockets、zlib streams、crypto streams

2.4 转换流Transform

transform 流其实可以当作一个特殊的 Duplex 流,因为它集成了双工流Duplex的一些作用,拥有Readable和Writable的能力。

不同的是转换流transform的读取与写入数据端是关联的在中间做了转换处理,最比较典型的应用就是zlib模块对于文件的压缩和解压了。

在Node的官方中提供了zlib模块,内置了转换流Transform
使用可读流读取test.txt文件,通过pipe到zli中,内部通过转换流Transform处理后,在通过管道输出到可写流,这样通过转换流Transform就处理了一个文件的压缩应用。

const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('test.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('test.gz'));

转换流Transform的一些场景 zlib streams、crypto streams

小结

对于Stream的理解本文来说描述的应该还是属于比较基础的一些作用和原理,尽量解释清楚一些概念、背景知识,更多细节和实践可能没有描述出来,但是对于你在写或者看到这些知识点的时候这些关于buffer strean处理的时候可能会更清楚一些它背后的故事。

如果你在学习的过程中有任何问题或者文章有所疏漏和问题,欢迎加入本专栏的讨论群

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant