Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: otel baggage support initial PR #10389

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ddtrace/_trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ def _get_baggage_item(self, key: str) -> Optional[Any]:
"""Gets a baggage item in this span context."""
return self._baggage.get(key, None)

def _get_all_baggage_items(self) -> Dict[str, Any]:
"""Returns all baggage items in this span context."""
return self._baggage

def _remove_baggage_item(self, key: str) -> None:
"""Remove a baggage item from this span context."""
if key in self._baggage:
del self._baggage[key]

def _remove_all_baggage_items(self) -> None:
"""Removes all baggage items from this span context."""
self._baggage.clear()

def __eq__(self, other: Any) -> bool:
if isinstance(other, Context):
with self._lock:
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import math
import pprint
import sys
Expand Down Expand Up @@ -508,6 +509,15 @@ def _get_baggage_item(self, key: str) -> Optional[Any]:
"""Gets a baggage item from the span context of this span."""
return self.context._get_baggage_item(key)

def _get_all_baggage_items(self) -> str:
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
return json.dumps(self.context._get_all_baggage_items())

def _remove_baggage_item(self, key: str) -> None:
self.context._remove_baggage_item(key)

def _remove_all_baggage_items(self) -> None:
self.context._remove_all_baggage_items()

def get_metrics(self) -> _MetricDictType:
"""Return all metrics."""
return self._metrics.copy()
Expand Down
4 changes: 3 additions & 1 deletion ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
PROPAGATION_STYLE_B3_SINGLE = "b3"
_PROPAGATION_STYLE_W3C_TRACECONTEXT = "tracecontext"
_PROPAGATION_STYLE_NONE = "none"
_PROPAGATION_STYLE_DEFAULT = "datadog,tracecontext"
_PROPAGATION_STYLE_DEFAULT = "datadog,tracecontext,baggage"
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
_PROPAGATION_STYLE_BAGGAGE = "baggage"
PROPAGATION_STYLE_ALL = (
_PROPAGATION_STYLE_W3C_TRACECONTEXT,
PROPAGATION_STYLE_DATADOG,
PROPAGATION_STYLE_B3_MULTI,
PROPAGATION_STYLE_B3_SINGLE,
_PROPAGATION_STYLE_NONE,
_PROPAGATION_STYLE_BAGGAGE,
)
W3C_TRACESTATE_KEY = "tracestate"
W3C_TRACEPARENT_KEY = "traceparent"
Expand Down
27 changes: 26 additions & 1 deletion ddtrace/internal/opentelemetry/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def attach(self, otel_context):
Datadog representation in the Global DDtrace Trace Context Provider.
"""
# Inline opentelemetry imports to avoid circular imports.
from opentelemetry.baggage import get_baggage
from opentelemetry.trace import Span as OtelSpan
from opentelemetry.trace import get_current_span

Expand All @@ -30,18 +31,28 @@ def attach(self, otel_context):
if otel_span:
if isinstance(otel_span, Span):
self._ddcontext_provider.activate(otel_span._ddspan)
ddcontext = otel_span._ddspan.context
elif isinstance(otel_span, OtelSpan):
trace_id, span_id, _, tf, ts, _ = otel_span.get_span_context()
trace_state = ts.to_header() if ts else None
ddcontext = _TraceContext._get_context(trace_id, span_id, tf, trace_state)
self._ddcontext_provider.activate(ddcontext)
else:
ddcontext = None
log.error(
"Programming ERROR: ddtrace does not support activiting spans with the type: %s. Please open a "
"github issue at: https://github.com/Datadog/dd-trace-py and set DD_TRACE_OTEL_ENABLED=True.",
type(otel_span),
)

# get current open telemetry baggage and store it on the datadog context object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add test coverage for baggage to tests/opentelemetry/test_context.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make sure the opentelemetry baggage api getters and setters work as expected and create/set/remove values on datadog context objects: https://github.com/open-telemetry/opentelemetry-python/blob/bd51fcb7a3afb08aec975e7302c9cc36060bcacc/opentelemetry-api/src/opentelemetry/baggage/__init__.py#L53-#L117

otel_baggage = get_baggage(otel_context)
if ddcontext and otel_baggage:
for key, value in otel_baggage.items():
# value is from opentelemetry and can be any object.
# make sure this object is compatible w/ datadog internals
ddcontext._baggage[key] = value # potentially convert to json

# A return value with the type `object` is required by the otel api to remove/deactivate spans.
# Since manually deactivating spans is not supported by ddtrace this object will never be used.
return object()
Expand All @@ -52,6 +63,7 @@ def get_current(self):
in a format that can be parsed by the OpenTelemetry API.
"""
# Inline opentelemetry imports to avoid circular imports.
from opentelemetry.baggage import set_baggage
from opentelemetry.context.context import Context as OtelContext
from opentelemetry.trace import NonRecordingSpan as OtelNonRecordingSpan
from opentelemetry.trace import SpanContext as OtelSpanContext
Expand All @@ -61,8 +73,9 @@ def get_current(self):

