一文学会 Node.js 中的流[每日前端夜话0xF4]

Node.js 中的流(Stream)是出了名的难用甚至是难以理解。

用 Dominic Tarr 的话来说:“流是 Node 中最好的,也是最容易被误解的想法。”即使是 Redux 的创建者和 React.js 的核心团队成员 Dan Abramov 也害怕 Node 流。

本文将帮助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!

什么是流?

流是为 Node.js 应用提供动力的基本概念之一。它们是数据处理方法,用于将输入的数据顺序读取或把数据写入输出。

流是一种以有效方式处理读写文件、网络通信或任何类型的端到端信息交换的方式。

流的处理方式非常独特,流不是像传统方式那样将文件 一次全部 读取到存储器中,而是逐段读取数据块并处理数据的内容,不将其全部保留在内存中。

这种方式使流在处理 大量数据 时非常强大,例如,文件的大小可能大于可用的内存空间,从而无法将整个文件读入内存进行处理。那是流的用武之地!

既能用流来处理较小的数据块,也可以读取较大的文件。

以 YouTube 或 Netflix 之类的“流媒体”服务为例:这些服务不会让你你立即下载视频和音频文件。取而代之的是,你的浏览器以连续的块流形式接收视频,从而使接收者几乎可以立即开始观看和收听。

但是,流不仅涉及处理媒体和大数据。它们还在代码中赋予了我们“可组合性”的力量。考虑可组合性的设计意味着能够以某种方式组合多个组件以产生相同类型的结果。在 Node.js 中,可以通过流在其他较小的代码段中传递数据,从而组成功能强大的代码段。

为什么会用到流

与其他数据处理方法相比,流基本上具有两个主要优点:

  1. 内存效率:你无需事先把大量数据加载到内存中即可进行处理

  2. 时间效率:得到数据后立即开始处所需的时间大大减少,不必等到整个有效数据全部发送完毕才开始处理

Node.js 中有 4 种类型的流:

  1. 可写:可以向其中写入数据的流。例如, fs.createWriteStream() 使我们可以使用流将数据写入文件。

  2. 可读:可从中读取数据的流。例如: fs.createReadStream() 让我们读取文件的内容。

  3. 双工:可读和可写的流。例如, net.Socket

  4. Transform:可在写入和读取时修改或转换数据。例如在文件压缩的情况下,你可以在文件中写入压缩数据,也可以从文件中读取解压缩的数据。

如果你已经使用过 Node.js,则可能遇到过流。例如在基于 Node.js 的 HTTP 服务器中, request 是可读流,而 response 是可写流。你可能用过 fs 模块,该模块可让你用可读和可写文件流。每当使用 Express 时,你都在使用流与客户端进行交互,而且由于 TCP 套接字、TLS栈和其他连接都基于 Node.js,所以在每个可以使用的数据库连接驱动的程序中使用流。

一个实际的例子

如何创建可读流

首先需要可读性流,然后将其初始化。

1const Stream = require('stream')
2const readableStream = new Stream.Readable()

现在,流已初始化,可以向其发送数据了:

1readableStream.push('ping!')
2readableStream.push('pong!')

异步迭代器

强烈建议在使用流时配合异步迭代器(async iterator)。根据 Axel Rauschmayer【 https://twitter.com/rauschma 】 博士的说法,异步迭代是一种用于异步检索数据容器内容的协议(这意味着当前“任务”可以在检索项目之前被暂停)。另外必须提及的是,流异步迭代器实现使用内部的 readable 事件。

从可读流中读取时,可以使用异步迭代器:

 1import * as fs from 'fs';
 2
 3async function logChunks(readable) {
 4  for await (const chunk of readable) {
 5    console.log(chunk);
 6  }
 7}
 8
 9const readable = fs.createReadStream(
10  'tmp/test.txt', {encoding: 'utf8'});
11logChunks(readable);
12
13// Output:
14// 'This is a test!\n'

也可以用字符串收集可读流的内容:

 1import {Readable} from 'stream';
 2
 3async function readableToString2(readable) {
 4  let result = '';
 5  for await (const chunk of readable) {
 6    result += chunk;
 7  }
 8  return result;
 9}
10
11const readable = Readable.from('Good morning!', {encoding: 'utf8'});
12assert.equal(await readableToString2(readable), 'Good morning!');

注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。

请切记不要将异步功能与 EventEmitter 混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。这个 pull request 【 https://github.com/nodejs/node/pull/27867 】旨在解决一旦其落在 Node 核心上产生的问题。

要了解有关异步迭代的 Node.js 流的更多信息,请查看这篇很棒的文章【 https://2ality.com/2019/11/nodejs-streams-async-iteration.html 】。

Readable.from():从可迭代对象创建可读流

