Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(llmobs): llmobs-specific context manager #10767

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions ddtrace/contrib/internal/futures/threading.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional
from typing import Tuple

import ddtrace
from ddtrace import config
from ddtrace._trace.context import Context


Expand All @@ -13,24 +15,34 @@ def _wrap_submit(func, args, kwargs):
# DEV: Be sure to propagate a Context and not a Span since we are crossing thread boundaries
current_ctx: Optional[Context] = ddtrace.tracer.current_trace_context()

llmobs_ctx: Optional[Context] = None
if config._llmobs_enabled:
from ddtrace.llmobs import LLMObs

llmobs_ctx = LLMObs._instance.current_trace_context()

# The target function can be provided as a kwarg argument "fn" or the first positional argument
self = args[0]
if "fn" in kwargs:
fn = kwargs.pop("fn")
fn_args = args[1:]
else:
fn, fn_args = args[1], args[2:]
return func(self, _wrap_execution, current_ctx, fn, fn_args, kwargs)
return func(self, _wrap_execution, (current_ctx, llmobs_ctx), fn, fn_args, kwargs)


def _wrap_execution(ctx: Optional[Context], fn, args, kwargs):
def _wrap_execution(ctx: Tuple[Optional[Context], Optional[Context]], fn, args, kwargs):
"""
Intermediate target function that is executed in a new thread;
it receives the original function with arguments and keyword
arguments, including our tracing `Context`. The current context
provider sets the Active context in a thread local storage
variable because it's outside the asynchronous loop.
"""
if ctx is not None:
ddtrace.tracer.context_provider.activate(ctx)
if ctx[0] is not None:
ddtrace.tracer.context_provider.activate(ctx[0])
if ctx[1] is not None and config._llmobs_enabled:
from ddtrace.llmobs import LLMObs

LLMObs._instance._llmobs_context_provider.activate(ctx[1])
return fn(*args, **kwargs)
58 changes: 32 additions & 26 deletions ddtrace/contrib/trace_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
This module contains utility functions for writing ddtrace integrations.
"""

from collections import deque
import ipaddress
import re
Expand Down Expand Up @@ -545,34 +546,39 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
"""
if override is False:
return None
if not override and not (int_config and distributed_tracing_enabled(int_config)):
return None

if override or (int_config and distributed_tracing_enabled(int_config)):
context = HTTPPropagator.extract(request_headers)

# Only need to activate the new context if something was propagated
if not context.trace_id:
return None

# Do not reactivate a context with the same trace id
# DEV: An example could be nested web frameworks, when one layer already
# parsed request headers and activated them.
#
# Example::
#
# app = Flask(__name__) # Traced via Flask instrumentation
# app = DDWSGIMiddleware(app) # Extra layer on top for WSGI
current_context = tracer.current_trace_context()
if current_context and current_context.trace_id == context.trace_id:
log.debug(
"will not activate extracted Context(trace_id=%r, span_id=%r), a context with that trace id is already active", # noqa: E501
context.trace_id,
context.span_id,
)
return None
context = HTTPPropagator.extract(request_headers)

# Only need to activate the new context if something was propagated
if not context.trace_id:
return None

# Do not reactivate a context with the same trace id
# DEV: An example could be nested web frameworks, when one layer already
# parsed request headers and activated them.
#
# Example::
#
# app = Flask(__name__) # Traced via Flask instrumentation
# app = DDWSGIMiddleware(app) # Extra layer on top for WSGI
current_context = tracer.current_trace_context()
if current_context and current_context.trace_id == context.trace_id:
log.debug(
"will not activate extracted Context(trace_id=%r, span_id=%r), a context with that trace id is already active", # noqa: E501
context.trace_id,
context.span_id,
)
return None

# We have parsed a trace id from headers, and we do not already
# have a context with the same trace id active
tracer.context_provider.activate(context)
if config._llmobs_enabled:
from ddtrace.llmobs import LLMObs

# We have parsed a trace id from headers, and we do not already
# have a context with the same trace id active
tracer.context_provider.activate(context)
LLMObs._activate_llmobs_distributed_headers(request_headers, context)


def _flatten(
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
METADATA = "_ml_obs.meta.metadata"
METRICS = "_ml_obs.metrics"
ML_APP = "_ml_obs.meta.ml_app"
PROPAGATED_PARENT_ID_KEY = "_dd.p.llmobs_parent_id"
PARENT_ID_KEY = "_ml_obs.llmobs_parent_id"
TAGS = "_ml_obs.tags"

Expand Down Expand Up @@ -44,3 +43,5 @@

DROPPED_IO_COLLECTION_ERROR = "dropped_io"
DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]"

ROOT_PARENT_ID = "undefined"
61 changes: 61 additions & 0 deletions ddtrace/llmobs/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import contextvars
from typing import Optional
from typing import Union

from ddtrace._trace.context import Context
from ddtrace._trace.provider import DefaultContextProvider
from ddtrace._trace.span import Span
from ddtrace.ext import SpanTypes


ContextTypeValue = Optional[Union[Context, Span]]


_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar(
"datadog_llmobs_contextvar",
default=None,
)


class LLMObsContextProvider(DefaultContextProvider):
"""Context provider that retrieves contexts from a context variable.
It is suitable for synchronous programming and for asynchronous executors
that support contextvars.
"""

def __init__(self):
# type: () -> None
super(DefaultContextProvider, self).__init__()
_DD_LLMOBS_CONTEXTVAR.set(None)

def _has_active_context(self):
# type: () -> bool
"""Returns whether there is an active context in the current execution."""
ctx = _DD_LLMOBS_CONTEXTVAR.get()
return ctx is not None

def _update_active(self, span: Span) -> Optional[Span]:
"""Updates the active LLMObs span.
The active span is updated to be the span's closest unfinished LLMObs ancestor span.
"""
if not span.finished:
return span
new_active: Optional[Span] = span
while new_active and new_active.finished:
new_active = new_active._parent
if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM:
break
self.activate(new_active)
return new_active

def activate(self, ctx: ContextTypeValue) -> None:
"""Makes the given context active in the current execution."""
_DD_LLMOBS_CONTEXTVAR.set(ctx)
super(DefaultContextProvider, self).activate(ctx)

def active(self) -> ContextTypeValue:
"""Returns the active span or context for the current execution."""
item = _DD_LLMOBS_CONTEXTVAR.get()
if isinstance(item, Span):
return self._update_active(item)
return item
10 changes: 1 addition & 9 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
from ddtrace.internal.dogstatsd import get_dogstatsd_client
from ddtrace.internal.hostname import get_hostname
from ddtrace.internal.utils.formats import asbool
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._llmobs import LLMObs
from ddtrace.llmobs._log_writer import V2LogWriter
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.sampler import RateSampler
from ddtrace.settings import IntegrationConfig

Expand Down Expand Up @@ -127,12 +124,7 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k
span.set_tag(SPAN_MEASURED_KEY)
self._set_base_span_tags(span, **kwargs)
if submit_to_llmobs and self.llmobs_enabled:
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
# For non-distributed traces or spans in the first service of a distributed trace,
# The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now
# in these cases to avoid conflicting with the later propagated tags.
parent_id = _get_llmobs_parent_id(span) or "undefined"
span.set_tag_str(PARENT_ID_KEY, str(parent_id))
LLMObs._instance._activate_llmobs_span(span)
return span

@classmethod
Expand Down
8 changes: 2 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ddtrace._trace.span import Span
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import METADATA
Expand All @@ -12,12 +13,9 @@
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import safe_json


Expand All @@ -37,9 +35,7 @@ def llmobs_set_tags(
"""Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags."""
if not self.llmobs_enabled:
return
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
parent_id = _get_llmobs_parent_id(span) or "undefined"
span.set_tag(PARENT_ID_KEY, parent_id)
LLMObs._instance._activate_llmobs_span(span)
parameters = {}
if span.get_tag("bedrock.request.temperature"):
parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0)
Expand Down
Loading
Loading