skip to Main Content

I am trying to make a slightly complex queue system in rxjs as follows:

  • any user can request a title at any time as long as they have no request or timeout pending
  • any user with a title can cancel their timeout
  • titles are hold only for 30 seconds tops
  • a title can be hold only by one user at any time
  • multiple titles can be hold by a user once at the same time
  • titles are delivered when nobody else is using them
    • if a title is not being used is delivered immediately when asked for
    • if a title is in use is delivered at the moment the current user gives it back or their time runs out
import {
  Subject,
  groupBy,
  mergeMap,
  switchMap,
  delayWhen,
  timer,
  takeUntil,
  share,
} from "rxjs";

type Snowflake = string;

enum Title {
  A = "A",
  B = "B",
}

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));

// This is an external function, cannot modify
const startTitleReq = (item: { title: Title; user: Snowflake }) => {
  console.log(`Starting request for ${item.user}, requested ${item.title}`);

  // Can take longer in reality
  return sleep(10_000).then(() => ({ success: true }));
};

const queue = new Subject<{ title: Title; user: Snowflake }>();
const cancel = new Subject();

queue
  .pipe(
    groupBy((item) => item.title),
    mergeMap((group$) =>
      group$.pipe(
        switchMap((item) => startTitleReq(item)),
        delayWhen((result) =>
          timer(result.success ? 10_000 : 0).pipe(takeUntil(cancel))
        )
      )
    ),
    share()
  )
  .subscribe(({ success }) => {
    console.log({ success });
  });

queue.next({ user: "1", title: Title.A });
queue.next({ user: "2", title: Title.A });
queue.next({ user: "3", title: Title.B });

So the expected results would be:

queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds
queue.next({ user: "2", title: Title.A }); // 3, success log after 40 seconds
queue.next({ user: "3", title: Title.B }); // 2, success log after 20 seconds

Now let’s say this were the start:

queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds

// After 5 seconds..
cancel.next();

queue.next({ user: "2", title: Title.A }); // 2, success log after 25 seconds

Link to playgound

2

Answers


  1. Good thinking with the mocks and trying to simplify the issue to be as minimal as possible.

    I’ve taken a pick at what I think is what you want:

    const request$$ = new Subject<string>();
    
    const queue$ = request$$.pipe(
      concatMap((title) =>
        getTitle(title).pipe(
          map((_) => ({ title, success: true })),
          timeout(30_000),
          catchError((e) => {
            return of({ title, success: false });
          })
        )
      )
    );
    

    When I give a go with a random delay for the getTitle method, here’s what we get:

    (Note that I’ve lowered down the timeout to 5s instead of 30 and made the random delay go between 1 and 10s)

    Starting request for title Title 0 (delay 5006)
    {title: "Title 0", success: false}
    
    Starting request for title Title 1 (delay 9117)
    {title: "Title 1", success: false}
    
    Starting request for title Title 2 (delay 1160)
    {title: "Title 2", success: true}
    
    Starting request for title Title 3 (delay 1521)
    {title: "Title 3", success: true}
    
    Starting request for title Title 4 (delay 5109)
    {title: "Title 4", success: false}
    
    Starting request for title Title 5 (delay 8671)
    {title: "Title 5", success: false}
    
    Starting request for title Title 6 (delay 6618)
    {title: "Title 6", success: false}
    
    Starting request for title Title 7 (delay 3804)
    {title: "Title 7", success: true}
    
    Starting request for title Title 8 (delay 2827)
    {title: "Title 8", success: true}
    
    Starting request for title Title 9 (delay 6142)
    {title: "Title 9", success: false}
    

    Here’s the live demo with the mocks.


    EDIT: Based on the comments it looks like I did not understood the issue the first time. Here’s the live demo for my second take at it.

    Bulk of the code:

    const queue$$ = new Subject<{ title: Title; user: Snowflake }>();
    const cancel$$ = new Subject<void>();
    
    const queue$ = queue$$.pipe(
      concatMap((item) =>
        startTitleReq(item).pipe(
          timeout(10_000),
          catchError(() => {
            return of({ success: false });
          }),
          takeUntil(cancel$$)
        )
      )
    );
    
    Login or Signup to reply.
  2. To manage individual job queues while allowing only one job to run at a time with RxJS, you can modify the existing code as follows:

    import {
      Subject,
      groupBy,
      mergeMap,
      switchMap,
      delayWhen,
      timer,
      takeUntil,
      share,
    } from "rxjs";
    
    type Snowflake = string;
    
    enum Title {
      A = "A",
      B = "B",
    }
    
    interface QueueItem {
      title: Title;
      user: Snowflake;
      timeoutId?: NodeJS.Timeout;
    }
    
    const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
    
    // This is an external function, cannot modify
    const startTitleReq = (item: { title: Title; user: Snowflake }) => {
      console.log(`Starting request for ${item.user}, requested ${item.title}`);
    
      // Can take longer in reality
      return sleep(10_000).then(() => ({ success: true }));
    };
    
    const queue = new Subject<QueueItem>();
    const cancel = new Subject();
    
    const queueItems: { [key in Title]?: QueueItem } = {};
    
    queue
      .pipe(
        groupBy((item) => item.title),
        mergeMap((group$) =>
          group$.pipe(
            switchMap((item) => {
              // Clear the timeout if it exists
              if (item.timeoutId) {
                clearTimeout(item.timeoutId);
              }
    
              // Check if the title is already in use
              if (queueItems[item.title]) {
                console.log(`Title ${item.title} is currently in use`);
                return [];
              }
    
              // Add the item to the queueItems object
              queueItems[item.title] = item;
    
              return startTitleReq(item);
            }),
            delayWhen((result) =>
              timer(result.success ? 10_000 : 0).pipe(takeUntil(cancel))
            ),
            takeUntil(cancel), // Cancel the request if canceled externally
            // Store the timeout ID to be able to cancel it later
            switchMap((result) => {
              // Remove the item from queueItems
              delete queueItems[result.title];
    
              const timeoutId = setTimeout(() => {
                console.log(`Timeout for ${result.user}, requested ${result.title}`);
                queue.next(result); // Add the item back to the queue after timeout
              }, 30_000);
    
              return [{ ...result, timeoutId }];
            })
          )
        ),
        share()
      )
      .subscribe(({ success }) => {
        console.log({ success });
      });
    
    queue.next({ user: "1", title: Title.A });
    queue.next({ user: "2", title: Title.A });
    queue.next({ user: "3", title: Title.B });
    
    // After 5 seconds
    cancel.next();
    
    queue.next({ user: "2", title: Title.A });

    Explanation of the changes:

    1. Introduced a new interface QueueItem to represent items in the queue.
    2. Added a timeoutId property to QueueItem to store the timeout identifier.
    3. Added a step to clear the timeout using clearTimeout before making a new
    request. This cancels any pending timeouts for the same item.
    4. Added takeUntil(cancel) to cancel the request if it is canceled externally.
    5. Added a new `switchMap

    I did my best for solve the Problem.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search