Skip to content

Welcome, Readable-stream@3!

Readable-stream v3 is here!

I am so excited to be able to release the new version of  readable-stream , v3! For all of you that might not know,  readable-stream  is the most downloaded module on NPM, and part of practically any dependency chain of any JavaScript application. readable-stream is a userland port of the Node.js  require('stream')  module, so that module authors can have a stable version across the various Node.js release lines (more details on  https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html ).  readable-stream @3 is an export of the  require('stream')  from Node.js 10, with some code transformations so that it runs smoothly on all the supported Node.js release lines: Node 6, Node 8 and Node 10. The API is mostly similar, but see the  docs  for more details. We guarantee to keep readable-stream@3 up to date until Node v10 is not supported anymore. This will happen in  April 2021 .

What’s new?

stream.pipeline

The new pipeline operator in Node.js stream is borne from the experience that  Mathias Buus  had with  pump , and it enables you to safely pipe together a list of streams, creating a pipeline. This method’s key part is safety, as it ensures every stream that is part of the pipeline will be destroyed ( stream.destroy()  is called) if an error occurs or the pipeline ends prematurely.  pipeline  ensures that no internal resources are leaked, and we encourage all developers to use it instead of pipe.

Plain Text
const { pipeline } = require("readable-stream");
const fs = require("fs");
const zlib = require("zlib");</p><p>// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.</p><p>// A pipeline to gzip a potentially huge tar file efficiently:</p><p>pipeline(
  fs.createReadStream("archive.tar"),
  zlib.createGzip(),
  fs.createWriteStream("archive.tar.gz"),
  err => {
    if (err) {
      console.error("Pipeline failed", err);
    } else {
      console.log("Pipeline succeeded");
    }
  }
);

Note that pipeline supports  utils.promisify , and it could be awaited upon in that manner.

Plain Text
const stream = require("readable-stream");
const { promisify } = require("util");
const fs = require("fs");
const zlib = require("zlib");
const pipeline = promisify(stream.pipeline);</p><p>// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.</p><p>// A pipeline to gzip a potentially huge tar file, using await:</p><p>async function run() {
  await pipeline(
    fs.createReadStream("archive.tar"),
    zlib.createGzip(),
    fs.createWriteStream("archive.tar.gz")
  );</p><p>  console.log("Pipeline succeeded");
}</p><p>run().catch(console.log);

stream.finished

One of the oldest problem in streams is knowing if it has ended, and it was solved in the userland module  end-of-stream . Multiple streams implementations and different authors have made it very hard to detect if a stream has finished reading or writing and we can stop listening for events. This utility is so critical for the usage of streams that we have added it in Node.js core, and it will be available in  readable-stream  as well.

Plain Text
const { finished } = require("readable-stream");
const fs = require("fs");
const rs = fs.createReadStream("archive.tar");</p><p>finished(rs, err => {
  if (err) {
    console.error("Stream failed", err);
  } else {
    console.log("Stream is done reading");
  }
});</p><p>rs.resume(); // drain the stream

Note that finished supports  utils.promisify , and it could be awaited upon in that manner.

Plain Text
const stream = require("readable-stream");
const { promisify } = require("util");
const fs = require("fs");
const finished = promisify(stream.finished);</p><p>async function run() {
  const rs = fs.createReadStream("archive.tar");
  rs.resume(); // drain the stream</p><p>  await finished(rs);</p><p>  console.log("Stream is done reading");
}</p><p>run().catch(console.log);

For Await (Let Chunk of Readable)

Readable now is async iterable. You can process data from a stream just using a nice for loop, as follows:

Plain Text
"use strict";</p><p>const { Readable, finished } = require("stream");
const { promisify } = require(util);</p><p>async function run(origin, dest) {
  try {
    const write = buildWrite(dest);
    // This is an async iterator
    for await (let chunk of origin) {
      await write(chunk.toString().toUpperCase());
    }
    await promisify(finished)(dest);
  } catch (err) {
    origin.destroy(err);
    dest.destroy(err);
  }
}</p><p>function buildWrite(stream) {
  // This is a good way of wrapping stream.write into a Promise.
  // We are waiting for a drain event to resolve, and we are wrapping
  // the error event. A consumer should probably use finished to
  // know if the stream has completed.
  const streamError = null;
  stream.on("error", function(err) {
    streamError = err;
  });</p><p>  return write;</p><p>  function write(chunk) {
    if (streamError) {
      return Promise.reject(streamError);
    }</p><p>    return new Promise(function(resolve, reject) {
      const res = stream.write(chunk);
      if (res) {
        resolve();
      } else {
        stream.once("drain", resolve);
      }
    });
  }
}</p><p>// startup and run the pipeline
const origin = new Readable({
  read(n) {
    this.push("hello");
    this.push(" ");
    this.push("world");
    this.push(null);
  }
});
run(origin, process.stdout).catch(console.log);

Note that the above pipeline supports backpressure, and it handles error events. AsyncIterators are currently experimental, as it’s a relatively new feature in the language being part of ES2018.

stream.push() always call _read()asynchronously

This breaking change simplifies some logic for implementers, making sure that a single  _read()  call is happening at any given time. In readable-stream@2 and Node.js < 10, up to two  _read  calls could happen at the same time, making implementing streams very complicated.

Before Node 10, the following would be an error with “Error: stream.push() after EOF“:

Plain Text
const { Readable } = require("stream");</p><p>let count = 0;
const read = new Readable({
  objectMode: true,
  read(n) {
    setImmediate(() => {
      this.push(count++);</p><p>      if (count > 10) {
        this.push(null);
      }
    });
  }
});</p><p>read.on("data", console.log);

Can you spot why this is going to cause an error? In readable-stream@2 (and Node < 10),  this.push(count++)  would call  _read()  synchronously, causing the callback passed to  setImmediate  to be called one more time after  we pushed  null . Fixing these types of problems is hard, because they imply a good knowledge of the stream machinery. Good news, with readable-stream@3 we don’t have to worry about this problem anymore.

Error Codes

readable-stream  now supports the same error codes that Node.js core provides, in the namespace of  ERR_STREAM_* . For more details, see https://nodejs.org/api/errors.html.

Clear Browser Support Commitment

readable-stream @3 commits to support all evergreen browsers (Google Chrome, Opera, Firefox, Edge), plus Internet Explorer 11 and the latest version of Safari. We ensure that all our tests pass thanks to  Sauce Labs , that provides us their service for free under their  Open Sauce  offering. We also use the formidable  https://www.npmjs.com/package/airtap , which enables us to run our  tape  tests in the various browsers.

unknown

Check it Out!

readable-stream  is part of some very long dependency chains on NPM, so please help your maintainers in updating their modules! The Stream Working Group is looking for help, and if you would like to contribute open an issue at  https://github.com/nodejs/readable-stream .

Insight, imagination and expertly engineered solutions to accelerate and sustain progress.

Contact