Node.js Multithreading, Part 1

Concurrent Programming with Node.js Multithreading using workers and atomics

Node.js is known to be a single-threaded runtime environment, meaning that a program’s code is executed line after line and there can't be two lines of a program running at the same time.

This may seem like a disadvantage when compared to multithreaded runtimes such as .NET’s CRL or the JVM, which can leverage the multi-core nature of modern CPUs.

In practice, this comes with the great advantage of largely simplifying the programming model. As an example, strong assumptions can be made that no concurrent access can ever happen to a shared data structure.

“Node.js has great resources to explain how this works”

Node.js Multithreading support

Although Node.js makes the assumption of running user code in a single thread, it also exposes multithreading APIs, the core of which lies in the  worker_threads  module.

The  worker_threads  programming model is fairly simple and consists of instantiating a  Worker  by providing the path to the file to be executed.

A somewhat convoluted example consists of a single program that behaves differently depending on whether it’s executed from the main or a worker thread:

Plain Text
// simple-worker.js
import { Worker, isMainThread, threadId } from 'worker_threads'

if (isMainThread) {
  // This loads the current file inside a Worker instance.
  console.log('inside main thread')
  new Worker('./simple-worker.js')
} else {
  console.log('inside worker', threadId)
}

The main thread, identified by the boolean export  isMainThread , logs to the console and then starts a new worker using the same file. The worker will then execute the code in the  else  branch and print its own  threadId .

The result of running this program will be similar to:

Plain Text
inside main thread
inside worker 1

You can follow along by cloning  this repository , where you will find all the code shown in this blog post.

Concurrency

A program that runs on multiple threads, where each thread does something different and is isolated from the others, is not a very useful program. More often, they will need to collaborate and communicate to carry out a common task.

For example, in order to avoid  blocking the event loop , you may want to offload a CPU-intensive operation to a worker by sending it some data, letting it run in the background, then getting the result back for further processing in the main thread.

This requires some level of synchronization between the main and worker threads because they are running  concurrently .

Sharing data

A typical way to communicate across multiple threads is by accessing shared data.

As an example, let’s compute the Nth prime number, a fairly expensive operation for high values of N, and offload this work to a number of background workers. Then, we want to determine which one managed to begin the computation first.

For reference, this is the code we use to calculate the Nth prime number:

Plain Text
// source: https://stackoverflow.com/a/57012040/32093
export function nthPrime(n) {
  const prime = []
  let i = 1

  while (i++ && prime.length < n)
    prime.reduce((a, c) => (i % c) * a, 2) && prime.push(i)

  return prime.length ? prime.pop() : -1
}

A naive approach

The following code is the entry point of our program, running on the main thread.

It spawns a certain number of workers (the specific number is unimportant), supplying them with the index of the prime we want to compute and a result variable to store the result. It then waits for all workers to finish and prints the value of the prime number and the threadId of the worker that computed it.

“The reason why we decided to use an array to store the result will be clearer later on.”
Plain Text
// index.js
import { Worker } from 'worker_threads'
import { __dirname, numberOfWorkers } from '../util.js'

const workers = []
// [threadId, nthPrime]
const result = [-1, -1]
// a number between 1k and 2k
const whichPrime = Math.floor(Math.random() * 1e3) + 1e3

for (let i = 0; i < numberOfWorkers; i++) {
  const worker = new Worker(__dirname(import.meta) + '/worker.js', {
    workerData: {
      result,
      whichPrime
    }
  })

  workers.push(new Promise(resolve => worker.once('exit', resolve)))
}

await Promise.all(workers)

console.log(
  `main thread: worker ${result[0]} calculated ${whichPrime}th prime as ${result[1]}`
)

The following code is the worker’s code, which introduces an arbitrary short delay, checks if no other worker has yet computed the result and, in that case, does the computation and sets the result.

Plain Text
// worker.js
import { workerData, threadId } from 'worker_threads'
import { setTimeout } from 'timers/promises'
import { nthPrime } from '../util.js'

