import { of, mergeMap, concatAll, toArray, take } from 'rxjs'
const duplicateValue = log => value => {
console.log(log, value)
return Promise.resolve([value, value]);
}
of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
mergeMap(duplicateValue('mapping a'), 1),
concatAll(),
mergeMap(duplicateValue('mapping b'), 1),
concatAll(),
take(3),
toArray(),
)
.subscribe(result => {
console.log(result);
});
output:
mapping a
1
mapping b
1
mapping a
2
mapping b
1
mapping a
3
[1, 1, 1]
only 3 values are taken eventually so I would like it to run the mappers as little as possible, if it were truely lazy evaluation then i would expect the first mapper to run once, producing 2 values, then the second mapper to run twice, producing 4 values
the 1
parameter to mergeMap should do something of this sort but does not really
i got a bit close using concatMap
but still not perfect
i think it is something subtle about the interoperation of promises and rxjs
2
Answers
There are a few reasons why the
mergeMap
in your example may not be lazy.Promise.resolve
is already eagerly evaluated, so theduplicateValue
function will always return a resolved promise before themergeMap
even runs.concatAll
operator itself is not lazy. It immediately subscribes to the inner observables and begins emitting values. So even if themergeMap
were lazy, theconcatAll
will still eagerly consume all the values.take
operator also eagerly subscribes to the source observable and begins emitting values. So even if themergeMap
andconcatAll
were lazy, thetake
will prevent them from being lazily evaluated.In order to have lazy evaluation, you would need to use operators like
defer
orObservable.create
to defer the execution until the subscription happens. Additionally, you would have to avoidPromise.resolve
and instead usedefer
orObservable.fromPromise
to create a lazy promise.You have two separate
mergeMap
operators applied to the source observableof(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
. This means that the inner observables created by each mergeMap will run independently, this leads to a scenario where emissions from the first mergeMap are interleaved with emissions from the second mergeMap.If you try to return a synchronous value from the
duplicateValue
function, like this:You’ll see that the output is as you expected:
This is because returning a promise automatically causes the code to be executed using a different scheduler. The fact that it runs on a different scheduler exposes the interleaving behavior explained above.
One possible approach is to nest the
mergeMap
operators like so:This creates a sequential behavior where the inner observable is created and subscribed to for each emission from the source observable