Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a streaming RAG method #967

Merged
merged 21 commits into from
Sep 23, 2024
Merged

Add a streaming RAG method #967

merged 21 commits into from
Sep 23, 2024

Conversation

scotttrinh
Copy link
Collaborator

Adds an SSE-style streaming response method along with a lower-level async generator.


Example output from httpie:

http http://localhost:3004 message=="Tell me something about pluto"
HTTP/1.1 200 OK
Connection: keep-alive
Content-Type: text/event-stream
Date: Wed, 17 Apr 2024 02:14:59 GMT
Keep-Alive: timeout=5
Transfer-Encoding: chunked

event: message_start
data: {
    "message": {
        "id": "chatcmpl-9Ep76xGnUpwaq8Xz8bSq8DAVRbFQ2",
        "model": "gpt-4-0125-preview",
        "role": "assistant"
    },
    "type": "message_start"
}

event: content_block_start
data: {
    "content_block": {
        "text": "",
        "type": "text"
    },
    "index": 0,
    "type": "content_block_start"
}

event: content_block_delta
data: {
    "delta": {
        "text": "Pl",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": "uto",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": "'s",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " surface",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " is",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " primarily",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " made",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " up",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " of",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " nitrogen",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " ice",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": ",",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " methane",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": ",",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " and",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " carbon",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": " mon",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": "oxide",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_delta
data: {
    "delta": {
        "text": ".",
        "type": "text_delta"
    },
    "index": 0,
    "type": "content_block_delta"
}

event: content_block_stop
data: {
    "index": 0,
    "type": "content_block_stop"
}

event: message_delta
data: {
    "delta": {
        "stop_reason": "stop"
    },
    "type": "message_delta"
}

event: message_stop
data: {
    "type": "message_stop"
}


packages/ai/src/core.ts Outdated Show resolved Hide resolved
packages/ai/src/core.ts Outdated Show resolved Hide resolved
@scotttrinh scotttrinh force-pushed the stream-ai branch 2 times, most recently from de92509 to f2519df Compare April 18, 2024 04:38
@scotttrinh scotttrinh marked this pull request as draft April 18, 2024 04:38
Base automatically changed from package-ai to master April 18, 2024 08:44
@scotttrinh scotttrinh marked this pull request as ready for review April 30, 2024 15:51
@scotttrinh scotttrinh force-pushed the stream-ai branch 2 times, most recently from 0704c99 to 8f53c24 Compare May 3, 2024 16:47
@CarsonF
Copy link
Collaborator

CarsonF commented May 17, 2024

@scotttrinh what do you think about an intermediate object to decouple input/output?

await queryRag(message, context).text()
await queryRag(message, context).response()
for await (part of queryRag(message, context)) { ... }

@scotttrinh
Copy link
Collaborator Author

@CarsonF

what do you think about an intermediate object to decouple input/output?

I'm not sure I understand the suggestion here 🤔

@CarsonF
Copy link
Collaborator

CarsonF commented May 17, 2024

@CarsonF

what do you think about an intermediate object to decouple input/output?

I'm not sure I understand the suggestion here 🤔

Instead of 3 top level query functions to vary the output shape, just have 1. Then the output shape is picked independently.
Also the async iterable symbol can be used instead of a third dev name.
The snippet is what the user call sites would look like.

@scotttrinh
Copy link
Collaborator Author

Ahh, I understand now. Seems reasonable so we'll have to answer the question of which is better on some other axes:

  1. What is more intuitive for the end user?
  2. What do other SDKs (like OpenAI and Anthropic) do?
  3. What has the lowest maintenance overhead?
  4. What is easiest to evolve in the future?
  5. What is easiest to document?
  6. What does the Python version of this same functionality do?

Some of those questions are research for me, others will require a bit of thinking and trying to intuit an answer. Feel free to state the case for a single function that returns an object that exposes the different interfaces vs. separate functions.

@CarsonF
Copy link
Collaborator

CarsonF commented May 17, 2024

As far as # 1 goes, fetch has the same API

await fetch(...).json()
await fetch(...).arrayBuffer()

I don't see any problems here.

I can't speak to 2 & 6.

As far as 3 & 4 both the inputs & outputs have to work together in implementation obviously. But this decoupling allows modifying args or output methods without needing to adjust multiple signatures. Giving the EdgeDBAI class a single entrypoint for RAG would play nice for other AI strategies in the future.

I did a first pass here stream-ai...CarsonF:edgedb-js:stream-ai

@scotttrinh
Copy link
Collaborator Author

As far as # 1 goes, fetch has the same API

Yeah, and while I agree that works as a primitive since it allows you to have one function with lots of different behavior, I think some people find that API annoying as an end-user API. Great for a primitive, though, and maybe that's really what we're building here.

As far as 3 & 4 both the inputs & outputs have to work together in implementation obviously. But this decoupling allows modifying args or output methods without needing to adjust multiple signatures. Giving the EdgeDBAI class a single entrypoint for RAG would play nice for other AI strategies in the future.

Yeah, I think that's a compelling argument! Somehow that makes me feel even more like this is a primitive on which higher-level functionality should be built (like our soon-to-come Vercel AI SDK integration) and therefore we should optimize for a flexible low-level primitive.

Lemme shop the idea around a bit more, thanks for taking the time to make an example implementation, that makes it nice and concrete for discussion. 🙏

Instead of using the async generator and converting the text back into a
byte stream, just pipe the response from the RAG request directly to the
response in `streamRag`.
Ensures that the chunks are properly parsed and emitted as they come in,
splitting multiple events that arrive together in one chunk, and
combining partial events from multiple chunks into a single emitted
value.
Copy link
Collaborator Author

@scotttrinh scotttrinh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! Think this looks pretty good. One small comment.

Comment on lines 133 to 139
const res = await this.fetchRag({
model: this.model,
prompt: this.prompt,
context: this.context,
query: this.query,
stream: true,
});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we always pass this.model, etc, I think we can simplify fetchRag to just take stream: boolean.

@scotttrinh
Copy link
Collaborator Author

OK, so @diksipav and I have been going back and forth a bunch about the API here, and I think I have some more concrete thoughts on await queryRag(message, context).text() vs await queryRag(message, context)/await streamRag(message, context):

fetch-like API

One major difference here is that, for consumers of fetch you need to inspect the Response object to even know which method is the appropriate one based on headers like content-type and transfer-encoding, etc. We don't have that restriction: the caller gets to tell the producer how it wants the data back: as a string, or as a stream.

Intermediate object

One possible advantage of having an API with an intermediate object you get back from queryRag is that you can refetch the same query without having to redefine it. I haven't yet thought of a good reason or use case for this though. Perhaps if it made sense like it does for the query builder where you have fixed messages and context, but might have different providers or config, but I don't think that's common enough to make this a first-class design choice given that the "query" we're talking about here is a pretty simple data structure: string.

Consistency

This is not a deal breaker, but in my mind it might be a good tie-breaker: consistency with the Python package. There we have separate methods and no intermediate object.

@CarsonF
Copy link
Collaborator

CarsonF commented Sep 19, 2024

Maybe the python library should change too 😛

packages/ai/src/core.ts Outdated Show resolved Hide resolved
packages/ai/src/types.ts Outdated Show resolved Hide resolved
Copy link
Collaborator Author

@scotttrinh scotttrinh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job, and thanks for throwing in the embeddings method as a bonus here.

Comment on lines 44 to 46
- `streamRag(message: string, context?: QueryContext): AsyncIterable<StreamingMessage> & PromiseLike<Response>`

Used both when you want to handle streaming data as it arrives or when you need a `Response` object that streams the results.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth documenting these separately, almost like two overloads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey I updated Readme and merged this, if you have time u can check the updated Readme.

Comment on lines 91 to 95
if (!response.headers.get("content-type")?.includes("application/json")) {
throw new Error(
"expected response to have content-type: application/json",
);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we take this out because we just want to throw the existing exceptions that the json method returns?

@diksipav diksipav merged commit a6de246 into master Sep 23, 2024
10 checks passed
@diksipav diksipav deleted the stream-ai branch September 23, 2024 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants