diff --git a/ddtrace/contrib/internal/langchain/patch.py b/ddtrace/contrib/internal/langchain/patch.py index dc8b211de03..9ca2d4ac7ec 100644 --- a/ddtrace/contrib/internal/langchain/patch.py +++ b/ddtrace/contrib/internal/langchain/patch.py @@ -50,6 +50,7 @@ from ddtrace.contrib.internal.langchain.constants import agent_output_parser_classes from ddtrace.contrib.internal.langchain.constants import text_embedding_models from ddtrace.contrib.internal.langchain.constants import vectorstore_classes +from ddtrace.contrib.internal.langchain.utils import shared_stream from ddtrace.contrib.trace_utils import unwrap from ddtrace.contrib.trace_utils import with_traced_module from ddtrace.contrib.trace_utils import wrap @@ -975,6 +976,165 @@ def traced_similarity_search(langchain, pin, func, instance, args, kwargs): return documents +# TODO refactor some of these on_span_started/on_span_finished functions +# that are used in other patched methods in this file into the utils module +@with_traced_module +def traced_chain_stream(langchain, pin, func, instance, args, kwargs): + integration: LangChainIntegration = langchain._datadog_integration + + def _on_span_started(span: Span): + inputs = get_argument_value(args, kwargs, 0, "input") + if integration.is_pc_sampled_span(span): + if not isinstance(inputs, list): + inputs = [inputs] + for idx, inp in enumerate(inputs): + if not isinstance(inp, dict): + span.set_tag_str("langchain.request.inputs.%d" % idx, integration.trunc(str(inp))) + else: + for k, v in inp.items(): + span.set_tag_str("langchain.request.inputs.%d.%s" % (idx, k), integration.trunc(str(v))) + + def _on_span_finished(span: Span, streamed_chunks, error: Optional[bool] = None): + if not error and integration.is_pc_sampled_span(span): + if langchain_core and isinstance(instance.steps[-1], langchain_core.output_parsers.JsonOutputParser): + # it's possible that the chain has a json output parser + # this will have already concatenated the chunks into a json object + + # it's also possible the json output parser isn't the last step, + # but one of the last steps, in which case we won't act on it here + # TODO (sam.brenner) make this more robust + content = json.dumps(streamed_chunks[-1]) + else: + # best effort to join chunks together + content = "".join([str(chunk) for chunk in streamed_chunks]) + span.set_tag_str("langchain.response.content", integration.trunc(content)) + + return shared_stream( + integration=integration, + pin=pin, + func=func, + instance=instance, + args=args, + kwargs=kwargs, + interface_type="chain", + on_span_started=_on_span_started, + on_span_finished=_on_span_finished, + ) + + +@with_traced_module +def traced_chat_stream(langchain, pin, func, instance, args, kwargs): + integration: LangChainIntegration = langchain._datadog_integration + llm_provider = instance._llm_type + + def _on_span_started(span: Span): + chat_messages = get_argument_value(args, kwargs, 0, "input") + if not isinstance(chat_messages, list): + chat_messages = [chat_messages] + for message_idx, message in enumerate(chat_messages): + if integration.is_pc_sampled_span(span): + if isinstance(message, dict): + span.set_tag_str( + "langchain.request.messages.%d.content" % (message_idx), + integration.trunc(str(message.get("content", ""))), + ) + span.set_tag_str( + "langchain.request.messages.%d.role" % (message_idx), + str(message.get("role", "")), + ) + elif isinstance(message, langchain_core.prompt_values.PromptValue): + for langchain_message_idx, langchain_message in enumerate(message.messages): + span.set_tag_str( + "langchain.request.messages.%d.%d.content" % (message_idx, langchain_message_idx), + integration.trunc(str(langchain_message.content)), + ) + span.set_tag_str( + "langchain.request.messages.%d.%d.role" % (message_idx, langchain_message_idx), + str(langchain_message.__class__.__name__), + ) + elif isinstance(message, langchain_core.messages.BaseMessage): + span.set_tag_str( + "langchain.request.messages.%d.content" % (message_idx), integration.trunc(str(message.content)) + ) + span.set_tag_str( + "langchain.request.messages.%d.role" % (message_idx), str(message.__class__.__name__) + ) + else: + span.set_tag_str( + "langchain.request.messages.%d.content" % (message_idx), integration.trunc(message) + ) + + for param, val in getattr(instance, "_identifying_params", {}).items(): + if isinstance(val, dict): + for k, v in val.items(): + span.set_tag_str("langchain.request.%s.parameters.%s.%s" % (llm_provider, param, k), str(v)) + else: + span.set_tag_str("langchain.request.%s.parameters.%s" % (llm_provider, param), str(val)) + + def _on_span_finished(span: Span, streamed_chunks, error: Optional[bool] = None): + if not error and integration.is_pc_sampled_span(span): + content = "".join([str(chunk.content) for chunk in streamed_chunks]) + span.set_tag_str("langchain.response.content", integration.trunc(content)) + + usage = getattr(streamed_chunks[-1], "usage_metadata", None) + if usage: + for k, v in usage.items(): + span.set_tag_str("langchain.response.usage_metadata.%s" % k, str(v)) + + return shared_stream( + integration=integration, + pin=pin, + func=func, + instance=instance, + args=args, + kwargs=kwargs, + interface_type="chat_model", + on_span_started=_on_span_started, + on_span_finished=_on_span_finished, + api_key=_extract_api_key(instance), + provider=llm_provider, + ) + + +@with_traced_module +def traced_llm_stream(langchain, pin, func, instance, args, kwargs): + integration: LangChainIntegration = langchain._datadog_integration + llm_provider = instance._llm_type + + def _on_span_start(span: Span): + inp = get_argument_value(args, kwargs, 0, "input") + if not isinstance(inp, list): + inp = [inp] + if integration.is_pc_sampled_span(span): + for idx, prompt in enumerate(inp): + span.set_tag_str("langchain.request.prompts.%d" % idx, integration.trunc(str(prompt))) + for param, val in getattr(instance, "_identifying_params", {}).items(): + if isinstance(val, dict): + for k, v in val.items(): + span.set_tag_str("langchain.request.%s.parameters.%s.%s" % (llm_provider, param, k), str(v)) + else: + span.set_tag_str("langchain.request.%s.parameters.%s" % (llm_provider, param), str(val)) + + def _on_span_finished(span: Span, streamed_chunks, error: Optional[bool] = None): + if not error and integration.is_pc_sampled_span(span): + content = "".join([str(chunk) for chunk in streamed_chunks]) + span.set_tag_str("langchain.response.content", integration.trunc(content)) + + return shared_stream( + integration=integration, + pin=pin, + func=func, + instance=instance, + args=args, + kwargs=kwargs, + interface_type="llm", + on_span_started=_on_span_start, + on_span_finished=_on_span_finished, + api_key=_extract_api_key(instance), + provider=llm_provider, + ) + + @with_traced_module def traced_base_tool_invoke(langchain, pin, func, instance, args, kwargs): integration = langchain._datadog_integration @@ -1203,6 +1363,9 @@ def patch(): wrap("langchain", "embeddings.OpenAIEmbeddings.embed_documents", traced_embedding(langchain)) else: from langchain.chains.base import Chain # noqa:F401 + from langchain_core import messages # noqa: F401 + from langchain_core import output_parsers # noqa: F401 + from langchain_core import prompt_values # noqa: F401 from langchain_core.tools import BaseTool # noqa:F401 wrap("langchain_core", "language_models.llms.BaseLLM.generate", traced_llm_generate(langchain)) @@ -1225,6 +1388,23 @@ def patch(): ) wrap("langchain_core", "runnables.base.RunnableSequence.batch", traced_lcel_runnable_sequence(langchain)) wrap("langchain_core", "runnables.base.RunnableSequence.abatch", traced_lcel_runnable_sequence_async(langchain)) + + # streaming + wrap("langchain_core", "runnables.base.RunnableSequence.stream", traced_chain_stream(langchain)) + wrap("langchain_core", "runnables.base.RunnableSequence.astream", traced_chain_stream(langchain)) + wrap( + "langchain_core", + "language_models.chat_models.BaseChatModel.stream", + traced_chat_stream(langchain), + ) + wrap( + "langchain_core", + "language_models.chat_models.BaseChatModel.astream", + traced_chat_stream(langchain), + ) + wrap("langchain_core", "language_models.llms.BaseLLM.stream", traced_llm_stream(langchain)) + wrap("langchain_core", "language_models.llms.BaseLLM.astream", traced_llm_stream(langchain)) + wrap("langchain_core", "tools.BaseTool.invoke", traced_base_tool_invoke(langchain)) wrap("langchain_core", "tools.BaseTool.ainvoke", traced_base_tool_ainvoke(langchain)) if langchain_openai: @@ -1275,6 +1455,12 @@ def unpatch(): unwrap(langchain_core.runnables.base.RunnableSequence, "ainvoke") unwrap(langchain_core.runnables.base.RunnableSequence, "batch") unwrap(langchain_core.runnables.base.RunnableSequence, "abatch") + unwrap(langchain_core.runnables.base.RunnableSequence, "stream") + unwrap(langchain_core.runnables.base.RunnableSequence, "astream") + unwrap(langchain_core.language_models.chat_models.BaseChatModel, "stream") + unwrap(langchain_core.language_models.chat_models.BaseChatModel, "astream") + unwrap(langchain_core.language_models.llms.BaseLLM, "stream") + unwrap(langchain_core.language_models.llms.BaseLLM, "astream") unwrap(langchain_core.tools.BaseTool, "invoke") unwrap(langchain_core.tools.BaseTool, "ainvoke") if langchain_openai: diff --git a/ddtrace/contrib/internal/langchain/utils.py b/ddtrace/contrib/internal/langchain/utils.py new file mode 100644 index 00000000000..330c890875b --- /dev/null +++ b/ddtrace/contrib/internal/langchain/utils.py @@ -0,0 +1,79 @@ +import inspect +import sys + + +class BaseTracedLangChainStreamResponse: + def __init__(self, generator, integration, span, on_span_finish): + self._generator = generator + self._dd_integration = integration + self._dd_span = span + self._on_span_finish = on_span_finish + self._chunks = [] + + +class TracedLangchainStreamResponse(BaseTracedLangChainStreamResponse): + def __iter__(self): + try: + for chunk in self._generator.__iter__(): + self._chunks.append(chunk) + yield chunk + except Exception: + self._dd_span.set_exc_info(*sys.exc_info()) + self._dd_integration.metric(self._dd_span, "incr", "request.error", 1) + raise + finally: + self._on_span_finish(self._dd_span, self._chunks, error=bool(self._dd_span.error)) + self._dd_span.finish() + + +class TracedLangchainAsyncStreamResponse(BaseTracedLangChainStreamResponse): + async def __aiter__(self): + try: + async for chunk in self._generator.__aiter__(): + self._chunks.append(chunk) + yield chunk + except Exception: + self._dd_span.set_exc_info(*sys.exc_info()) + self._dd_integration.metric(self._dd_span, "incr", "request.error", 1) + raise + finally: + self._on_span_finish(self._dd_span, self._chunks, error=bool(self._dd_span.error)) + self._dd_span.finish() + + +def shared_stream( + integration, + pin, + func, + instance, + args, + kwargs, + interface_type, + on_span_started, + on_span_finished, + **extra_options, +): + options = { + "pin": pin, + "operation_id": f"{instance.__module__}.{instance.__class__.__name__}", + "interface_type": interface_type, + } + + options.update(extra_options) + + span = integration.trace(**options) + span.set_tag("langchain.request.stream", True) + on_span_started(span) + + try: + resp = func(*args, **kwargs) + cls = TracedLangchainAsyncStreamResponse if inspect.isasyncgen(resp) else TracedLangchainStreamResponse + + return cls(resp, integration, span, on_span_finished) + except Exception: + # error with the method call itself + span.set_exc_info(*sys.exc_info()) + span.finish() + integration.metric(span, "incr", "request.error", 1) + integration.metric(span, "dist", "request.duration", span.duration_ns) + raise diff --git a/releasenotes/notes/langchain-lcel-stream-calls-bff85c974a72cceb.yaml b/releasenotes/notes/langchain-lcel-stream-calls-bff85c974a72cceb.yaml new file mode 100644 index 00000000000..1b2ce64e68f --- /dev/null +++ b/releasenotes/notes/langchain-lcel-stream-calls-bff85c974a72cceb.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + langchain: Adds support for tracing ``stream`` calls on LCEL chains, chat completion models, or completion models. diff --git a/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_chat_streamed_response.txt b/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_chat_streamed_response.txt new file mode 100644 index 00000000000..56cc9060b39 --- /dev/null +++ b/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_chat_streamed_response.txt @@ -0,0 +1,18 @@ +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Python"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" is"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"\n\n"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"the"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" be"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"st!"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A75i9IDlLLFDI6n75COTQV8IsQ91c","object":"chat.completion.chunk","created":1726253613,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"length"}]} + +data: [DONE] + diff --git a/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_chat_streamed_response_json_output_parser.txt b/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_chat_streamed_response_json_output_parser.txt new file mode 100644 index 00000000000..32cbb9677f4 --- /dev/null +++ b/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_chat_streamed_response_json_output_parser.txt @@ -0,0 +1,36 @@ +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"Here"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":":\n\n"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"```"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"json"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"\n"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"{\n"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":" "},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":" \""},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"countries"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"\":"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"\"France"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":" is"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":" a"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":" country!\""},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"\n}"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-A87AG6I7sUCYtwCx9oIF0UK8vJLkl","object":"chat.completion.chunk","created":1726497528,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_992d1ea92d","choices":[{"index":0,"delta":{"content":"\n```"},"logprobs":null,"finish_reason":null}]} + +data: [DONE] + diff --git a/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_llm_streamed_response.txt b/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_llm_streamed_response.txt new file mode 100644 index 00000000000..e9be5532760 --- /dev/null +++ b/tests/contrib/langchain/cassettes/langchain_community/lcel_openai_llm_streamed_response.txt @@ -0,0 +1,14 @@ +data: {"id":"cmpl-A76h6hDhraI7ckuzJD4IqzfSVfqJV","object":"text_completion","created":1726257392,"choices":[{"text":"\n\n","index":0,"logprobs":null,"finish_reason":null}],"model":"gpt-3.5-turbo-instruct"} + +data: {"id":"cmpl-A76h6hDhraI7ckuzJD4IqzfSVfqJV","object":"text_completion","created":1726257392,"choices":[{"text":"Python","index":0,"logprobs":null,"finish_reason":null}],"model":"gpt-3.5-turbo-instruct"} + +data: {"id":"cmpl-A76h6hDhraI7ckuzJD4IqzfSVfqJV","object":"text_completion","created":1726257392,"choices":[{"text":" is","index":0,"logprobs":null,"finish_reason":null}],"model":"gpt-3.5-turbo-instruct"} + +data: {"id":"cmpl-A76h6hDhraI7ckuzJD4IqzfSVfqJV","object":"text_completion","created":1726257392,"choices":[{"text":" coo","index":0,"logprobs":null,"finish_reason":null}],"model":"gpt-3.5-turbo-instruct"} + +data: {"id":"cmpl-A76h6hDhraI7ckuzJD4IqzfSVfqJV","object":"text_completion","created":1726257392,"choices":[{"text":"l!","index":0,"logprobs":null,"finish_reason":null}],"model":"gpt-3.5-turbo-instruct"} + +data: {"id":"cmpl-A76h6hDhraI7ckuzJD4IqzfSVfqJV","object":"text_completion","created":1726257392,"choices":[{"text":"","index":0,"logprobs":null,"finish_reason":"length"}],"model":"gpt-3.5-turbo-instruct"} + +data: [DONE] + diff --git a/tests/contrib/langchain/conftest.py b/tests/contrib/langchain/conftest.py index b9debeca991..d8d41d923df 100644 --- a/tests/contrib/langchain/conftest.py +++ b/tests/contrib/langchain/conftest.py @@ -162,3 +162,81 @@ def langchain_pinecone(ddtrace_config_langchain, mock_logs, mock_metrics, langch yield langchain_pinecone except ImportError: yield + + +@pytest.fixture +def streamed_response_responder(): + # TODO (sam.brenner): clean this up a bit, make it more generic + try: + import importlib + import os + + import httpx + + class CustomTransport(httpx.BaseTransport): + def __init__(self, file: str): + super().__init__() + self.file = file + + def handle_request(self, request: httpx.Request) -> httpx.Response: + with open( + os.path.join(os.path.dirname(__file__), f"cassettes/langchain_community/{self.file}"), + "r", + encoding="utf-8", + ) as f: + content = f.read() + return httpx.Response(200, request=request, content=content) + + def responder(module, client_class_key, http_client_key, client_path: list[str], file: str): + clientModule = importlib.import_module(module) # openai, anthropic, etc. + client_class = getattr(clientModule, client_class_key) + client = client_class(**{http_client_key: httpx.Client(transport=CustomTransport(file=file))}) + + for prop in client_path: + client = getattr(client, prop) + + return client + + yield responder + + except ImportError: + yield + + +@pytest.fixture +def async_streamed_response_responder(): + # TODO (sam.brenner): clean this up a bit, make it more generic + try: + import importlib + import os + + import httpx + + class CustomTransport(httpx.AsyncBaseTransport): + def __init__(self, file: str): + super().__init__() + self.file = file + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + with open( + os.path.join(os.path.dirname(__file__), f"cassettes/langchain_community/{self.file}"), + "r", + encoding="utf-8", + ) as f: + content = f.read() + return httpx.Response(200, request=request, content=content) + + def responder(module, client_class_key, http_client_key, client_path: list[str], file: str): + clientModule = importlib.import_module(module) # openai, anthropic, etc. + client_class = getattr(clientModule, client_class_key) + client = client_class(**{http_client_key: httpx.AsyncClient(transport=CustomTransport(file=file))}) + + for prop in client_path: + client = getattr(client, prop) + + return client + + yield responder + + except ImportError: + yield diff --git a/tests/contrib/langchain/test_langchain_community.py b/tests/contrib/langchain/test_langchain_community.py index c075d043246..1a0d0acbc53 100644 --- a/tests/contrib/langchain/test_langchain_community.py +++ b/tests/contrib/langchain/test_langchain_community.py @@ -23,6 +23,8 @@ "meta.openai.request.logprobs", # langchain-openai llm call now includes logprobs as param "meta.error.stack", "meta.http.useragent", + "meta.langchain.request.openai-chat.parameters.logprobs", + "meta.langchain.request.openai.parameters.logprobs", "meta.langchain.request.openai.parameters.seed", # langchain-openai llm call now includes seed as param "meta.langchain.request.openai.parameters.logprobs", # langchain-openai llm call now includes seed as param "metrics.langchain.tokens.total_cost", # total_cost depends on if tiktoken is installed @@ -1276,6 +1278,149 @@ def test_faiss_vectorstore_retrieval(langchain_community, langchain_openai, requ retriever.invoke("What was the message of the last test query?") +@pytest.mark.snapshot(ignores=IGNORE_FIELDS) +def test_streamed_chain(langchain_core, langchain_openai, streamed_response_responder): + client = streamed_response_responder( + module="openai", + client_class_key="OpenAI", + http_client_key="http_client", + client_path=["chat", "completions"], + file="lcel_openai_chat_streamed_response.txt", + ) + + prompt = langchain_core.prompts.ChatPromptTemplate.from_messages( + [("system", "You are world class technical documentation writer."), ("user", "{input}")] + ) + llm = langchain_openai.ChatOpenAI(client=client) + parser = langchain_core.output_parsers.StrOutputParser() + + chain = prompt | llm | parser + for _ in chain.stream({"input": "how can langsmith help with testing?"}): + pass + + +@pytest.mark.snapshot(ignores=IGNORE_FIELDS) +def test_streamed_chat(langchain_openai, streamed_response_responder): + client = streamed_response_responder( + module="openai", + client_class_key="OpenAI", + http_client_key="http_client", + client_path=["chat", "completions"], + file="lcel_openai_chat_streamed_response.txt", + ) + model = langchain_openai.ChatOpenAI(client=client) + + for _ in model.stream(input="how can langsmith help with testing?"): + pass + + +@pytest.mark.snapshot(ignores=IGNORE_FIELDS) +def test_streamed_llm(langchain_openai, streamed_response_responder): + client = streamed_response_responder( + module="openai", + client_class_key="OpenAI", + http_client_key="http_client", + client_path=["completions"], + file="lcel_openai_llm_streamed_response.txt", + ) + + llm = langchain_openai.OpenAI(client=client) + + for _ in llm.stream(input="How do I write technical documentation?"): + pass + + +@pytest.mark.snapshot( + ignores=IGNORE_FIELDS, + token="tests.contrib.langchain.test_langchain_community.test_streamed_chain", +) +async def test_astreamed_chain(langchain_core, langchain_openai, async_streamed_response_responder): + client = async_streamed_response_responder( + module="openai", + client_class_key="AsyncOpenAI", + http_client_key="http_client", + client_path=["chat", "completions"], + file="lcel_openai_chat_streamed_response.txt", + ) + + prompt = langchain_core.prompts.ChatPromptTemplate.from_messages( + [("system", "You are world class technical documentation writer."), ("user", "{input}")] + ) + llm = langchain_openai.ChatOpenAI(async_client=client) + parser = langchain_core.output_parsers.StrOutputParser() + + chain = prompt | llm | parser + async for _ in chain.astream({"input": "how can langsmith help with testing?"}): + pass + + +@pytest.mark.snapshot( + ignores=IGNORE_FIELDS, + token="tests.contrib.langchain.test_langchain_community.test_streamed_chat", +) +async def test_astreamed_chat(langchain_openai, async_streamed_response_responder): + client = async_streamed_response_responder( + module="openai", + client_class_key="AsyncOpenAI", + http_client_key="http_client", + client_path=["chat", "completions"], + file="lcel_openai_chat_streamed_response.txt", + ) + + model = langchain_openai.ChatOpenAI(async_client=client) + + async for _ in model.astream(input="how can langsmith help with testing?"): + pass + + +@pytest.mark.snapshot( + ignores=IGNORE_FIELDS, + token="tests.contrib.langchain.test_langchain_community.test_streamed_llm", +) +async def test_astreamed_llm(langchain_openai, async_streamed_response_responder): + client = async_streamed_response_responder( + module="openai", + client_class_key="AsyncOpenAI", + http_client_key="http_client", + client_path=["completions"], + file="lcel_openai_llm_streamed_response.txt", + ) + + llm = langchain_openai.OpenAI(async_client=client) + + async for _ in llm.astream(input="How do I write technical documentation?"): + pass + + +@pytest.mark.snapshot(ignores=IGNORE_FIELDS) +def test_streamed_json_output_parser(langchain, langchain_core, langchain_openai, streamed_response_responder): + client = streamed_response_responder( + module="openai", + client_class_key="OpenAI", + http_client_key="http_client", + client_path=["chat", "completions"], + file="lcel_openai_chat_streamed_response_json_output_parser.txt", + ) + + model = langchain_openai.ChatOpenAI(model="gpt-4o", max_tokens=50, client=client) + parser = langchain_core.output_parsers.JsonOutputParser() + + chain = model | parser + inp = ( + "output a list of the country france their population in JSON format. " + 'Use a dict with an outer key of "countries" which contains a list of countries. ' + "Each country should have the key `name` and `population`" + ) + + messages = [ + langchain.schema.SystemMessage(content="You know everything about the world."), + langchain.schema.HumanMessage(content=inp), + ] + + for _ in chain.stream(input=messages): + pass + + @pytest.mark.snapshot( # tool description is generated differently is some langchain_core versions ignores=["meta.langchain.request.tool.description"], diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json new file mode 100644 index 00000000000..39efc4affeb --- /dev/null +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chain.json @@ -0,0 +1,61 @@ +[[ + { + "name": "langchain.request", + "service": "", + "resource": "langchain_core.runnables.base.RunnableSequence", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "66e8388f00000000", + "langchain.request.inputs.0.input": "how can langsmith help with testing?", + "langchain.request.stream": "True", + "langchain.request.type": "chain", + "langchain.response.content": "Python is\\n\\nthe best!", + "language": "python", + "runtime-id": "b3018660b0904cbfbe25aa695ac32c27" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 44915 + }, + "duration": 49336000, + "start": 1726494863602561000 + }, + { + "name": "langchain.request", + "service": "", + "resource": "langchain_openai.chat_models.base.ChatOpenAI", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "type": "", + "error": 0, + "meta": { + "langchain.request.api_key": "...key>", + "langchain.request.messages.0.0.content": "You are world class technical documentation writer.", + "langchain.request.messages.0.0.role": "SystemMessage", + "langchain.request.messages.0.1.content": "how can langsmith help with testing?", + "langchain.request.messages.0.1.role": "HumanMessage", + "langchain.request.openai-chat.parameters.model": "gpt-3.5-turbo", + "langchain.request.openai-chat.parameters.model_name": "gpt-3.5-turbo", + "langchain.request.openai-chat.parameters.n": "1", + "langchain.request.openai-chat.parameters.stream": "False", + "langchain.request.openai-chat.parameters.temperature": "0.7", + "langchain.request.provider": "openai-chat", + "langchain.request.stream": "True", + "langchain.request.type": "chat_model", + "langchain.response.content": "Python is\\n\\nthe best!" + }, + "metrics": { + "_dd.measured": 1 + }, + "duration": 42281000, + "start": 1726494863608857000 + }]] diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json new file mode 100644 index 00000000000..b32c4fe1cf6 --- /dev/null +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_chat.json @@ -0,0 +1,37 @@ +[[ + { + "name": "langchain.request", + "service": "", + "resource": "langchain_openai.chat_models.base.ChatOpenAI", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "66e8388f00000000", + "langchain.request.api_key": "...key>", + "langchain.request.messages.0.content": "how can langsmith help with testing?", + "langchain.request.openai-chat.parameters.model": "gpt-3.5-turbo", + "langchain.request.openai-chat.parameters.model_name": "gpt-3.5-turbo", + "langchain.request.openai-chat.parameters.n": "1", + "langchain.request.openai-chat.parameters.stream": "False", + "langchain.request.openai-chat.parameters.temperature": "0.7", + "langchain.request.provider": "openai-chat", + "langchain.request.stream": "True", + "langchain.request.type": "chat_model", + "langchain.response.content": "Python is\\n\\nthe best!", + "language": "python", + "runtime-id": "b3018660b0904cbfbe25aa695ac32c27" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 44915 + }, + "duration": 9681000, + "start": 1726494863731905000 + }]] diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json new file mode 100644 index 00000000000..9fc49c983e3 --- /dev/null +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_json_output_parser.json @@ -0,0 +1,63 @@ +[[ + { + "name": "langchain.request", + "service": "", + "resource": "langchain_core.runnables.base.RunnableSequence", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "66e868dd00000000", + "langchain.request.inputs.0": "content='You know everything about the world.'", + "langchain.request.inputs.1": "content='output a list of the country france their population in JSON format. Use a dict with an outer key of \"countries\" which ...", + "langchain.request.stream": "True", + "langchain.request.type": "chain", + "langchain.response.content": "{\"countries\": \"France is a country!\"}", + "language": "python", + "runtime-id": "4dfebe356bef4570b2d56e91c011025d" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 56307 + }, + "duration": 47006000, + "start": 1726507229333366000 + }, + { + "name": "langchain.request", + "service": "", + "resource": "langchain_openai.chat_models.base.ChatOpenAI", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "type": "", + "error": 0, + "meta": { + "langchain.request.api_key": "...key>", + "langchain.request.messages.0.content": "You know everything about the world.", + "langchain.request.messages.0.role": "SystemMessage", + "langchain.request.messages.1.content": "output a list of the country france their population in JSON format. Use a dict with an outer key of \"countries\" which contains ...", + "langchain.request.messages.1.role": "HumanMessage", + "langchain.request.openai-chat.parameters.max_tokens": "50", + "langchain.request.openai-chat.parameters.model": "gpt-4o", + "langchain.request.openai-chat.parameters.model_name": "gpt-4o", + "langchain.request.openai-chat.parameters.n": "1", + "langchain.request.openai-chat.parameters.stream": "False", + "langchain.request.openai-chat.parameters.temperature": "0.7", + "langchain.request.provider": "openai-chat", + "langchain.request.stream": "True", + "langchain.request.type": "chat_model", + "langchain.response.content": "Here:\\n\\n```json\\n{\\n \"countries\":\"France is a country!\"\\n}\\n```" + }, + "metrics": { + "_dd.measured": 1 + }, + "duration": 42208000, + "start": 1726507229337869000 + }]] diff --git a/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json new file mode 100644 index 00000000000..487f657e09a --- /dev/null +++ b/tests/snapshots/tests.contrib.langchain.test_langchain_community.test_streamed_llm.json @@ -0,0 +1,39 @@ +[[ + { + "name": "langchain.request", + "service": "", + "resource": "langchain_openai.llms.base.OpenAI", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "", + "error": 0, + "meta": { + "_dd.p.dm": "-0", + "_dd.p.tid": "66e8388f00000000", + "langchain.request.api_key": "...key>", + "langchain.request.openai.parameters.frequency_penalty": "0", + "langchain.request.openai.parameters.max_tokens": "256", + "langchain.request.openai.parameters.model_name": "gpt-3.5-turbo-instruct", + "langchain.request.openai.parameters.n": "1", + "langchain.request.openai.parameters.presence_penalty": "0", + "langchain.request.openai.parameters.temperature": "0.7", + "langchain.request.openai.parameters.top_p": "1", + "langchain.request.prompts.0": "How do I write technical documentation?", + "langchain.request.provider": "openai", + "langchain.request.stream": "True", + "langchain.request.type": "llm", + "langchain.response.content": "\\n\\nPython is cool!", + "language": "python", + "runtime-id": "b3018660b0904cbfbe25aa695ac32c27" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 44915 + }, + "duration": 9120000, + "start": 1726494863700015000 + }]]