Welcome, readable-stream@3!
By Matteo Collina

/img/blog/2018/08/streams-banner.jpg

I am so excited to be able to release the new version of readable-stream, v3! For all of you that might not know, readable-stream is the most downloaded module on NPM, and part of practically any dependency chain of any JavaScript application. readable-stream is a userland port of the Node.js require('stream') module, so that module authors could have a stable version across the various Node.js release lines (more details on https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). readable-stream@3 is an export of the require('stream') from Node.js 10, with some code transformations so that it runs smoothly on all the supported Node.js release lines: Node 6, Node 8 and Node 10. The API is mostly similar, but see the docs for more details. We guarantee to keep readable-stream@3 up to date until Node v10 is not supported anymore. This will happen in April 2021.

What’s new?

stream.pipeline

The new pipeline operator in Node.js stream is borne from the experience that Mathias Buus had with pump, and it enables you to safely pipe together a list of streams, creating a pipeline. This method’s key part is safety, as it ensures every stream that is part of the pipeline will be destroyed (stream.destroy() is called) if an error occurs or the pipeline ends prematurely. pipeline ensures that no internal resource are leaked, and we encourage all developers to use it instead of pipe.

const { pipeline } = require('readable-stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Note that pipeline supports utils.promisify, and it could be awaited upon in that manner.

const stream = require('readable-stream');
const { promisify } = require('util');
const fs = require('fs');
const zlib = require('zlib');
const pipeline = promisify(stream.pipeline);

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file, using await:

async function run () {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'));

  console.log('Pipeline succeeded');
}

run().catch(console.log)

stream.finished

One of the oldest problem in streams is knowing if it has ended, and it was solved in the userland module end-of-stream. Multiple streams implementations and different authors have made it very hard to detect if a stream has finished reading or writing, and we can stop listening for events. This utility is so critical for the usage of streams that we have added it in Node.js core, and it will be available in readable-stream as well.

const { finished } = require('readable-stream');
const fs = require('fs');
const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

rs.resume(); // drain the stream

Note that finished supports utils.promisify, and it could be awaited upon in that manner.

const stream = require('readable-stream');
const { promisify } = require('util');
const fs = require('fs');
const finished = promisify(stream.finished);

async function run () {
  const rs = fs.createReadStream('archive.tar');
  rs.resume(); // drain the stream

  await finished(rs)

  console.log('Stream is done reading');
}

run().catch(console.log);

for await (let chunk of readable)

Readable now is async iterable. You can process data from a stream just using a nice for loop, as follows:

'use strict'

const { Readable, finished } = require('stream')
const { promisify } = require(util)

async function run (origin, dest) {
  try {
    const write = buildWrite(dest)
    // This is an async iterator
    for await (let chunk of origin) {
      await write(chunk.toString().toUpperCase())
    }
    await promisify(finished)(dest)
  } catch (err) {
    origin.destroy(err)
    dest.destroy(err)
  }
}


function buildWrite (stream) {
  // This is a good way of wrapping stream.write into a Promise.
  // We are waiting for a drain event to resolve, and we are wrapping
  // the error event. A consumer should probably use finished to
  // know if the stream has completed.
  const streamError = null
  stream.on('error', function (err) {
    streamError = err
  })

  return write

  function write (chunk) {
    if (streamError) {
      return Promise.reject(streamError)
    }

    return new Promise(function (resolve, reject) {
      const res = stream.write(chunk)
      if (res) {
        resolve()
      } else {
        stream.once('drain', resolve)
      }
    })
  }
}

// startup and run the pipeline
const origin = new Readable({
  read (n) {
    this.push('hello')
    this.push(' ')
    this.push('world')
    this.push(null)
  }
})
run(origin, process.stdout).catch(console.log)

Note that the above pipeline supports backpressure, and it handles error events. AsyncIterators are currently experimental, as it’s a relatively new feature in the language being part of ES2018.

stream.push() always call _read() asynchronously

This breaking change simplifies some logic for implementers, making sure that a single _read() call is happening at any given time. In readable-stream@2 and Node.js < 10, up to two _read calls could happen at the same time, making implementing streams very complicated.

Before Node 10, the following would error with “Error: stream.push() after EOF“:

const { Readable } = require('stream')

let count = 0
const read = new Readable({
  objectMode: true,
  read (n) {
    setImmediate(() => {
      this.push(count++)

      if (count > 10) {
        this.push(null)
      }
    })
  }
})

read.on('data', console.log)

Can you spot why this is going to error? In readable-stream@2 (and Node < 10), this.push(count++) would call _read() synchronously, causing the callback passed to setImmediate to be called one more time after we pushed null. Fixing these types of problem is hard, because they imply a good knowledge of the stream machinery. Good news, with readable-stream@3 we don’t have to worry anymore about this problem.

Error Codes

readable-stream now supports the same error codes that Node.js core provides, in the namespace of ERR_STREAM_*. For more details, see https://nodejs.org/api/errors.html.

Clear Browser support commitment

readable-stream@3 commits to support all evergreen browsers (Google Chrome, Opera, Firefox, Edge), plus Internet Explorer 11 and the latest version of Safari. We ensure that all our tests passes thanks to Sauce Labs, that provides us their service for free under their Open Sauce offering. We also use the formidable https://www.npmjs.com/package/airtap, which enables us to run our tape tests in the various browsers.

Sauce Labs

Sauce Test Status

Check it out!

readable-stream is part of some very long dependency chains on NPM, so please help your maintainers in updating their modules! The Stream Working Group is looking for help, and if you would like to contribute open an issue at https://github.com/nodejs/readable-stream.

New Call-to-action
join the discussion