Executing the code below randomly either prints "DONE" or not, why does it happen?
How do I make it go to the console.log("DONE");
line every time.
const {Worker, isMainThread, parentPort} = require('node:worker_threads');
async function main() {
if (isMainThread) {
const worker = new Worker(__filename);
let resultResolve = null;
let resultPromise = new Promise(resolve => resultResolve = resolve);
worker.on('message', (msg) => resultResolve(msg));
while (await resultPromise != null) {
resultPromise = new Promise(resolve => resultResolve = resolve);
}
console.log("DONE");
} else {
for (let i = 0; i < 10000; i++) {
parentPort.postMessage(i);
}
parentPort.postMessage(null);
}
}
main();
My guess is that the race happens when the worker exits and either the main thread event loop makes it to the await resultPromise
line before that or not.
UPDATE 1
I am trying to have an async generator yielding values produced by a thread worker.
A more meaningful example would be:
const {Worker, isMainThread, parentPort} = require('node:worker_threads');
async function* fooGenerator() {
const worker = new Worker(__filename);
let resultResolve = null;
let resultPromise = new Promise(resolve => resultResolve = resolve);
worker.on('message', (msg) => resultResolve(msg));
let result = null;
while ((result = await resultPromise) != null) {
resultPromise = new Promise(resolve => resultResolve = resolve);
yield result;
}
}
async function main() {
if (isMainThread) {
for await (let value of fooGenerator());
console.log("DONE");
} else {
for (let i = 0; i < 10000; i++) {
parentPort.postMessage(i);
}
parentPort.postMessage(null);
}
}
main();
UPDATE 2
Adding setInterval
doesn’t solve the problem. It still won’t print "DONE".
const {Worker, isMainThread, parentPort} = require('node:worker_threads');
async function main() {
if (isMainThread) {
setInterval(() => {}, 1000);
const worker = new Worker(__filename);
let resultResolve = null;
let resultPromise = new Promise(resolve => resultResolve = resolve);
worker.on('message', (msg) => resultResolve(msg));
while ((await resultPromise) != null) {
resultPromise = new Promise(resolve => resultResolve = resolve);
}
console.log("DONE");
} else {
for (let i = 0; i < 10000; i++) {
parentPort.postMessage(i);
}
parentPort.postMessage(null);
}
}
main();
2
Answers
I suspect the race condition is rather between
calling
resultResolve
andassigning a new
resultResolve
. There might be multiplemessage
events happening before you assign a new resolver function toresultResolve
, which leads to skipped messages. To confirm this suspicion, useThis will definitely happen when your
fooGenerator()
experiences backpressure, e.g. inWhat you need to solve this problem is a proper queue such as this one. You’d then write
Your code relies on some critical timing that apparently is not always the way you think it is.
With this:
You are apparently assuming that each time this event handler gets called, it will be a new and different
resultResolve()
function that will trigger yet a different promise to resolve. But, if you instrument your code and check to see that this is actually a new and different promise, you will find that it is not. So, you have a race condition between the worker message and the code that assigns a new value toresultResolve
, thus you’re reusingresultResolve
values and not resolving all your promises.So, the Worker finishes its work, communicates all the results back to the parent, calls all the
resultResolve()
functions, but because you reuse some of theresultResolve
values, you don’t resolve all the promise and thus your generator gets stuck, but nodejs has no other work to do. The program exits without getting to the "DONE" message.I’d rather implement this code without a couple things. First, I have a strong distaste for manually created promises that assign the resolve, reject functions outside the executor. There’s a reason the promise designers didn’t expose them. It can be messy to do so and that’s part of the cause of the problem here. If you absolutely must use that type of construct, then I encapsulate that logic inside a Deferred class and create an explicit class that does this for you. But, you usually don’t need to do it that way at all.
Second, you have to completely get rid of the race condition. In my mind that’s easiest to do by not to the
resultResolve
thing at all. If theworker.on('message', ...)
code has just one job and that’s to grab the data it was sent and store it and keep track of when there is no more data, then you can then make another piece of code that lets some other code iterate that data, waiting when it needs to and being told its done when it’s done.You can pick your desired architecture and put iterating the queue behind a generator, an async iterator or a class/method approach.
I would implement this with a queue. There are plenty of already built queuing classes to use, but here’s a simple home-grown one that works here:
If you really want to use
for await ...
, then you could put an asyncIterator on the Queue or convert it to a generator.Also, note that there’s not a single manually created promise here. It uses a trick I’ve used several times before where it awaits an event and that allows you to await something that some other agent will trigger more cleanly (at least in my opinion).
I added a counter to make sure that all the Worker messages were being processed.