skip to Main Content

I am using node.js and I have a task that runs every second like so:

let queue = ["Sample Data 1", "Sample Data 2"]
const job = schedule.scheduleJob('*/1 * * * * *', function () {
    console.log("Checking the queue...")
    if (queue.length > 0) {
        wss.broadcast(JSON.stringify({
            data: queue[0]
        }));
        setTimeout(() => {
            queue.shift();
        }, queue[0].duration);
    }
});

I am wondering how I can make it so that the timeout must finish before the next queue check. Could I use a de-bounce, or is there a better way?

2

Answers


  1. You could use

    Math.max(queue[0].duration, 1000 - (performance.now() - start))
    

    to make an interval between the jobs at least 1 second and call the schedule recursively.

    In that case returning a job could make not much sense, so you could collect your jobs in an array for example if needed.

    const schedule = {
      scheduleJob(job, cb){
        setTimeout(cb, Math.random()*500);
        return job;
      }
    };
    
    const wss = {
      broadcast(task){  
        console.log(JSON.stringify(task));
      }
    };
    
    let queue = [{title: "Sample Data 1", duration: 2000}, {title: "Sample Data 2", duration: 500}];
    
    const shiftQueue = (stopCb, jobs = []) => {
        if(!queue.length) return stopCb(jobs);
        const start = performance.now();
        const job = schedule.scheduleJob('*/1 * * * * *', function () {
          console.log("Checking the queue...")
          wss.broadcast(JSON.stringify({
              data: queue[0]
          }));
          setTimeout(() => {
              queue.shift();
              shiftQueue(stopCb, jobs);
          }, Math.max(queue[0].duration, 1000 - (performance.now() - start)));
        });
        jobs.push(job);
    }
    
    shiftQueue(jobs => console.log(jobs));

    But I would use an async generator to simplify the logic. A bonus here that you can execute an async code with await in the for await loop and its execution time will be counted also:

    const schedule = {
      scheduleJob(job, cb){
        setTimeout(cb, Math.random()*500);
        return job;
      }
    };
    
    const wss = {
      broadcast(task){  
        console.log(JSON.stringify(task));
      }
    };
    
    let queue = [{title: "Sample Data 1", duration: 2000}, {title: "Sample Data 2", duration: 500}];
    
    const timeout = timeout => new Promise(r => setTimeout(r, timeout));
    
    async function* shiftQueue(queue){
        for(const task of queue){
          const start = performance.now();  
          const job = new Promise(resolve => {
            const job = schedule.scheduleJob('*/1 * * * * *', function () {
              console.log("Checking the queue...")
              wss.broadcast(JSON.stringify({
                  data: task
              }));
              resolve(job);
            });
          });
          yield job;
          await timeout(Math.max(task.duration, 1000 - (performance.now() - start)));
        }
    }
    
    (async () => {
      for await(job of shiftQueue(queue)){
        console.log(job);
        // emulate some async work
        await timeout(100);
      }
      console.log('complete');
    })();
    Login or Signup to reply.
  2. The OP’s problem qualifies perfectly for a solution of mainly 2 combined techniques …

    1. the map based creation of a list of async function/s (expressions), each function representing a delaying broadcast task (delaying and not delayed because the task executes immediately but delays its resolving/returning time).

    2. the creation of an async generator via an async generator-function (expression), where the latter consumes / works upon the created list of delaying tasks, and where the async generator itself will be iterated via the for await...of statement.

    In addition one needs to write kind of a wait function which can be achieved easily via an async function which returns a Promise instance, where the latter resolves the promise via setTimeout and a customizable delay value.

    const queueData =
      ["Sample Data 1", "Sample Data 2", "Sample Data 3"];
    
    // create a list of async function based "delaying tasks".
    const delayingTasks = queueData
      .map(data => async () => {
    
        wss.broadcast(
          JSON.stringify({ data })
        );
        await wait(1500);
    
        return `successful broadcast of "${ data }"`;
      });
    
    // create an async generator from the "delaying tasks".
    const scheduledTasksPool = (async function* (taskList) {
      let task;
      while (task = taskList.shift()) {
    
        yield await task();
      }
    })(delayingTasks);
    
    // utilize the async generator of "delaying tasks".
    (async () => {
      for await (const result of scheduledTasksPool) {
    
        console.log({ result });
      }
    })();
    .as-console-wrapper { min-height: 100%!important; top: 0; }
    <script>
    
    const wss = {
      broadcast(payload) {
        console.log('broadcast of payload ...', payload);
      },
    };
    
    async function wait(timeInMsec = 1_000) {
      return new Promise(resolve =>
        setTimeout(resolve, Math.max(0, Math.min(timeInMsec, 20_000)))
      );
    }
    
    </script>

    And since the approach is two folded, one even can customize each delay in between two tasks … one just slightly has to change the format of the to be queued data and the task generating mapper functionality (2 lines of code are effected) …

    // changed format.
    const queueData = [
      { data: "Sample Data 1", delay: 1000 },
      { data: "Sample Data 2", delay: 3000 },
      { data: "Sample Data 3", delay: 2000 },
      { data: "Sample Data 4" },
    ];
    
    // create a list of async function based "delaying tasks".
    const delayingTasks = queueData
      .map(({ data, delay = 0 }) => async () => { // changed argument.
    
        wss.broadcast(
          JSON.stringify({ data })
        );
        await wait(delay); // changed ... custom delay.
    
        return `successful broadcast of "${ data }"`;
      });
    
    // create an async generator from the "delaying tasks".
    const scheduledTasksPool = (async function* (taskList) {
      let task;
      while (task = taskList.shift()) {
    
        yield await task();
      }
    })(delayingTasks);
    
    // utilize the async generator of "delaying tasks".
    (async () => {
      for await (const result of scheduledTasksPool) {
    
        console.log({ result });
      }
    })();
    .as-console-wrapper { min-height: 100%!important; top: 0; }
    <script>
    
    const wss = {
      broadcast(payload) {
        console.log('broadcast of payload ...', payload);
      },
    };
    
    async function wait(timeInMsec = 1_000) {
      return new Promise(resolve =>
        setTimeout(resolve, Math.max(0, Math.min(timeInMsec, 20_000)))
      );
    }
    
    </script>
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search