skip to Main Content

Executing the code below randomly either prints "DONE" or not, why does it happen?
How do I make it go to the console.log("DONE"); line every time.

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function main() {
  if (isMainThread) {
    const worker = new Worker(__filename);
    let resultResolve = null;
    let resultPromise = new Promise(resolve => resultResolve = resolve);
    worker.on('message', (msg) => resultResolve(msg));
    while (await resultPromise != null) {
      resultPromise = new Promise(resolve => resultResolve = resolve);
    }
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();

My guess is that the race happens when the worker exits and either the main thread event loop makes it to the await resultPromise line before that or not.

UPDATE 1

I am trying to have an async generator yielding values produced by a thread worker.

A more meaningful example would be:

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function* fooGenerator() {
  const worker = new Worker(__filename);
  let resultResolve = null;
  let resultPromise = new Promise(resolve => resultResolve = resolve);
  worker.on('message', (msg) => resultResolve(msg));
  let result = null;
  while ((result = await resultPromise) != null) {
    resultPromise = new Promise(resolve => resultResolve = resolve);
    yield result;
  }
}

async function main() {
  if (isMainThread) {
    for await (let value of fooGenerator());
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();

UPDATE 2

Adding setInterval doesn’t solve the problem. It still won’t print "DONE".

const {Worker, isMainThread, parentPort} = require('node:worker_threads');

async function main() {
  if (isMainThread) {
    setInterval(() => {}, 1000);
    const worker = new Worker(__filename);
    let resultResolve = null;
    let resultPromise = new Promise(resolve => resultResolve = resolve);
    worker.on('message', (msg) => resultResolve(msg));
    while ((await resultPromise) != null) {
      resultPromise = new Promise(resolve => resultResolve = resolve);
    }
    console.log("DONE");
  } else {
    for (let i = 0; i < 10000; i++) {
      parentPort.postMessage(i);
    }
    parentPort.postMessage(null);
  }
}

main();

2

Answers


  1. I suspect the race condition is rather between

    worker.on('message', (msg) => resultResolve(msg));
    

    calling resultResolve and

    let resultPromise = new Promise(resolve => resultResolve = resolve);
    

    assigning a new resultResolve. There might be multiple message events happening before you assign a new resolver function to resultResolve, which leads to skipped messages. To confirm this suspicion, use

    const {Worker, isMainThread, parentPort} = require('node:worker_threads');
    
    async function main() {
      if (isMainThread) {
        const worker = new Worker(__filename);
        let resultResolve = null;
        worker.on('message', (msg) => {
          if (resultResolve) resultResolve(msg);
          else console.log(`unhandled ${msg}`);
          resultResolve = null;
        });
        let i = 0;
        while (true) {
          const result = await new Promise(resolve => {
            resultResolve = resolve;
          });
          if (result == null) break;
          if (result != i) console.log(`missed ${i}`);
          i = result + 1;
        }
        console.log("DONE");
      } else {
        for (let i = 0; i < 10000; i++) {
          parentPort.postMessage(i);
        }
        parentPort.postMessage(null);
      }
    }
    
    main();
    

    This will definitely happen when your fooGenerator() experiences backpressure, e.g. in

    for await (const value of fooGenerator()) {
      await delay(50);
    }
    

    What you need to solve this problem is a proper queue such as this one. You’d then write

    const {Worker, isMainThread, parentPort} = require('node:worker_threads');
        
    if (isMainThread) {
      (async function main() {
        const worker = new Worker(__filename);
        const queue = new AsyncBlockingQueue();
        worker.on('message', msg => queue.enqueue(msg));
        for await (const value of queue) {}
        console.log("DONE");
      })();
    } else {
      for (let i = 0; i < 10000; i++) {
        parentPort.postMessage(i);
      }
      parentPort.postMessage(null);
    }
    
    Login or Signup to reply.
  2. Your code relies on some critical timing that apparently is not always the way you think it is.

    With this:

     worker.on('message', (msg) => resultResolve(msg));
    

    You are apparently assuming that each time this event handler gets called, it will be a new and different resultResolve() function that will trigger yet a different promise to resolve. But, if you instrument your code and check to see that this is actually a new and different promise, you will find that it is not. So, you have a race condition between the worker message and the code that assigns a new value to resultResolve, thus you’re reusing resultResolve values and not resolving all your promises.

    So, the Worker finishes its work, communicates all the results back to the parent, calls all the resultResolve() functions, but because you reuse some of the resultResolve values, you don’t resolve all the promise and thus your generator gets stuck, but nodejs has no other work to do. The program exits without getting to the "DONE" message.


    I’d rather implement this code without a couple things. First, I have a strong distaste for manually created promises that assign the resolve, reject functions outside the executor. There’s a reason the promise designers didn’t expose them. It can be messy to do so and that’s part of the cause of the problem here. If you absolutely must use that type of construct, then I encapsulate that logic inside a Deferred class and create an explicit class that does this for you. But, you usually don’t need to do it that way at all.

    Second, you have to completely get rid of the race condition. In my mind that’s easiest to do by not to the resultResolve thing at all. If the worker.on('message', ...) code has just one job and that’s to grab the data it was sent and store it and keep track of when there is no more data, then you can then make another piece of code that lets some other code iterate that data, waiting when it needs to and being told its done when it’s done.

    You can pick your desired architecture and put iterating the queue behind a generator, an async iterator or a class/method approach.

    I would implement this with a queue. There are plenty of already built queuing classes to use, but here’s a simple home-grown one that works here:

    const { Worker, isMainThread, parentPort } = require('node:worker_threads');
    const { EventEmitter, once } = require('node:events');
    
    class Queue extends EventEmitter {
        constructor() {
            super();
            this.data = [];
            this.complete = false;
        }
        async next() {
            if (this.data.length) {
                return this.data.shift();
            } else if (this.complete) {
                // return sentinel value that we're done
                return null;
            } else {
                // wait for next Queue event to happen
                // either an item to be added or a done signal
                await once(this, 'data');
                return this.next();
            }
        }
        done() {
            this.complete = true;
            this.emit('data');
        }
        add(data) {
            this.data.push(data);
            // notify watchers that there is now some data
            this.emit('data');
        }
    }
    
    async function main() {
        if (isMainThread) {
            const worker = new Worker(__filename);
            let q = new Queue();
            worker.on('message', (msg) => {
                if (msg === null) {
                    q.done();
                } else {
                    q.add(msg);
                }
            });
            let cntr = 0;
            while (true) {
                const val = await q.next();
                if (val === null) {
                    break;
                }
                ++cntr;
            }
            console.log(`Done: got ${cntr} results`);
        } else {
            // this is what the Worker is doing
            for (let i = 0; i < 10000; i++) {
                parentPort.postMessage(i);
            }
            parentPort.postMessage(null);
        }
    }
    
    main();
    

    If you really want to use for await ..., then you could put an asyncIterator on the Queue or convert it to a generator.

    Also, note that there’s not a single manually created promise here. It uses a trick I’ve used several times before where it awaits an event and that allows you to await something that some other agent will trigger more cleanly (at least in my opinion).

    I added a counter to make sure that all the Worker messages were being processed.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search