I have some problem trying to understand why do I need to pipe share
operator at the outer observable when trying to make shared observable. I would think that it will be sufficient to pass share
as part of observable returned inside switchMap
, but it looks like it is not the case.
const {
catchError,
concatMap,
count,
debounceTime,
delay,
finalize,
map,
mapTo,
mergeMap,
startWith,
switchMap,
tap,
take,
toArray,
takeUntil,
bufferTime,
filter,
bufferWhen,
share,
distinctUntilChanged,
} = rxjs.operators;
const {
fromEventPattern,
BehaviorSubject,
Subject,
of,
timer,
interval,
} = rxjs;
const first = new BehaviorSubject(1);
const destroy = new Subject();
const first$ = first.asObservable().pipe(
map((value) => parseInt(value, 10)),
distinctUntilChanged(),
takeUntil(destroy)
);
const createInterval = () =>
interval(1000).pipe(
finalize(() => console.log('finalize')),
share() // here I create multicast observable
);
const events$ = first$.pipe(
share(),
switchMap(() => createInterval())
//share() // why this is needed outside and the `share` above is not sufficient ?
);
events$
.pipe(tap((value) => console.log('first', value)))
.subscribe((value) => console.log('subscription 1', value));
// not subscribing properly without share() above
events$
.pipe(tap((value) => console.log('second', value)))
.subscribe((value) => console.log('subscription 2', value));
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
2
Answers
In the Docs its says:
So we can share the output stream observable to multiple subscribers, without creating new streams!
To demo this, here I have a code, where there is no share, but a tap above the switchMap, to indicate the interval is getting executed on each subscription! The first subscribe happens immediately, but the second happens after 3 seconds, you will see there are two streams here, so in the context of an API, one which is static, we can use share to reduce the number of calls, this is just an example!
We do not need multiple shares, because the code above it will be shared, in your case, what I think happened was you are sharing the behavioursubject part (upto the stream above the share!) with two subscribers, since you have distinct until unchanged here, the first subscriber gets subscribed, since the value has changed, but the second does not because it has the same value in the stream!
The fix for this will be to have a single share, either in the inner observable
createInterval
or onevents$
(as the last pipe operator).Finally here is a working example, where it works fine!
The share operator in RxJS is used to create a multicast observable, meaning that it allows multiple subscribers to share the same underlying source observable. When you use share within an observable pipeline, it applies to the observable that it is immediately called on and the subsequent operators in that pipeline.
In your code, you have two occurrences of the share operator:
Inside the createInterval function:
After the switchMap operator:
In the second case, the share operator is applied to the observable resulting from the switchMap. This means that the multicasting is applied to the inner observable created by createInterval, and it’s shared among all subscribers to events$.
Now, regarding your question about why the additional share is needed at the end of the observable pipeline (events$):
In RxJS, operators are typically applied to the observable that they immediately follow in the pipeline. When you apply share immediately after first$.pipe(switchMap(() => createInterval())), it ensures that the multicasting is shared among all subscribers to the events$ observable itself.
Without this additional share, each subscription to events$ would create a new instance of the entire observable chain, including the createInterval part. This could lead to separate intervals being created for each subscriber, and they would not be sharing the same underlying source.
By adding share at the end, you ensure that all subscriptions to events$ share the same multicasted observable, and thus, they share the same underlying source from the createInterval part.
So, to summarize, the share operator is needed both where you create the interval (createInterval) and at the end of the observable pipeline (events$) to achieve the desired multicasting behavior and share the same underlying source among all subscribers.