skip to Main Content

I want to combine two subjects in similar way as zip does, but with the following important difference.

I want an observables that combines two subjects in this way. When both have emitted values, emit the latest of their values. Then, both need to emit at least once again, then emit their latest emitted values. Etc.

Both subjects:

1 ———- 2 —– 3 — 4 ——————- 5 ——

——- a ——————- b — c — d —-————

Goal observable:

——- 1a —————– 4b —————-5d —-

Simple example: subject1 emits 5 times and after that subject2 emits once. The first emitted value of zip is a pair of both first emits of the subjects. I want (subj1-emit5, subj2-emit1).

2

Answers


  1. How about withLatestFrom ?

    import './style.css';
    
    import { rx, of, tap, interval, withLatestFrom } from 'rxjs';
    const obs1$ = interval(1000);
    const obs2$ = interval(5000);
    
    obs2$
      .pipe(
        withLatestFrom(obs1$),
        tap(([first, second]) => {
          console.log(`second Source (5s): ${first} first Source (1s): ${second}`);
        })
      )
      .subscribe();
    
    // Open the console in the bottom right to see results.
    

    stackblitz

    Login or Signup to reply.
  2. I would proceed building an almostZip function with something like this (comments within the code to explain the logic)

    function almostZip(o1$: Observable<any>, o2$: Observable<any>) {
      // we create a variable within the closure to hold the state of the Observable
      // in particular we store the values emitted by the 2 Observables received as input
      const valuesEmitted = [null, null]
      const _o1$ = o1$.pipe(
        // when o1$ emits, then we load the value emitted into the state variable first position
        tap(v => valuesEmitted[0] = v)
      )
      const _o2$ = o2$.pipe(
        // when o2$ emits, then we load the value emitted into the state variable second position
        tap(v => valuesEmitted[1] = v)
      )
    
      // this is the Observable returned by the function almostZip
      return merge(_o1$, _o2$).pipe(
        // filter out all notifications until both slots of the state var are not null
        filter(() => {
          return !!valuesEmitted[0] && !!valuesEmitted[1]
        }),
        // notify a copy of the state
        map(() => [...valuesEmitted]),
        // reset the state
        tap(() => {
          valuesEmitted[0] = null
          valuesEmitted[1] = null
        }),
      )
    }
    

    Here a stackblitz that represents how this work

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search