Node.js Stream.pipeline() Method

Why Stream.pipeline

通过流我们可以将一大块数据拆分为一小部分一点一点的流动起来,而无需一次性全部读入,在 Linux 下我们可以通过 | 符号实现,类似的在 Nodejs 的 Stream 模块中同样也为我们提供了 pipe() 方法来实现。

未使用 Stream pipe 情况

在 Nodejs 中 I/O 操作都是异步的,先用 util 模块的 promisify 方法将 fs.readFile 的 callback 形式转为 Promise 形式,这块代码看似没问题,但是它的体验不是很好,因为它是将数据一次性读入内存再进行的返回,当数据文件很大的时候也是对内存的一种消耗,因此不推荐它。

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = require('path');
const readFile = promisify(fs.readFile);

app.use(async ctx => {
  try {
    ctx.body = await readFile(resolve(__dirname, 'test.json'));
  } catch(err) { ctx.body = err };
});

app.listen(3000);

使用 Stream pipe 情况

下面,再看看怎么通过 Stream 的方式在 Koa 框架中响应数据

...
app.use(async ctx => {
  try {
    const readable = fs.createReadStream(resolve(__dirname, 'test.json'));
    ctx.body = readable;
  } catch(err) { ctx.body = err };
});

以上在 Koa 中直接创建一个可读流赋值给 ctx.body 就可以了,你可能疑惑了为什么没有 pipe 方法,因为框架给你封装好了,不要被表象所迷惑了,看下相关源码:

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {
  ...
  let body = ctx.body;
  if (body instanceof Stream) return body.pipe(res);
  ...
}

没有神奇之处,框架在返回的时候做了层判断,因为 res 是一个可写流对象,如果 body 也是一个 Stream 对象(此时的 Body 是一个可读流),则使用 body.pipe(res) 以流的方式进行响应。

使用 Stream VS 不使用 Stream

动图封面

动图封面

What Stream.pipeline function

The stream.pipeline() method is a module method that is used to the pipe by linking the streams passing on errors and accurately cleaning up and providing a callback function when the pipeline is done. 

Stream.pipeline() 方法是一种模块方法,用于通过链接传递错误的流并在管道完成时准确地清理并提供回调函数来用于管道。

Syntax:

stream.pipeline(...streams, callback)

Parameters: 

This method accepts two parameters as mentioned above and described below.该方法接受如上所述和如下所述的两个参数。

  • …streams: These are two or more streams that are to be piped together.这些是要通过管道连接在一起的两个或多个流。
  • callback: This function is called when the pipeline is fully done and it shows an ‘error’ if the pipeline is not accomplished.当管道完全完成时调用此函数,如果管道未完成,则会显示“错误”。

Return Value:

 It returns a cleanup function. 返回值:它返回一个清理函数。

The below examples illustrate the use of the stream.pipeline() method in Node.js: 

以下示例说明了 Node.js 中的

stream.pipeline() 方法的用法

Example 1: 

// Node.js program to demonstrate the   

// stream.pipeline() method

// Including fs and zlib module

const fs = require('fs');

const zlib = require('zlib');

// Constructing finished from stream

const { pipeline } = require('stream');

// Constructing promisify from

// util

const { promisify } = require('util');

// Defining pipelineAsync method

const pipelineAsync = promisify(pipeline);

// Constructing readable stream

const readable = fs.createReadStream("input.text");

// Constructing writable stream

const writable = fs.createWriteStream("output.text");

// Creating transform stream

const transform = zlib.createGzip();

// Async function

(async function run() {

    try {

        // pipelining three streams

        await pipelineAsync(

            readable,

            transform,

            writable

        );

        console.log("pipeline accomplished.");

    }

    // Shows error

    catch (err) {

        console.error('pipeline failed with error:', err);

    }

})();

Output:

Promise {  }
pipeline accomplished.

Example 2: 

// Node.js program to demonstrate the   

// stream.pipeline() method

// Including fs and zlib module

const fs = require('fs');

const zlib = require('zlib');

// Constructing finished from stream

const { pipeline } = require('stream');

// Constructing promisify from

// util

const { promisify } = require('util');

