skip to Main Content

I am trying to set up a node/react setup that streams results from openai. I found an example project that does this but it is using next.js. I am successfully making the call and the results are returning as they should, however, the issue is how to return the stream to the client. Here is the code that works in next.js

import {
  GetServerSidePropsContext,
} from 'next';
import { DEFAULT_SYSTEM_PROMPT, DEFAULT_TEMPERATURE } from '@/utils/app/const';
import { OpenAIError, OpenAIStream } from '@/utils/server';
import { ChatBody, Message } from '@/types/chat';
// @ts-expect-error
import wasm from '../../node_modules/@dqbd/tiktoken/lite/tiktoken_bg.wasm?module';
import tiktokenModel from '@dqbd/tiktoken/encoders/cl100k_base.json';
import { Tiktoken, init } from '@dqbd/tiktoken/lite/init';

const handler = async (
  req: GetServerSidePropsContext['req'],
  res: GetServerSidePropsContext['res'],
): Promise<Response> => {
  try {
    const { model, messages, key, prompt, temperature } = (await (
      req as unknown as Request
    ).json()) as ChatBody;
    -(
      (await init((imports) => WebAssembly.instantiate(wasm, imports)))
    );
    console.log({ model, messages, key, prompt, temperature })
    const encoding = new Tiktoken(
      tiktokenModel.bpe_ranks,
      tiktokenModel.special_tokens,
      tiktokenModel.pat_str,
    );

    let promptToSend = prompt;
    if (!promptToSend) {
      promptToSend = DEFAULT_SYSTEM_PROMPT;
    }

    let temperatureToUse = temperature;
    if (temperatureToUse == null) {
      temperatureToUse = DEFAULT_TEMPERATURE;
    }

    const prompt_tokens = encoding.encode(promptToSend);

    let tokenCount = prompt_tokens.length;
    let messagesToSend: Message[] = [];

    for (let i = messages.length - 1; i >= 0; i--) {
      const message = messages[i];
      const tokens = encoding.encode(message.content);

      if (tokenCount + tokens.length + 1000 > model.tokenLimit) {
        break;
      }
      tokenCount += tokens.length;
      messagesToSend = [message, ...messagesToSend];
    }

    encoding.free();

    const stream = await OpenAIStream(
      model,
      promptToSend,
      temperatureToUse,
      key,
      messagesToSend,
    );

    return new Response(stream);
  } catch (error) {
    console.error(error);
    if (error instanceof OpenAIError) {
      return new Response('Error', { status: 500, statusText: error.message });
    } else {
      return new Response('Error', { status: 500 });
    }
  }
};

export default handler;

OpenAIStream.ts

 const res = await fetch(url, {...});

  const encoder = new TextEncoder();
  const decoder = new TextDecoder();

  const stream = new ReadableStream({
    async start(controller) {
      const onParse = (event: ParsedEvent | ReconnectInterval) => {
        if (event.type === 'event') {
          const data = event.data;

          try {
            const json = JSON.parse(data);
            if (json.choices[0].finish_reason != null) {
              controller.close();
              return;
            }
            const text = json.choices[0].delta.content;
            const queue = encoder.encode(text);
            controller.enqueue(queue);
          } catch (e) {
            controller.error(e);
          }
        }
      };

      const parser = createParser(onParse);

      for await (const chunk of res.body as any) {
        parser.feed(decoder.decode(chunk));
      }
    },
  });

  return stream;

When trying to this up in node the first issue I ran into is "ReadableStream" is undefined. I solved it using a polyfill

import { ReadableStream } from ‘web-streams-polyfill/ponyfill/es2018’;

When I log

const text = json.choices[0].delta.content;

It shows that the multiple responses from the API are being returned correctly.

Instead of using returning the data using new Response I am using:

import { toJSON } from 'flatted';

export const fetchChatOpenAI = async (
  req: AuthenticatedRequest,
  res: Response
) => {
  try {
    const stream = await OpenAIStream(
      model,
      promptToSend,
      temperatureToUse,
      key,
      messagesToSend
    );

    res.status(200).send(toJSON(stream));
  } catch (error) {
    if (error instanceof OpenAIError) {
      console.error(error);
      res.status(500).json({ statusText: error.message });
    } else {
      res.status(500).json({ statusText: 'ERROR' });
    }
  }
};

In the client here is how the response is being handled.

 const controller = new AbortController();
    const response = await fetch(endpoint, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      signal: controller.signal,
      body: JSON.stringify(chatBody),
    });
    if (!response.ok) {
      console.log(response.statusText);
    } else {
      const data = response.body;

      if (data) {
        const reader = data.getReader();
        const decoder = new TextDecoder();
        let done = false;
        let text = '';
        while (!done) {
          const { value, done: doneReading } = await reader.read();
          console.log(value);
          done = doneReading;
          const chunkValue = decoder.decode(value);
          console.log(chunkValue);
          text += chunkValue;
        }
      }
    }

When running the next.js project here is a sample output from those logs.
WORKING_DEMO

In my Node version here is a screenshot of what those logs look like