stream.Readable.from(iterable, [options]) 这是一种实用方法,用于从迭代器中创建可读流,该迭代器保存可迭代对象中包含的数据。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,除其他作用外,还可以用于指定文本编码。

 1const { Readable } = require('stream');
 2
 3async function * generate() {
 4  yield 'hello';
 5  yield 'streams';
 6}
 7
 8const readable = Readable.from(generate());
 9
10readable.on('data', (chunk) => {
11  console.log(chunk);
12});

两种读取模式

根据 Streams API,可读流有效地以两种模式之一运行: flowingpaused 。可读流可以处于对象模式,无论处于 flowing 模式还是 paused 模式。

  • 流模式 下,将自动从底层系统读取数据,并通过 EventEmitter 接口使用事件将其尽快提供给程序。

  • paused 模式 下,必须显式调用 stream.read() 方法以从流中读取数据块。

在 flowing 模式中,要从流中读取数据,可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件,并执行你的回调。看下面的代码片段:

 1var fs = require("fs");
 2var data = '';
 3
 4var readerStream = fs.createReadStream('file.txt'); //Create a readable stream
 5
 6readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 
 7
 8// Handle stream events --> data, end, and error
 9readerStream.on('data', function(chunk) {
10   data += chunk;
11});
12
13readerStream.on('end',function() {
14   console.log(data);
15});
16
17readerStream.on('error', function(err) {
18   console.log(err.stack);
19});
20
21console.log("Program Ended");

函数调用 fs.createReadStream() 给你一个可读流。最初流处于静态状态。一旦你侦听数据事件并附加了回调,它就会开始流动。之后将读取大块数据并将其传递给你的回调。流实现者决定发送数据事件的频率。例如,每当有几 KB 的数据被读取时,HTTP 请求就可能发出一个数据事件。当从文件中读取数据时,你可能会决定读取一行后就发出数据事件。

当没有更多数据要读取(结束)时,流将发出结束事件。在以上代码段中,我们监听此事件以在结束时得到通知。

另外,如果有错误,流将发出并通知错误。

在 paused 模式下,你只需在流实例上重复调用 read() ,直到读完所有数据块为止,如以下示例所示:

 1var fs = require('fs');
 2var readableStream = fs.createReadStream('file.txt');
 3var data = '';
 4var chunk;
 5
 6readableStream.on('readable', function() {
 7    while ((chunk=readableStream.read()) != null) {
 8        data += chunk;
 9    }
10});
11
12readableStream.on('end', function() {
13    console.log(data)
14});

read() 函数从内部缓冲区读取一些数据并将其返回。当没有内容可读取时返回 null 。所以在 while 循环中,我们检查是否为 null 并终止循环。请注意,当可以从流中读取大量数据时,将会发出可读事件。

所有 Readable 流均以 paused 模式 开始,但可以通过以下方式之一切换为 flowing 模式

  • 添加一个 ‘data’ 事件处理。

  • 调用 stream.resume() 方法。

  • 调用 stream.pipe() 方法将数据发送到可写对象。

Readable 可以使以下方法之一切换回 paused 模式:

  • 如果没有管道目标,则通过调用 stream.pause() 方法。

  • 如果有管道目标,请删除所有管道目标。可以通过调用 stream.unpipe() 方法来删除多个管道目标。

一个需要记住的重要概念是,除非提供了一种用于消耗或忽略该数据的机制,否则 Readable 将不会生成数据。如果使用机制被禁用或取消,则 Readable 将会试图停止生成数据。添加 readable 事件处理会自动使流停止 flowing,并通过 read.read() 得到数据。如果删除了 readable 事件处理,那么如果存在 ‘data’ 事件处理,则流将再次开始 flowing。

如何创建可写流

要将数据写入可写流,你需要在流实例上调用 write() 。如以下示例所示:

1var fs = require('fs');
2var readableStream = fs.createReadStream('file1.txt');
3var writableStream = fs.createWriteStream('file2.txt');
4
5readableStream.setEncoding('utf8');
6
7readableStream.on('data', function(chunk) {
8    writableStream.write(chunk);
9});

上面的代码很简单。它只是简单地从输入流中读取数据块,并使用 write() 写入目的地。该函数返回一个布尔值,指示操作是否成功。如果为 true ,则写入成功,你可以继续写入更多数据。如果返回 false ,则表示出了点问题,你目前无法写任何内容。可写流将通过发出 drain 事件来通知你什么时候可以开始写入更多数据。

调用 writable.end() 方法表示没有更多数据将被写入 Writable。如果提供,则可选的回调函数将作为 finish 事件的侦听器附加。

1// Write 'hello, ' and then end with 'world!'.
2const fs = require('fs');
3const file = fs.createWriteStream('example.txt');
4file.write('hello, ');
5file.end('world!');
6// Writing more now is not allowed!

你可以用可写流从可读流中读取数据:

 1const Stream = require('stream')
 2
 3const readableStream = new Stream.Readable()
 4const writableStream = new Stream.Writable()
 5
 6writableStream._write = (chunk, encoding, next) => {
 7    console.log(chunk.toString())
 8    next()
 9}