from .span import Span

# getting current active span
ddactive = self._ddcontext_provider.active()
context = OtelContext()
context = OtelContext() # store current datadog baggage into here
if isinstance(ddactive, DDSpan):
span = Span(ddactive)
context = set_span_in_context(span, context)
Expand All @@ -72,6 +85,18 @@ def get_current(self):
span_context = OtelSpanContext(ddactive.trace_id or 0, ddactive.span_id or 0, True, tf, ts)
span = OtelNonRecordingSpan(span_context)
context = set_span_in_context(span, context)

if isinstance(ddactive, DDSpan):
dd_baggage = ddactive.context._baggage
elif isinstance(ddactive, DDContext):
dd_baggage = ddactive._baggage
else:
dd_baggage = {}

# getting current active baggage
for key, value in dd_baggage.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here we should add test coverage for this

set_baggage(key, value, context)

return context

def detach(self, token):
Expand Down
67 changes: 67 additions & 0 deletions ddtrace/propagation/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Text # noqa:F401
from typing import Tuple # noqa:F401
from typing import cast # noqa:F401
import urllib.parse

import ddtrace
from ddtrace._trace.span import Span # noqa:F401
Expand Down Expand Up @@ -38,6 +39,7 @@
from ..internal._tagset import decode_tagset_string
from ..internal._tagset import encode_tagset_values
from ..internal.compat import ensure_text
from ..internal.constants import _PROPAGATION_STYLE_BAGGAGE
from ..internal.constants import _PROPAGATION_STYLE_NONE
from ..internal.constants import _PROPAGATION_STYLE_W3C_TRACECONTEXT
from ..internal.constants import HIGHER_ORDER_TRACE_ID_BITS as _HIGHER_ORDER_TRACE_ID_BITS
Expand Down Expand Up @@ -885,12 +887,57 @@ def _inject(span_context, headers):
return headers


class _BaggageHeader:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Class _BaggageHeader should have an init method (...read more)

Ensure that a class has an __init__ method. This check is bypassed when the class is a data class (annotated with @DataClass).

View in Datadog  Leave us feedback  Documentation

@staticmethod
def _encode_key(key):
safe_characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "!#$%&'*+-.^_`|~"
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
encoded = urllib.parse.quote(key, safe=safe_characters)
return encoded

def _encode_value(value):
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
safe_characters = (
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "!#$%&'()*+-./:<>?@[]^_`{|}~"
)
encoded = urllib.parse.quote(value, safe=safe_characters)
return encoded

@staticmethod
def _inject(span_context, headers):
baggage_items = span_context._baggage.items()
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
if not baggage_items:
return

header_value = ",".join(
f"{_BaggageHeader._encode_key(str(key).strip())}={_BaggageHeader._encode_value(str(value).strip())}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise an unhandled exception if value can not be converted to a string? How should we handle dicts,arrays,booleans,etc. The format needs to be consistent across languages (ex: encoding True vs true)

Copy link
Contributor Author

@rachelyangdog rachelyangdog Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When "extracting baggage from propagation headers, they may encounter malformed header contents." When this occurs, we "should ignore the entire header." (RFC) So instead of an error, we could potentially just ignore it? As of now, I think it would just do something like a type error

Copy link
Member

@lucaspimentel lucaspimentel Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise an unhandled exception if value can not be converted to a string?

I don't know who errors are handles in Python, but an invalid header should not break the service we are instrumenting. We could definitely log a warning or whatever you usually do in cases like this.

"should ignore the entire header." (RFC)

The main point here was that we should not try to extract individual values while ignore that bad ones. If something is wrong, don't try to extract anything. I clear this up in the RFC, thanks.

rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
for key, value in baggage_items
)

headers["baggage"] = header_value

