skip to Main Content

reading the first n bytes of a byte stream (in form of a AsyncIterable) feels cumbersome and error prone.

Is there a better way to implement this?

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);

  let offset = 0;

  const iterator = stream[Symbol.asyncIterator]();

  while (true) {
    const { done, value } = await iterator.next();

    if (done) {
      throw new Error("Buffer underflow");
    } else {
      const chunk = value;
      if (chunk.length < length - offset) {
        prefix.set(chunk, offset);
        offset += chunk.length;
      } else {
        const slice = chunk.slice(0, length - offset);
        prefix.set(slice, offset);

        return [prefix, prepend(chunk.slice(slice.length), stream)];
      }
    }
  }
}

async function* prepend(
  prefix: Uint8Array,
  stream: AsyncIterable<Uint8Array>
) {
  yield prefix;
  yield* stream;
}

2

Answers


  1. stream primitives

    We’ll start by defining stream primitives –

    flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
    take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
    skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
    toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
    
    async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
      for await (const a of t) {
        yield *a
      }
    }
    
    async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
      for await (const v of t) {
        if (n-- <= 0) return
        yield v 
      }
      if (n > 0) throw Error("buffer underflow")
    }
    
    async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
      for await (const v of t) {
        if (n-- > 0) continue
        yield v
      }
      if (n > 0) throw Error("buffer underflow")
    }
    
    async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
      const r = []
      for await (const v of t) r.push(v)
      return r
    }
    

    shift

    Using these stream primitives, we can write shift in a comfortable and safe way –

    shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
    
    async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
      return [
        new Uint8Array(await toArray(take(flatten(stream), count))),
        skip(flatten(stream), count)
      ] as const
    }
    

    Let’s create a mock buffer and test it –

    const buffer: AsyncIterable<Uint8Array> = {
      async *[Symbol.asyncIterator]() {
        for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
          yield new Uint8Array(v)
          await new Promise(r => setTimeout(r, 100))
        }
      }
    }
    
    async function main() {
      const [first, rest] = await shift(buffer, 4)
      console.log({
        first: Array.from(first),
        rest: await toArray(rest)
      })
    }
    
    main().then(console.log, console.error)
    
    {
      first: [0, 1, 2, 3],
      rest: [4, 5, 6, 7, 8, 9]
    }
    

    demo

    Run and verify the result on the typescript playground

    Login or Signup to reply.
  2. I think the iterator logic itself can be simplified by using a notClosing helper and normal iteration:

    async function shift(
      length: number,
      stream: AsyncIterable<Uint8Array>
    ): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
      const prefix = new Uint8Array(length);
      const iter = stream[Symbol.asyncIterator]();
      let offset = 0;
      for await (const chunk of notClosing(iter)) {
        if (chunk.length < length - offset) {
          prefix.set(chunk, offset);
          offset += chunk.length;
        } else {
          const slice = chunk.slice(0, length - offset);
          prefix.set(slice, offset);
          return [prefix, prepend(chunk.slice(slice.length), iter)];
        }
      }
      throw new Error("Buffer underflow");
    }
    

    Unless you want to convert the stream from an iterator of chunks into a much less efficient iterator of individual bytes, there’s nothing you can further simplify about the offset logic.

    const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype)) as AsyncIterator<any>;
    function prepend<T>(val: T, iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
      return Object.assign(Object.create(AsyncIteratorPrototype), {
        first: true,
        next() {
          if (this.first) {
            const res = {done: false, value: val};
            val = undefined!; // GC
            this.first = false;
            return res;
          }
          return iter.next();
        },
        return: iter.return ? () => iter.return!() : undefined,
      });
    }
    function notClosing<T>(iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
      return Object.assign(Object.create(AsyncIteratorPrototype), {
        next: iter.next.bind(iter),
      });
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search