skip to Main Content

I’m completely new to rxjs and trying to accomplish one task – essentially I need a Promise-like API with the following differences:

  1. Unlike the standard Promise my API should be able to resolve multiple times.
  2. When calling await/then the last resolved value should be used.
  3. If my API was never resolved the await/then should hung up, just like the standard Promise.
  4. Additionally I need a reset method which will be resetting my API object so the following await/then calls will hung up like in the point 3.

How can I implement the needed behaviour? I think rxjs Observable and lastValueFrom is what I need, but I just can’t figure out how to use it.

Pseudocode example usage:

const myAuthObservable = new MyObservableApi<Auth>((resolve) =>
  subscribe(
    (s) => s.auth,
    (auth) => {
      if (auth) {
        resolve(auth); // this can be called multiple times
      }
    }
  )
);

emitter.on("start", async () => {
  // always the last auth will be returned
  // or the api will hung up until the first auth is emitted
  const auth = await myAuthObservable;
  myAuthObservable.reset();
});

2

Answers


  1. Chosen as BEST ANSWER

    after reading @Barmar comment I eventually came up with a simple non-rxjs solution

    It sounds like what you want is an object that caches the value that the promise resolves to, then returns that cached value on future uses

    export class MultiResolve<T> {
      private readonly initialSymbol: symbol = Symbol();
      private currentValue: T | symbol = this.initialSymbol;
      private readonly resolvers: Array<(value: T) => void> = [];
    
      resolve(value: T): void {
        this.currentValue = value;
        for (const resolver of this.resolvers) {
          resolver(value);
        }
      }
    
      get promise(): Promise<T> {
        return new Promise(resolve => {
          if (this.currentValue !== this.initialSymbol) {
            resolve(this.currentValue as T);
          }
          this.resolvers.push(resolve);
        });
      }
    
      reset(): void {
        this.currentValue = this.initialSymbol;
      }
    }
    

    usage:

    const multiResolve = new MultiResolve<number>();
    
    async function test() {
      console.log(await multiResolve.promise); // Will wait until the first resolve
    
      multiResolve.resolve(1);
      console.log(await multiResolve.promise); // Will immediately get 1
    
      multiResolve.resolve(undefined as any); // TypeScript requires an explicit cast to 'any' for undefined
      console.log(await multiResolve.promise); // Will immediately get undefined
    
      multiResolve.resolve(2);
      console.log(await multiResolve.promise); // Will immediately get 2
    }
    
    test();
    
    setTimeout(() => {
      multiResolve.resolve(3); // Will immediately get 3 in the next await
    }, 1000);
    

  2. I believe I have what you’re looking for – a custom subject that behaves like a single buffer ReplaySubject, but can be reset. This code has been adapted from that exact class to provide the functionality you seek.

    export class ResettableSubject<T> extends Subject<T> {
      /** We can't just rely on value to be undefined to know if it is stored since the type could be undefined. */
      private hasValue = false;
      private value?: T | undefined;
    
      constructor() {
        super();
      }
    
      /** Updates the subject's value and caches it. */
      override next(value: T): void {
        this.value = value;
        this.hasValue = true;
        super.next(value);
      }
    
      /** Resets the cached value. */
      reset() {
        this.value = undefined;
        this.hasValue = false;
      }
    
      protected  _subscribe(subscriber: Subscriber<T>): Subscription {
        const subscription = this._innerSubscribe(subscriber);
        if (this.hasValue) {
          subscriber.next(this.value);
        }
        return subscription;
      }
    }
    

    StackBlitz

    In the example below, the subscription will emit all values, but the promises created from firstValueFrom will only emit if the subject has a value and reset hasn’t cleared the cached value.

    const source$ = new ResettableSubject<number>();
    // displays all values.
    source$.subscribe(x => console.log(`from subscription ${x}`));
    // will only display the first value once set.
    firstValueFrom(source$).then(x => console.log(`first value before first next: ${x}`));
    source$.next(1);
    // will only display the first value.
    firstValueFrom(source$).then(x => console.log(`first value displayed immediately: ${x}`));
    source$.next(2); // the original subscription should update.
    source$.reset(); // no future subscribers will see 2.
    firstValueFrom(source$).then(x => console.log(`first value after reset ${x}`));
    source$.next(3); // 3 is output by subscription and last promise.
    

    Your usage would be like the example below. Whatever sets myAuthObservable will have to make sure Auth isn’t undefined, or you could update the ResettableSubject class to ignore undefined values passed to the next method.

    const myAuthObservable = new ResettableSubject<Auth>();
    
    emitter.on("start", async () => {
      // Only emits when myAuthObservable gets updated after reset below was called.
      const auth = await firstValueFrom(myAuthObservable);
      myAuthObservable.reset();
    });
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search