-
Notifications
You must be signed in to change notification settings - Fork 408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(langchain): add support for streamed calls #10672
base: main
Are you sure you want to change the base?
Conversation
|
Datadog ReportBranch report: ✅ 0 Failed, 2812 Passed, 2729 Skipped, 34m 24.39s Total duration (59m 53.06s time saved) |
BenchmarksBenchmark execution time: 2024-09-16 19:16:23 Comparing candidate commit aaddc40 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 354 metrics, 46 unstable metrics. |
chat_messages = get_argument_value(args, kwargs, 0, "input") | ||
if not isinstance(chat_messages, list): | ||
chat_messages = [chat_messages] | ||
for message_idx, message in enumerate(chat_messages): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added all of this logic because, unlike model.generate
, which just takes in a list of BaseMessage
types (ie HumanMessage
, SystemMessage
, etc.), model.stream
can take in:
- a single string
- a single dict
- a list of strings
- a list of dicts
- a list of
BaseMessage
types - a
PromptValue
type, which has amessages
property ofBaseMessage
types
Do we care about all of these different types? Or, do we just want to listify and str
each element (this logic would also carry over to LLMObs spans, future PR)? It would make the code a lot simpler, but maybe the view of each tag not as nice.
What does this PR do?
Adds support for
.(a)stream(...)
calls on LangChain LCEL chains, chat models, and completion (LLM) models. It accomplishes this by:stream
function is calledThere is one caveat to this, where it's possible the last step in a chain's
stream
call is aJSONOutputParser
. In this case, the stream is already concatenated for us, and we use that result instead.A few additional notes:
astream
, aren't actually async functions, they just return async generators. This is reflected in shared patching functions and test snapshots.shared_stream
, which returns a compatible iterable for sync and async stream managers. It utilizeson_span_started
andon_span_finished
functions to coordinate tags to add in the different cases, such as chain, chat, or llm.stream
methods do not invoke the underlyinggenerate
methods we trace, there's not easy path for code re-use. Thus, I tried to mimic the tags we add for the relevantchain.invoke
,model.generate
, andllm.invoke
patched functions.Note: The version of
vcrpy
which we pinned for reduced flakiness did not like streamed calls when using the LangChain library. As such, I introduced a new fixture that returns a stub for the HTTP transport theOpenAI
client uses. Then, the client with that transport specified can be used on thelangchain_openai
instance. This approach uses text files with just the response data, which is why the "cassettes" added for the tests written aren't the usualyaml
format.Checklist
Reviewer Checklist