Correct usage of Worker Threads with Piscina

Since they were introduced as an experimental new feature in Node.js v10, Worker threads have become one of the more exciting stable core capabilities. There is, however, a noticeable lack of information available around how they are best used, their key benefits and, importantly, their limitations.

A few months ago in the Node.js core GitHub repository, we received an issue from a user who was experiencing performance issues when using workers. It turned out that their application was happily spinning up several thousand concurrent Worker threads to churn through data stored on the filesystem. While the issue ended up helping us identify a minor memory leak in the Worker implementation, the key issue with that application was that it was abusing Worker threads in a number of fundamental ways.

The issue was quickly resolved but it highlighted the fact that there are still a large number of misconceptions about Worker threads and multi-threading in Node.js, and there are very few good examples out there about how to use them correctly. It was that realization that led directly to the creation of Piscina, a new open source project focused on the correct usage of Worker threads in Node.js, sponsored by NearForm Research.

Getting Started with Piscina

Piscina is an MIT-licensed open source project, published to the public npm registry, developed by NearForm Research.

It can be installed by running:
npm i piscina

(Or, if you’re using yarn, yarn add piscina)

Once installed, a new instance of Piscina can be created:

const Piscina = require('piscina');

const piscina = new Piscina({
  filename: path.resolve(__dirname, 'worker.js');
});

(async function() {
  const result = await piscina.runTask({ a: 4, b: 6 });
  console.log(result);  // Prints 10
})();

The filename option passed to the Piscina constructor identifies the code will be run within the pool of Node.js Worker threads Piscina will create. This must be specified in the form of either an absolute file path or a `file://` URL, and may specify either a regular JavaScript file, an *.mjs EcmaScript Module, a Node.js module path or a Node.js native addon.

When the Piscina instance is created, a minimum number of Worker threads is spawned automatically based on the number of CPU cores available to the Node.js process. Every spawned worker will load its own copy of the worker code identified by filename.
Piscina
The piscina.runTask() method creates a new task and submits that to the first available worker. Every time piscina.runTask() is called, the task will either be immediately passed off to an available worker or pushed onto an internal task queue, where it will wait for a worker to become available. Either way, the method returns a Promise that will be resolved once the worker has completed the task (or rejected if there was an error).

The worker code pointed to by the filename option must be in the form of a module that exports a single function as the default export, as in:

module.exports = ({ a, b }) => {
  return a + b;
};

A Promise that resolves as a function may also be exported in case there is some initialization work that needs to be performed:

async function initialize() {
  await doSomeAsyncInitialization();
  return ({ a, b }) => a + b;
}

module.exports = initialize();

Every time a task is dispatched to a worker, this exported function is called to process the task. Piscina handles all of the communication between the main Node.js thread and the worker thread, allowing developers to stay focused on the functionality.
As we will explore a bit later, Piscina workers are best suited for offloading synchronous tasks to run off the main Node.js event loop. That said, there are many cases where the worker needs to perform at least some asynchronous work. To help make that as natural as possible, the worker may export an asynchronous function:

module.exports = async ({ a, b }) => {
  await somethingElse();
  return a + b;
};

Piscina is intelligent enough to tell the difference between these alternatives and handles all errors accordingly, rejecting the Promise returned by piscina.runTask() should any part of the initialization or processing fail.

Cloning vs. Transfer

By default, all arguments passed into the worker are cloned in accordance with a Web API standard called the “structured clone algorithm”. This algorithm allows all primitive JavaScript types (e.g. booleans, strings, numbers, etc.) and other “primordial” JavaScript types such as Date, RegExp, ArrayBuffer, Arrays, Maps, Sets and plain objects to be copied and passed to the worker.

Worker threads allow certain special kinds of objects (currently ArrayBuffers and MessagePorts in Node.js) to be transferred across threads instead of copied. Transferring an object moves ownership of that object from one Node.js thread to another, making further use of those objects in the originating thread impossible unless it is transferred back. This is helpful, for instance, when working with large amounts of data that are expensive to copy or when opening additional communication channels between threads.

