I’m writing an HTTP API with expressjs in Node.js and here is what I’m trying to achieve:
- I have a regular task that I would like to run regularly, approx every minute. This task is implemented with an async function named
task
. - In reaction to a call in my API I would like to have that task called immediately as well
- Two executions of the
task
function must not be concurrent. Each execution should run to completion before another execution is started.
The code looks like this:
// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});
I’ve put a full working sample project at https://github.com/piec/question.js
If I were in go I would do it like this and it would be easy, but I don’t know how to do that with Node.js.
Ideas I have considered or tried:
- I could apparently put
task
in a critical section using a mutex from the async-mutex library. But I’m not too fond of adding mutexes in js code. - Many people seem to be using message queue libraries with worker processes (bee-queue, bullmq, …) but this adds a dependency to an external service like redis usually. Also if I’m correct the code would be a bit more complex because I need a main entrypoint and an entrypoint for worker processes. Also you can’t share objects with the workers as easily as in a "normal" single process situation.
- I have tried RxJs subject in order to make a producer consumer channel. But I was not able to limit the execution of
task
to one at a time (task
is async).
Thank you!
2
Answers
Here’s a version using RxJS#Subject that is almost working. How to finish it depends on your use-case.
The issue here is that
res.send("ok")
is linked to theeffect$
streams next emission. This may not be the one generated by therun.next
you’re about to call.There are many ways to fix this. For example, you can tag each emission with an ID and then wait for the corresponding emission before using
res.send("ok")
.There are better ways too if calls distinguish themselves naturally.
A Clunky ID Version
Generating an ID randomly is a bad idea, but it gets the general thrust across. You can generate unique IDs however you like. They can be integrated directly into the task somehow or can be kept 100% separate the way they are here (task itself has no knowledge that it’s been assigned an ID before being run).
You can make your own serialized asynchronous queue and run the tasks through that.
This queue uses a flag to keep track of whether it’s in the middle of running an asynchronous operation already. If so, it just adds the task to the queue and will run it when the current operation is done. If not, it runs it now. Adding it to the queue returns a promise so the caller can know when the task finally got to run.
If the tasks are asynchronous, they are required to return a promise that is linked to the asynchronous activity. You can mix in non-asynchronous tasks too and they will also be serialized.