skip to Main Content

I have two observables. Both of them emits a value, and then completes( they dont complete when the value is emited, but after, so their marble diagram would be sth like ---1---| ). I’d like to create an observable from those two observables. This observable should either completes when

1- one of the two provided observables completes, and the other one did not emit any value

2- the two provided observables have emitted a value, and then completed

How can I achieve this behavior ?

Thanks

This is my starting point

const obsC$ = merge(obsA$, obsB$).pipe( ... ) 

Its marble diagram would be sth like

----obsAEmission---|(a completes)

----obsBEmission---|(b completes)

----obsA---obsB----|(a and b completes)

3

Answers


  1. One way to approach this that may be a bit heavy handed, is to materialize your source observables and dematerialize your merged observable.

    There are probably other solutions, but you can give it a try:

    const obsC$ = merge(
        obsA$.pipe(materialize()), 
        obsB$.pipe(materialize())
    ).pipe(
        takeWhile(v => v.hasValue),
        dematerialize()
    );
    

    UPDATE: Wait for both source observable to complete if both have emitted a value.

    It’s the same idea, I just count the number of values and make sure not to materialize a compete too soon.

    const obsC$ = merge(
        obsA$.pipe(materialize()), 
        obsB$.pipe(materialize())
    ).pipe(
        scan(({valueCount}, curr) => ({
            valueCount: valueCount + 1,
            ...curr
        }), {valueCount: 0}),
        takeWhile(v => v.hasValue || v.valueCount > 1),
        filter(v => v.hasValue),
        dematerialize()
    );
    
    Login or Signup to reply.
  2. I feel like what you need is something like this:

    interface State {
      hasEmittedValue: boolean;
      hasCompleted: boolean;
    }
    
    const mapToState =
      () =>
      <T>(obs$: Observable<T>): Observable<State> =>
        obs$.pipe(
          materialize(),
          scan<ObservableNotification<any>, State>(
            (acc, curr) => {
              switch (curr.kind) {
                // next
                case 'N':
                  return { ...acc, hasEmittedValue: true };
                // complete
                case 'C':
                  return { ...acc, hasCompleted: true };
                // error
                case 'E': {
                  // @todo: I don't know what behavior your want if the obs
                  // errors, I'll let you implement that bit
                }
              }
              return acc;
            },
            {
              hasEmittedValue: false,
              hasCompleted: false,
            }
          )
        );
    
    const treatObservables = <T, U>(
      obs$1: Observable<T>,
      obs2$: Observable<U>
    ): Observable<unknown> =>
      combineLatest([obs$1.pipe(mapToState()), obs2$.pipe(mapToState())]).pipe(
        takeWhile(([value1, value2]) => {
          if (
            (value1.hasCompleted && !value2.hasEmittedValue) ||
            (value2.hasCompleted && !value1.hasEmittedValue)
          ) {
            false;
          }
    
          if (
            (value1.hasEmittedValue && !value1.hasCompleted) ||
            (value2.hasEmittedValue && !value2.hasCompleted)
          ) {
            false;
          }
    
          return true;
        })
      );
    

    Code on Stackblitz

    That said, you haven’t specified if you just need to know when the new observable ends or if you want the values in between merged, etc. What I’ve written above doesn’t return any value, I did assume you only wanted to be aware of the final complete as I had to guess

    Login or Signup to reply.
  3. You could create a function that returns an observable with your custom logic (maybe choose a better name than I did 😀):

    function firstCompletedOrBoth<T>(a$: Observable<T>, b$: Observable<T>) {
       
      return new Observable<T>(emit => {
    
        let aEmitted = false;
        let bEmitted = false;
      
        const subA = a$.subscribe({
          next:  a => {
            aEmitted = true;
            emit.next(a);
          },
          complete: () => {
            if(subB.closed || !bEmitted) {
              emit.complete();
            }
          }
        });
    
        // TODO: don't repeat the exact same observer code!
        const subB = b$.subscribe({
          next:  b => {
            bEmitted = true;
            emit.next(b);
          },
          complete: () => {
            if(subA.closed || !aEmitted) {
              emit.complete();
            }
          }
        });
    
        return () => {
          subA.unsubscribe();
          subB.unsubscribe();
        }
      })
    

    The idea here is that you subscribe to each observable and keep track if each of them has emitted. When either of them complete, you evaluate whether or not you want to complete the observable.

    Here’s a StackBlitz demonstrating this behavior.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search