The return value of a worker function is cloned by default, but Piscina supports transferring the return value if it fits into the “transferable object” category by using Piscina.move():

const { move } = require('piscina');

module.exports = () => {
  return move(new Uint8Array(10));
}

For more complex objects that contain transferable objects, Piscina provides mechanisms to transfer those as well. Transfer lists are not something that we will cover in too much detail in this blog post but you can refer to the Piscina documentation and examples to see transfer lists and MessagePorts in use.

The fact that input arguments and return values are copied means that there is some overhead associated with passing complex values. Typically, this cost will be minimal and largely inconsequential, but it is definitely there.

Cancelling Tasks

Piscina supports a mechanism that allows submitted tasks to be cancelled even if the task has already been submitted to a worker and is being actively processed. This can be used, for instance, to cancel tasks that are taking too long to run.

The mechanism works by passing an AbortSignal object along with the submitted task:

const { AbortController } = require(‘abort-controller’);

(async function() {
  const abortController = new AbortController();

  // Cancel the task after 5 seconds
  const timeout =
    setTimeout(() => abortController.abort(), 5000);
  try {
    await piscina.runTask(
      { a: 4, b: 6 }, abortController.signal);
    clearTimeout(timeout);
  } catch (err) {
    console.log('The task was canceled');
  }
})();

When abort() is called while the task is being processed, the worker thread itself will be interrupted and torn down (Piscina will restart it automatically if and when necessary), and the Promise returned by piscina.runTask() will be rejected. A plain Node.js EventEmitter can also be used here, in which case, emitting the abort event on it will have the same effect.

Benchmarks and Performance

Before worker thread support in Node.js existed, there was speculation that workers would help developers resolve a great number of common performance issues within their Node.js applications. This perception was based on the idea that Node.js has always executed JavaScript on a single thread that prevents tasks from running in parallel and taking full advantage of a multi-core machine.

This is a common misconception about Node.js. While it is true that JavaScript does not run in parallel, there has always been a pool of native threads in a Node.js process handling tasks like file system reads and writes, garbage collection, cryptography, compression and more. An application properly designed to leverage the Node.js asynchronous I/O optimizations can achieve absolutely phenomenal performance despite having the JavaScript execution limited to a single thread.

So the question should be: What additional benefit do Node.js Worker threads (and Piscina by extension) provide? To expand on that, we decided to do a study of our own to find out what performance benefits Worker threads can offer.

Sync vs. Async Tasks, CPU vs. I/O

Our study starts by characterizing all workloads in Node.js across a pair of dimensions: Synchronous or Asynchronous, and CPU-bound or I/O-bound.

A synchronous workload is one in which JavaScript is running, not letting the Node.js event loop progress while it executes. (Essentially, any time JavaScript is running the Node.js, the event loop is not.) During a synchronous workload, the event loop is unable to perform any other tasks (other than garbage collection, but that’s another story).

An asynchronous workload is one in which the execution of JavaScript is waiting for some other event to occur. These can be activities such as reading from a file, waiting for a network connection, or writing out to a socket. These activities do not block the event loop — and in fact, rely on the event loop continuing in order to complete. Importantly, synchronous workloads block the completion of asynchronous workloads.

On the other dimensions, synchronous workloads are nearly always CPU-bound. (There are exceptions to this rule, such as synchronous filesystem I/O operations or uses of Atomic.wait() that we won’t get into here.) What this means is that synchronous workloads will typically grind away on an available CPU core until the task is completed. Examples of CPU-bound operations include cryptography or any relatively advanced mathematics (such as image processing) or synchronous iterations over large collections.

I/O-bound workloads are those that are intrinsically tied to tasks such as reading or writing to the file system, querying a database or waiting on a timer to fire. These workloads are typically not very expensive in terms of using system resources but can introduce latency into an application waiting on tasks to complete.

For the first part of our study into the performance characteristics of Worker threads, we wanted to get a better sense of the kinds of workloads for which workers are best suited. To do so, we created a simple test that compares different versions of a given workload.

