Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming broken for ChatOpenAI in version 0.0.132 (and before) #2365

Closed
danieldunderfelt opened this issue Aug 22, 2023 · 7 comments
Closed
Labels
auto:bug Related to a bug, vulnerability, unexpected error with an existing feature

Comments

@danieldunderfelt
Copy link

I am trying to use streaming messages from ChatOpenAI, but streaming seems to have broken at some point. I had it working, and have all the eventemitter+websocket things set up. The problem is that handleLLMNewToken is not receiving the tokens as they are generated, instead all tokens come in at once when the response is finished (at the same time as call() returns the response).

This is the code I am using:

const openAiChat = new ChatOpenAI({
  modelName: 'gpt-3.5-turbo-16k',
  openAIApiKey: config.OPENAI_API_KEY,
  temperature: 0.5,
  streaming: true,
})

let messageBuffer = ''

const response = await openAiChat.call(completionMessages, {
  signal: abortController.signal,
  callbacks: [
    {
      handleLLMNewToken: (token: string) => {
        // This is called for all tokens (once per token) when the whole response is finished.
        console.log('Token:', token)
        messageBuffer += token
        eventEmitter.emit('message', messageBuffer)
      },
    },
  ],
})
@dosubot dosubot bot added the auto:bug Related to a bug, vulnerability, unexpected error with an existing feature label Aug 22, 2023
@dosubot
Copy link

dosubot bot commented Aug 22, 2023

🤖

Hello,

Based on the information you've provided and the context from the LangChain repository, it seems like the issue you're experiencing is related to how streaming is handled in the LangChain framework.

In the _generate method of the OpenAI class, when params.stream is true, a new Promise is created that waits for all the data to be received from the OpenAI API before it resolves. This is done by listening for the [DONE] message from the API, which signifies that all data has been sent. Only then does it resolve the Promise and return the data. This is why handleLLMNewToken is receiving all tokens at once when the response is finished.

If you want to handle tokens as they are generated, you would need to modify this code to resolve the Promise or handle the data as soon as each token is received, rather than waiting for the [DONE] message.

Here is the relevant code:

const data = params.stream
  ? await new Promise<CreateCompletionResponse>((resolve, reject) => {
      const choices: CreateCompletionResponseChoicesInner[] = [];
      let response: Omit<CreateCompletionResponse, "usage">;
      let rejected = false;
      let resolved = false;
      this.completionWithRetry(
        {
          ...params,
          prompt: subPrompts[i],
        },
        {
          signal: options.signal,
          ...options.options,
          adapter: fetchAdapter, // default adapter doesn't do streaming
          responseType: "stream",
          onmessage: (event) => {
            if (event.data?.trim?.() === "[DONE]") {
              if (resolved || rejected) {
                return;
              }
              resolved = true;
              resolve({
                ...response,
                choices,
              });
            } else {
              // ... rest of the code
            }
          },
        }
      ).catch((error) => {
        if (!rejected) {
          rejected = true;
          reject(error);
        }
      });
    })
  : await this.completionWithRetry(
      {
        ...params,
        prompt: subPrompts[i],
      },
      {
        signal: options.signal,
        ...options.options,
      }
    );

In the context shared, the _streamResponseChunks method in the ChatOpenAI class is responsible for handling the streaming of responses. It uses the startStream method to initiate the stream and listens for the [DONE] message to close the stream. The handleLLMNewToken function is called within the _streamResponseChunks method for each chunk of data received from the stream. If you want to handle tokens as they are generated, you would need to modify the _streamResponseChunks method to yield each token as it is received, rather than waiting for the [DONE] message.

I hope this helps! If you have any other questions, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@msmith-techempower
Copy link

I can confirm the same issue with .stream() calls in certain cases.

When working with a ConversationChain and calling chain.stream({ message: "Generate 500 words of Lorem Ipsum" }) I can test that streaming (which is set to true when the OpenAI object is constructed, as in the original example here) does not provide 'chunks' but rather a 'chunk' via:

const model = new ChatOpenAI({
  openAIApiKey: env.OPENAI_API_KEY,
  streaming: true,
});
const chain = new ConversationChain({ llm: model });
const response = await chain.stream({ input: "Generate 500 words of Lorem Ipsum" });
console.log(response); // ReadableStream { locked: false, state: 'readable', supportsBYOB: false }
let reader = response.getReader();
let data = await reader.read();
console.log(data.done); // false
while (!data.done) {
  console.log(data.value.response); // Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce euismod tellus non tortor ultricies, sit amet porta arcu feugiat. Curabitur hendrerit bibendum odio, a convallis velit molestie id. Nulla quis congue turpis. Sed eu urna quis ligula venenatis laoreet. Sed placerat felis sed ligula eleifend sagittis. Maecenas aliquet sodales neque, non lobortis massa tristique nec. Fusce sagittis bibendum lorem vel imperdiet. Phasellus fringilla molestie nisl id aliquam. Nam at erat ut justo bibendum sagittis. Aenean at finibus turpis. Donec justo dui, dapibus id ligula varius, ultricies feugiat est. Integer luctus lorem at interdum mattis. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Etiam sollicitudin urna eget velit cursus consequat. Vivamus vitae tincidunt arcu. Vivamus lacinia hendrerit scelerisque. Suspendisse a fermentum ipsum. Aenean et volutpat quam. Pellentesque nec tortor at mi vestibulum sollicitudin et eu tortor. Nam felis eros, rhoncus non ex nec, varius interdum mi. Praesent a tincidunt nunc, ut scelerisque neque. Phasellus sed orci pulvinar, tristique dui et, ultrices odio. Aliquam erat volutpat. Nulla finibus porta interdum. Quisque fringilla elit ac sem fermentum, a posuere enim ultrices. Nulla mollis vehicula sem, quis efficitur elit pulvinar eget. Fusce in pulvinar leo, vitae porttitor justo. Etiam lacinia, tortor in aliquam vulputate, velit sapien faucibus turpis, sed posuere eros nunc a augue. Ut vitae accumsan risus. Morbi dapibus malesuada justo, sit amet vestibulum nulla mattis a. Proin id fermentum urna. Sed dignissim tortor velit, ut euismod metus ultricies non. Quisque a nulla sodales, lacinia massa in, sagittis ante. Aenean congue, elit sit amet ornare porttitor, nunc tellus tristique nulla, non hendrerit lorem massa a mi. Suspendisse vitae posuere mi. Nullam tincidunt malesuada augue, id tempus quam malesuada in. Mauris aliquet metus at purus vehicula porta. Vestibulum ut mattis mauris. Suspendisse mattis sem eget ante hendrerit iaculis. Aenean ligula turpis, venenatis id ullamcorper ac, dapibus ac neque. Aenean pulvinar pulvinar leo. Sed feugiat, nulla nec iaculis tristique, ante libero auctor ipsum, at aliquam arcu ex sed mauris. Etiam faucibus egestas mauris, id sollicitudin ligula. In dignissim, dolor at pellentesque mattis, sem sem vestibulum felis, vitae dapibus nulla magna sit amet nulla. Donec sed porttitor enim. Sed sed magna ultricies, congue tellus ac, varius nibh.
  data = await reader.read();
  console.log(data.done); // true
}
console.log(data.done); // true

Oddly, I have conducted this test in the simpler fashion by cutting out the ConversationChain and operating on the OpenAI directly via model.stream() and it does correctly produce chunked output via a ReadableStream as expected.

@jacoblee93
Copy link
Collaborator

This integration test seems to log tokens alright for me - can you share anything else about your setup?

test.only("Test ChatOpenAI in streaming mode", async () => {
  let nrNewTokens = 0;
  let streamedCompletion = "";

  const model = new ChatOpenAI({
    modelName: "gpt-3.5-turbo",
    streaming: true,
    maxTokens: 10,
    callbacks: [
      {
        async handleLLMNewToken(token: string) {
          console.log(token);
          nrNewTokens += 1;
          streamedCompletion += token;
        },
      },
    ],
  });
  const message = new HumanMessage("Hello!");
  const result = await model.call([message]);
  console.log(result);

  expect(nrNewTokens > 0).toBe(true);
  expect(result.content).toBe(streamedCompletion);
}, 10000);

@jacoblee93
Copy link
Collaborator

I can confirm the same issue with .stream() calls in certain cases.

When working with a ConversationChain and calling chain.stream({ message: "Generate 500 words of Lorem Ipsum" }) I can test that streaming (which is set to true when the OpenAI object is constructed, as in the original example here) does not provide 'chunks' but rather a 'chunk' via:

const model = new ChatOpenAI({
  openAIApiKey: env.OPENAI_API_KEY,
  streaming: true,
});
const chain = new ConversationChain({ llm: model });
const response = await chain.stream({ input: "Generate 500 words of Lorem Ipsum" });
console.log(response); // ReadableStream { locked: false, state: 'readable', supportsBYOB: false }
let reader = response.getReader();
let data = await reader.read();
console.log(data.done); // false
while (!data.done) {
  console.log(data.value.response); // Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce euismod tellus non tortor ultricies, sit amet porta arcu feugiat. Curabitur hendrerit bibendum odio, a convallis velit molestie id. Nulla quis congue turpis. Sed eu urna quis ligula venenatis laoreet. Sed placerat felis sed ligula eleifend sagittis. Maecenas aliquet sodales neque, non lobortis massa tristique nec. Fusce sagittis bibendum lorem vel imperdiet. Phasellus fringilla molestie nisl id aliquam. Nam at erat ut justo bibendum sagittis. Aenean at finibus turpis. Donec justo dui, dapibus id ligula varius, ultricies feugiat est. Integer luctus lorem at interdum mattis. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Etiam sollicitudin urna eget velit cursus consequat. Vivamus vitae tincidunt arcu. Vivamus lacinia hendrerit scelerisque. Suspendisse a fermentum ipsum. Aenean et volutpat quam. Pellentesque nec tortor at mi vestibulum sollicitudin et eu tortor. Nam felis eros, rhoncus non ex nec, varius interdum mi. Praesent a tincidunt nunc, ut scelerisque neque. Phasellus sed orci pulvinar, tristique dui et, ultrices odio. Aliquam erat volutpat. Nulla finibus porta interdum. Quisque fringilla elit ac sem fermentum, a posuere enim ultrices. Nulla mollis vehicula sem, quis efficitur elit pulvinar eget. Fusce in pulvinar leo, vitae porttitor justo. Etiam lacinia, tortor in aliquam vulputate, velit sapien faucibus turpis, sed posuere eros nunc a augue. Ut vitae accumsan risus. Morbi dapibus malesuada justo, sit amet vestibulum nulla mattis a. Proin id fermentum urna. Sed dignissim tortor velit, ut euismod metus ultricies non. Quisque a nulla sodales, lacinia massa in, sagittis ante. Aenean congue, elit sit amet ornare porttitor, nunc tellus tristique nulla, non hendrerit lorem massa a mi. Suspendisse vitae posuere mi. Nullam tincidunt malesuada augue, id tempus quam malesuada in. Mauris aliquet metus at purus vehicula porta. Vestibulum ut mattis mauris. Suspendisse mattis sem eget ante hendrerit iaculis. Aenean ligula turpis, venenatis id ullamcorper ac, dapibus ac neque. Aenean pulvinar pulvinar leo. Sed feugiat, nulla nec iaculis tristique, ante libero auctor ipsum, at aliquam arcu ex sed mauris. Etiam faucibus egestas mauris, id sollicitudin ligula. In dignissim, dolor at pellentesque mattis, sem sem vestibulum felis, vitae dapibus nulla magna sit amet nulla. Donec sed porttitor enim. Sed sed magna ultricies, congue tellus ac, varius nibh.
  data = await reader.read();
  console.log(data.done); // true
}
console.log(data.done); // true

Oddly, I have conducted this test in the simpler fashion by cutting out the ConversationChain and operating on the OpenAI directly via model.stream() and it does correctly produce chunked output via a ReadableStream as expected.

@msmith-techempower yeah unfortunately that's expected at the moment - the .stream() method will not currently work properly for ConversationChain and LLMChain subclasses in general. You'll need to use the old callback approach. We want to deprecate it in favor of expression language but are still figuring out the right abstraction for memory.

@jacoblee93
Copy link
Collaborator

We should definitely update the ConversationChain docs with an expression language variant and mark the existing one as soft-deprecated/only if you need a more exotic type of memory.

@danieldunderfelt
Copy link
Author

I am using Bun by @Jarred-Sumner, and apparently that was the issue. It is working perfectly now in Bun 0.8!

@jacoblee93
Copy link
Collaborator

Ah, that's an important detail! Glad it's resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto:bug Related to a bug, vulnerability, unexpected error with an existing feature
Projects
None yet
Development

No branches or pull requests

3 participants