Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Span Events #200

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ async def handle_info(self, request: Request) -> web.Response:
"client_drop_p0s": True,
# Just a random selection of some peer_tags to aggregate on for testing, not exhaustive
"peer_tags": ["db.name", "mongodb.db", "messaging.system"],
"span_events": True, # Advertise support for the top-level Span field for Span Events
},
headers={"Datadog-Agent-State": "03e868b3ecdd62a91423cc4c3917d0d151fb9fa486736911ab7f5a0750c63824"},
)
Expand Down
73 changes: 72 additions & 1 deletion ddapm_test_agent/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class SpanLink(TypedDict):
flags: NotRequired[Optional[int]]


class SpanEvent(TypedDict):
time_unix_nano: int
name: str
attributes: NotRequired[Dict[str, Any]]


class Span(TypedDict):
name: str
span_id: SpanId
Expand All @@ -71,6 +77,7 @@ class Span(TypedDict):
meta: NotRequired[Dict[str, str]]
metrics: NotRequired[Dict[str, MetricType]]
span_links: NotRequired[List[SpanLink]]
span_events: NotRequired[List[SpanEvent]]
meta_struct: NotRequired[Dict[str, Dict[str, Any]]]


Expand All @@ -88,9 +95,20 @@ class Span(TypedDict):
"meta",
"metrics",
"span_links",
"span_events",
"meta_struct",
]
TopLevelSpanValue = Union[None, SpanId, TraceId, int, str, Dict[str, str], Dict[str, MetricType], List[SpanLink]]
TopLevelSpanValue = Union[
None,
SpanId,
TraceId,
int,
str,
Dict[str, str],
Dict[str, MetricType],
List[SpanLink],
List[SpanEvent],
]
Trace = List[Span]
v04TracePayload = List[List[Span]]
TraceMap = OrderedDict[int, Trace]
Expand Down Expand Up @@ -176,6 +194,32 @@ def verify_span(d: Any) -> Span:
assert isinstance(link["flags"], int), "Expected flags to be of type: 'int', got: " + str(
type(link["flags"])
)
if "span_events" in d:
assert isinstance(d["span_events"], list)
for event in d["span_events"]:
assert isinstance(event, dict), f"Expected all span_events to be of type: 'dict', got: {type(event)}"
required_attrs = ["time_unix_nano", "name"]
for attr in required_attrs:
assert attr in event, f"'{attr}' required in span event"
assert isinstance(
event["time_unix_nano"], int
), "Expected 'time_unix_nano' to be of type: 'int', got: " + str(type(event["time_unix_nano"]))
assert isinstance(event["name"], str), "Expected 'name' to be of type: 'str', got: " + str(
type(event["name"])
)
if "attributes" in event:
assert isinstance(event["attributes"], dict)
for k, v in event["attributes"].items():
assert isinstance(k, str), f"Expected key 'attributes.{k}' to be of type: 'str', got: {type(k)}"
assert isinstance(
v, (str, int, float, bool, list)
), f"Expected value of key 'attributes.{k}' to be of type: 'str/int/float/bool/list', got: {type(v)}"
if isinstance(v, list) and v: # Check if list is homogeneous
first_type = type(v[0])
i = None
assert all(
isinstance(i, first_type) for i in v
), f"Expected all elements in list to be of the same type: '{first_type}', got: {type(i)}"
return cast(Span, d)
except AssertionError as e:
raise TypeError(*e.args) from e
Expand Down Expand Up @@ -304,17 +348,32 @@ def copy_span_links(s: SpanLink) -> SpanLink:
return copy


def copy_span_events(s: SpanEvent) -> SpanEvent:
attributes = s["attributes"].copy() if "attributes" in s else None
copy = s.copy()
if attributes is not None:
# Copy arrays inside attributes
for k, v in attributes.items():
if isinstance(v, list):
attributes[k] = v.copy()
copy["attributes"] = attributes
return copy


def copy_span(s: Span) -> Span:
meta = s["meta"].copy() if "meta" in s else None
metrics = s["metrics"].copy() if "metrics" in s else None
links = s["span_links"].copy() if "span_links" in s else None
events = s["span_events"].copy() if "span_events" in s else None
copy = s.copy()
if meta is not None:
copy["meta"] = meta
if metrics is not None:
copy["metrics"] = metrics
if links is not None:
copy["span_links"] = [copy_span_links(link) for link in links]
if events is not None:
copy["span_events"] = [copy_span_events(event) for event in events]
return copy