The scrypt function in Node.js is a cryptographic operation that uses a password to derive a key. It is designed to be an expensive operation that consumes both CPU and memory resources to make guessing the key more difficult. Node.js provides both synchronous and asynchronous versions of the scrypt function. The synchronous version runs on the main Node.js thread and blocks all other activity until it is complete. The asynchronous version runs in a separate background thread provided and managed by libuv using the same pool of threads that support asynchronous file system operations and other async tasks.

In our example workload, we create an async generator that produces 10 random bytes (the password) each time it is called. Once those bytes are generated, we create a random 16-byte salt value and pass both the password and the salt into the scrypt function, and we wait on the results.

'use strict';
const crypto = require('crypto');
const { promisify } = require('util');
const randomFill = promisify(crypto.randomFill);
const scrypt = promisify(crypto.scrypt);
const { performance, PerformanceObserver } =
  require('perf_hooks');

const salt = Buffer.allocUnsafe(16);

const obs = new PerformanceObserver((entries) => {
  console.log(entries.getEntries()[0].duration);
});
obs.observe({ entryTypes: ['measure'] });

async function * generateInput () {
  let max = parseInt(process.argv[2] || 10);
  const data = Buffer.allocUnsafe(10);
  while (max-- > 0) {
    yield randomFill(data);
  }
}

(async function () {
  performance.mark('start');
  const keylen = 64;

  for await (const input of generateInput()) {
    (await scrypt(input,
                  await randomFill(salt),
                  keylen)).toString('hex');
  }

  performance.mark('end');
  performance.measure('start to end', 'start', 'end');
})();

We use the Node.js performance hooks API to measure the length of time it takes to generate keys for the entire batch. By default, we start with a batch of 10 randomly generated passwords. It is important to note that this is using the asynchronous version of the scrypt function, which means all of the actual cryptographic work is happening off the main Node.js thread.

On the modest 4-core Lenovo desktop running Ubuntu that is sitting under my desk here in my home office, using Node.js 14.2, the execution time of this application is approximately 625 milliseconds on average.

We can alter this example to use the synchronous version of scrypt by changing a single line of code from:

    (await scrypt(input,
                  await randomFill(salt),
                  keylen)).toString('hex');

To:

scryptSync(input,
           randomFillSync(salt),
           keylen).toString('hex');

When we run this on the same 4-core Lenovo desktop, we get an execution time averaging around 580 milliseconds. Some might be surprised that the synchronous version runs faster! There are multiple reasons for this, but the most significant is that moving execution off the main thread incurs its own small additional performance overhead. When the number of blocking operations is relatively small, the event loop blockage typically will not be high enough to matter much. Increasing the number of those operations, however, can have a significant impact.

For instance, if we run the same versions of the scrypt example using a sample size of 1000 randomly generated passwords, the total execution time for both extends out to around 58 seconds for both. Execution time is only part of the story: we also have to pay attention to how much each example blocks the event loop from turning. The higher the event loop blockage is, the longer asynchronous workloads will take to complete.
Learning to Swim with Piscina, the node.js worker pool
Without going into the details on how we measured the event loop block, profiling the asynchronous version of our example with 1000 samples shows that 99% of the time, the event loop delay was at around 20 milliseconds, while in the synchronous version, the event loop delay was over 60 milliseconds. This means that while the synchronous version of the code was running, any other asynchronous workloads (file reads, timers, etc.) would have experienced a significant delay. That’s not good, and such delays are the primary reason why synchronous operations are discouraged for many Node.js programs.

So how do Worker threads fit into this picture? Many people have speculated that moving expensive crypto operations into Node.js workers would yield better performance. In theory, these speculations are correct, but it depends entirely on what you are comparing against.

In our test, we modified our two examples (the async and sync version of scrypt) to each use a Piscina worker pool. This was really as simple as creating a separate worker.js that calls either version of the scrypt function, then modifying the main script to create the Piscina pool and defer to runTask:

const Piscina = require('../..');
const { resolve } = require('path');
const crypto = require('crypto');
const { promisify } = require('util');
const randomFill = promisify(crypto.randomFill);

