skip to Main Content

I have some problem trying to understand why do I need to pipe share operator at the outer observable when trying to make shared observable. I would think that it will be sufficient to pass share as part of observable returned inside switchMap, but it looks like it is not the case.

const {
  catchError,
  concatMap,
  count,
  debounceTime,
  delay,
  finalize,
  map,
  mapTo,
  mergeMap,
  startWith,
  switchMap,
  tap,
  take,
  toArray,
  takeUntil,
  bufferTime,
  filter,
  bufferWhen,
  share,
  distinctUntilChanged,
} = rxjs.operators;
const {
  fromEventPattern,
  BehaviorSubject,
  Subject,
  of,
  timer,
  interval,
} = rxjs;

const first = new BehaviorSubject(1);
const destroy = new Subject();
const first$ = first.asObservable().pipe(
  map((value) => parseInt(value, 10)),
  distinctUntilChanged(),
  takeUntil(destroy)
);

const createInterval = () =>
  interval(1000).pipe(
    finalize(() => console.log('finalize')),
    share() // here I create multicast observable
  );

const events$ = first$.pipe(
  share(),
  switchMap(() => createInterval())
  //share() // why this is needed outside and the `share` above is not sufficient ?
);

events$
  .pipe(tap((value) => console.log('first', value)))
  .subscribe((value) => console.log('subscription 1', value));

// not subscribing properly without share() above
events$
  .pipe(tap((value) => console.log('second', value)))
  .subscribe((value) => console.log('subscription 2', value));
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

2

Answers


  1. In the Docs its says:

    Returns a new Observable that multicasts (shares) the original
    Observable. As long as there is at least one Subscriber this
    Observable will be subscribed and emitting data. When all subscribers
    have unsubscribed it will unsubscribe from the source Observable.
    Because the Observable is multicasting it makes the stream hot. This
    is an alias for multicast(() => new Subject()), refCount().

    So we can share the output stream observable to multiple subscribers, without creating new streams!

    To demo this, here I have a code, where there is no share, but a tap above the switchMap, to indicate the interval is getting executed on each subscription! The first subscribe happens immediately, but the second happens after 3 seconds, you will see there are two streams here, so in the context of an API, one which is static, we can use share to reduce the number of calls, this is just an example!

    const {
      catchError,
      concatMap,
      count,
      debounceTime,
      delay,
      finalize,
      map,
      mapTo,
      mergeMap,
      startWith,
      switchMap,
      tap,
      take,
      toArray,
      takeUntil,
      bufferTime,
      filter,
      bufferWhen,
      share,
      distinctUntilChanged,
    } = rxjs.operators;
    const {
      fromEventPattern,
      BehaviorSubject,
      Subject,
      of ,
      timer,
      interval,
    } = rxjs;
    
    const first = new BehaviorSubject(1);
    const destroy = new Subject();
    const first$ = first.asObservable().pipe(
      map((value) => parseInt(value, 10)),
      distinctUntilChanged(),
      takeUntil(destroy)
    );
    
    const createInterval = () =>
      interval(1000).pipe(
        finalize(() => console.log('finalize')),
      );
    
    const events$ = first$.pipe(
      tap(() => console.log('calling')),
      switchMap(() => createInterval()),
      //share() // why this is needed outside and the `share` above is not sufficient ?
    );
    
    events$
      .pipe(tap((value) => console.log('first', value)))
      .subscribe((value) => console.log('subscription 1', value));
    
    setTimeout(() => {
      // not subscribing properly without share() above
      events$
        .pipe(tap((value) => console.log('second', value)))
        .subscribe((value) => console.log('subscription 2', value));
    
    }, 3000);
    <script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

    We do not need multiple shares, because the code above it will be shared, in your case, what I think happened was you are sharing the behavioursubject part (upto the stream above the share!) with two subscribers, since you have distinct until unchanged here, the first subscriber gets subscribed, since the value has changed, but the second does not because it has the same value in the stream!

    The fix for this will be to have a single share, either in the inner observable createInterval or on events$ (as the last pipe operator).

    Finally here is a working example, where it works fine!

    const {
      catchError,
      concatMap,
      count,
      debounceTime,
      delay,
      finalize,
      map,
      mapTo,
      mergeMap,
      startWith,
      switchMap,
      tap,
      take,
      toArray,
      takeUntil,
      bufferTime,
      filter,
      bufferWhen,
      share,
      distinctUntilChanged,
    } = rxjs.operators;
    const {
      fromEventPattern,
      BehaviorSubject,
      Subject,
      of ,
      timer,
      interval,
    } = rxjs;
    
    const first = new BehaviorSubject(1);
    const destroy = new Subject();
    const first$ = first.asObservable().pipe(
      map((value) => parseInt(value, 10)),
      distinctUntilChanged(),
      takeUntil(destroy)
    );
    
    const createInterval = () =>
      interval(1000).pipe(
        finalize(() => console.log('finalize')),
        // share(),
      );
    
    const events$ = first$.pipe(
      tap(() => console.log('calling')),
      switchMap(() => createInterval()),
      share(),
    );
    
    events$
      .pipe(tap((value) => console.log('first', value)))
      .subscribe((value) => console.log('subscription 1', value));
    
    // not subscribing properly without share() above
    events$
      .pipe(tap((value) => console.log('second', value)))
      .subscribe((value) => console.log('subscription 2', value));
    <script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
    Login or Signup to reply.
  2. The share operator in RxJS is used to create a multicast observable, meaning that it allows multiple subscribers to share the same underlying source observable. When you use share within an observable pipeline, it applies to the observable that it is immediately called on and the subsequent operators in that pipeline.

    In your code, you have two occurrences of the share operator:

    Inside the createInterval function:

    const createInterval = () =>
      interval(1000).pipe(
        finalize(() => console.log('finalize')),
        share() // here I create multicast observable
      );

    After the switchMap operator:

    const events$ = first$.pipe(
      share(),
      switchMap(() => createInterval())
    );

    In the second case, the share operator is applied to the observable resulting from the switchMap. This means that the multicasting is applied to the inner observable created by createInterval, and it’s shared among all subscribers to events$.

    Now, regarding your question about why the additional share is needed at the end of the observable pipeline (events$):

    In RxJS, operators are typically applied to the observable that they immediately follow in the pipeline. When you apply share immediately after first$.pipe(switchMap(() => createInterval())), it ensures that the multicasting is shared among all subscribers to the events$ observable itself.

    Without this additional share, each subscription to events$ would create a new instance of the entire observable chain, including the createInterval part. This could lead to separate intervals being created for each subscriber, and they would not be sharing the same underlying source.

    By adding share at the end, you ensure that all subscriptions to events$ share the same multicasted observable, and thus, they share the same underlying source from the createInterval part.

    So, to summarize, the share operator is needed both where you create the interval (createInterval) and at the end of the observable pipeline (events$) to achieve the desired multicasting behavior and share the same underlying source among all subscribers.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search