Expand Down Expand Up @@ -379,6 +438,18 @@ def add_span_link(
return s


def add_span_event(
s: Span, time_unix_nano: int = 1730405656000000000, name: str = "event", attributes: Optional[Dict[str, Any]] = None
) -> Span:
if "span_events" not in s:
s["span_events"] = []
new_event = SpanEvent(time_unix_nano=time_unix_nano, name=name)
if attributes is not None:
new_event["attributes"] = attributes
s["span_events"].append(new_event)
return s


def _trace_decoder_flexible(json_string: bytes) -> Dict[str, Any]:
"""Parse Trace JSON and accounts for meta that may contain numbers such as ports. Converts these meta correctly to strings.
Also ensures that any valid integers/floats are correctly parsed, to prevent ids from being decoded as strings incorrectly.
Expand Down
92 changes: 83 additions & 9 deletions ddapm_test_agent/trace_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
log = logging.getLogger(__name__)


DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,meta.pathway.hash,meta._dd.p.tid"
DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,span_events.time_unix_nano,meta.pathway.hash,meta._dd.p.tid"


def _key_match(d1: Dict[str, Any], d2: Dict[str, Any], key: str) -> bool:
Expand Down Expand Up @@ -198,13 +198,13 @@ def _diff_spans(
results = []
s1_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in s1.items() if k not in ("meta", "metrics", "span_links")},
{k: v for k, v in s1.items() if k not in ("meta", "metrics", "span_links", "span_events")},
)
s2_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in s2.items() if k not in ("meta", "metrics", "span_links")},
{k: v for k, v in s2.items() if k not in ("meta", "metrics", "span_links", "span_events")},
)
for d1, d2, ignored in [
for d1, d2, ign in [
(s1_no_tags, s2_no_tags, ignored),
(s1["meta"], s2["meta"], set(i[5:] for i in ignored if i.startswith("meta."))),
(
Expand All @@ -216,7 +216,7 @@ def _diff_spans(
d1 = cast(Dict[str, Any], d1)
d2 = cast(Dict[str, Any], d2)
diffs = []
for k in (set(d1.keys()) | set(d2.keys())) - ignored:
for k in (set(d1.keys()) | set(d2.keys())) - ign:
if not _key_match(d1, d2, k):
diffs.append(k)
results.append(diffs)
Expand All @@ -235,7 +235,7 @@ def _diff_spans(
{k: v for k, v in l2.items() if k != "attributes"},
)
link_diff = []
for d1, d2, ignored in [
for d1, d2, ign in [
(l1_no_tags, l2_no_tags, set(i[11:] for i in ignored if i.startswith("span_links."))),
(
l1.get("attributes") or {},
Expand All @@ -246,14 +246,47 @@ def _diff_spans(
d1 = cast(Dict[str, Any], d1)
d2 = cast(Dict[str, Any], d2)
diffs = []
for k in (set(d1.keys()) | set(d2.keys())) - ignored:
for k in (set(d1.keys()) | set(d2.keys())) - ign:
if not _key_match(d1, d2, k):
diffs.append(k)
link_diff.append(diffs)

link_diffs.append(link_diff)
results.append(link_diffs) # type: ignore

event_diffs = []
if len(s1.get("span_events") or []) != len(s2.get("span_events") or []):
results[0].append("span_events")
else:
for e1, e2 in zip(s1.get("span_events") or [], s2.get("span_events") or []):
l1_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in e1.items() if k != "attributes"},
)
l2_no_tags = cast(
Dict[str, TopLevelSpanValue],
{k: v for k, v in e2.items() if k != "attributes"},
)
event_diff = []
for d1, d2, ign in [
(l1_no_tags, l2_no_tags, set(i[12:] for i in ignored if i.startswith("span_events."))),
(
e1.get("attributes") or {},
e2.get("attributes") or {},
set(i[23:] for i in ignored if i.startswith("span_events.attributes.")),
),
]:
d1 = cast(Dict[str, Any], d1)
d2 = cast(Dict[str, Any], d2)
diffs = []
for k in (set(d1.keys()) | set(d2.keys())) - ign:
if not _key_match(d1, d2, k):
diffs.append(k)
event_diff.append(diffs)

event_diffs.append(event_diff)
results.append(event_diffs) # type: ignore

return cast(Tuple[List[str], List[str], List[str], List[Tuple[List[str], List[str]]]], tuple(results))


Expand All @@ -279,7 +312,9 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None
) as frame:
frame.add_item(f"Expected span:\n{pprint.pformat(s_exp)}")
frame.add_item(f"Received span:\n{pprint.pformat(s_rec)}")
top_level_diffs, meta_diffs, metrics_diffs, span_link_diffs = _diff_spans(s_exp, s_rec, ignored)
top_level_diffs, meta_diffs, metrics_diffs, span_link_diffs, span_event_diffs = _diff_spans(
s_exp, s_rec, ignored
)

for diffs, diff_type, d_exp, d_rec in [
(top_level_diffs, "span", s_exp, s_rec),
Expand All @@ -295,7 +330,7 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None
raise AssertionError(
f"Span{' ' + diff_type if diff_type != 'span' else ''} value '{diff_key}' in expected span but is not in the received span."
)
elif diff_key == "span_links":
elif diff_key in ["span_links", "span_events"]:
raise AssertionError(
f"{diff_type} mismatch on '{diff_key}': got {len(d_rec[diff_key])} values for {diff_key} which does not match expected {len(d_exp[diff_key])}." # type: ignore
)
Expand Down Expand Up @@ -328,6 +363,30 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None
f"Span link {diff_type} mismatch on '{diff_key}': got '{d_rec[diff_key]}' which does not match expected '{d_exp[diff_key]}'."
)

for i, (event_level_diffs, attribute_diffs) in enumerate(span_event_diffs):
for diffs, diff_type, d_exp, d_rec in [
(event_level_diffs, f"{i}", s_exp["span_events"][i], s_rec["span_events"][i]),
(
attribute_diffs,
f"{i} attributes",
s_exp["span_events"][i].get("attributes") or {},
s_rec["span_events"][i].get("attributes") or {},
),
]:
for diff_key in diffs:
if diff_key not in d_exp:
raise AssertionError(
f"Span event {diff_type} value '{diff_key}' in received span event but is not in the expected span event."
)
elif diff_key not in d_rec:
raise AssertionError(
f"Span event {diff_type} value '{diff_key}' in expected span event but is not in the received span event."
)
else:
raise AssertionError(
f"Span event {diff_type} mismatch on '{diff_key}': got '{d_rec[diff_key]}' which does not match expected '{d_exp[diff_key]}'."
)
Comment on lines +377 to +388

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Code Quality Violation

too many nesting levels (...read more)

Avoid to nest too many loops together. Having too many loops make your code harder to understand.
Prefer to organize your code in functions and unit of code you can clearly understand.

Learn More

View in Datadog  Leave us feedback  Documentation



class SnapshotFailure(Exception):
pass
Expand Down Expand Up @@ -367,6 +426,7 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]:
"meta",
"metrics",
"span_links",
"span_events",
]
for k in order:
if k in s:
Expand All @@ -385,6 +445,11 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]:
if "attributes" in link:
link["attributes"] = OrderedDict(sorted(link["attributes"].items(), key=operator.itemgetter(0)))

if "span_events" in d:
for event in d["span_events"]:
if "attributes" in event:
event["attributes"] = OrderedDict(sorted(event["attributes"].items(), key=operator.itemgetter(0)))

for k in ["meta", "metrics"]:
if k in d and len(d[k]) == 0:
del d[k]
Expand Down Expand Up @@ -414,6 +479,15 @@ def _snapshot_trace_str(trace: Trace, removed: Optional[List[str]] = None) -> st
if "span_links" in span:
for link in span["span_links"]:
link.pop(key[11:], None) # type: ignore
elif key.startswith("span_events.attributes."):
if "span_events" in span:
for event in span["span_events"]:
if "attributes" in event:
event["attributes"].pop(key[23:], None)
marcotc marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +483 to +486

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Code Quality Violation

too many nesting levels (...read more)

Avoid to nest too many loops together. Having too many loops make your code harder to understand.
Prefer to organize your code in functions and unit of code you can clearly understand.

Learn More

View in Datadog  Leave us feedback  Documentation

elif key.startswith("span_events."):
if "span_events" in span:
for event in span["span_events"]:
event.pop(key[12:], None)
marcotc marked this conversation as resolved.
Show resolved Hide resolved
else:
span.pop(key, None) # type: ignore

Expand Down
4 changes: 4 additions & 0 deletions releasenotes/notes/add-span-events-306e697168373899.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add agent support for Span Events.
3 changes: 3 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async def test_info(agent):
"endpoints": [
"/v0.4/traces",
"/v0.5/traces",
"/v0.7/traces",
"/v0.6/stats",
"/telemetry/proxy/",
"/v0.7/config",
Expand All @@ -94,6 +95,8 @@ async def test_info(agent):
"feature_flags": [],
"config": {},
"client_drop_p0s": True,
"peer_tags": ["db.name", "mongodb.db", "messaging.system"],
"span_events": True,
}


Expand Down
Loading
Loading