const piscina = new Piscina({
  filename: resolve(__dirname, 'scrypt.js'),
  concurrentTasksPerWorker: 10
});

async function * generateInput () {
  let max = parseInt(process.argv[2] || 10);
  const data = Buffer.allocUnsafe(10);
  while (max-- > 0) {
    yield randomFill(data);
  }
}

(async function () {
  performance.mark('start');
  const keylen = 64;

  for await (const input of generateInput()) {
    await piscina.runTask({ input, keylen });
  }
})();

When we run the pooled-async version with a sample size of 1000 random generated passwords, we end up with a total execution time of about 58 seconds and an event loop delay of about 20 milliseconds 99% of the time — results that are identical to the unpooled asynchronous version! When we run the pooled synchronous version with the same sample size we also end up with identical results — 58 seconds with an event loop delay of around milliseconds. What is happening here? Why are the unpooled async, pooled async and pooled sync variations all giving identical results?

The reason, it turns out, is because in the asynchronous version of scrypt, the cryptographic operations are always already being performed in a separate background thread regardless of whether Piscina is being used or not. When we use the asynchronous scrypt in a Piscina worker, all we are actually doing is scheduling a thread to schedule work in yet another thread, and in doing so we are not actually saving any compute time or resources. The unpooled async scrypt is already as optimized as we are able to make it! We do, however, see an improvement between the pooled and unpooled synchronous variations, but the performance of the pooled version can only ever be as good as the unpooled asynchronous version.

If you’re looking at the results above and thinking that the asynchronous version that is not using Piscina has the best performance, you’re absolutely correct. The lesson we learn from this example is simple:

If a task is already optimized for asynchronous execution in Node.js, moving that task into a worker thread is not going to make it perform any better. Or, put another way, Worker threads will only provide performance benefits when moving synchronous operations of the main Node.js thread.

This matches the warning at the beginning of the official Worker threads documentation:

Workers (threads) are useful for performing CPU-intensive JavaScript operations. They will not help much with I/O-intensive work. Node.js’s built-in asynchronous I/O operations are more efficient than Workers can be.

Limiting system resources and balancing queue pressure

Piscina works by creating a pool of threads and maintaining a queue of tasks to be processed by those threads. Accordingly, there are a number of considerations that need to be given to the number of threads Piscina should create, the number of tasks it should allow to be queued up and the amount of system resources each worker should be allowed to consume.

A number of configuration options are supported:

  • minThreads / maxThreads — Respectively, these specify the absolute minimum number of Worker threads Piscina will maintain and the absolute maximum Piscina will allow to exist at any one time. By default, these are calculated as a function of the number of CPU cores available to the Node.js process. For instance, on the 4-Core Lenovo system we are using for benchmarking, the default maxThreads is 6 (1.5 times the number of cores), while the minThreads is 2. There is no single rule to determine the “correct” minimum or maximum number of threads as it can depend on a large range of factors that are beyond the scope of this single blog post.
  • idleTimeout — By default, once a worker thread completes execution of a task, if there are no additional tasks to work on and we have not hit the minThreads limit, the worker will shut down immediately. In many cases, this behaviour is desirable to keep the idle thread from consuming system resources unnecessarily. The idleTimeout may be set to a specific number of milliseconds to keep the idle thread alive waiting for an additional task to be posted.
  • maxQueue – Specifies the maximum number of tasks that may be submitted to the Piscina queue waiting for a free thread.  This feature limits the overall latency of tasks that are being posted to the thread pool. If the queue is at this limit and another task is posted, the corresponding Promise will be rejected. Whether using maxQueue or not, it can often make sense to check the current queue size before posting a task. By default, the maxQueue is unbounded.
  • concurrentTasksPerWorker – A typical worker performs one task at a time, which is desirable if the task is a purely synchronous workload — that is, it’s only capable of working on one thing at a time. However, if the worker task is partially asynchronous, then it becomes possible and safe for the worker to take on additional concurrent tasks. The concurrentTasksPerWorker option allows you to specify the maximum number of tasks an asynchronous worker can take on.
  • resourceLimits.maxOldGenerationSizeMb, resourceLimits.maxYoungGenerationSizeMb, and resourceLimits.codeRangeSizeMb are advanced configuration options inherited from Node.js that allow the amount of memory consumed by a worker thread to be strictly limited. Once these thresholds are met, the worker thread will be immediately destroyed and any pending Promises will be rejected.

