skip to Main Content

I don’t know if this is possible! I want some generic code. This should act similar to a ReplaySubject but, on subscribing, should emit each most recent message with a unique name. In the code below, the actual output is …

I want this to be logged once ... received the latest bbb 5

And I understand why I am getting this output.

The desired output is (order not important) …

I want this to be logged once ... received the latest aaa 3
I want this to be logged once ... received the latest bbb 5

But I don’t know how to achieve this. I know that I could create a second ReplaySubject, one to deal with aaa messages and one to deal with bbb messages. However in my actual code, I have 100+ message names and I don’t want to create 100+ ReplaySubject’s (one for each). Is there a more generic/scalable solution to this problem?

// import { ReplaySubject } from 'rxjs'
// import { filter } from 'rxjs/operators'

replaySource = new rxjs.ReplaySubject(1)

replaySource.next({name: 'aaa', payload: 1})
replaySource.next({name: 'bbb', payload: 2})
replaySource.next({name: 'aaa', payload: 3})
replaySource.next({name: 'bbb', payload: 4})
replaySource.next({name: 'bbb', payload: 5})

replaySource.pipe(
  rxjs.filter(x => x.name === 'aaa')
).subscribe((x) => {
  console.log('I want this to be logged once ... received the latest aaa ' + x.payload)
})

replaySource.pipe(
  rxjs.filter(x => x.name === 'bbb')
).subscribe((x) => {
  console.log('I want this to be logged once ... received the latest bbb ' + x.payload)
})
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>

10