@staticmethod
def _extract(headers):
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
header_value = headers.get("baggage")
if not header_value:
return None

baggage = {}
baggages = header_value.split(",")
for key_value in baggages:
key, value = key_value.split("=", 1)
rachelyangdog marked this conversation as resolved.
Show resolved Hide resolved
key = urllib.parse.unquote(key.strip())
value = urllib.parse.unquote(value.strip())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this deconding/unquoting handle errors?

baggage[key] = value

return Context(baggage=baggage)


_PROP_STYLES = {
PROPAGATION_STYLE_DATADOG: _DatadogMultiHeader,
PROPAGATION_STYLE_B3_MULTI: _B3MultiHeader,
PROPAGATION_STYLE_B3_SINGLE: _B3SingleHeader,
_PROPAGATION_STYLE_W3C_TRACECONTEXT: _TraceContext,
_PROPAGATION_STYLE_NONE: _NOP_Propagator,
_PROPAGATION_STYLE_BAGGAGE: _BaggageHeader,
}


Expand All @@ -915,8 +962,19 @@ def _extract_configured_contexts_avail(normalized_headers):
def _resolve_contexts(contexts, styles_w_ctx, normalized_headers):
primary_context = contexts[0]
links = []

for context in contexts[1:]:
style_w_ctx = styles_w_ctx[contexts.index(context)]

# special case where baggage is first in propagation styles
# why would you do that? I don't know, but it's possible
if style_w_ctx[0] == _PROPAGATION_STYLE_BAGGAGE:
baggage_context = contexts[0]
contexts.append(baggage_context)
del contexts[0]
styles_w_ctx.append(_PROPAGATION_STYLE_BAGGAGE)
del style_w_ctx[0]
Comment on lines +970 to +975
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we handle this outside of the for-loop? baggage should never be the primary_context

Copy link
Contributor Author

@rachelyangdog rachelyangdog Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the RFC it says that users can also disable other propagation styles (tracecontext or datadog) and enable only the baggage propagator which means baggage could be the primary_context in certain cases. Considering this, I think we can leave the code inside the for-loop. Let me know your thoughts because I think there are may be better ways to go about this.


# encoding expects at least trace_id and span_id
if context.span_id and context.trace_id and context.trace_id != primary_context.trace_id:
links.append(
Expand Down Expand Up @@ -952,6 +1010,11 @@ def _resolve_contexts(contexts, styles_w_ctx, normalized_headers):
primary_context._meta[LAST_DD_PARENT_ID_KEY] = "{:016x}".format(dd_context.span_id)
# the span_id in tracecontext takes precedence over the first extracted propagation style
primary_context.span_id = context.span_id

# baggage is always merged into the primary context
if style_w_ctx == _PROPAGATION_STYLE_BAGGAGE:
primary_context._baggage.update(context._baggage)
Comment on lines +1014 to +1015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Code Quality Violation

too many nesting levels (...read more)

Avoid to nest too many loops together. Having too many loops make your code harder to understand.
Prefer to organize your code in functions and unit of code you can clearly understand.

Learn More

View in Datadog  Leave us feedback  Documentation


primary_context._span_links = links
return primary_context

Expand Down Expand Up @@ -998,6 +1061,10 @@ def parent_call():
else:
log.error("ddtrace.tracer.sample is not available, unable to sample span.")

# baggage should be injected regardless of existing span or trace id
if _PROPAGATION_STYLE_BAGGAGE in config._propagation_style_inject:
_BaggageHeader._inject(span_context, headers)

# Not a valid context to propagate
if span_context.trace_id is None or span_context.span_id is None:
log.debug("tried to inject invalid context %r", span_context)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,11 @@ def _parse_propagation_styles(name, default):
- "b3" (formerly 'b3 single header')
- "b3 single header (deprecated for 'b3')"
- "tracecontext"
- "baggage"
- "none"


The default value is ``"datadog,tracecontext"``.
The default value is ``"datadog,tracecontext,baggage"``.


Examples::
Expand Down
104 changes: 104 additions & 0 deletions tests/tracer/test_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ddtrace.constants import AUTO_REJECT
from ddtrace.constants import USER_KEEP
from ddtrace.constants import USER_REJECT
from ddtrace.internal.constants import _PROPAGATION_STYLE_BAGGAGE
from ddtrace.internal.constants import _PROPAGATION_STYLE_NONE
from ddtrace.internal.constants import _PROPAGATION_STYLE_W3C_TRACECONTEXT
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
Expand Down Expand Up @@ -2915,6 +2916,60 @@ def test_span_links_set_on_root_span_not_child(fastapi_client, tracer, fastapi_t
_HTTP_HEADER_TRACESTATE: "",
},
),
(
"only_baggage",
[
_PROPAGATION_STYLE_BAGGAGE,
],
{
"baggage": {"foo": "bar"},
},
{
"baggage": "foo=bar",
},
),
(
"baggage_and_datadog",
[
PROPAGATION_STYLE_DATADOG,
_PROPAGATION_STYLE_BAGGAGE,
],
{
"trace_id": VALID_DATADOG_CONTEXT["trace_id"],
"span_id": VALID_DATADOG_CONTEXT["span_id"],
"baggage": {"foo": "bar"},
},
{
HTTP_HEADER_TRACE_ID: "13088165645273925489",
HTTP_HEADER_PARENT_ID: "8185124618007618416",
"baggage": "foo=bar",
},
),
(
"baggage_order",
[
_PROPAGATION_STYLE_BAGGAGE,
PROPAGATION_STYLE_DATADOG,
PROPAGATION_STYLE_B3_MULTI,
PROPAGATION_STYLE_B3_SINGLE,
_PROPAGATION_STYLE_W3C_TRACECONTEXT,
],
{
"baggage": {"foo": "bar"},
"trace_id": VALID_DATADOG_CONTEXT["trace_id"],
"span_id": VALID_DATADOG_CONTEXT["span_id"],
},
{
"baggage": "foo=bar",
HTTP_HEADER_TRACE_ID: "13088165645273925489",
HTTP_HEADER_PARENT_ID: "8185124618007618416",
_HTTP_HEADER_B3_TRACE_ID: "b5a2814f70060771",
_HTTP_HEADER_B3_SPAN_ID: "7197677932a62370",
_HTTP_HEADER_B3_SINGLE: "b5a2814f70060771-7197677932a62370",
_HTTP_HEADER_TRACEPARENT: "00-0000000000000000b5a2814f70060771-7197677932a62370-00",
_HTTP_HEADER_TRACESTATE: "",
},
),
]


