I don’t know if this is possible! I want some generic code. This should act similar to a ReplaySubject but, on subscribing, should emit each most recent message with a unique name. In the code below, the actual output is …
I want this to be logged once ... received the latest bbb 5
And I understand why I am getting this output.
The desired output is (order not important) …
I want this to be logged once ... received the latest aaa 3
I want this to be logged once ... received the latest bbb 5
But I don’t know how to achieve this. I know that I could create a second ReplaySubject, one to deal with aaa
messages and one to deal with bbb
messages. However in my actual code, I have 100+ message names and I don’t want to create 100+ ReplaySubject’s (one for each). Is there a more generic/scalable solution to this problem?
// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'
replaySource = new rxjs.ReplaySubject(1)
replaySource.next({name: 'aaa', payload: 1})
replaySource.next({name: 'bbb', payload: 2})
replaySource.next({name: 'aaa', payload: 3})
replaySource.next({name: 'bbb', payload: 4})
replaySource.next({name: 'bbb', payload: 5})
replaySource.pipe(
rxjs.filter(x => x.name === 'aaa')
).subscribe((x) => {
console.log('I want this to be logged once ... received the latest aaa ' + x.payload)
})
replaySource.pipe(
rxjs.filter(x => x.name === 'bbb')
).subscribe((x) => {
console.log('I want this to be logged once ... received the latest bbb ' + x.payload)
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
10
Answers
Here's an answer showing how I have implemented this based on the answer by @KArslan (his answer now deleted by moderator) - not overly happy with the implementation though as it does not clean up a
ReplaySubject
when it is no longer in use and the solution does not feel like a good clean RxJS approach.Leave the default buffer for your subject (infinite buffer), and pipe distinct (use the arg to specify that only the name property must be distinct).
As far as I’m aware, what you are attempting to do isn’t possible using a
ReplaySubject
.ReplaySubject
doesn’t close when itsnext
function is called, and the observable has no way of knowing which will be the final value until the observable is complete, so operators such aslast
,takeLast
, etc., won’t work. This is because the stream doesn’t end, so the operators don’t know when to stop operating.When you subscribe to a
ReplaySubject
, you get each value in sequence, not at once, so any operator you use can only operate on one value at a time. This is also true for callbacks passed to the subscribe function; you won’t be able to do it within the subscribe block either.The only way to accomplish what you’re looking to do using a single observable is to create a stream of values in an observable that completes.
With an observable that completes, you can use the
reduce
operator to get what you’re looking for.Based on the answer given by OP, i’d like to make following suggestion which i think can leverage the advantage of replaySubject’s functionality better
Here is a solution that only use one subject and pipes. Internally the subject keeps a map of
object.name => object
with scan, so it can always update the corresponding name with the latest object it gets. Then, it emit for each value of the map.The subject needs to be connected right away so it can start running its logic before the first subscription.
If you want the notification from only a specific object, just pipe
filter
onreplaySource
before subscribingYou could make your own pipe operator with a BehaviorSubject ?
You can also specify if you want to listen once, or keep listening
To emit each most recent message with a unique name, you can use a single
ReplaySubject
and filter the messages based on their name. I have kept buffer size to 1. If you want then you can use without any buffer size but in that case it will keep all emitted messages.Please let me know if this helps you.
You can implement your class inherited from Subject. Here is an example code in Typescript (just remove type annotations if you are using JavaScript)
This works for RxJS 6+ (I needed it working for RxJS 6). If using this in RxJS 7 you may want to tweak it to use
connectable
instead of publishReplayIt uses
distinctUntilChanged
to:the contents of the map being fully dumped after each incoming message
NB:
lodash isEqual
is used but this can easily be swapped out for a non-lodash equality check