Among these options, the first four are the most important to determining the overall efficiency of your worker thread pool. To help measure the efficiency of the worker pool we define the concept of “Queue Pressure”, which is the ratio between the rate of new tasks being added to the queue (inbound rate) and the rate of tasks being picked up and processed by the workers in our pool (outbound rate). Low Queue Pressure means that the inbound rate is slower than the outbound rate and the queue is not able to maintain enough entries to prevent Worker threads from going idle. High Queue Pressure means that the inbound rate is higher than the outbound rate and tasks are accumulating faster than we can process them. Both conditions can be detrimental to overall performance.

When queue pressure is low, Worker threads go idle and are terminated. If the queue pressure begins to rise again, new Worker threads will need to spawn to replace those that went idle, which is a fairly expensive operation on its own. To prevent the churn that this causes, the minThreads option can be set to a higher value (causing Piscina to consistently maintain a larger number of idle threads) or the idleTimeout can be extended, delaying the termination of the idle thread while waiting for new tasks. The important concept here is that when queue pressure is low, the destruction and recreation of Worker threads can cause a significant performance loss.

When queue pressure is high, on the other hand, Piscina will stack tasks up in the internal queue and allocate a Promise for each one. While this prevents Worker threads from going idle, it causes increased latency waiting for tasks to execute as they sit for longer periods of time in the queue. If the task objects are large, it can also lead to higher memory usage.
To illustrate this idea, we created an example application that implements an HTTP server performing a simple React server-side rendering operation. In the application, every received HTTP request is immediately deferred to the Piscina worker pool to perform the rendering operation. (React server-side rendering has always been an infamously slow synchronous workload that blocks the main Node.js event loop, so it’s a perfect type of workload for us to move into a worker.)
The initial implementation of the example uses Fastify (a fast efficient open source HTTP server framework whose development was also sponsored by NearForm) and a plugin we created to easily use Piscina and Fastify together:

'use strict';
const fastify = require('fastify')();
const fastifyPiscina = require('fastify-piscina');
const { resolve } = require('path');

fastify.register(fastifyPiscina, {
  Filename: resolve(__dirname, 'worker')
});

// Declare a route
fastify.get('/', async () => fastify.runTask({ name: 'James' }));

// Run the server!
const start = async () => {
  try {
    await fastify.listen(3000);
  } catch (err) {
    process.exit(1);
  }
};

start();

Notice that we create the Piscina instance initially without using any of the performance-tuning configuration options.
In the worker, we render a simple HTML page using an example React component that generates a block of random Lorem-Ipsum style text. It’s not a particularly expensive or complex rendering operation, but it is enough to illustrate the point.

'use strict';
const React = require('react');
const ReactDOMServer = require('react-dom/server');
const { Greeting, Lorem } = require('./components');

module.exports = ({ name }) => {
  return `
  <!doctype html>
    <html>
    <body>
    <div id="root">${
      ReactDOMServer.renderToString(React.createElement(Greeting, { name }))
    }</div>
    ${
      ReactDOMServer.renderToString(React.createElement(Lorem))
    }
    <script src="/static/home.js"></script>
  </body>
  </html>`;
};

Once our server code is in place, we start the server and test it using an HTTP benchmarking tool also developed at NearForm called autocannon (npm i -g autocannon). By default, autocannon will open 10 concurrent connections and attempt to throw as many HTTP requests as it can to the server over a 10 second period. With the application set up as it is, we end up with results:
Running 10s test @ http://localhost:3000

10 connections