Answers


  1. Chosen as BEST ANSWER

    Here's an answer showing how I have implemented this based on the answer by @KArslan (his answer now deleted by moderator) - not overly happy with the implementation though as it does not clean up a ReplaySubject when it is no longer in use and the solution does not feel like a good clean RxJS approach.

    const messageReplaySources = {}
    
    const messageReplay$ = name => {
      // create ReplaySubject (if it does not exist) on subscription
      createReplay(name)
      return messageReplaySources[name].asObservable()
    }
    
    const sendReplay = (name, payload) => {
      // create ReplaySubject (if it does not exist) when message is sent
      createReplay(name)
      const message = { name, payload }
      messageReplaySources[name].next(message)
    }
    
    const createReplay = name => {
      if (messageReplaySources[name] == null) messageReplaySources[name] = new rxjs.ReplaySubject(1)
    }
    
    sendReplay('aaa', 1)
    sendReplay('bbb', 2)
    sendReplay('aaa', 3)
    sendReplay('bbb', 4)
    sendReplay('bbb', 5)
    
    messageReplay$('aaa').subscribe(message => {
      console.log(`I want this to be logged once ... received the latest ${message.name} ${message.payload}`)
    })
    
    messageReplay$('bbb').subscribe(message => {
      console.log(`I want this to be logged once ... received the latest ${message.name} ${message.payload}`)
    })
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>


  2. Leave the default buffer for your subject (infinite buffer), and pipe distinct (use the arg to specify that only the name property must be distinct).

    // import { ReplaySubject } from 'rxjs'
    // import { filter } from 'rxjs/operators'
    
    replaySource = new rxjs.ReplaySubject().pipe(rxjs.distinct(x => x.name))
    
    replaySource.next({name: 'aaa', payload: 1});
    replaySource.next({name: 'bbb', payload: 2});
    replaySource.next({name: 'aaa', payload: 3});
    replaySource.next({name: 'bbb', payload: 4});
    replaySource.next({name: 'bbb', payload: 5});
    
    replaySource.subscribe(x => {
      console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
    })
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
    Login or Signup to reply.
  3. As far as I’m aware, what you are attempting to do isn’t possible using a ReplaySubject. ReplaySubject doesn’t close when its next function is called, and the observable has no way of knowing which will be the final value until the observable is complete, so operators such as last, takeLast, etc., won’t work. This is because the stream doesn’t end, so the operators don’t know when to stop operating.

    When you subscribe to a ReplaySubject, you get each value in sequence, not at once, so any operator you use can only operate on one value at a time. This is also true for callbacks passed to the subscribe function; you won’t be able to do it within the subscribe block either.

    The only way to accomplish what you’re looking to do using a single observable is to create a stream of values in an observable that completes.

    const myObservable = new Observable((subscriber) => {
        subscriber.next({ name: 'aaa', payload: 1 });
        subscriber.next({ name: 'bbb', payload: 2 });
        subscriber.next({ name: 'aaa', payload: 3 });
        subscriber.next({ name: 'bbb', payload: 4 });
        subscriber.next({ name: 'bbb', payload: 5 });
        subscriber.complete();
    });
    

    With an observable that completes, you can use the reduce operator to get what you’re looking for.

    myObservable
        .filter(x => x.name === 'aaa')
        .reduce((acc, val) => val)
        .subscribe(x => {
            console.log('I want this to be logged once … received the latest aaa ' + x.payload);
        });
    
    myObservable
        .filter(x => x.name === 'bbb')
        .reduce((acc, val) => val)
        .subscribe(x => {
            console.log('I want this to be logged once … received the latest bbb ' + x.payload);
        });
    
    Login or Signup to reply.
  4. Based on the answer given by OP, i’d like to make following suggestion which i think can leverage the advantage of replaySubject’s functionality better

    const setReplay = new rxjs.Subject(1) // the setter
    const replayWatcher = new rxjs.ReplaySubject(1) // where the full Replay is stored
    
    // append the stored Replay with new value
    // to stop listening for new value, just unsubscribe replayListner
    const replayListner = setReplay.pipe(
      rxjs.scan((acc, value, index) => {
        acc[value.name] = value.payload
        return acc
      }, {})
    ).subscribe(replayWatcher)
    
    setReplay.next({name: 'aaa', payload: 1});
    setReplay.next({name: 'bbb', payload: 2});
    setReplay.next({name: 'aaa', payload: 3});
    setReplay.next({name: 'bbb', payload: 4});
    setReplay.next({name: 'bbb', payload: 5});
    
    // u can create any kind of parsing method using pipe method of Observable
    function getAllReplayInArrayFormat(){
      return replayWatcher.pipe(
        rxjs.map(allReplay => Object.keys(allReplay).map(
          keys => `I want this to be logged once ... received the latest ${keys} ${allReplay[keys]}`
        )),
        rxjs.take(1),
      )
    }
    function getFilteredReplay(name){
      return replayWatcher.pipe(
        rxjs.map(allReplay => allReplay[name]),
        rxjs.map(filteredReplay => `I want this to be logged once ... received the latest ${name} ${filteredReplay}`),
        rxjs.take(1),
      )
    }
    
    replayWatcher.subscribe(console.log).unsubscribe();
    getAllReplayInArrayFormat().subscribe(console.log);
    getFilteredReplay("aaa").subscribe(console.log);
    getFilteredReplay("bbb").subscribe(console.log);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
    Login or Signup to reply.
  5. Here is a solution that only use one subject and pipes. Internally the subject keeps a map of object.name => object with scan, so it can always update the corresponding name with the latest object it gets. Then, it emit for each value of the map.
    The subject needs to be connected right away so it can start running its logic before the first subscription.

    If you want the notification from only a specific object, just pipe filter on replaySource before subscribing

    replaySource = new rxjs.ReplaySubject(1).pipe(
      rxjs.scan((acc, item) => {
        acc[item.name] = item
        return acc
      }, {}),
      rxjs.switchMap(obj => rxjs.from(Object.values(obj)))
    )
    rxjs.connectable(replaySource).connect()
    
    replaySource.next({ name: 'aaa', payload: 1 })
    replaySource.next({ name: 'bbb', payload: 2 })
    replaySource.next({ name: 'aaa', payload: 3 })
    replaySource.next({ name: 'bbb', payload: 4 })
    replaySource.next({ name: 'bbb', payload: 5 })
    
    replaySource.subscribe(x => {
      console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
    })
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
    Login or Signup to reply.
  6. You could make your own pipe operator with a BehaviorSubject ?

    You can also specify if you want to listen once, or keep listening

    const source = new rxjs.BehaviorSubject('');
    
    function monitor(name, listenOnce = true) {
       const operators = [rxjs.filter(v => v.name === name)];
       if (listenOnce) operators.push(rxjs.first());
       return rxjs.pipe(...operators);
    }
    
    function log({ name, payload }) { console.log(`Name is ${name}, value is ${payload}`); }
    
    const aaaSub = source.pipe(monitor('aaa')).subscribe(log);
    const bbbSub = source.pipe(monitor('bbb', false)).subscribe(log);
    
    source.next({name: 'aaa', payload: 1})
    source.next({name: 'bbb', payload: 2})
    source.next({name: 'aaa', payload: 3})
    source.next({name: 'bbb', payload: 4})
    source.next({name: 'bbb', payload: 5})
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
    Login or Signup to reply.
  7. To emit each most recent message with a unique name, you can use a single ReplaySubject and filter the messages based on their name. I have kept buffer size to 1. If you want then you can use without any buffer size but in that case it will keep all emitted messages.

    Please let me know if this helps you.

    import { ReplaySubject, from } from 'rxjs';
    import { groupBy, mergeMap, map } from 'rxjs/operators';
    
    interface Message {
      name: string;
      payload: number;
    }
    
    const messages: Message[] = [
      { name: 'aaa', payload: 1 },
      { name: 'bbb', payload: 2 },
      { name: 'ccc', payload: 1 },
      { name: 'aaa', payload: 3 },
      { name: 'bbb', payload: 4 },
      { name: 'bbb', payload: 5 },
      // more messages
    ];
    
    const replaySubject = new ReplaySubject<Message>(1);
    
    from(messages)
      .pipe(
        groupBy((message) => message.name),
        mergeMap((group$) => group$.pipe(map((message) => ({ name: group$.key, payload: message.payload })))),
      )
      .subscribe((message) => {
        console.log(`I want this to be logged once ... received the latest ${message.name} with ${message.payload}`);
      });
    
    Login or Signup to reply.
  8. const subject = new rxjs.Subject();
    const replay = subject.asObservable().pipe(
      rxjs.scan((map, value) => map.set(value.name, value), new Map()),
      rxjs.shareReplay(1),
      rxjs.switchMap((map) => rxjs.from(map.values()))
    );
    
    replay.subscribe() // You need to call subscribe before any next() methods to activate the replay Observable or use
    // connectable(replay).connect()
    
    subject.next({name: 'aaa', payload: 1})
    subject.next({name: 'bbb', payload: 2})
    subject.next({name: 'aaa', payload: 3})
    subject.next({name: 'bbb', payload: 4})
    subject.next({name: 'bbb', payload: 5})
    
    replay.subscribe((x) => {
      console.log(`I want this to be logged once ... received the latest ${x.name} ${x.payload}`)
    })
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
    Login or Signup to reply.
  9. You can implement your class inherited from Subject. Here is an example code in Typescript (just remove type annotations if you are using JavaScript)

    import { PartialObserver, Subject, Subscription } from "rxjs";
    import { filter } from "rxjs/operators";
    
    class ReplayLatestPayloadSubject<T> extends Subject<T> {
      // Array of keys used to replay the events for a new subscriber in the order they arrived.
      // If order doesn't matter look on the keys in _cached dictionary instead in replayOldMessages
      private _keys: string[] = [];
      // A dictionary to map a key to the latest payload.
      private _cached: Record<string, T> = {};
    
      // construtor having a getKey message to get the key for the object (in the example the name property)
      constructor(private getKey: (value: T) => string) {
        super();
      }
    
      // Override the subscribe method (all overloads)
      public subscribe(observer?: PartialObserver<T>): Subscription;
      public subscribe(
        next?: (value: T) => void,
        error?: (error: any) => void,
        complete?: () => void
      ): Subscription;
      public subscribe(
        observerOrNext?: PartialObserver<T> | ((value: T) => void),
        error?: (error: any) => void,
        complete?: () => void
      ): Subscription {
        if (typeof observerOrNext === "function") {
          // The overload with a next method is used
          this.replayOldMessages(observerOrNext);
        } else if (observerOrNext?.next) {
          // the overload with an observer is used.
          this.replayOldMessages((v) => observerOrNext.next(v));
        }
        // Call base class
        return super.subscribe(observerOrNext as any, error, complete);
      }
    
      // replay messages when a new subscriber is called
      private replayOldMessages(next: (value: T) => void) {
        for (let key of this._keys) {
          next(this._cached[key]);
        }
      }
    
      // override the next function to store the values to be replayed later.
      public next(value?: T): void {
        if (value) {
          const key = this.getKey(value);
          if (!this._cached[key]) {
            this._keys.push(key);
          }
          this._cached[key] = value;
        }
        super.next(value);
      }
    }
    
    // Example
    
    const replaySource = new ReplayLatestPayloadSubject<{
      name: string;
      payload: number;
    }>((v) => v.name);
    
    replaySource.next({ name: "aaa", payload: 1 });
    replaySource.next({ name: "bbb", payload: 2 });
    replaySource.next({ name: "aaa", payload: 3 });
    replaySource.next({ name: "bbb", payload: 4 });
    replaySource.next({ name: "bbb", payload: 5 });
    
    replaySource.pipe(filter((x) => x.name === "aaa")).subscribe((x) => {
      console.log(
        "I want this to be logged once ... received the latest aaa " + x.payload
      );
    });
    
    replaySource.pipe(filter((x) => x.name === "bbb")).subscribe((x) => {
      console.log(
        "I want this to be logged once ... received the latest bbb " + x.payload
      );
    });
    
    Login or Signup to reply.
  10. This works for RxJS 6+ (I needed it working for RxJS 6). If using this in RxJS 7 you may want to tweak it to use connectable instead of publishReplay

    It uses distinctUntilChanged to:

    • avoid duplicate processing
    • fix the bug spoken of by Ákos Vandra-Meyer wherein … messages sent after subscribing result in the contents of the map being fully dumped after each incoming message

    NB: lodash isEqual is used but this can easily be swapped out for a non-lodash equality check

    const AAA = 'aaa'
    const BBB = 'bbb'
    
    const RS1 = new rxjs.Subject()
    
    const RS2 = RS1.pipe(
      rxjs.scan((acc, item) => ({...acc, [item.name]: item}), {}),
      rxjs.distinctUntilChanged(_.isEqual),
      rxjs.publishReplay(1)
    )
    RS2.connect()
    
    const replay$ = RS2.pipe(
      rxjs.switchMap(obj => rxjs.from(Object.values(obj)))
    ).asObservable()
    
    console.log('before sub')
    
    RS1.next({name: AAA, payload: 1})
    RS1.next({name: BBB, payload: 2})
    RS1.next({name: AAA, payload: 3})
    RS1.next({name: BBB, payload: 4})
    RS1.next({name: BBB, payload: 5})
    
    replay$.pipe(
      rxjs.filter(x => x.name === AAA),
      rxjs.distinctUntilChanged(_.isEqual)
    ).subscribe(x => {
      console.log(`${x.name} ${x.payload}`)
    })
    
    replay$.pipe(
      rxjs.filter(x => x.name === BBB),
      rxjs.distinctUntilChanged(_.isEqual)
    ).subscribe(x => {
      console.log(`${x.name} ${x.payload}`)
    })
    
    console.log('after sub')
    
    RS1.next({name: BBB, payload: 5}) // does nothing coz of distinctUntilChanged
    RS1.next({name: BBB, payload: 6})
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/lodash.js/4.17.21/lodash.min.js"></script>
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search