// Defining pipelineAsync method

const pipelineAsync = promisify(pipeline);

// Constructing readable stream

const readable = fs.createReadStream("input.text");

// Constructing writable stream

const writable = fs.createWriteStream("output.text");

// Creating transform stream

const transform = zlib.createGzip();

// Async function

(async function run() {

    try {

        // pipelining three streams

        await pipelineAsync(

            readable,

            writable,

            transform

        );

        console.log("pipeline accomplished.");

    }

    // Shows error

    catch (err) {

        console.error('pipeline failed with error:', err);

    }

})();

Output: Here, the order of streams is not proper while piping so an error occurs.

Promise {  }
pipeline failed with error: Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable
    at WriteStream.Writable.pipe (_stream_writable.js:243:24)
    at pipe (internal/streams/pipeline.js:57:15)
    at Array.reduce ()
    at pipeline (internal/streams/pipeline.js:88:18)
    at Promise (internal/util.js:274:30)
    at new Promise ()
    at pipeline (internal/util.js:273:12)
    at run (/home/runner/ThirstyTimelyKey/index.js:33:11)
    at /home/runner/ThirstyTimelyKey/index.js:45:5
    at Script.runInContext (vm.js:133:20)

解析Stream.PipeLine

在应用层我们调用了 fs.createReadStream() 这个方法,顺藤摸瓜找到这个方法创建的可读流对象的 pipe 方法实现,以下仅列举核心代码实现,基于 Nodejs v12.x 源码。

2.1.1 /lib/fs.js

导出一个 createReadStream 方法,在这个方法里面创建了一个 ReadStream 可读流对象,且 ReadStream 来自 internal/fs/streams 文件,继续向下找。

// https://github.com/nodejs/node/blob/v12.x/lib/fs.js
// 懒加载,主要在用到的时候用来实例化 ReadStream、WriteStream ... 等对象
function lazyLoadStreams() {
  if (!ReadStream) {
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
    [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
  }
}

function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options); // 创建一个可读流
}

module.exports = fs = {
  createReadStream, // 导出 createReadStream 方法
  ...
}

2.1.2 /lib/internal/fs/streams.js

这个方法里定义了构造函数 ReadStream,且在原型上定义了 open、_read、_destroy 等方法,并没有我们要找的 pipe 方法。

但是呢通过 ObjectSetPrototypeOf 方法实现了继承,ReadStream 继承了 Readable 在原型中定义的函数,接下来继续查找 Readable 的实现

// https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
const { Readable, Writable } = require('stream');

function ReadStream(path, options) {
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  ...
  Readable.call(this, options);
  ...
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

ReadStream.prototype.open = function() { ... };

ReadStream.prototype._read = function(n) { ... };;

ReadStream.prototype._destroy = function(err, cb) { ... };
...

module.exports = {
  ReadStream,
  WriteStream
};

2.1.3 /lib/stream.js

在 stream.js 的实现中,有条注释:在 Readable/Writable/Duplex/... 之前导入 Stream,原因是为了避免 cross-reference(require),为什么会这样?

第一步 stream.js 这里将 require('internal/streams/legacy') 导出复制给了 Stream。

在之后的 _stream_readable、Writable、Duplex ... 模块也会反过来引用 stream.js 文件,具体实现下面会看到。

Stream 导入了 internal/streams/legacy

上面 /lib/internal/fs/streams.js 文件从 stream 模块获取了一个 Readable 对象,就是下面的 Stream.Readable 的定义。

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
...

2.1.4 /lib/internal/streams/legacy.js

上面的 Stream 等于 internal/streams/legacy,首先继承了 Events 模块,之后呢在原型上定义了 pipe 方法,刚开始看到这里的时候以为实现是在这里了,但后来看 _stream_readable 的实现之后,发现 _stream_readable 继承了 Stream 之后自己又重新实现了 pipe 方法,那么疑问来了这个模块的 pipe 方法是干嘛的?什么时候会被用?翻译文件名 “legacy=遗留”?有点没太理解,难道是遗留了?有清楚的大佬可以指点下,也欢迎在公众号 “Nodejs技术栈” 后台加我微信一块讨论下!

// https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
const {
  ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

Stream.prototype.pipe = function(dest, options) {
  ...
};

module.exports = Stream;

2.1.5 /lib/_stream_readable.js

在 _stream_readable.js 的实现里面定义了 Readable 构造函数,且继承于 Stream,这个 Stream 正是我们上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加载了 internal/streams/legacy 文件且重写了里面定义的 pipe 方法。

经过上面一系列的分析,终于找到可读流的 pipe 在哪里,同时也更进一步的认识到了在创建一个可读流时的执行调用过程,下面将重点来看这个方法的实现。

module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('stream');

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...
  Stream.call(this, options); // 继承自 Stream 构造函数的定义
}
...

2.2 _stream_readable 实现分析

2.2.1 声明构造函数 Readable

声明构造函数 Readable 继承 Stream 的构造函数和原型。

Stream 是 /lib/stream.js 文件,上面分析了,这个文件继承了 events 事件,此时也就拥有了 events 在原型中定义的属性,例如 on、emit 等方法。

const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...

  Stream.call(this, options);
}