10
11readableStream.pipe(writableStream)
12
13readableStream.push('ping!')
14readableStream.push('pong!')
15
16writableStream.end()

还可以用异步迭代器来写入可写流,建议使用

 1import * as util from 'util';
 2import * as stream from 'stream';
 3import * as fs from 'fs';
 4import {once} from 'events';
 5
 6const finished = util.promisify(stream.finished); // (A)
 7
 8async function writeIterableToFile(iterable, filePath) {
 9  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
10  for await (const chunk of iterable) {
11    if (!writable.write(chunk)) { // (B)
12      // Handle backpressure
13      await once(writable, 'drain');
14    }
15  }
16  writable.end(); // (C)
17  // Wait until done. Throws if there are errors.
18  await finished(writable);
19}
20
21await writeIterableToFile(
22  ['One', ' line of text.\n'], 'tmp/log.txt');
23assert.equal(
24  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
25  'One line of text.\n');

stream.finished() 的默认版本是基于回调的,但是可以通过 util.promisify() 转换为基于 Promise 的版本(A行)。

在此例中,使用以下两种模式:

Writing to a writable stream while handling backpressure (line B):

在处理 backpressure 时写入可写流(B行):

1if (!writable.write(chunk)) {
2  await once(writable, 'drain');
3}

关闭可写流,并等待写入完成(C行):

1writable.end();
2await finished(writable);

pipeline()

管道是一种机制,可以将一个流的输出作为另一流的输入。它通常用于从一个流中获取数据并将该流的输出传递到另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流数据。

在 Node 10.x 中引入了 stream.pipeline() 。这是一种模块方法,用于在流转发错误和正确清理之间进行管道传输,并在管道完成后提供回调。

这是使用管道的例子:

 1const { pipeline } = require('stream');
 2const fs = require('fs');
 3const zlib = require('zlib');
 4
 5// 使用 pipeline API 可以轻松将一系列流
 6// 通过管道传输在一起,并在管道完全完成后得到通知。
 7// 一个有效地用 gzip压缩巨大视频文件的管道:
 8
 9pipeline(
10  fs.createReadStream('The.Matrix.1080p.mkv'),
11  zlib.createGzip(),
12  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
13  (err) => {
14    if (err) {
15      console.error('Pipeline failed', err);
16    } else {
17      console.log('Pipeline succeeded');
18    }
19  }
20);

由于 pipe 不安全,应使用 pipeline 代替 pipe

流模块

Node.js 流模块【 https://nodejs.org/api/stream.html 】 提供了构建所有流 API 的基础。

Stream 模块是 Node.js 中默认提供的原生模块。Stream 是 EventEmitter 类的实例,该类在 Node 中异步处理事件。因此流本质上是基于事件的。

要访问流模块:

1const stream = require('stream');

stream 模块对于创建新型流实例非常有用。通常不需要使用 stream 模块来消耗流。

流驱动的 Node API

由于它们的优点,许多 Node.js 核心模块提供了原生流处理功能,最值得注意的是:

  • net.Socket 是流所基于的主 API 节点,它是以下大多数 API 的基础

  • process.stdin 返回连接到 stdin 的流

  • process.stdout 返回连接到 stdout 的流

  • process.stderr 返回连接到 stderr 的流

  • fs.createReadStream() 创建一个可读的文件流

  • fs.createWriteStream() 创建可写的文件流

  • net.connect() 启动基于流的连接

  • http.request() 返回 http.ClientRequest 类的实例,它是可写流

  • zlib.createGzip() 使用gzip(一种压缩算法)将数据压缩到流中

  • zlib.createGunzip() 解压缩 gzip 流。

  • zlib.createDeflate() deflate(压缩算法)将数据压缩到流中

  • zlib.createInflate() 解压缩一个deflate流

流备忘单:

Webp.net-resizeimage

查看更多:Node.js 流速查表【 https://devhints.io/nodejs-stream

以下是与可写流相关的一些重要事件:

  • error –表示在写或配置管道时发生了错误。

  • pipeline – 当把可读流传递到可写流中时,该事件由可写流发出。

  • unpipe – 当你在可读流上调用 unpipe 并停止将其输送到目标流中时发出。

结论

这就是所有关于流的基础知识。流、管道和链是 Node.js 的核心和最强大的功能。流确实可以帮你编写简洁而高效的代码来执行 I/O。

另外,还有一个值得期待的 Node.js 战略计划【 https://github.com/nodejs/TSC/blob/master/Strategic-Initiatives.md#current-initiatives 】,名为 BOB【 https://github.com/Fishrock123/bob 】,旨在改善 Node.js 的内部数据流以及希望作为未来 Node.js 流数据接口的公共 API 的。

原文: https://nodesource.com/blog/understanding-streams-in-nodejs