┌─────────┬──────┬──────┬───────┬──────┬─────────┬─────────┬──────────┐
│ Stat    │ 2.5% │ 50%  │ 97.5% │ 99%  │ Avg     │ Stdev   │ Max      │
├─────────┼──────┼──────┼───────┼──────┼─────────┼─────────┼──────────┤
│ Latency │ 0 ms │ 1 ms │ 5 ms  │ 7 ms │ 1.21 ms │ 1.42 ms │ 36.58 ms │
└─────────┴──────┴──────┴───────┴──────┴─────────┴─────────┴──────────┘
┌───────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐
│ Stat      │ 1%      │ 2.5%    │ 50%     │ 97.5%   │ Avg     │ Stdev   │ Min     │
├───────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Req/Sec   │ 1832    │ 1832    │ 6603    │ 7415    │ 5881.7  │ 1795.16 │ 1832    │
├───────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Bytes/Sec │ 6.28 MB │ 6.28 MB │ 22.8 MB │ 25.9 MB │ 20.4 MB │ 6.24 MB │ 6.28 MB │
└───────────┴─────────┴─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘

Req/Bytes counts sampled once per second.

59k requests in 10.07s, 204 MB read

If we examine the wait time of items in the Piscina queue with the default workload of only 10 concurrent requests, the maximum latency is just over 11 milliseconds with a mean of 0.02 milliseconds. We end up with an average of just under 5900 requests per second processed.

If we run the test again with autocannon creating 1000 concurrent connections against the same server, we get:

Running 10s test @ http://localhost:3000

1000 connections

┌─────────┬────────┬────────┬────────┬────────┬───────────┬───────────┬────────────┐
│ Stat    │ 2.5%   │ 50%    │ 97.5%  │ 99%    │ Avg       │ Stdev     │ Max        │
├─────────┼────────┼────────┼────────┼────────┼───────────┼───────────┼────────────┤
│ Latency │ 131 ms │ 148 ms │ 334 ms │ 438 ms │ 172.01 ms │ 134.43 ms │ 1783.75 ms │
└─────────┴────────┴────────┴────────┴────────┴───────────┴───────────┴────────────┘

┌───────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐
│ Stat      │ 1%      │ 2.5%    │ 50%     │ 97.5%   │ Avg     │ Stdev   │ Min     │
├───────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Req/Sec   │ 1621    │ 1621    │ 6743    │ 6939    │ 5808.73 │ 1758.45 │ 1621    │
├───────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Bytes/Sec │ 5.59 MB │ 5.59 MB │ 23.1 MB │ 24.1 MB │ 20.1 MB │ 6.07 MB │ 5.59 MB │
└───────────┴─────────┴─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘

Req/Bytes counts sampled once per second.

64k requests in 11.3s, 221 MB read
If we examine the maximum wait time latency for queued items in this run we see that the maximum jumps to 351.70 milliseconds with a mean of 156.71 milliseconds. The average requests per second are just over 5800. You’ll see that the latency per request as reported by autocannon also increases.

In this example, on the benchmark machine, we are using it is difficult for the queue pressure to drop low enough to cause performance loss. (While we were in the process of writing this blog post, Anna actually made a change in Piscina that made it significantly less likely that low queue pressure would cause the performance issues in the first place), but we can definitely see the impact that increased queue pressure has on processing latency.

Excessively high queue pressure can be diagnosed by looking at the queue wait time latency. Excessively low queue pressure can be diagnosed by simply not seeing enough of a performance improvement when enabling Piscina. There is no single rule of thumb to follow here. It is recommended that you profile your application with various workloads and tune both the minThreads and idleTimeout until the processing throughput reaches an acceptable level. Setting either of those too high, however, can cause system resources to be wasted.
Learning to Swim with Piscina, the node.js worker pool
In case you’re wondering about the equivalent performance of the React server-side rendering when Piscina is not being used, the best performance we’re able to achieve when autocannon is using 1000 concurrent requests is about 3400 requests per second. Enabling Piscina, in this case, yielded a 2x performance improvement and cut the latency per request in half.

Applying Backpressure

With the server-side rendering example above we are intentionally allowing the Piscina task queue to fill up, trading improved throughput for increased memory consumption. There are times, however, when we will want to limit the amount of memory consumed by the worker pool, we do so by limiting the inbound rate of new tasks flowing into the queue. That is, we intentionally keep queue pressure low.