2.2.2 声明 pipe 方法,订阅 data 事件

在 Stream 的原型上声明 pipe 方法,订阅 data 事件,src 为可读流对象,dest 为可写流对象。

我们在使用 pipe 方法的时候也是监听的 data 事件,一边读取数据一边写入数据。

看下 ondata() 方法里的几个核心实现:

  • dest.write(chunk):接收 chunk 写入数据,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true,否则返回 false 时应该停止向流写入数据,直到 'drain' 事件被触发
  • src.pause():可读流会停止 data 事件,意味着此时暂停数据写入了。

之所以调用 src.pause() 是为了防止读入数据过快来不及写入,什么时候知道来不及写入呢,要看 dest.write(chunk) 什么时候返回 false,是根据创建流时传的 highWaterMark 属性,默认为 16384 (16kb),对象模式的流默认为 16。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      src.pause();
    }
  }
  ...
};

2.2.3 订阅 drain 事件,继续流动数据

上面提到在 data 事件里,如果调用 dest.write(chunk) 返回 false,就会调用 src.pause() 停止数据流动,什么时候再次开启呢?

如果说可以继续写入事件到流时会触发 drain 事件,也是在 dest.write(chunk) 等于 false 时,如果 ondrain 不存在则注册 drain 事件。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      if (!ondrain) {
        // When the dest drains, it reduces the awaitDrain counter
        // on the source.  This would be more elegant with a .once()
        // handler in flow(), but adding and removing repeatedly is
        // too slow.
        ondrain = pipeOnDrain(src);
        dest.on('drain', ondrain);
      }
      src.pause();
    }
  }
  ...
};

