skip to Main Content
import { of, mergeMap, concatAll, toArray, take } from 'rxjs'

const duplicateValue = log => value => {
  console.log(log, value)
  return Promise.resolve([value, value]);
}

of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
  mergeMap(duplicateValue('mapping a'), 1),
  concatAll(),
  mergeMap(duplicateValue('mapping b'), 1),
  concatAll(),
  take(3),
  toArray(),
)
  .subscribe(result => {
    console.log(result);
  });

output:

mapping a
1
mapping b
1
mapping a
2
mapping b
1
mapping a
3
[1, 1, 1]

only 3 values are taken eventually so I would like it to run the mappers as little as possible, if it were truely lazy evaluation then i would expect the first mapper to run once, producing 2 values, then the second mapper to run twice, producing 4 values

the 1 parameter to mergeMap should do something of this sort but does not really

i got a bit close using concatMap but still not perfect

i think it is something subtle about the interoperation of promises and rxjs

2

Answers


  1. There are a few reasons why the mergeMap in your example may not be lazy.

    1. Promise.resolve is already eagerly evaluated, so the duplicateValue function will always return a resolved promise before the mergeMap even runs.
    2. The concatAll operator itself is not lazy. It immediately subscribes to the inner observables and begins emitting values. So even if the mergeMap were lazy, the concatAll will still eagerly consume all the values.
    3. The take operator also eagerly subscribes to the source observable and begins emitting values. So even if the mergeMap and concatAll were lazy, the take will prevent them from being lazily evaluated.

    In order to have lazy evaluation, you would need to use operators like defer or Observable.create to defer the execution until the subscription happens. Additionally, you would have to avoid Promise.resolve and instead use defer or Observable.fromPromise to create a lazy promise.

    Login or Signup to reply.
  2. You have two separate mergeMap operators applied to the source observable of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10). This means that the inner observables created by each mergeMap will run independently, this leads to a scenario where emissions from the first mergeMap are interleaved with emissions from the second mergeMap.

    If you try to return a synchronous value from the duplicateValue function, like this:

    const duplicateValue = log => value => {
      console.log(log, value)
      return of([value, value])
    }
    

    You’ll see that the output is as you expected:

    mapping a
    1
    mapping b
    1
    mapping b
    1
    [1, 1, 1]
    

    This is because returning a promise automatically causes the code to be executed using a different scheduler. The fact that it runs on a different scheduler exposes the interleaving behavior explained above.

    One possible approach is to nest the mergeMap operators like so:

    of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .pipe(
        mergeMap((x) => {
          return from(duplicateValue('mapping a')(x)).pipe(
            concatAll(),
            mergeMap((y) => {
              return from(duplicateValue('mapping b')(x));
            }, 1)
          );
        }, 1),
        concatAll(),
        take(3),
        toArray()
      )
      .subscribe((result) => {
        console.log(result);
      });
    

    This creates a sequential behavior where the inner observable is created and subscribed to for each emission from the source observable

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search