Expand Down Expand Up @@ -3037,3 +3092,52 @@ def test_llmobs_parent_id_not_injected_by_default():
context = Context(trace_id=1, span_id=2)
HTTPPropagator.inject(context, {})
mock_llmobs_inject.assert_not_called()


@pytest.mark.parametrize(
"span_context,expected_headers",
[
(Context(baggage={"key1": "val1"}), {"baggage": "key1=val1"}),
(Context(baggage={"key1": "val1", "key2": "val2"}), {"baggage": "key1=val1,key2=val2"}),
(Context(baggage={"serverNode": "DF 28"}), {"baggage": "serverNode=DF%2028"}),
(Context(baggage={"userId": "Amélie"}), {"baggage": "userId=Am%C3%A9lie"}),
(Context(baggage={"user!d(me)": "false"}), {"baggage": "user!d%28me%29=false"}),
],
ids=[
"single_key_value",
"multiple_key_value_pairs",
"space_in_value",
"special_characters_in_value",
"special_characters_in_key",
],
)
def test_baggageheader_inject(span_context, expected_headers):
from ddtrace.propagation.http import _BaggageHeader

headers = {}
_BaggageHeader._inject(span_context, headers)
assert headers == expected_headers


@pytest.mark.parametrize(
"headers,expected_baggage",
[
({"baggage": "key1=val1"}, {"key1": "val1"}),
({"baggage": "key1=val1,key2=val2,foo=bar,x=y"}, {"key1": "val1", "key2": "val2", "foo": "bar", "x": "y"}),
({"baggage": "user!d%28me%29=false"}, {"user!d(me)": "false"}),
({"baggage": "userId=Am%C3%A9lie"}, {"userId": "Amélie"}),
({"baggage": "serverNode=DF%2028"}, {"serverNode": "DF 28"}),
],
ids=[
"single_key_value",
"multiple_key_value_pairs",
"special_characters_in_key",
"special_characters_in_value",
"space_in_value",
],
)
def test_baggageheader_extract(headers, expected_baggage):
from ddtrace.propagation.http import _BaggageHeader

context = _BaggageHeader._extract(headers)
assert context._baggage == expected_baggage
Loading