skip to Main Content

I have a Node.js application that users can interact with. Users can request a title (A or B). It takes some time for the application to hand out the title. After the title is given, the user gets to keep it for 60 seconds or they can abort manually to allow the next user that requested the same title to get it.

The application can only process 1 request at a time so I thought I need a main queue and individual queues for each title so that 2 users who request a different title do not have to wait for 60 seconds or a abort signal, only for the request.

I tried to make use of the p-queue library to build this functionality and used it like below:

import PQueue from "p-queue";

// Main queue to make sure app handles 1 job at a time
const queue = new PQueue({ concurrency: 1 });

// Function to add job to the main queue
const addJobToQueue = async (
  cb: (...args: unknown[]) => Promise<unknown>,
  options?: Partial<QueueAddOptions>
) => {
  await queue.add(cb, options);
};

enum Title = {
  A = "a",
  B = "b"
}

const titleRequestQueues = {
  [Title.A]: new PQueue({ concurrency: 1 }),
  [Title.B] new PQueue({ concurrency: 1 }),
}

const controller = new AbortController();

// User 1 requests title A
// User 2 requests title A
// User 3 requests title B

// User 2 has to wait for user 1 `promptForCancel` (see `addTitle`)
// User 3 only has to wait for User 1 `fetchTitleForUser` (see `addTitle`)
// So given these requests, the order should be 1, 3, 2
const requestTitle = async (title: Title) => {
  try {
    await titleRequestQueues[title].add(
      () => addJobToQueue(
        () => addTitle(Title.A)
      ),
      { signal: controller.signal }
    );
  } catch (error) {
    if (!(error instanceof AbortError)) {
      throw error;
    }
  }
}

const addTitle = async (_title, controller) => {
  // Some code that takes time to run
  await fetchTitleForUser()

  try {
     // `promptForCancel` Throws when an error when time expired.
     // Otherwise it resolves.
    await promptForCancel(60 * 1000)

    // Resolved. Send abort signal to main queue to allow
    // the next job to run.
    controller.abort();
  } catch {
    // Time expired, send abort sginal to main queue to allow
    // the next job to run.
    controller.abort();
  }
}

Currently the behaviour is that 1 job runs at a time in general as desired, but when User 3 sends a request, it still has to wait for User 1’s promptForCancel which should not be the case.

2

Answers


  1. Instead of handling the queue and holding the HTTP/TCP connection for 60 seconds, you can try using redis to store the current user’s access to that title, and instead of request-reply pattern you can use the fire-and-forget strategy (polling) because http has some default timeout limit, which will eventually finish as more number of users come, here user keeps checking every x seconds if he has access to it or not, once user requests, just maintain the sequence of users separately and also the current user who has access to the title.
    Also how do you give the access to the next user if key expires in redis?. In every poll for access check by the users, you can check if the key is empty/expired, if it is expired/empty then find the next user in queue and the set the key for that user. This way you can scale it for as many number of users possible.
    Manual abort is the easiest, just delete the key from redis and it will be automatically set in the next poll.

    Login or Signup to reply.
  2. I think you can do this with promises all the way: consider that a queue in this context is nothing more than promises that are then-chained. For "aborting" I would use Promise.race, but I suppose you could also use AbortController.

    Below snippet uses the HTML document to show a cancel button for each handed out title. So there is one class CancelButton that has specific code for managing the DOM.

    The request for the handout is mocked by a delay of 2 seconds.

    The rest is plain promise-based code:

    // Code specific to this HTML user interface:
    class CancelButton {
        button;
        clicked;
        
        constructor(prompt) {
            // Create and show an HTML button
            this.button = document.createElement("button");
            this.button.textContent = prompt;
            document.body.append(this.button);
            // A promise that will resolve when user clicks the button
            this.clicked = new Promise(resolve =>
                this.button.addEventListener("click", resolve, { once: true })
            );
        }
        
        remove() {
            this.button.remove();
        }
    };
    
    // Class for managing the execution of async jobs in a queued fashion.
    class Chain {
        tailPromise = Promise.resolve();
        
        // Method that returns a promise that resolves when the given asyncJob has executed 
        //    and that job's returned promise resolved
        add(asyncJob) { 
            const promise = this.tailPromise.then(asyncJob);
            this.tailPromise = promise; // chain this promise to this "queue"
            return promise;
        }
    }
    
    // enum 
    const Title = { A: "a", B: "b" };
    
    // Main promise chain to make sure app handles 1 title-handout job at a time
    const titleHandoutChain = new Chain;
    
    // Promise chain per title to ensure only one user can own a title at a time
    const titleUseChains = {
        [Title.A]: new Chain,
        [Title.B]: new Chain,
    };
    
    // Function to mock an asynchronous API call
    const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms));
    
    const handoutTitle = (title) => delay(2_000);
    
    const promptForCancel = async (user, title, timeout) => {
        const cancelButton = new CancelButton(`Cancel user ${user}'s use of title ${title}`); 
        // await whatever happens first: click on cancel button or timeout expired
        await Promise.race([cancelButton.clicked, delay(timeout)]); 
        cancelButton.remove();
    }
    
    const requestAndUseTitle = async (user, title) => {
        console.log(`User ${user}: Waiting for title ${title} to become available.`);
        // First wait for the moment that the title is free...
        await titleUseChains[title].add(async () => {
            console.log(`User ${user}: Title ${title} is available. Waiting for handout mechanism to become available.`);
            await titleHandoutChain.add(async () => {
                console.log(`User ${user}: Handout mechanism is available. Handing out title ${title} (takes 2 seconds).`);
                await handoutTitle(title);
            });
            console.log(`User ${user}: Ownership of title ${title} started - user may cancel.`);
            await promptForCancel(user, title, 60_000);
        });
        console.log(`User ${user}: No more ownership of title ${title}.`);
    }
    
    // Main:
    // User 1 requests title A
    requestAndUseTitle(1, Title.A);
    // User 2 requests title A
    requestAndUseTitle(2, Title.A);
    // User 3 requests title B
    requestAndUseTitle(3, Title.B);
    // User 2 has to wait for user 1 `promptForCancel` (see `addTitle`)
    // User 3 only has to wait for User 1 `handoutTitle` (see `addTitle`)
    // So given these requests, the order should be 1, 3, 2
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search