// 当可写入流 dest 耗尽时,它将会在可读流对象 source 上减少 awaitDrain 计数器
// 为了确保所有需要缓冲的写入都完成,即 state.awaitDrain === 0 和 src 可读流上的 data 事件存在,切换流到流动模式
function pipeOnDrain(src) {
  return function pipeOnDrainFunctionResult() {
    const state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

// stream.read() 从内部缓冲拉取并返回数据。如果没有可读的数据,则返回 null。在可读流上 src 还有一个 readable 属性,如果可以安全地调用 readable.read(),则为 true
function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

2.2.4 触发 data 事件

调用 readable 的 resume() 方法,触发可读流的 'data' 事件,进入流动模式。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  // Start the flow if it hasn't been started already.
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }
  ...

然后实例上的 resume(Readable 原型上定义的)会在调用 resume() 方法,在该方法内部又调用了 resume_(),最终执行了 stream.read(0) 读取了一次空数据(size 设置的为 0),将会触发实例上的 _read() 方法,之后会在触发 data 事件。

function resume(stream, state) {
  ...
  process.nextTick(resume_, stream, state);
}

function resume_(stream, state) {
  debug('resume', state.reading);
  if (!state.reading) {
    stream.read(0);
  }

  ...
}

2.2.5 订阅 end 事件

end 事件:当可读流中没有数据可供消费时触发,调用 onend 函数,执行 dest.end() 方法,表明已没有数据要被写入可写流,进行关闭(关闭可写流的 fd),之后再调用 stream.write() 会导致错误。

Readable.prototype.pipe = function(dest, options) {
  ...
  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
              dest !== process.stdout &&
              dest !== process.stderr;

  const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  ...

  function onend() {
    debug('onend');
    dest.end();
  }
}

2.2.6 触发 pipe 事件

在 pipe 方法里面最后还会触发一个 pipe 事件,传入可读流对象

Readable.prototype.pipe = function(dest, options) {
  ...
  const source = this;
  dest.emit('pipe', src);
  ...
};

在应用层使用的时候可以在可写流上订阅 pipe 事件,做一些判断,具体可参考官网给的这个示例 stream_event_pipe[1]

2.2.7 支持链式调用

最后返回 dest,支持类似 unix 的用法:A.pipe(B).pipe(C)

Readable.prototype.pipe = function(dest, options) {
  return dest;
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/337396.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

实时云渲染服务:流式传输 VR 和 AR 内容

想象一下无需专用的物理计算机,甚至无需实物连接,就能获得高质量的 AR/VR 体验是种什么样的体验? 过去,与 VR 交互需要专用的高端工作站,并且根据头显、壁挂式传感器和专用的物理空间。VR 中的复杂任务会突破传感器范…

快速傅里叶变化检测轻微划痕

像这种轻微划痕,普通算法鲁棒性差,通用性也不是很好,通过一些特殊处理,基本上可以满足客户需求. 图像处理,检测无非这个几个步骤. 预处理----分割----筛选—满足设定条件NG read_image (Image, ‘轻微划痕.bmp’) dev_close_window() get_image_size(Image, Width, Height) dev…

Chatgpt+Comfyui绘图源码线上部署文档

源码仓库: https://gitee.com/BTYY/wailikeji-chatgpt 其他文档地址: ChatgptComfyui绘图源码运营文档 ChatgptComfyui绘图源码说明及本地部署文档 一、云服务器购买 (一)购买云服务 有两种部署方案,不同方案对服务…

【MongoDB】下载安装、指令操作

目录 1.下载安装 2.指令 2.1.基础操作指令 2.2.增加 2.3.查询 2.4.修改 2.5.删除 前言: 关于MongoDB的核心概念请移步: 【文档数据库】ES和MongoDB的对比-CSDN博客 1.下载安装 本文以安装Windows版本的mongodb为例,Linux版本的其实…

IP劫持的危害分析及应对策略

在当今数字化时代,网络安全问题备受关注,其中IP劫持是一种常见而危险的威胁。本文将深入探讨IP劫持的危害,并提供一些有效的应对策略。 第一部分:IP劫持的定义 IP劫持是指黑客通过各种手段获取并篡改目标IP地址的控制权&#xf…

如何在地图资源下载工具中下载和共享自定义资源

在获取GIS或遥感资源时,有时候可能会有一些您自己的下载资源!您也可以在地图资源下载工具增加这些下载资源!这样既可以方便以后下载,也可以与其它人分享下载资源! 设置方式如下: 下载方式如下:…

【汇编】 13.3 对int iret和栈的深入理解

书中示例 assume cs:codecode segment start:mov ax,csmov ds,axmov si,offset lpmov ax,0mov es,axmov di,200hmov cx,offset end0-offset lpcldrep movsb ;lp到end0的指令传送到0:200处mov ax,0mov es,axmov word ptr es:[7ch*4],200hmov word ptr es:[7ch*42],0 ;设置7c表项…

NeRF - 神经辐射场 原理分析与解释

标题:NeRF:Representing Scenes as Neural Radiance Fields for View Synthesis 顾名思义:通过NeRF 神经辐射场合成新视角来表达场景 这是一篇来源于伯克利大学的论文研究 摘要 论文提出了一种方法,可以通过优化一个连续体积场…

力扣每日一题---1547. 切棍子的最小成本

//当我们将棍子分段之后,我们是不是想到了怎么组合这些棍子 //并且这些棍子有一个性质就是只能与相邻的进行组合 //暴力搜索的话复杂度很高 //在思考暴力搜索的时候,我们发现一个规律 //比如棍子长度1 2 1 1 2 //那么与最后一个2组合的棍子有&#xff0c…

「绝世唐门」七怪一死六伤,98级玄子饕餮真身,伊莱克斯宣布神位

Hello,小伙伴们,我是拾荒君。 《斗罗大陆Ⅱ绝世唐门》第32期超前爆料,据透露史莱克监察团深入邪魂师的老巢,发现许多城中的百姓遭到残忍的毒手。为了对抗这些残忍的邪魂师,史莱克监察团成员纷纷发动武魂,展现出强大的…

人工智能原理实验2(1)——八数码问题(BFS、DFS、UCS、IDS、A*算法)

🧡🧡实验内容🧡🧡 要求对空格执行空格左移、空格右移、空格上移和空格下移这四个操作使得棋盘从初始状态(左)到目标状态(右) 🧡🧡BFS、DFS实现🧡…

使用Scrapy 爬取“http://tuijian.hao123.com/”网页中左上角“娱乐”、“体育”、“财经”、“科技”、历史等名称和URL

一、网页信息 二、检查网页,找出目标内容 三、根据网页格式写正常爬虫代码 from bs4 import BeautifulSoup import requestsheaders {User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/53…

恢复因各种情况丢失的数据和文件的恢复软件汇集。

数据和文件恢复软件工具是直观的应用程序,可以从各种存储介质中恢复有价值且敏感的业务相关数据。这些宝贵的救生应用程序使企业能够恢复因不可预测的情况而丢失的数据。 存储介质解决方案可能会因网络攻击、病毒、数据泄露、硬盘故障等多种原因而丢失或损坏数据。…

Dart安装(Winodws)

Dart官网: https://dart.dev/ 一、命令行安装 https://dart.dev/get-dart You can install the Dart SDK using Chocolatey. error Important: These commands require administrator rights. Here’s one way to open a Command Prompt window that has admin …

ROS第 12 课 Launch 启动文件的使用方法

文章目录 第 12 课 Launch 启动文件的使用方法1.本节前言2.Lanuch 文件基本语法2.2 参数设置2.3 重映射嵌套 3.实操练习 第 12 课 Launch 启动文件的使用方法 1.本节前言 我们在前面的教程里面通过命令行来尝试运行新的节点。但随着创建越来越复杂的机器人系统中,打…

前后置、断言、提取变量、数据库操作功能

前置操作和后置操作都是 API 请求在发送和响应过程中执行的脚本,主要用于在发起 API 请求前和获得响应后完成验证或执行某些操作,目的是为了提高 API 调试和测试的效率,并确保接口的正确性。 前置操作​ 前置操作是在 API 请求之前执行的脚本…

[计算机网络]基本概念

目录 1.ip地址和端口号 1.1IP地址 1.2端口号 2.认识协议 2.1概念: 2.2知名协议的默认端口 3.五元组 4.协议分层 4.1分层的作用 4.2OSI七层模型 4.3TCP/IP五层(四层)模型 ​编辑4.4网络设备对应的分层: ​编辑以下为跨…

【51、32单片机】模块化编程(.c .h文件)

0、前言 USER:存放工程文件、主函数文件 main.c,以及其他包括system_stm32f10x.c等 CORE :用来存放核心文件和启动文件 OBJ :是用来存放编译过程文件以及hex 文件 STM32F10x_FWLib :用来存放 ST 官方提供的库函数源码文件 SY…

【开源】基于JAVA的CRM客户管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统设计3.1 用例设计3.2 E-R 图设计3.3 数据库设计3.3.1 客户表3.3.2 商品表3.3.3 客户跟踪表3.3.4 客户消费表3.3.5 系统角色表 四、系统展示五、核心代码5.1 查询客户5.2 新增客户跟踪记录5.3 新增客户消费订单5.4 查…

让uniapp小程序支持多色图标icon:iconfont-tools-cli

前景: uniapp开发小程序项目时,对于iconfont多色图标无法直接支持;若将多色icon下载引入项目则必须关注包体,若将图标放在oss或者哪里管理,加载又是一个问题,因此大多采用iconfont-tools工具,但…