From b34e3bdcfb3bed166ec6599ccb7913021044211b Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 23 Sep 2024 16:50:58 -0400 Subject: [PATCH 1/8] Use our own context manager --- ddtrace/contrib/internal/futures/threading.py | 20 +- ddtrace/contrib/trace_utils.py | 5 + ddtrace/llmobs/_constants.py | 2 + ddtrace/llmobs/_context.py | 61 ++++ ddtrace/llmobs/_integrations/base.py | 10 +- ddtrace/llmobs/_integrations/bedrock.py | 8 +- ddtrace/llmobs/_llmobs.py | 113 +++++--- ddtrace/llmobs/_trace_processor.py | 4 +- ddtrace/llmobs/_utils.py | 37 +-- ddtrace/propagation/http.py | 4 +- tests/llmobs/test_llmobs_service.py | 24 +- tests/llmobs/test_llmobs_trace_processor.py | 41 ++- tests/llmobs/test_propagation.py | 272 ++++++++++-------- 13 files changed, 357 insertions(+), 244 deletions(-) create mode 100644 ddtrace/llmobs/_context.py diff --git a/ddtrace/contrib/internal/futures/threading.py b/ddtrace/contrib/internal/futures/threading.py index deea68e2c1..bd8635f76c 100644 --- a/ddtrace/contrib/internal/futures/threading.py +++ b/ddtrace/contrib/internal/futures/threading.py @@ -1,6 +1,8 @@ from typing import Optional +from typing import Tuple import ddtrace +from ddtrace import config from ddtrace._trace.context import Context @@ -13,6 +15,12 @@ def _wrap_submit(func, args, kwargs): # DEV: Be sure to propagate a Context and not a Span since we are crossing thread boundaries current_ctx: Optional[Context] = ddtrace.tracer.current_trace_context() + llmobs_ctx: Optional[Context] = None + if config._llmobs_enabled: + from ddtrace.llmobs import LLMObs + + llmobs_ctx = LLMObs._instance.current_trace_context() + # The target function can be provided as a kwarg argument "fn" or the first positional argument self = args[0] if "fn" in kwargs: @@ -20,10 +28,10 @@ def _wrap_submit(func, args, kwargs): fn_args = args[1:] else: fn, fn_args = args[1], args[2:] - return func(self, _wrap_execution, current_ctx, fn, fn_args, kwargs) + return func(self, _wrap_execution, (current_ctx, llmobs_ctx), fn, fn_args, kwargs) -def _wrap_execution(ctx: Optional[Context], fn, args, kwargs): +def _wrap_execution(ctx: Tuple[Optional[Context], Optional[Context]], fn, args, kwargs): """ Intermediate target function that is executed in a new thread; it receives the original function with arguments and keyword @@ -31,6 +39,10 @@ def _wrap_execution(ctx: Optional[Context], fn, args, kwargs): provider sets the Active context in a thread local storage variable because it's outside the asynchronous loop. """ - if ctx is not None: - ddtrace.tracer.context_provider.activate(ctx) + if ctx[0] is not None: + ddtrace.tracer.context_provider.activate(ctx[0]) + if ctx[1] is not None and config._llmobs_enabled: + from ddtrace.llmobs import LLMObs + + LLMObs._instance._llmobs_context_provider.activate(ctx[1]) return fn(*args, **kwargs) diff --git a/ddtrace/contrib/trace_utils.py b/ddtrace/contrib/trace_utils.py index 3cc0cbdac4..dbc0ae167e 100644 --- a/ddtrace/contrib/trace_utils.py +++ b/ddtrace/contrib/trace_utils.py @@ -1,6 +1,7 @@ """ This module contains utility functions for writing ddtrace integrations. """ + from collections import deque import ipaddress import re @@ -573,6 +574,10 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None, # We have parsed a trace id from headers, and we do not already # have a context with the same trace id active tracer.context_provider.activate(context) + if config._llmobs_enabled: + from ddtrace.llmobs import LLMObs + + LLMObs._activate_distributed_headers(request_headers, context) def _flatten( diff --git a/ddtrace/llmobs/_constants.py b/ddtrace/llmobs/_constants.py index af022c5bc4..504bbf4c9f 100644 --- a/ddtrace/llmobs/_constants.py +++ b/ddtrace/llmobs/_constants.py @@ -44,3 +44,5 @@ DROPPED_IO_COLLECTION_ERROR = "dropped_io" DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]" + +ROOT_PARENT_ID = "undefined" diff --git a/ddtrace/llmobs/_context.py b/ddtrace/llmobs/_context.py new file mode 100644 index 0000000000..33768e4e4d --- /dev/null +++ b/ddtrace/llmobs/_context.py @@ -0,0 +1,61 @@ +import contextvars +from typing import Optional +from typing import Union + +from ddtrace._trace.context import Context +from ddtrace._trace.provider import DefaultContextProvider +from ddtrace._trace.span import Span +from ddtrace.ext import SpanTypes + + +ContextTypeValue = Optional[Union[Context, Span]] + + +_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar( + "datadog_llmobs_contextvar", + default=None, +) + + +class LLMObsContextProvider(DefaultContextProvider): + """Context provider that retrieves contexts from a context variable. + It is suitable for synchronous programming and for asynchronous executors + that support contextvars. + """ + + def __init__(self): + # type: () -> None + super(DefaultContextProvider, self).__init__() + _DD_LLMOBS_CONTEXTVAR.set(None) + + def _has_active_context(self): + # type: () -> bool + """Returns whether there is an active context in the current execution.""" + ctx = _DD_LLMOBS_CONTEXTVAR.get() + return ctx is not None + + def _update_active(self, span: Span) -> Optional[Span]: + """Updates the active LLMObs span. + The active span is updated to be the span's closest unfinished LLMObs ancestor span. + """ + if not span.finished: + return span + new_active: Optional[Span] = span + while new_active and new_active.finished: + new_active = new_active._parent + if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM: + break + self.activate(new_active) + return new_active + + def activate(self, ctx: ContextTypeValue) -> None: + """Makes the given context active in the current execution.""" + _DD_LLMOBS_CONTEXTVAR.set(ctx) + super(DefaultContextProvider, self).activate(ctx) + + def active(self) -> ContextTypeValue: + """Returns the active span or context for the current execution.""" + item = _DD_LLMOBS_CONTEXTVAR.get() + if isinstance(item, Span): + return self._update_active(item) + return item diff --git a/ddtrace/llmobs/_integrations/base.py b/ddtrace/llmobs/_integrations/base.py index c4186c50a5..6082c65430 100644 --- a/ddtrace/llmobs/_integrations/base.py +++ b/ddtrace/llmobs/_integrations/base.py @@ -16,11 +16,8 @@ from ddtrace.internal.dogstatsd import get_dogstatsd_client from ddtrace.internal.hostname import get_hostname from ddtrace.internal.utils.formats import asbool -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._llmobs import LLMObs from ddtrace.llmobs._log_writer import V2LogWriter -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.sampler import RateSampler from ddtrace.settings import IntegrationConfig @@ -127,12 +124,7 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k span.set_tag(SPAN_MEASURED_KEY) self._set_base_span_tags(span, **kwargs) if submit_to_llmobs and self.llmobs_enabled: - if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None: - # For non-distributed traces or spans in the first service of a distributed trace, - # The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now - # in these cases to avoid conflicting with the later propagated tags. - parent_id = _get_llmobs_parent_id(span) or "undefined" - span.set_tag_str(PARENT_ID_KEY, str(parent_id)) + LLMObs._instance._activate_llmobs_span(span) return span @classmethod diff --git a/ddtrace/llmobs/_integrations/bedrock.py b/ddtrace/llmobs/_integrations/bedrock.py index 82aa0ff6a0..e3741efabc 100644 --- a/ddtrace/llmobs/_integrations/bedrock.py +++ b/ddtrace/llmobs/_integrations/bedrock.py @@ -4,6 +4,7 @@ from ddtrace._trace.span import Span from ddtrace.internal.logger import get_logger +from ddtrace.llmobs import LLMObs from ddtrace.llmobs._constants import INPUT_MESSAGES from ddtrace.llmobs._constants import INPUT_TOKENS_METRIC_KEY from ddtrace.llmobs._constants import METADATA @@ -12,12 +13,9 @@ from ddtrace.llmobs._constants import MODEL_PROVIDER from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY from ddtrace.llmobs._integrations import BaseLLMIntegration -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import safe_json @@ -37,9 +35,7 @@ def llmobs_set_tags( """Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags.""" if not self.llmobs_enabled: return - if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None: - parent_id = _get_llmobs_parent_id(span) or "undefined" - span.set_tag(PARENT_ID_KEY, parent_id) + LLMObs._instance._activate_llmobs_span(span) parameters = {} if span.get_tag("bedrock.request.temperature"): parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index c8d07cec93..37948e354c 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -10,6 +10,7 @@ from ddtrace import Span from ddtrace import config from ddtrace import patch +from ddtrace._trace.context import Context from ddtrace.ext import SpanTypes from ddtrace.internal import atexit from ddtrace.internal import forksafe @@ -35,17 +36,16 @@ from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_VALUE from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS +from ddtrace.llmobs._context import LLMObsContextProvider from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor from ddtrace.llmobs._utils import AnnotationContext -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_ml_app from ddtrace.llmobs._utils import _get_session_id -from ddtrace.llmobs._utils import _inject_llmobs_parent_id from ddtrace.llmobs._utils import safe_json from ddtrace.llmobs._utils import validate_prompt from ddtrace.llmobs._writer import LLMObsEvalMetricWriter @@ -75,6 +75,7 @@ def __init__(self, tracer=None): super(LLMObs, self).__init__() self.tracer = tracer or ddtrace.tracer self._llmobs_span_writer = None + self._llmobs_context_provider = LLMObsContextProvider() self._llmobs_span_writer = LLMObsSpanWriter( is_agentless=config._llmobs_agentless_enabled, @@ -266,23 +267,46 @@ def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan """Returns a simple representation of a span to export its span and trace IDs. If no span is provided, the current active LLMObs-type span will be used. """ - if span: - try: - if span.span_type != SpanTypes.LLM: - log.warning("Span must be an LLMObs-generated span.") - return None - return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) - except (TypeError, AttributeError): - log.warning("Failed to export span. Span must be a valid Span object.") - return None - span = cls._instance.tracer.current_span() if span is None: - log.warning("No span provided and no active LLMObs-generated span found.") - return None - if span.span_type != SpanTypes.LLM: - log.warning("Span must be an LLMObs-generated span.") + span = cls._instance.current_span() + if span is None: + log.warning("No span provided and no active LLMObs-generated span found.") + return None + return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) + try: + if span.span_type != SpanTypes.LLM: + log.warning("Span must be an LLMObs-generated span.") + return None + return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) + except (TypeError, AttributeError): + log.warning("Failed to export span. Span must be a valid Span object.") return None - return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) + + def current_span(self) -> Optional[Span]: + """Returns the currently active LLMObs-generated span. + Note that there may be an active span represented by a context object + (i.e. a distributed trace) which will not be returned by this method. + """ + active = self._llmobs_context_provider.active() + return active if isinstance(active, Span) else None + + def current_trace_context(self) -> Optional[Context]: + """Returns the context for the current LLMObs trace.""" + active = self._llmobs_context_provider.active() + if isinstance(active, Context): + return active + elif isinstance(active, Span): + return active.context + return None + + def _activate_llmobs_span(self, span: Span) -> None: + """Propagate the llmobs parent span's ID as the new span's parent ID and activate the new span.""" + llmobs_parent = self._llmobs_context_provider.active() + if llmobs_parent: + span.set_tag_str(PARENT_ID_KEY, str(llmobs_parent.span_id)) + else: + span.set_tag_str(PARENT_ID_KEY, ROOT_PARENT_ID) + self._llmobs_context_provider.activate(span) def _start_span( self, @@ -296,6 +320,7 @@ def _start_span( if name is None: name = operation_kind span = self.tracer.trace(name, resource=operation_kind, span_type=SpanTypes.LLM) + self._activate_llmobs_span(span) span.set_tag_str(SPAN_KIND, operation_kind) if model_name is not None: span.set_tag_str(MODEL_NAME, model_name) @@ -307,12 +332,6 @@ def _start_span( if ml_app is None: ml_app = _get_ml_app(span) span.set_tag_str(ML_APP, ml_app) - if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None: - # For non-distributed traces or spans in the first service of a distributed trace, - # The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now - # in these cases to avoid conflicting with the later propagated tags. - parent_id = _get_llmobs_parent_id(span) or "undefined" - span.set_tag_str(PARENT_ID_KEY, str(parent_id)) return span @classmethod @@ -514,7 +533,7 @@ def annotate( such as `{prompt,completion,total}_tokens`. """ if span is None: - span = cls._instance.tracer.current_span() + span = cls._instance.current_span() if span is None: log.warning("No span provided and no active LLMObs-generated span found.") return @@ -792,6 +811,16 @@ def submit_evaluation( } ) + @classmethod + def _inject_llmobs_context(cls, request_headers: Dict[str, str]) -> Dict[str, str]: + active_ctx = cls._instance.current_trace_context() + if active_ctx is None: + parent_id = ROOT_PARENT_ID + else: + parent_id = str(active_ctx.span_id) + request_headers[PARENT_ID_KEY] = parent_id + return request_headers + @classmethod def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optional[Span] = None) -> Dict[str, str]: """Injects the span's distributed context into the given request headers.""" @@ -805,14 +834,38 @@ def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optio log.warning("request_headers must be a dictionary of string key-value pairs.") return request_headers if span is None: - span = cls._instance.tracer.current_span() + span = cls._instance.current_span() if span is None: log.warning("No span provided and no currently active span found.") return request_headers - _inject_llmobs_parent_id(span.context) + if not isinstance(span, Span): + log.warning("span must be a valid Span object. Distributed context will not be injected.") + return request_headers + cls._inject_llmobs_context(request_headers) HTTPPropagator.inject(span.context, request_headers) return request_headers + @classmethod + def _activate_llmobs_distributed_headers( + cls, request_headers: Dict[str, str], context: Optional[Context] = None + ) -> None: + if not context: + context = HTTPPropagator.extract(request_headers) + if not context.trace_id or not context.span_id: + log.warning("Failed to extract trace/span ID from request headers.") + return + if PARENT_ID_KEY not in request_headers: + log.warning("Failed to extract LLMObs parent ID from request headers.") + parent_id = request_headers.get(PARENT_ID_KEY) + if parent_id is None: + return + try: + parent_id = int(parent_id) + except ValueError: + return + llmobs_context = Context(trace_id=context.trace_id, span_id=parent_id) + cls._instance._llmobs_context_provider.activate(llmobs_context) + @classmethod def activate_distributed_headers(cls, request_headers: Dict[str, str]) -> None: """ @@ -827,12 +880,8 @@ def activate_distributed_headers(cls, request_headers: Dict[str, str]) -> None: ) return context = HTTPPropagator.extract(request_headers) - if context.trace_id is None or context.span_id is None: - log.warning("Failed to extract trace ID or span ID from request headers.") - return - if PROPAGATED_PARENT_ID_KEY not in context._meta: - log.warning("Failed to extract LLMObs parent ID from request headers.") cls._instance.tracer.context_provider.activate(context) + cls._instance._activate_llmobs_distributed_headers(request_headers, context) # initialize the default llmobs instance diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 5a654a8fb9..0d709d9148 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -27,10 +27,10 @@ from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_VALUE from ddtrace.llmobs._constants import PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import TAGS -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_ml_app from ddtrace.llmobs._utils import _get_session_id from ddtrace.llmobs._utils import _get_span_name @@ -100,7 +100,7 @@ def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: ml_app = _get_ml_app(span) span.set_tag_str(ML_APP, ml_app) - parent_id = str(_get_llmobs_parent_id(span) or "undefined") + parent_id = span.get_tag(PARENT_ID_KEY) or ROOT_PARENT_ID span._meta.pop(PARENT_ID_KEY, None) llmobs_span_event = { diff --git a/ddtrace/llmobs/_utils.py b/ddtrace/llmobs/_utils.py index 7dd17ea94f..59dd766224 100644 --- a/ddtrace/llmobs/_utils.py +++ b/ddtrace/llmobs/_utils.py @@ -3,7 +3,6 @@ from typing import Optional from typing import Union -import ddtrace from ddtrace import Span from ddtrace import config from ddtrace.ext import SpanTypes @@ -12,8 +11,6 @@ from ddtrace.llmobs._constants import LANGCHAIN_APM_SPAN_NAME from ddtrace.llmobs._constants import ML_APP from ddtrace.llmobs._constants import OPENAI_APM_SPAN_NAME -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._constants import SESSION_ID @@ -84,18 +81,6 @@ def _get_nearest_llmobs_ancestor(span: Span) -> Optional[Span]: return None -def _get_llmobs_parent_id(span: Span) -> Optional[str]: - """Return the span ID of the nearest LLMObs-type span in the span's ancestor tree. - In priority order: manually set parent ID tag, nearest LLMObs ancestor, local root's propagated parent ID tag. - """ - if span.get_tag(PARENT_ID_KEY): - return span.get_tag(PARENT_ID_KEY) - nearest_llmobs_ancestor = _get_nearest_llmobs_ancestor(span) - if nearest_llmobs_ancestor: - return str(nearest_llmobs_ancestor.span_id) - return span.get_tag(PROPAGATED_PARENT_ID_KEY) - - def _get_span_name(span: Span) -> str: if span.name in (LANGCHAIN_APM_SPAN_NAME, GEMINI_APM_SPAN_NAME) and span.resource != "": return span.resource @@ -120,10 +105,7 @@ def _get_ml_app(span: Span) -> str: def _get_session_id(span: Span) -> Optional[str]: - """ - Return the session ID for a given span, by checking the span's nearest LLMObs span ancestor. - Default to the span's trace ID. - """ + """Return the session ID for a given span, by checking the span's nearest LLMObs span ancestor.""" session_id = span.get_tag(SESSION_ID) if session_id: return session_id @@ -133,23 +115,6 @@ def _get_session_id(span: Span) -> Optional[str]: return session_id -def _inject_llmobs_parent_id(span_context): - """Inject the LLMObs parent ID into the span context for reconnecting distributed LLMObs traces.""" - span = ddtrace.tracer.current_span() - if span is None: - log.warning("No active span to inject LLMObs parent ID info.") - return - if span.context is not span_context: - log.warning("The current active span and span_context do not match. Not injecting LLMObs parent ID.") - return - - if span.span_type == SpanTypes.LLM: - llmobs_parent_id = str(span.span_id) - else: - llmobs_parent_id = _get_llmobs_parent_id(span) - span_context._meta[PROPAGATED_PARENT_ID_KEY] = llmobs_parent_id or "undefined" - - def _unserializable_default_repr(obj): default_repr = "[Unserializable object: {}]".format(repr(obj)) log.warning("I/O object is not JSON serializable. Defaulting to placeholder value instead.") diff --git a/ddtrace/propagation/http.py b/ddtrace/propagation/http.py index de3c2a9132..6293cf0204 100644 --- a/ddtrace/propagation/http.py +++ b/ddtrace/propagation/http.py @@ -1008,9 +1008,9 @@ def parent_call(): headers[_HTTP_BAGGAGE_PREFIX + key] = span_context._baggage[key] if config._llmobs_enabled: - from ddtrace.llmobs._utils import _inject_llmobs_parent_id + from ddtrace.llmobs import LLMObs - _inject_llmobs_parent_id(span_context) + LLMObs._inject_llmobs_context(headers) if PROPAGATION_STYLE_DATADOG in config._propagation_style_inject: _DatadogMultiHeader._inject(span_context, headers) diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index b49bc4298c..686a1eb656 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -23,7 +23,9 @@ from ddtrace.llmobs._constants import OUTPUT_DOCUMENTS from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_VALUE +from ddtrace.llmobs._constants import PARENT_ID_KEY from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING @@ -981,7 +983,7 @@ def test_export_span_no_specified_span_no_active_span_raises_warning(LLMObs, moc def test_export_span_active_span_not_llmobs_span_raises_warning(LLMObs, mock_logs): with LLMObs._instance.tracer.trace("non_llmobs_span"): LLMObs.export_span() - mock_logs.warning.assert_called_once_with("Span must be an LLMObs-generated span.") + mock_logs.warning.assert_called_once_with("No span provided and no active LLMObs-generated span found.") def test_export_span_no_specified_span_returns_exported_active_span(LLMObs): @@ -1358,15 +1360,15 @@ def test_inject_distributed_headers_span_calls_httppropagator_inject(LLMObs, moc with mock.patch("ddtrace.propagation.http.HTTPPropagator.inject") as mock_inject: LLMObs.inject_distributed_headers({}, span=span) assert mock_inject.call_count == 1 - mock_inject.assert_called_once_with(span.context, {}) + mock_inject.assert_called_once_with(span.context, {PARENT_ID_KEY: ROOT_PARENT_ID}) def test_inject_distributed_headers_current_active_span_injected(LLMObs, mock_logs): - span = LLMObs._instance.tracer.trace("test_span") + _ = LLMObs._instance.tracer.trace("test_span") with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.inject") as mock_inject: LLMObs.inject_distributed_headers({}, span=None) - assert mock_inject.call_count == 1 - mock_inject.assert_called_once_with(span.context, {}) + assert mock_inject.call_count == 0 + mock_logs.warning.assert_called_once_with("No span provided and no currently active span found.") def test_activate_distributed_headers_llmobs_disabled_does_nothing(LLMObs, mock_logs): @@ -1387,23 +1389,23 @@ def test_activate_distributed_headers_calls_httppropagator_extract(LLMObs, mock_ def test_activate_distributed_headers_no_trace_id_does_nothing(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - mock_extract.return_value = Context(span_id="123", meta={PROPAGATED_PARENT_ID_KEY: "123"}) + mock_extract.return_value = Context(span_id=123, meta={PROPAGATED_PARENT_ID_KEY: "123"}) LLMObs.activate_distributed_headers({}) assert mock_extract.call_count == 1 - mock_logs.warning.assert_called_once_with("Failed to extract trace ID or span ID from request headers.") + mock_logs.warning.assert_called_once_with("Failed to extract trace/span ID from request headers.") def test_activate_distributed_headers_no_span_id_does_nothing(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - mock_extract.return_value = Context(trace_id="123", meta={PROPAGATED_PARENT_ID_KEY: "123"}) + mock_extract.return_value = Context(trace_id=123, meta={PROPAGATED_PARENT_ID_KEY: "123"}) LLMObs.activate_distributed_headers({}) assert mock_extract.call_count == 1 - mock_logs.warning.assert_called_once_with("Failed to extract trace ID or span ID from request headers.") + mock_logs.warning.assert_called_once_with("Failed to extract trace/span ID from request headers.") def test_activate_distributed_headers_no_llmobs_parent_id_does_nothing(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - dummy_context = Context(trace_id="123", span_id="456") + dummy_context = Context(trace_id=123, span_id=456) mock_extract.return_value = dummy_context with mock.patch("ddtrace.llmobs.LLMObs._instance.tracer.context_provider.activate") as mock_activate: LLMObs.activate_distributed_headers({}) @@ -1414,7 +1416,7 @@ def test_activate_distributed_headers_no_llmobs_parent_id_does_nothing(LLMObs, m def test_activate_distributed_headers_activates_context(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - dummy_context = Context(trace_id="123", span_id="456", meta={PROPAGATED_PARENT_ID_KEY: "789"}) + dummy_context = Context(trace_id=123, span_id=456, meta={PROPAGATED_PARENT_ID_KEY: "789"}) mock_extract.return_value = dummy_context with mock.patch("ddtrace.llmobs.LLMObs._instance.tracer.context_provider.activate") as mock_activate: LLMObs.activate_distributed_headers({}) diff --git a/tests/llmobs/test_llmobs_trace_processor.py b/tests/llmobs/test_llmobs_trace_processor.py index c0a199391d..0849e8526c 100644 --- a/tests/llmobs/test_llmobs_trace_processor.py +++ b/tests/llmobs/test_llmobs_trace_processor.py @@ -17,10 +17,11 @@ from ddtrace.llmobs._constants import MODEL_PROVIDER from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_VALUE +from ddtrace.llmobs._constants import PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_session_id from tests.llmobs._utils import _expected_llmobs_llm_span_event from tests.utils import DummyTracer @@ -83,35 +84,25 @@ def test_processor_only_creates_llmobs_span_event(): with dummy_tracer.trace("root_llm_span", span_type=SpanTypes.LLM) as root_span: root_span.set_tag_str(SPAN_KIND, "llm") with dummy_tracer.trace("child_span") as child_span: - with dummy_tracer.trace("llm_span", span_type=SpanTypes.LLM) as grandchild_span: - grandchild_span.set_tag_str(SPAN_KIND, "llm") - trace = [root_span, child_span, grandchild_span] - expected_grandchild_llmobs_span = _expected_llmobs_llm_span_event(grandchild_span, "llm") - expected_grandchild_llmobs_span["parent_id"] = str(root_span.span_id) + pass + trace = [root_span, child_span] trace_filter.process_trace(trace) - assert mock_llmobs_span_writer.enqueue.call_count == 2 - mock_llmobs_span_writer.assert_has_calls( - [ - mock.call.enqueue(_expected_llmobs_llm_span_event(root_span, "llm")), - mock.call.enqueue(expected_grandchild_llmobs_span), - ] - ) + assert mock_llmobs_span_writer.enqueue.call_count == 1 + mock_llmobs_span_writer.assert_has_calls([mock.call.enqueue(_expected_llmobs_llm_span_event(root_span, "llm"))]) -def test_set_correct_parent_id(): +def test_set_correct_parent_id(LLMObs): """Test that the parent_id is set as the span_id of the nearest LLMObs span in the span's ancestor tree.""" - dummy_tracer = DummyTracer() - with dummy_tracer.trace("root"): - with dummy_tracer.trace("llm_span", span_type=SpanTypes.LLM) as llm_span: + with LLMObs._instance.tracer.trace("root"): + with LLMObs.workflow("llmobs span") as llm_span: pass - assert _get_llmobs_parent_id(llm_span) is None - with dummy_tracer.trace("root_llm_span", span_type=SpanTypes.LLM) as root_span: - with dummy_tracer.trace("child_span") as child_span: - with dummy_tracer.trace("llm_span", span_type=SpanTypes.LLM) as grandchild_span: - pass - assert _get_llmobs_parent_id(root_span) is None - assert _get_llmobs_parent_id(child_span) == str(root_span.span_id) - assert _get_llmobs_parent_id(grandchild_span) == str(root_span.span_id) + assert llm_span.get_tag(PARENT_ID_KEY) is None + with LLMObs.workflow("root llmobs span") as root_span: + assert root_span.get_tag(PARENT_ID_KEY) == ROOT_PARENT_ID + with LLMObs._instance.tracer.trace("child_span") as child_span: + assert child_span.get_tag(PARENT_ID_KEY) is None + with LLMObs.task("llmobs span") as grandchild_span: + assert grandchild_span.get_tag(PARENT_ID_KEY) == str(root_span.span_id) def test_propagate_session_id_from_ancestors(): diff --git a/tests/llmobs/test_propagation.py b/tests/llmobs/test_propagation.py index d892c6b98a..39c1b99c00 100644 --- a/tests/llmobs/test_propagation.py +++ b/tests/llmobs/test_propagation.py @@ -1,55 +1,55 @@ import json import os -from ddtrace.ext import SpanTypes -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY -from ddtrace.llmobs._utils import _get_llmobs_parent_id -from ddtrace.llmobs._utils import _inject_llmobs_parent_id -from ddtrace.propagation.http import HTTPPropagator -from tests.utils import DummyTracer +import pytest + +from ddtrace.llmobs._constants import PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID +from ddtrace.contrib.internal.futures.patch import patch as patch_futures +from ddtrace.contrib.internal.futures.patch import unpatch as unpatch_futures -def test_inject_llmobs_parent_id_no_llmobs_span(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span"): - with dummy_tracer.trace("Non-LLMObs span") as child_span: - _inject_llmobs_parent_id(child_span.context) - assert child_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == "undefined" +def test_inject_llmobs_parent_id_no_llmobs_span(LLMObs): + request_headers = {} + with LLMObs._instance.tracer.trace("Non-LLMObs span"): + with LLMObs._instance.tracer.trace("Non-LLMObs span"): + LLMObs._inject_llmobs_context(request_headers) + assert request_headers.get(PARENT_ID_KEY) == ROOT_PARENT_ID -def test_inject_llmobs_parent_id_simple(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: - _inject_llmobs_parent_id(root_span.context) - assert root_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(root_span.span_id) +def test_inject_llmobs_parent_id_simple(LLMObs): + request_headers = {} + with LLMObs.workflow("LLMObs span") as root_span: + LLMObs._inject_llmobs_context(request_headers) + assert request_headers.get(PARENT_ID_KEY) == str(root_span.span_id) -def test_inject_llmobs_parent_id_nested_llmobs_non_llmobs(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: - with dummy_tracer.trace("Non-LLMObs span") as child_span: - _inject_llmobs_parent_id(child_span.context) - assert child_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(root_span.span_id) +def test_inject_llmobs_parent_id_nested_llmobs_non_llmobs(LLMObs): + request_headers = {} + with LLMObs.workflow("LLMObs span") as root_span: + with LLMObs._instance.tracer.trace("Non-LLMObs span"): + LLMObs._inject_llmobs_context(request_headers) + assert request_headers.get(PARENT_ID_KEY) == str(root_span.span_id) -def test_inject_llmobs_parent_id_non_llmobs_root_span(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span"): - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as child_span: - _inject_llmobs_parent_id(child_span.context) - assert child_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(child_span.span_id) +def test_inject_llmobs_parent_id_non_llmobs_root_span(LLMObs): + request_headers = {} + with LLMObs._instance.tracer.trace("Non-LLMObs span"): + with LLMObs.workflow("LLMObs span") as child_span: + LLMObs._inject_llmobs_context(request_headers) + assert request_headers.get(PARENT_ID_KEY) == str(child_span.span_id) -def test_inject_llmobs_parent_id_nested_llmobs_spans(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("LLMObs child span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("Last LLMObs child span", span_type=SpanTypes.LLM) as last_llmobs_span: - _inject_llmobs_parent_id(last_llmobs_span.context) - assert last_llmobs_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(last_llmobs_span.span_id) +def test_inject_llmobs_parent_id_nested_llmobs_spans(LLMObs): + request_headers = {} + with LLMObs.workflow("LLMObs span"): + with LLMObs.workflow("LLMObs child span"): + with LLMObs.workflow("Last LLMObs child span") as last_llmobs_span: + LLMObs._inject_llmobs_context(request_headers) + assert request_headers.get(PARENT_ID_KEY) == str(last_llmobs_span.span_id) -def test_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess): +def test_propagate_correct_llmobs_parent_id_simple(ddtrace_run_python_code_in_subprocess, LLMObs): """Test that the correct LLMObs parent ID is propagated in the headers in a simple distributed scenario. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a LLMObs span. @@ -60,32 +60,30 @@ def test_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess from ddtrace import tracer from ddtrace.ext import SpanTypes +from ddtrace.llmobs import LLMObs from ddtrace.propagation.http import HTTPPropagator -with tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: - with tracer.trace("Non-LLMObs span") as child_span: +with LLMObs.workflow("LLMObs span") as root_span: + with tracer.trace("Non-LLMObs span"): headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} - HTTPPropagator.inject(child_span.context, headers) + LLMObs.inject_distributed_headers(headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) - context = HTTPPropagator.extract(headers) - dummy_tracer = DummyTracer() - dummy_tracer.context_provider.activate(context) - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as span: + LLMObs.activate_distributed_headers(headers) + with LLMObs.workflow("LLMObs span") as span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span.get_tag(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess): +def test_propagate_llmobs_parent_id_complex(ddtrace_run_python_code_in_subprocess, LLMObs): """Test that the correct LLMObs parent ID is propagated in the headers in a more complex trace. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a non-LLMObs local root span and a LLMObs child span. @@ -96,34 +94,32 @@ def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess): from ddtrace import tracer from ddtrace.ext import SpanTypes +from ddtrace.llmobs import LLMObs from ddtrace.propagation.http import HTTPPropagator -with tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: - with tracer.trace("Non-LLMObs span") as child_span: +with LLMObs.workflow("LLMObs span") as root_span: + with tracer.trace("Non-LLMObs span"): headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} - HTTPPropagator.inject(child_span.context, headers) + LLMObs.inject_distributed_headers(headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) - context = HTTPPropagator.extract(headers) - dummy_tracer = DummyTracer() - dummy_tracer.context_provider.activate(context) - with dummy_tracer.trace("Non-LLMObs span") as span: - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as llm_span: + LLMObs.activate_distributed_headers(headers) + with LLMObs._instance.tracer.trace("Non-LLMObs span") as span: + with LLMObs.workflow("LLMObs span") as llm_span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] - assert _get_llmobs_parent_id(llm_span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span.get_tag(PARENT_ID_KEY) is None + assert llm_span.get_tag(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(run_python_code_in_subprocess): +def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(ddtrace_run_python_code_in_subprocess, LLMObs): """Test that the correct LLMObs parent ID (None) is extracted from the headers in a simple distributed scenario. Service A (subprocess) has spans, but none are LLMObs spans. Service B (outside subprocess) has a LLMObs span. @@ -133,63 +129,59 @@ def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(run_python_code_in_su import json from ddtrace import tracer +from ddtrace.llmobs import LLMObs from ddtrace.propagation.http import HTTPPropagator -with tracer.trace("Non-LLMObs span") as root_span: +with tracer.trace("Non-LLMObs span") as span: headers = {} - HTTPPropagator.inject(root_span.context, headers) + LLMObs.inject_distributed_headers(headers) + HTTPPropagator.inject(span.context, headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) - assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) - context = HTTPPropagator.extract(headers) - dummy_tracer = DummyTracer() - dummy_tracer.context_provider.activate(context) - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as span: - assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == "undefined" + LLMObs.activate_distributed_headers(headers) + with LLMObs.workflow("LLMObs span") as span: + assert str(span.parent_id) == headers.get("x-datadog-parent-id") + assert span.get_tag(PARENT_ID_KEY) == ROOT_PARENT_ID def test_inject_distributed_headers_simple(LLMObs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: + with LLMObs.workflow("LLMObs span") as root_span: request_headers = LLMObs.inject_distributed_headers({}, span=root_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert PARENT_ID_KEY in request_headers def test_inject_distributed_headers_nested_llmobs_non_llmobs(LLMObs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("Non-LLMObs span") as child_span: + with LLMObs.workflow("LLMObs span"): + with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span: request_headers = LLMObs.inject_distributed_headers({}, span=child_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert PARENT_ID_KEY in request_headers def test_inject_distributed_headers_non_llmobs_root_span(LLMObs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span"): - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as child_span: + with LLMObs._instance.tracer.trace("Non-LLMObs span"): + with LLMObs.workflow("LLMObs span") as child_span: request_headers = LLMObs.inject_distributed_headers({}, span=child_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert PARENT_ID_KEY in request_headers def test_inject_distributed_headers_nested_llmobs_spans(LLMObs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("LLMObs child span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("Last LLMObs child span", span_type=SpanTypes.LLM) as last_llmobs_span: + with LLMObs.workflow("LLMObs span"): + with LLMObs.workflow("LLMObs child span"): + with LLMObs.workflow("LLMObs grandchild span") as last_llmobs_span: request_headers = LLMObs.inject_distributed_headers({}, span=last_llmobs_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert PARENT_ID_KEY in request_headers -def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess, LLMObs): +def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( + ddtrace_run_python_code_in_subprocess, LLMObs +): """Test that the correct LLMObs parent ID is propagated in the headers in a simple distributed scenario. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a LLMObs span. @@ -202,8 +194,6 @@ def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( from ddtrace.ext import SpanTypes from ddtrace.llmobs import LLMObs -LLMObs.enable(ml_app="test-app", api_key="") - with LLMObs.workflow("LLMObs span") as root_span: with tracer.trace("Non-LLMObs span") as child_span: headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} @@ -212,9 +202,8 @@ def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) @@ -222,14 +211,14 @@ def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( LLMObs.activate_distributed_headers(headers) with LLMObs.workflow("LLMObs span") as span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span.get_tag(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_activate_distributed_headers_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess, LLMObs): +def test_activate_distributed_headers_propagate_llmobs_parent_id_complex(ddtrace_run_python_code_in_subprocess, LLMObs): """Test that the correct LLMObs parent ID is propagated in the headers in a more complex trace. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a non-LLMObs local root span and a LLMObs child span. - Both of service B's spans should have the same LLMObs parent ID (Root LLMObs span from service A). + Service B's LLMObs span should have the LLMObs parent ID (Root LLMObs span from service A). """ code = """ import json @@ -238,33 +227,31 @@ def test_activate_distributed_headers_propagate_llmobs_parent_id_complex(run_pyt from ddtrace.ext import SpanTypes from ddtrace.llmobs import LLMObs -LLMObs.enable(ml_app="test-app", api_key="") - with LLMObs.workflow("LLMObs span") as root_span: with tracer.trace("Non-LLMObs span") as child_span: headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} - headers = LLMObs.inject_distributed_headers(headers, span=child_span) + headers = LLMObs.inject_distributed_headers(headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) LLMObs.activate_distributed_headers(headers) - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span") as span: + with LLMObs._instance.tracer.trace("Non-LLMObs span") as span: with LLMObs.llm(model_name="llm_model", name="LLMObs span") as llm_span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] - assert _get_llmobs_parent_id(llm_span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span.get_tag(PARENT_ID_KEY) is None + assert llm_span.get_tag(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_python_code_in_subprocess, LLMObs): +def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans( + ddtrace_run_python_code_in_subprocess, LLMObs +): """Test that the correct LLMObs parent ID (None) is extracted from the headers in a simple distributed scenario. Service A (subprocess) has spans, but none are LLMObs spans. Service B (outside subprocess) has a LLMObs span. @@ -276,8 +263,6 @@ def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_ from ddtrace import tracer from ddtrace.llmobs import LLMObs -LLMObs.enable(ml_app="test-app", api_key="") - with tracer.trace("Non-LLMObs span") as root_span: headers = {} headers = LLMObs.inject_distributed_headers(headers, span=root_span) @@ -285,9 +270,8 @@ def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_ print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) @@ -295,4 +279,58 @@ def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_ LLMObs.activate_distributed_headers(headers) with LLMObs.task("LLMObs span") as span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == "undefined" + assert span.get_tag(PARENT_ID_KEY) == ROOT_PARENT_ID + + +@pytest.mark.parametrize("ddtrace_global_config", [dict(_llmobs_enabled=True, _llmobs_ml_app="test_app_name")]) +def test_threading_submit_propagation(LLMObs, mock_llmobs_span_writer, ddtrace_global_config): + # Assert the threading patch propagates the llmobs tracing context if llmobs is enabled. + patch_futures() + import concurrent.futures + + def fn(): + with LLMObs.task("executor.thread"): + return 42 + + with LLMObs.workflow("main.thread"): + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(fn) + result = future.result() + assert result == 42 + assert mock_llmobs_span_writer.enqueue.call_count == 2 + main_thread_span, executor_thread_span = None, None + for span in mock_llmobs_span_writer.enqueue.call_args_list: + if span[0][0]["name"] == "main.thread": + main_thread_span = span[0][0] + else: + executor_thread_span = span[0][0] + assert main_thread_span["parent_id"] == ROOT_PARENT_ID + assert executor_thread_span["parent_id"] == main_thread_span["span_id"] + unpatch_futures() + + +@pytest.mark.parametrize("ddtrace_global_config", [dict(_llmobs_enabled=True, _llmobs_ml_app="test_app_name")]) +def test_threading_map_propagation(LLMObs, mock_llmobs_span_writer, ddtrace_global_config): + # Assert the threading patch propagates the llmobs tracing context if llmobs is enabled. + patch_futures() + import concurrent.futures + + def fn(value): + with LLMObs.task("executor.thread"): + return value + + with LLMObs.workflow("main.thread"): + with concurrent.futures.ThreadPoolExecutor() as executor: + _ = executor.map(fn, (1, 2)) + assert mock_llmobs_span_writer.enqueue.call_count == 3 + main_thread_span = None + executor_thread_spans = [] + for span in mock_llmobs_span_writer.enqueue.call_args_list: + if span[0][0]["name"] == "main.thread": + main_thread_span = span[0][0] + else: + executor_thread_spans.append(span[0][0]) + assert main_thread_span["parent_id"] == ROOT_PARENT_ID + assert executor_thread_spans[0]["parent_id"] == main_thread_span["span_id"] + assert executor_thread_spans[1]["parent_id"] == main_thread_span["span_id"] + unpatch_futures() From c99c0bce44d0cc07e46308cc109164637a2d0679 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 23 Sep 2024 17:29:28 -0400 Subject: [PATCH 2/8] Fix tests, refactor trace_utils --- ddtrace/contrib/trace_utils.py | 59 ++++++++++++++++---------------- tests/llmobs/test_propagation.py | 16 ++++----- tests/tracer/test_propagation.py | 12 +++---- 3 files changed, 44 insertions(+), 43 deletions(-) diff --git a/ddtrace/contrib/trace_utils.py b/ddtrace/contrib/trace_utils.py index dbc0ae167e..f451becd1c 100644 --- a/ddtrace/contrib/trace_utils.py +++ b/ddtrace/contrib/trace_utils.py @@ -546,38 +546,39 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None, """ if override is False: return None + if not override and not (int_config and distributed_tracing_enabled(int_config)): + return None - if override or (int_config and distributed_tracing_enabled(int_config)): - context = HTTPPropagator.extract(request_headers) - - # Only need to activate the new context if something was propagated - if not context.trace_id: - return None - - # Do not reactivate a context with the same trace id - # DEV: An example could be nested web frameworks, when one layer already - # parsed request headers and activated them. - # - # Example:: - # - # app = Flask(__name__) # Traced via Flask instrumentation - # app = DDWSGIMiddleware(app) # Extra layer on top for WSGI - current_context = tracer.current_trace_context() - if current_context and current_context.trace_id == context.trace_id: - log.debug( - "will not activate extracted Context(trace_id=%r, span_id=%r), a context with that trace id is already active", # noqa: E501 - context.trace_id, - context.span_id, - ) - return None + context = HTTPPropagator.extract(request_headers) + + # Only need to activate the new context if something was propagated + if not context.trace_id: + return None + + # Do not reactivate a context with the same trace id + # DEV: An example could be nested web frameworks, when one layer already + # parsed request headers and activated them. + # + # Example:: + # + # app = Flask(__name__) # Traced via Flask instrumentation + # app = DDWSGIMiddleware(app) # Extra layer on top for WSGI + current_context = tracer.current_trace_context() + if current_context and current_context.trace_id == context.trace_id: + log.debug( + "will not activate extracted Context(trace_id=%r, span_id=%r), a context with that trace id is already active", # noqa: E501 + context.trace_id, + context.span_id, + ) + return None - # We have parsed a trace id from headers, and we do not already - # have a context with the same trace id active - tracer.context_provider.activate(context) - if config._llmobs_enabled: - from ddtrace.llmobs import LLMObs + # We have parsed a trace id from headers, and we do not already + # have a context with the same trace id active + tracer.context_provider.activate(context) + if config._llmobs_enabled: + from ddtrace.llmobs import LLMObs - LLMObs._activate_distributed_headers(request_headers, context) + LLMObs._activate_llmobs_distributed_headers(request_headers, context) def _flatten( diff --git a/tests/llmobs/test_propagation.py b/tests/llmobs/test_propagation.py index 39c1b99c00..d9024927ec 100644 --- a/tests/llmobs/test_propagation.py +++ b/tests/llmobs/test_propagation.py @@ -3,10 +3,10 @@ import pytest -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.contrib.internal.futures.patch import patch as patch_futures from ddtrace.contrib.internal.futures.patch import unpatch as unpatch_futures +from ddtrace.llmobs._constants import PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID def test_inject_llmobs_parent_id_no_llmobs_span(LLMObs): @@ -71,7 +71,7 @@ def test_propagate_correct_llmobs_parent_id_simple(ddtrace_run_python_code_in_su print(json.dumps(headers)) """ env = os.environ.copy() - env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) @@ -105,7 +105,7 @@ def test_propagate_llmobs_parent_id_complex(ddtrace_run_python_code_in_subproces print(json.dumps(headers)) """ env = os.environ.copy() - env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) @@ -140,7 +140,7 @@ def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(ddtrace_run_python_co print(json.dumps(headers)) """ env = os.environ.copy() - env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) @@ -202,7 +202,7 @@ def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( print(json.dumps(headers)) """ env = os.environ.copy() - env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) @@ -235,7 +235,7 @@ def test_activate_distributed_headers_propagate_llmobs_parent_id_complex(ddtrace print(json.dumps(headers)) """ env = os.environ.copy() - env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) @@ -270,7 +270,7 @@ def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans( print(json.dumps(headers)) """ env = os.environ.copy() - env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1"}) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) assert stderr == b"", (stdout, stderr) diff --git a/tests/tracer/test_propagation.py b/tests/tracer/test_propagation.py index ca3341d6a8..bfac0b3d0f 100644 --- a/tests/tracer/test_propagation.py +++ b/tests/tracer/test_propagation.py @@ -3021,22 +3021,22 @@ def test_DD_TRACE_PROPAGATION_STYLE_INJECT_overrides_DD_TRACE_PROPAGATION_STYLE( def test_llmobs_enabled_injects_llmobs_parent_id(): with override_global_config(dict(_llmobs_enabled=True)): - with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject: + with mock.patch("ddtrace.llmobs.LLMObs") as mock_llmobs: context = Context(trace_id=1, span_id=2) HTTPPropagator.inject(context, {}) - mock_llmobs_inject.assert_called_once_with(context) + mock_llmobs._inject_llmobs_context.assert_called_once_with({}) def test_llmobs_disabled_does_not_inject_parent_id(): with override_global_config(dict(_llmobs_enabled=False)): - with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject: + with mock.patch("ddtrace.llmobs.LLMObs") as mock_llmobs: context = Context(trace_id=1, span_id=2) HTTPPropagator.inject(context, {}) - mock_llmobs_inject.assert_not_called() + mock_llmobs._inject_llmobs_context.assert_not_called() def test_llmobs_parent_id_not_injected_by_default(): - with mock.patch("ddtrace.llmobs._utils._inject_llmobs_parent_id") as mock_llmobs_inject: + with mock.patch("ddtrace.llmobs.LLMObs") as mock_llmobs: context = Context(trace_id=1, span_id=2) HTTPPropagator.inject(context, {}) - mock_llmobs_inject.assert_not_called() + mock_llmobs._inject_llmobs_context.assert_not_called() From 191a0d078e3b5adfe9625d89815c70bb5bd21374 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Mon, 23 Sep 2024 18:09:14 -0400 Subject: [PATCH 3/8] Remove propagated parent ID key --- ddtrace/llmobs/_constants.py | 1 - tests/llmobs/test_llmobs_service.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ddtrace/llmobs/_constants.py b/ddtrace/llmobs/_constants.py index 504bbf4c9f..8b25fcc7a5 100644 --- a/ddtrace/llmobs/_constants.py +++ b/ddtrace/llmobs/_constants.py @@ -3,7 +3,6 @@ METADATA = "_ml_obs.meta.metadata" METRICS = "_ml_obs.metrics" ML_APP = "_ml_obs.meta.ml_app" -PROPAGATED_PARENT_ID_KEY = "_dd.p.llmobs_parent_id" PARENT_ID_KEY = "_ml_obs.llmobs_parent_id" TAGS = "_ml_obs.tags" diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 686a1eb656..7954a3dc19 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1389,7 +1389,7 @@ def test_activate_distributed_headers_calls_httppropagator_extract(LLMObs, mock_ def test_activate_distributed_headers_no_trace_id_does_nothing(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - mock_extract.return_value = Context(span_id=123, meta={PROPAGATED_PARENT_ID_KEY: "123"}) + mock_extract.return_value = Context(span_id=123) LLMObs.activate_distributed_headers({}) assert mock_extract.call_count == 1 mock_logs.warning.assert_called_once_with("Failed to extract trace/span ID from request headers.") @@ -1397,7 +1397,7 @@ def test_activate_distributed_headers_no_trace_id_does_nothing(LLMObs, mock_logs def test_activate_distributed_headers_no_span_id_does_nothing(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - mock_extract.return_value = Context(trace_id=123, meta={PROPAGATED_PARENT_ID_KEY: "123"}) + mock_extract.return_value = Context(trace_id=123) LLMObs.activate_distributed_headers({}) assert mock_extract.call_count == 1 mock_logs.warning.assert_called_once_with("Failed to extract trace/span ID from request headers.") @@ -1416,7 +1416,7 @@ def test_activate_distributed_headers_no_llmobs_parent_id_does_nothing(LLMObs, m def test_activate_distributed_headers_activates_context(LLMObs, mock_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - dummy_context = Context(trace_id=123, span_id=456, meta={PROPAGATED_PARENT_ID_KEY: "789"}) + dummy_context = Context(trace_id=123, span_id=456) mock_extract.return_value = dummy_context with mock.patch("ddtrace.llmobs.LLMObs._instance.tracer.context_provider.activate") as mock_activate: LLMObs.activate_distributed_headers({}) From bd9b5390104fc111b1f1852146ec255fcf624237 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Tue, 24 Sep 2024 14:16:34 -0400 Subject: [PATCH 4/8] Fix tracer dummy tests, removed propagation constant --- tests/llmobs/test_llmobs_service.py | 1 - tests/tracer/test_propagation.py | 11 ++++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 42cbdf7925..485a8f7340 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -24,7 +24,6 @@ from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_VALUE from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND diff --git a/tests/tracer/test_propagation.py b/tests/tracer/test_propagation.py index bfac0b3d0f..ffa161ad83 100644 --- a/tests/tracer/test_propagation.py +++ b/tests/tracer/test_propagation.py @@ -3022,21 +3022,18 @@ def test_DD_TRACE_PROPAGATION_STYLE_INJECT_overrides_DD_TRACE_PROPAGATION_STYLE( def test_llmobs_enabled_injects_llmobs_parent_id(): with override_global_config(dict(_llmobs_enabled=True)): with mock.patch("ddtrace.llmobs.LLMObs") as mock_llmobs: - context = Context(trace_id=1, span_id=2) - HTTPPropagator.inject(context, {}) - mock_llmobs._inject_llmobs_context.assert_called_once_with({}) + HTTPPropagator.inject(Context(trace_id=1, span_id=2), {}) + mock_llmobs._inject_llmobs_context.assert_called_once() def test_llmobs_disabled_does_not_inject_parent_id(): with override_global_config(dict(_llmobs_enabled=False)): with mock.patch("ddtrace.llmobs.LLMObs") as mock_llmobs: - context = Context(trace_id=1, span_id=2) - HTTPPropagator.inject(context, {}) + HTTPPropagator.inject(Context(trace_id=1, span_id=2), {}) mock_llmobs._inject_llmobs_context.assert_not_called() def test_llmobs_parent_id_not_injected_by_default(): with mock.patch("ddtrace.llmobs.LLMObs") as mock_llmobs: - context = Context(trace_id=1, span_id=2) - HTTPPropagator.inject(context, {}) + HTTPPropagator.inject(Context(trace_id=1, span_id=2), {}) mock_llmobs._inject_llmobs_context.assert_not_called() From dd0a23a81fe1fff2db1cae9aa6a0331a7fd6d705 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Tue, 24 Sep 2024 14:31:40 -0400 Subject: [PATCH 5/8] Add dd apm span/trace IDs as separate fields --- ddtrace/llmobs/_trace_processor.py | 1 + ddtrace/llmobs/_writer.py | 1 + tests/llmobs/_utils.py | 1 + 3 files changed, 3 insertions(+) diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py index 0d709d9148..8dfcd0edd7 100644 --- a/ddtrace/llmobs/_trace_processor.py +++ b/ddtrace/llmobs/_trace_processor.py @@ -113,6 +113,7 @@ def _llmobs_span_event(self, span: Span) -> Dict[str, Any]: "status": "error" if span.error else "ok", "meta": meta, "metrics": metrics, + "_dd": {"span_id": str(span.span_id), "trace_id": "{:x}".format(span.trace_id)}, } session_id = _get_session_id(span) if session_id is not None: diff --git a/ddtrace/llmobs/_writer.py b/ddtrace/llmobs/_writer.py index 0b79176b51..d82ab95a83 100644 --- a/ddtrace/llmobs/_writer.py +++ b/ddtrace/llmobs/_writer.py @@ -51,6 +51,7 @@ class LLMObsSpanEvent(TypedDict): meta: Dict[str, Any] metrics: Dict[str, Any] collection_errors: List[str] + _dd: Dict[str, str] class LLMObsEvaluationMetricEvent(TypedDict, total=False): diff --git a/tests/llmobs/_utils.py b/tests/llmobs/_utils.py index 308e420ddf..193f4243f6 100644 --- a/tests/llmobs/_utils.py +++ b/tests/llmobs/_utils.py @@ -193,6 +193,7 @@ def _llmobs_base_span_event( "meta": {"span.kind": span_kind}, "metrics": {}, "tags": _expected_llmobs_tags(span, tags=tags, error=error, session_id=session_id), + "_dd": {"span_id": str(span.span_id), "trace_id": "{:x}".format(span.trace_id)}, } if session_id: span_event["session_id"] = session_id From 0e4182648c473ca121fa51aaa0abf6286e0d181d Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Tue, 24 Sep 2024 14:45:02 -0400 Subject: [PATCH 6/8] typing --- ddtrace/llmobs/_llmobs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 08daedd36b..e608dd8d5b 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -858,11 +858,11 @@ def _activate_llmobs_distributed_headers( return if PARENT_ID_KEY not in request_headers: log.warning("Failed to extract LLMObs parent ID from request headers.") - parent_id = request_headers.get(PARENT_ID_KEY) - if parent_id is None: + _parent_id = request_headers.get(PARENT_ID_KEY) + if _parent_id is None: return try: - parent_id = int(parent_id) + parent_id = int(_parent_id) except ValueError: return llmobs_context = Context(trace_id=context.trace_id, span_id=parent_id) From 304bbe7237f6d4704e02884fdb94b2c47d9edeaa Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Tue, 15 Oct 2024 18:14:33 -0400 Subject: [PATCH 7/8] Address review comments --- ddtrace/llmobs/_context.py | 6 ++---- ddtrace/llmobs/_llmobs.py | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/ddtrace/llmobs/_context.py b/ddtrace/llmobs/_context.py index 33768e4e4d..c6496954af 100644 --- a/ddtrace/llmobs/_context.py +++ b/ddtrace/llmobs/_context.py @@ -23,13 +23,11 @@ class LLMObsContextProvider(DefaultContextProvider): that support contextvars. """ - def __init__(self): - # type: () -> None + def __init__(self) -> None: super(DefaultContextProvider, self).__init__() _DD_LLMOBS_CONTEXTVAR.set(None) - def _has_active_context(self): - # type: () -> bool + def _has_active_context(self) -> bool: """Returns whether there is an active context in the current execution.""" ctx = _DD_LLMOBS_CONTEXTVAR.get() return ctx is not None diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 3a97c2431a..0e13ccf6a0 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -891,10 +891,9 @@ def _activate_llmobs_distributed_headers( if not context.trace_id or not context.span_id: log.warning("Failed to extract trace/span ID from request headers.") return - if PARENT_ID_KEY not in request_headers: - log.warning("Failed to extract LLMObs parent ID from request headers.") _parent_id = request_headers.get(PARENT_ID_KEY) if _parent_id is None: + log.warning("Failed to extract LLMObs parent ID from request headers.") return try: parent_id = int(_parent_id) From 80eced2d9b6addf7ecf80d5a91878632efb93012 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 16 Oct 2024 15:53:05 -0400 Subject: [PATCH 8/8] Add release note --- .../notes/feat-llmobs-context-cf709480b30ed0a5.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 releasenotes/notes/feat-llmobs-context-cf709480b30ed0a5.yaml diff --git a/releasenotes/notes/feat-llmobs-context-cf709480b30ed0a5.yaml b/releasenotes/notes/feat-llmobs-context-cf709480b30ed0a5.yaml new file mode 100644 index 0000000000..5bf7961f49 --- /dev/null +++ b/releasenotes/notes/feat-llmobs-context-cf709480b30ed0a5.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + LLM Observability: Introduces an improved automated context management system for LLM Observability-specific spans. + Also introduces ``LLMObs.current_span()`` and ``LLMObs.current_trace_context()`` which return the currently active LLM Observability-specific ``Span`` and ``Context`` objects, respectively. + Also modifies ``LLMObs.export_span()``, ``LLMObs.inject_distributed_headers()``, ``LLMObs.annotate()`` to default to the current active LLM Observability-specific span if ``span`` is not provided. + - | + LLM Observability: Introduces automated distributed tracing support for LLM Observability traces involving the ``concurrent.futures.thread`` module.