In this next example, we have a simple comma-separated values document containing about 25,000 individual lines. For each line, we want to convert it into an object that is passed off to a Piscina worker for processing. We don’t really care about the return value so we just dispatch the task and move on.

We are using the open source csvtojson module (npm i csvtojson) to open the csv file and efficiently stream it into our application one line at a time. This module takes care of the details of reading the data from the disk and transforming it into an object we can use. From there, we hand the item off to the queue.

const { resolve } = require('path');
const csv = require('csvtojson');
const Pool = require('../..');

const pool = new Pool({
  filename: resolve(__dirname, 'worker.js')
});

const stream = csv().fromFile('./data.csv');

stream
  .on('data', (data) => {
    const line = data.toString('utf8');
    pool.runTask(line).catch((err) => {
      console.error(err.message);
      process.exit(1);
    });
  })
  .on('error', console.error)
  .on('end', () => {
    console.log('done');
  });

If we run this example, we will find that Piscina happily allows us to keep queuing tasks up forever, and we’ll quickly end up with nearly 25,000 items in the queue (along with 25,000 Promises allocated to track them). This is undesirable for multiple reasons but there are two we care most about here: overall memory usage and queue wait time latency. Allocating so many pending Promises and tasks at once will cause heap usage to spike immediately, and given that we have a maximum of 6 Worker threads by default on this 4-core machine, it’s going to take some time to get through the entire set of queued tasks.

To help balance things out, the maxQueue configuration option can provide an upper limit on the number of pending tasks that may be queued. Once that limit is reached, Piscina will reject any attempt to add a new task until pending tasks are processed and there is room again in the queue. Once the size of the queue drops back to zero, the Piscina instance will emit the drain event, signalling that the queue is completely empty. We can use these mechanisms to control the flow of tasks into the queue to maintain a reasonable and balanced queue pressure:

const { resolve } = require('path');
const csv = require('csvtojson');
const Pool = require('../..');

const pool = new Pool({
  filename: resolve(__dirname, 'worker.js'),
  maxQueue: 16
});

const stream = csv().fromFile('./data.csv');

pool.on('drain', () => {
  if (stream.isPaused()) {
    console.log('resuming...', counter, pool.queueSize);
    stream.resume();
  }
});

performance.mark('A');

stream
  .on('data', (data) => {
    const line = data.toString('utf8');
    pool.runTask(line);
    if (pool.queueSize === maxQueue) {
      console.log('pausing...', counter, pool.queueSize);
      stream.pause();
    }
  })

  .on('error', console.error)
  .on('end', () => {
    console.log('done');
  });

Here we limit the queue size to a maximum of 16 items. Once that limit is reached, we pause the csvtojson stream feeding us items until the Piscina queue has been completely processed. We then resume the feeding stream to collect an additional set of tasks. Rather than allocating over 25k tasks and Promises, we allocate only a handful at any given time. What we see when we run this example is that while the total execution time of the application increases by a small amount, the total memory usage over the entire test is much lower and much more predictable.

Because the drain event is only emitted once the queue has been emptied, there are some drawbacks when the latency of creating new tasks is non-negligible. A similar effect can be achieved by awaiting submitted tasks and submitting additional tasks after each completes.

Whichever approach is used, the idea is fundamentally the same: limiting the number of items queued at any given time can improve overall performance.

Wrapping Up

Our goal in writing this blog post has been to introduce Piscina and layout many of the fundamental principles of worker thread performance in Node.js.

We developed the Piscina project specifically to make it easier to correctly integrate Worker threads into your Node.js applications. The examples provided seek not only to show how to use Piscina but how to do so effectively and efficiently in multiple scenarios. Work is continuing to improve both Piscina and Worker threads in general in Node.js core. If you’re interested in contributing to either, please reach out and let us know!

This post was co-written by James Snell & Anna Henningsen. 

For further reading, we suggest:

View all posts  |  Technology  |  Business  |  Culture  |  Opinion  |  Design
Follow us for more information on this and other topics.
Published by James Snell