I am trying to make a slightly complex queue system in rxjs as follows:
- any user can request a title at any time as long as they have no request or timeout pending
- any user with a title can cancel their timeout
- titles are hold only for 30 seconds tops
- a title can be hold only by one user at any time
- multiple titles can be hold by a user once at the same time
- titles are delivered when nobody else is using them
- if a title is not being used is delivered immediately when asked for
- if a title is in use is delivered at the moment the current user gives it back or their time runs out
import {
Subject,
groupBy,
mergeMap,
switchMap,
delayWhen,
timer,
takeUntil,
share,
} from "rxjs";
type Snowflake = string;
enum Title {
A = "A",
B = "B",
}
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
// This is an external function, cannot modify
const startTitleReq = (item: { title: Title; user: Snowflake }) => {
console.log(`Starting request for ${item.user}, requested ${item.title}`);
// Can take longer in reality
return sleep(10_000).then(() => ({ success: true }));
};
const queue = new Subject<{ title: Title; user: Snowflake }>();
const cancel = new Subject();
queue
.pipe(
groupBy((item) => item.title),
mergeMap((group$) =>
group$.pipe(
switchMap((item) => startTitleReq(item)),
delayWhen((result) =>
timer(result.success ? 10_000 : 0).pipe(takeUntil(cancel))
)
)
),
share()
)
.subscribe(({ success }) => {
console.log({ success });
});
queue.next({ user: "1", title: Title.A });
queue.next({ user: "2", title: Title.A });
queue.next({ user: "3", title: Title.B });
So the expected results would be:
queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds
queue.next({ user: "2", title: Title.A }); // 3, success log after 40 seconds
queue.next({ user: "3", title: Title.B }); // 2, success log after 20 seconds
Now let’s say this were the start:
queue.next({ user: "1", title: Title.A }); // 1, success log after 10 seconds
// After 5 seconds..
cancel.next();
queue.next({ user: "2", title: Title.A }); // 2, success log after 25 seconds
2
Answers
Good thinking with the mocks and trying to simplify the issue to be as minimal as possible.
I’ve taken a pick at what I think is what you want:
When I give a go with a random delay for the
getTitle
method, here’s what we get:(Note that I’ve lowered down the timeout to 5s instead of 30 and made the random delay go between 1 and 10s)
Here’s the live demo with the mocks.
EDIT: Based on the comments it looks like I did not understood the issue the first time. Here’s the live demo for my second take at it.
Bulk of the code:
To manage individual job queues while allowing only one job to run at a time with RxJS, you can modify the existing code as follows:
Explanation of the changes:
1. Introduced a new interface QueueItem to represent items in the queue.
2. Added a timeoutId property to QueueItem to store the timeout identifier.
3. Added a step to clear the timeout using clearTimeout before making a new
request. This cancels any pending timeouts for the same item.
4. Added takeUntil(cancel) to cancel the request if it is canceled externally.
5. Added a new `switchMap
I did my best for solve the Problem.