await setTimeout(Math.random() * 100 + 100)

if (workerData.result[0] === -1) {
  workerData.result[0] = threadId

  const prime = nthPrime(workerData.whichPrime)
  workerData.result[1] = prime

  console.log(
    `worker(${threadId}): the ${workerData.whichPrime}th prime is ${prime}`
  )
}

Running this code leads to a result similar to this:

Plain Text
worker(4): the 1809th prime is 15493
worker(2): the 1809th prime is 15493
worker(1): the 1809th prime is 15493
worker(3): the 1809th prime is 15493
worker(5): the 1809th prime is 15493
main thread: worker -1 calculated 1809th prime as -1

Each worker is unaware of other workers computing the prime number, so it stores its own  threadId  and prime number in the result. The change is not propagated back to the main thread, which keeps seeing the initial value instead of the value we expect.

The reason for this is that data sent from one thread to another is copied using the  structured clone algorithm , thereby causing each thread to see its own copy and preventing changes from being propagated across threads.

Shared memory

The problem with the previous example can be addressed by using a data structure that, instead of being copied, is shared across multiple threads.

JavaScript offers a fairly flexible object for this, called  SharedArrayBuffer . Its usage can be unintuitive at first, but it basically represents an array of raw, arbitrary data. The data can then be manipulated using  views , such as  Int32Array , which behave like plain arrays.

The relevant part of the main thread code of the previous example becomes:

Plain Text
const rawResult = new SharedArrayBuffer(2 * Int32Array.BYTES_PER_ELEMENT)
// [threadId, nthPrime]
const result = new Int32Array(rawResult).fill(-1)

The changes are minimal: instead of using a plain array we create a  SharedArrayBuffer  big enough to contain two 32-bit integers, then we initialize it to -1 using a  Int32Array  view on it. The worker code remains unchanged.

If we execute this code, most of the time we’ll get something that looks like a correct result:

Plain Text
worker(5): the 1560th prime is 13109
main thread: worker 5 calculated 1560th prime as 13109

Unfortunately, at some point we’ll come across a bug, which manifests itself in a way similar to this:

Plain Text
worker(4): the 1417th prime is 11821
worker(1): the 1417th prime is 11821
main thread: worker 1 calculated 1417th prime as 11821

This is called a  race condition . Two workers believed they were the first ones to run the computation when only one can be.

This is the nature of asynchronous, concurrent code: access to shared data must be thread-safe and atomic, to guarantee correctness.

In this case, the problem lies in these two lines of code:

Plain Text
if (workerData.result[0] === -1) {
  workerData.result[0] = threadId
}

Two threads were able to enter the if statement and both changed the result. The last one which did is the “winner”, but we ran the computation twice unnecessarily.

Achieving thread safety

In order to fix the race condition we rely on the atomic operations provided by the  Atomics  object.

The operation we’re doing consists of:

  • Reading a value
  • Checking if the value is equal to -1
  • If so, setting it to a different value

We will use the  Atomics.compareExchange  method, which does exactly the same in a thread-safe way.

Our main thread code is unchanged, and the worker code changes as such:

Plain Text
if (Atomics.compareExchange(workerData.result, 0, -1, threadId) === -1) {
  const prime = nthPrime(workerData.whichPrime)
  Atomics.store(workerData.result, 1, prime)

  console.log(`worker(${threadId}): found ${prime}`)
}

Atomics.compareExchange  does this:

  1. Read the value at index 0 of the result property
  2. If the value is equal to -1, set it to threadId
  3. Return the original value before the change, if any

This is all happening in an atomic fashion, thereby preventing the race condition of the previous example from happening.

Using the right primitives for thread-safe programming opens up many opportunities for multithreaded programs, which distribute workloads across multiple workers.

The code shown in this blog post is available in  this repository .

In the next blog post of this Node.js multithreading series, we will see how to apply these concepts to a more interesting scenario. Learn more about our Node.js Development Services

Insight, imagination and expertly engineered solutions to accelerate and sustain progress.

Contact