skip to Main Content

I’m writing an HTTP API with expressjs in Node.js and here is what I’m trying to achieve:

  • I have a regular task that I would like to run regularly, approx every minute. This task is implemented with an async function named task.
  • In reaction to a call in my API I would like to have that task called immediately as well
  • Two executions of the task function must not be concurrent. Each execution should run to completion before another execution is started.

The code looks like this:

// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

// call task regularly
setIntervalAsync(async () => {
  await task("ticker");
}, 5000) // normally 1min

// call task immediately
app.get("/task", async (req, res) => {
  await task("trigger");
  res.send("ok");
});

I’ve put a full working sample project at https://github.com/piec/question.js

If I were in go I would do it like this and it would be easy, but I don’t know how to do that with Node.js.


Ideas I have considered or tried:

  • I could apparently put task in a critical section using a mutex from the async-mutex library. But I’m not too fond of adding mutexes in js code.
  • Many people seem to be using message queue libraries with worker processes (bee-queue, bullmq, …) but this adds a dependency to an external service like redis usually. Also if I’m correct the code would be a bit more complex because I need a main entrypoint and an entrypoint for worker processes. Also you can’t share objects with the workers as easily as in a "normal" single process situation.
  • I have tried RxJs subject in order to make a producer consumer channel. But I was not able to limit the execution of task to one at a time (task is async).

Thank you!

2

Answers


  1. Here’s a version using RxJS#Subject that is almost working. How to finish it depends on your use-case.

    async function task(reason: string) {
      console.log("do thing because %s...", reason);
      await sleep(1000);
      console.log("done");
    }
    
    const run = new Subject<string>();
    const effect$ = run.pipe(
      // Limit one task at a time
      concatMap(task),
      share()
    );
    const effectSub = effect$.subscribe();
    
    interval(5000).subscribe(_ => 
      run.next("ticker")
    );
    
    // call task immediately
    app.get("/task", async (req, res) => {
      effect$.pipe(
        take(1)
      ).subscribe(_ =>
        res.send("ok")
      );
      run.next("trigger");
    });
    

    The issue here is that res.send("ok") is linked to the effect$ streams next emission. This may not be the one generated by the run.next you’re about to call.

    There are many ways to fix this. For example, you can tag each emission with an ID and then wait for the corresponding emission before using res.send("ok").

    There are better ways too if calls distinguish themselves naturally.


    A Clunky ID Version

    Generating an ID randomly is a bad idea, but it gets the general thrust across. You can generate unique IDs however you like. They can be integrated directly into the task somehow or can be kept 100% separate the way they are here (task itself has no knowledge that it’s been assigned an ID before being run).

    
    interface IdTask {
      taskId: number,
      reason: string
    }
    
    interface IdResponse {
      taskId: number,
      response: any
    }
    
    async function task(reason: string) {
      console.log("do thing because %s...", reason);
      await sleep(1000);
      console.log("done");
    }
    
    const run = new Subject<IdTask>();
    const effect$: Observable<IdResponse> = run.pipe(
      // concatMap only allows one observable at a time to run
      concatMap((eTask: IdTask) => from(task(eTask.reason)).pipe(
        map((response:any) => ({
          taskId: eTask.taskId,
          response
        })as IdResponse)
      )),
      share()
    );
    const effectSub = effect$.subscribe({
      next: v => console.log("This is a shared task emission: ", v)
    });
    
    interval(5000).subscribe(num => 
      run.next({
        taskId: num,
        reason: "ticker"
      })
    );
    
    // call task immediately
    app.get("/task", async (req, res) => {
      const randomId = Math.random();
      effect$.pipe(
        filter(({taskId}) => taskId == randomId),
        take(1)
      ).subscribe(_ =>
        res.send("ok")
      );
      run.next({
        taskId: randomId,
        reason: "trigger"
      });
    });
    
    Login or Signup to reply.
  2. You can make your own serialized asynchronous queue and run the tasks through that.

    This queue uses a flag to keep track of whether it’s in the middle of running an asynchronous operation already. If so, it just adds the task to the queue and will run it when the current operation is done. If not, it runs it now. Adding it to the queue returns a promise so the caller can know when the task finally got to run.

    If the tasks are asynchronous, they are required to return a promise that is linked to the asynchronous activity. You can mix in non-asynchronous tasks too and they will also be serialized.

    class SerializedAsyncQueue {
        constructor() {
            this.tasks = [];
            this.inProcess = false;
        }
        // adds a promise-returning function and its args to the queue
        // returns a promise that resolves when the function finally gets to run
        add(fn, ...args) {
            let d = new Deferred();
            this.tasks.push({ fn, args: ...args, deferred: d });
            this.check();
            return d.promise;
        }
        check() {
            if (!this.inProcess && this.tasks.length) {
                // run next task
                this.inProcess = true;
                const nextTask = this.tasks.shift();
                Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
                    this.inProcess = false;
                    nextTask.deferred.resolve(val);
                    this.check();
                }).catch(err => {
                    console.log(err);
                    this.inProcess = false;
                    nextTask.deferred.reject(err);
                    this.check();
                });
            }
        }
    }
    
    const Deferred = function() {
        if (!(this instanceof Deferred)) {
            return new Deferred();
        }
        const p = this.promise = new Promise((resolve, reject) => {
            this.resolve = resolve;
            this.reject = reject;
        });
        this.then = p.then.bind(p);
        this.catch = p.catch.bind(p);
        if (p.finally) {
            this.finally = p.finally.bind(p);
        }
    }
    
    
    let queue = new SerializedAsyncQueue();
    
    // utility function
    const sleep = function(t) {
        return new Promise(resolve => {
            setTimeout(resolve, t);
        });
    }
    
    // only a single execution of this function is allowed at a time
    // so it is run only via the queue that makes sure it is serialized
    async function task(reason: string) {
        function runIt() {
            console.log("do thing because %s...", reason);
            await sleep(1000);
            console.log("done");
        }
        return queue.add(runIt);
    }
    
    // call task regularly
    setIntervalAsync(async () => {
        await task("ticker");
    }, 5000) // normally 1min
    
    // call task immediately
    app.get("/task", async (req, res) => {
        await task("trigger");
        res.send("ok");
    });
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search