NOT_WORKING_APP

2

Answers


  1. That ReadableStream is undefined in Node.js. You’ve resolved this by using a polyfill, web-streams-polyfill, which provides the ReadableStream implementation for Node.js.

    • It’s important to note that the fetch API’s response.body property is not directly accessible in the browser environment. Instead, you can use the response.body.getReader() method to obtain a ReadableStreamDefaultReader.

      const controller = new AbortController();
      const response = await fetch(endpoint, {
      method: ‘POST’,
      headers: {
      ‘Content-Type’: ‘application/json’,
      },
      signal: controller.signal,
      body: JSON.stringify(chatBody),
      });

       if (!response.ok) {
         console.log(response.statusText);
       } else {
         const reader = response.body.getReader();
         const decoder = new TextDecoder();
         let done = false;
         let text = '';
      
         const readStream = async () => {
           while (!done) {
             const { value, done: doneReading } = await reader.read();
             done = doneReading;
             const chunkValue = decoder.decode(value);
             console.log(chunkValue);
             text += chunkValue;
           }
         };
      
         await readStream();
       }
      
    • In this updated code, response.body.getReader() is used to obtain the ReadableStreamDefaultReader. Then, an asynchronous function, readStream, is defined to read the stream data. The function uses a while loop to iterate over the chunks of data and decode them using TextDecoder. The readStream function is called asynchronously to start reading the stream.

    This should help you handle the response from the server and receive the streamed data in the client correctly.

    Login or Signup to reply.
  2. The aproach for handling streaming responses from the OpenAI API in a Node.js environment:

    1. Import the necessary modules: Import the required modules such as stream and node-fetch to work with streams and make HTTP requests, respectively.

    2. Create a readable stream: Use the Readable class from the stream module to create a readable stream. You can customize the read method based on your specific needs. This stream will be used to push the received data chunks from the streaming endpoint.

    3. Make an HTTP request to the streaming endpoint: Use the fetch function from node-fetch to make an HTTP request to the streaming endpoint provided by the OpenAI API. This will give you access to the response object.

    4. Handle the response stream: Attach event listeners to the response’s body stream to handle the received data and signal the end of the stream. The 'data' event is triggered when a new data chunk is received, and the 'end' event is triggered when the stream ends.

    5. Push data into the readable stream: Inside the 'data' event listener, push the received data chunks into the previously created readable stream using the push method. This will allow you to consume the data from the stream later.

    6. Signal the end of the stream: Inside the 'end' event listener, call the push(null) method on the readable stream to signal that there is no more data to be pushed.

    7. Consume the data from the stream: Attach event listeners to the readable stream to consume the data. Use the 'data' event to receive the data chunks and process them as needed. Use the 'end' event to determine when the stream has ended.

    By following these steps, you can effectively handle streaming responses from the OpenAI API in your Node.js application. Remember to handle errors, close the stream when necessary, and adapt the code to fit your specific requirements.

    Regarding the ReadableStream issue, in Node.js you can use the stream module to work with streams. You can create a readable stream using the Readable class from the stream module. Here’s an example of how you can create a readable stream and push data into it:

    const { Readable } = require('stream');
    
    const createStream = () => {
      const stream = new Readable({
        read() {},
      });
    
      // Push data into the stream
      stream.push('Hello,');
      stream.push('world!');
      stream.push(null); // Signal the end of the stream
    
      return stream;
    };
    
    const stream = createStream();
    
    stream.on('data', (chunk) => {
      console.log(chunk.toString());
    });
    
    stream.on('end', () => {
      console.log('Stream ended');
    });
    

    In the above example, createStream function creates a readable stream and pushes two chunks of data into it: "Hello," and "world!". The stream is then consumed using the 'data' event and 'end' event.

    Now, to handle the streaming response from the OpenAI API in your Node.js application, you can create a readable stream and push the received data into it as it arrives. Here’s an example to illustrate this concept:

    const { Readable } = require('stream');
    const fetch = require('node-fetch');
    
    const openAIStream = async () => {
      const response = await fetch('https://example.com/streaming-endpoint');
      const stream = new Readable();
    
      // Push received data into the stream
      response.body.on('data', (chunk) => {
        stream.push(chunk);
      });
    
      // Signal the end of the stream
      response.body.on('end', () => {
        stream.push(null);
      });
    
      return stream;
    };
    
    const stream = openAIStream();
    
    stream.on('data', (chunk) => {
      console.log(chunk.toString());
    });
    
    stream.on('end', () => {
      console.log('Stream ended');
    });
    

    In the openAIStream function, you make a request to the streaming endpoint and create a readable stream. Then, you listen for the 'data' event of the response’s body stream and push the received data chunks into the created stream. Finally, you signal the end of the stream when the response’s body emits the 'end' event.

    You can adjust this example to fit your specific requirements and integrate it into your existing codebase. Remember to handle errors and close the stream properly in case of failures or when you no longer need it.

    Note: Make sure to install the required dependencies (node-fetch, web-streams-polyfill, etc.).

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search