diff --git a/ddapm_test_agent/agent.py b/ddapm_test_agent/agent.py index b1e7792..60d2c75 100644 --- a/ddapm_test_agent/agent.py +++ b/ddapm_test_agent/agent.py @@ -695,6 +695,7 @@ async def handle_info(self, request: Request) -> web.Response: "client_drop_p0s": True, # Just a random selection of some peer_tags to aggregate on for testing, not exhaustive "peer_tags": ["db.name", "mongodb.db", "messaging.system"], + "span_events": True, # Advertise support for the top-level Span field for Span Events }, headers={"Datadog-Agent-State": "03e868b3ecdd62a91423cc4c3917d0d151fb9fa486736911ab7f5a0750c63824"}, ) diff --git a/ddapm_test_agent/trace.py b/ddapm_test_agent/trace.py index c661b2d..308f494 100644 --- a/ddapm_test_agent/trace.py +++ b/ddapm_test_agent/trace.py @@ -57,6 +57,12 @@ class SpanLink(TypedDict): flags: NotRequired[Optional[int]] +class SpanEvent(TypedDict): + time_unix_nano: int + name: str + attributes: NotRequired[Dict[str, Any]] + + class Span(TypedDict): name: str span_id: SpanId @@ -71,6 +77,7 @@ class Span(TypedDict): meta: NotRequired[Dict[str, str]] metrics: NotRequired[Dict[str, MetricType]] span_links: NotRequired[List[SpanLink]] + span_events: NotRequired[List[SpanEvent]] meta_struct: NotRequired[Dict[str, Dict[str, Any]]] @@ -88,9 +95,20 @@ class Span(TypedDict): "meta", "metrics", "span_links", + "span_events", "meta_struct", ] -TopLevelSpanValue = Union[None, SpanId, TraceId, int, str, Dict[str, str], Dict[str, MetricType], List[SpanLink]] +TopLevelSpanValue = Union[ + None, + SpanId, + TraceId, + int, + str, + Dict[str, str], + Dict[str, MetricType], + List[SpanLink], + List[SpanEvent], +] Trace = List[Span] v04TracePayload = List[List[Span]] TraceMap = OrderedDict[int, Trace] @@ -176,6 +194,32 @@ def verify_span(d: Any) -> Span: assert isinstance(link["flags"], int), "Expected flags to be of type: 'int', got: " + str( type(link["flags"]) ) + if "span_events" in d: + assert isinstance(d["span_events"], list) + for event in d["span_events"]: + assert isinstance(event, dict), f"Expected all span_events to be of type: 'dict', got: {type(event)}" + required_attrs = ["time_unix_nano", "name"] + for attr in required_attrs: + assert attr in event, f"'{attr}' required in span event" + assert isinstance( + event["time_unix_nano"], int + ), "Expected 'time_unix_nano' to be of type: 'int', got: " + str(type(event["time_unix_nano"])) + assert isinstance(event["name"], str), "Expected 'name' to be of type: 'str', got: " + str( + type(event["name"]) + ) + if "attributes" in event: + assert isinstance(event["attributes"], dict) + for k, v in event["attributes"].items(): + assert isinstance(k, str), f"Expected key 'attributes.{k}' to be of type: 'str', got: {type(k)}" + assert isinstance( + v, (str, int, float, bool, list) + ), f"Expected value of key 'attributes.{k}' to be of type: 'str/int/float/bool/list', got: {type(v)}" + if isinstance(v, list) and v: # Check if list is homogeneous + first_type = type(v[0]) + i = None + assert all( + isinstance(i, first_type) for i in v + ), f"Expected all elements in list to be of the same type: '{first_type}', got: {type(i)}" return cast(Span, d) except AssertionError as e: raise TypeError(*e.args) from e @@ -304,10 +348,23 @@ def copy_span_links(s: SpanLink) -> SpanLink: return copy +def copy_span_events(s: SpanEvent) -> SpanEvent: + attributes = s["attributes"].copy() if "attributes" in s else None + copy = s.copy() + if attributes is not None: + # Copy arrays inside attributes + for k, v in attributes.items(): + if isinstance(v, list): + attributes[k] = v.copy() + copy["attributes"] = attributes + return copy + + def copy_span(s: Span) -> Span: meta = s["meta"].copy() if "meta" in s else None metrics = s["metrics"].copy() if "metrics" in s else None links = s["span_links"].copy() if "span_links" in s else None + events = s["span_events"].copy() if "span_events" in s else None copy = s.copy() if meta is not None: copy["meta"] = meta @@ -315,6 +372,8 @@ def copy_span(s: Span) -> Span: copy["metrics"] = metrics if links is not None: copy["span_links"] = [copy_span_links(link) for link in links] + if events is not None: + copy["span_events"] = [copy_span_events(event) for event in events] return copy @@ -379,6 +438,18 @@ def add_span_link( return s +def add_span_event( + s: Span, time_unix_nano: int = 1730405656000000000, name: str = "event", attributes: Optional[Dict[str, Any]] = None +) -> Span: + if "span_events" not in s: + s["span_events"] = [] + new_event = SpanEvent(time_unix_nano=time_unix_nano, name=name) + if attributes is not None: + new_event["attributes"] = attributes + s["span_events"].append(new_event) + return s + + def _trace_decoder_flexible(json_string: bytes) -> Dict[str, Any]: """Parse Trace JSON and accounts for meta that may contain numbers such as ports. Converts these meta correctly to strings. Also ensures that any valid integers/floats are correctly parsed, to prevent ids from being decoded as strings incorrectly. diff --git a/ddapm_test_agent/trace_snapshot.py b/ddapm_test_agent/trace_snapshot.py index d3fb207..d1216b8 100644 --- a/ddapm_test_agent/trace_snapshot.py +++ b/ddapm_test_agent/trace_snapshot.py @@ -30,7 +30,7 @@ log = logging.getLogger(__name__) -DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,meta.pathway.hash,meta._dd.p.tid" +DEFAULT_SNAPSHOT_IGNORES = "span_id,trace_id,parent_id,duration,start,metrics.system.pid,metrics.system.process_id,metrics.process_id,metrics._dd.tracer_kr,meta.runtime-id,span_links.trace_id_high,span_events.time_unix_nano,meta.pathway.hash,meta._dd.p.tid" def _key_match(d1: Dict[str, Any], d2: Dict[str, Any], key: str) -> bool: @@ -198,13 +198,13 @@ def _diff_spans( results = [] s1_no_tags = cast( Dict[str, TopLevelSpanValue], - {k: v for k, v in s1.items() if k not in ("meta", "metrics", "span_links")}, + {k: v for k, v in s1.items() if k not in ("meta", "metrics", "span_links", "span_events")}, ) s2_no_tags = cast( Dict[str, TopLevelSpanValue], - {k: v for k, v in s2.items() if k not in ("meta", "metrics", "span_links")}, + {k: v for k, v in s2.items() if k not in ("meta", "metrics", "span_links", "span_events")}, ) - for d1, d2, ignored in [ + for d1, d2, ign in [ (s1_no_tags, s2_no_tags, ignored), (s1["meta"], s2["meta"], set(i[5:] for i in ignored if i.startswith("meta."))), ( @@ -216,7 +216,7 @@ def _diff_spans( d1 = cast(Dict[str, Any], d1) d2 = cast(Dict[str, Any], d2) diffs = [] - for k in (set(d1.keys()) | set(d2.keys())) - ignored: + for k in (set(d1.keys()) | set(d2.keys())) - ign: if not _key_match(d1, d2, k): diffs.append(k) results.append(diffs) @@ -235,7 +235,7 @@ def _diff_spans( {k: v for k, v in l2.items() if k != "attributes"}, ) link_diff = [] - for d1, d2, ignored in [ + for d1, d2, ign in [ (l1_no_tags, l2_no_tags, set(i[11:] for i in ignored if i.startswith("span_links."))), ( l1.get("attributes") or {}, @@ -246,7 +246,7 @@ def _diff_spans( d1 = cast(Dict[str, Any], d1) d2 = cast(Dict[str, Any], d2) diffs = [] - for k in (set(d1.keys()) | set(d2.keys())) - ignored: + for k in (set(d1.keys()) | set(d2.keys())) - ign: if not _key_match(d1, d2, k): diffs.append(k) link_diff.append(diffs) @@ -254,6 +254,39 @@ def _diff_spans( link_diffs.append(link_diff) results.append(link_diffs) # type: ignore + event_diffs = [] + if len(s1.get("span_events") or []) != len(s2.get("span_events") or []): + results[0].append("span_events") + else: + for e1, e2 in zip(s1.get("span_events") or [], s2.get("span_events") or []): + l1_no_tags = cast( + Dict[str, TopLevelSpanValue], + {k: v for k, v in e1.items() if k != "attributes"}, + ) + l2_no_tags = cast( + Dict[str, TopLevelSpanValue], + {k: v for k, v in e2.items() if k != "attributes"}, + ) + event_diff = [] + for d1, d2, ign in [ + (l1_no_tags, l2_no_tags, set(i[12:] for i in ignored if i.startswith("span_events."))), + ( + e1.get("attributes") or {}, + e2.get("attributes") or {}, + set(i[23:] for i in ignored if i.startswith("span_events.attributes.")), + ), + ]: + d1 = cast(Dict[str, Any], d1) + d2 = cast(Dict[str, Any], d2) + diffs = [] + for k in (set(d1.keys()) | set(d2.keys())) - ign: + if not _key_match(d1, d2, k): + diffs.append(k) + event_diff.append(diffs) + + event_diffs.append(event_diff) + results.append(event_diffs) # type: ignore + return cast(Tuple[List[str], List[str], List[str], List[Tuple[List[str], List[str]]]], tuple(results)) @@ -279,7 +312,9 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None ) as frame: frame.add_item(f"Expected span:\n{pprint.pformat(s_exp)}") frame.add_item(f"Received span:\n{pprint.pformat(s_rec)}") - top_level_diffs, meta_diffs, metrics_diffs, span_link_diffs = _diff_spans(s_exp, s_rec, ignored) + top_level_diffs, meta_diffs, metrics_diffs, span_link_diffs, span_event_diffs = _diff_spans( + s_exp, s_rec, ignored + ) for diffs, diff_type, d_exp, d_rec in [ (top_level_diffs, "span", s_exp, s_rec), @@ -295,7 +330,7 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None raise AssertionError( f"Span{' ' + diff_type if diff_type != 'span' else ''} value '{diff_key}' in expected span but is not in the received span." ) - elif diff_key == "span_links": + elif diff_key in ["span_links", "span_events"]: raise AssertionError( f"{diff_type} mismatch on '{diff_key}': got {len(d_rec[diff_key])} values for {diff_key} which does not match expected {len(d_exp[diff_key])}." # type: ignore ) @@ -328,6 +363,30 @@ def _compare_traces(expected: Trace, received: Trace, ignored: Set[str]) -> None f"Span link {diff_type} mismatch on '{diff_key}': got '{d_rec[diff_key]}' which does not match expected '{d_exp[diff_key]}'." ) + for i, (event_level_diffs, attribute_diffs) in enumerate(span_event_diffs): + for diffs, diff_type, d_exp, d_rec in [ + (event_level_diffs, f"{i}", s_exp["span_events"][i], s_rec["span_events"][i]), + ( + attribute_diffs, + f"{i} attributes", + s_exp["span_events"][i].get("attributes") or {}, + s_rec["span_events"][i].get("attributes") or {}, + ), + ]: + for diff_key in diffs: + if diff_key not in d_exp: + raise AssertionError( + f"Span event {diff_type} value '{diff_key}' in received span event but is not in the expected span event." + ) + elif diff_key not in d_rec: + raise AssertionError( + f"Span event {diff_type} value '{diff_key}' in expected span event but is not in the received span event." + ) + else: + raise AssertionError( + f"Span event {diff_type} mismatch on '{diff_key}': got '{d_rec[diff_key]}' which does not match expected '{d_exp[diff_key]}'." + ) + class SnapshotFailure(Exception): pass @@ -367,6 +426,7 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]: "meta", "metrics", "span_links", + "span_events", ] for k in order: if k in s: @@ -385,6 +445,11 @@ def _ordered_span(s: Span) -> OrderedDictType[str, TopLevelSpanValue]: if "attributes" in link: link["attributes"] = OrderedDict(sorted(link["attributes"].items(), key=operator.itemgetter(0))) + if "span_events" in d: + for event in d["span_events"]: + if "attributes" in event: + event["attributes"] = OrderedDict(sorted(event["attributes"].items(), key=operator.itemgetter(0))) + for k in ["meta", "metrics"]: if k in d and len(d[k]) == 0: del d[k] @@ -414,6 +479,15 @@ def _snapshot_trace_str(trace: Trace, removed: Optional[List[str]] = None) -> st if "span_links" in span: for link in span["span_links"]: link.pop(key[11:], None) # type: ignore + elif key.startswith("span_events.attributes."): + if "span_events" in span: + for event in span["span_events"]: + if "attributes" in event: + event["attributes"].pop(key[23:], None) + elif key.startswith("span_events."): + if "span_events" in span: + for event in span["span_events"]: + event.pop(key[12:], None) else: span.pop(key, None) # type: ignore diff --git a/releasenotes/notes/add-span-events-306e697168373899.yaml b/releasenotes/notes/add-span-events-306e697168373899.yaml new file mode 100644 index 0000000..0537a45 --- /dev/null +++ b/releasenotes/notes/add-span-events-306e697168373899.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add agent support for Span Events. diff --git a/tests/test_agent.py b/tests/test_agent.py index e74bdbf..d4637d8 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -86,6 +86,7 @@ async def test_info(agent): "endpoints": [ "/v0.4/traces", "/v0.5/traces", + "/v0.7/traces", "/v0.6/stats", "/telemetry/proxy/", "/v0.7/config", @@ -94,6 +95,8 @@ async def test_info(agent): "feature_flags": [], "config": {}, "client_drop_p0s": True, + "peer_tags": ["db.name", "mongodb.db", "messaging.system"], + "span_events": True, } diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py index c4eb36d..d9ade5a 100644 --- a/tests/test_snapshot.py +++ b/tests/test_snapshot.py @@ -5,6 +5,7 @@ from ddapm_test_agent import trace_snapshot from ddapm_test_agent import tracestats_snapshot +from ddapm_test_agent.trace import add_span_event from ddapm_test_agent.trace import add_span_link from ddapm_test_agent.trace import copy_span from ddapm_test_agent.trace import set_attr @@ -667,6 +668,34 @@ async def test_removed_attributes_metrics(agent, tmp_path, snapshot_removed_attr "", {"start"}, ), + # Mismatching span events count + ( + [TWO_SPAN_TRACE_NO_START], + [[TWO_SPAN_TRACE_NO_START[0], add_span_event(copy_span(TWO_SPAN_TRACE_NO_START[1]))]], + "Span value 'span_events' in received span but is not in the expected span.", + {"start"}, + ), + # Mismatching span event name + ( + [[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), name="expected_name")]], + [[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), name="got_name")]], + "Span event 0 mismatch on 'name': got 'got_name' which does not match expected 'expected_name'.", + {"start"}, + ), + # Mismatching span event attributes + ( + [[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": "2"})]], + [[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": "0"})]], + "Span event 0 attributes mismatch on 'b': got '0' which does not match expected '2'.", + {"start"}, + ), + # Matching span event + ( + [[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": 2, "c": [3]})]], + [[add_span_event(copy_span(ONE_SPAN_TRACE_NO_START[0]), attributes={"a": "1", "b": 2, "c": [3]})]], + "", + {"start"}, + ), # Default ignored fields ( [ @@ -682,6 +711,12 @@ async def test_removed_attributes_metrics(agent, tmp_path, snapshot_removed_attr "error": 0, "meta": {}, "metrics": {}, + "span_events": [ + { + "time_unix_nano": 123, + "name": "event_name", + }, + ], } ] ], @@ -698,6 +733,12 @@ async def test_removed_attributes_metrics(agent, tmp_path, snapshot_removed_attr "error": 0, "meta": {}, "metrics": {}, + "span_events": [ + { + "time_unix_nano": 456, + "name": "event_name", + }, + ], } ] ], diff --git a/tests/test_trace.py b/tests/test_trace.py index 74f6609..873b793 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -59,6 +59,9 @@ def test_trace_chunk(): "span_id": 1234, "trace_id": 321, "meta_struct": {"key": msgpack.packb({"subkey": "value"})}, + "span_events": [ + {"time_unix_nano": 1, "name": "event", "attributes": {"a": "1", "b": [2, 3]}} + ], } ] ] @@ -120,6 +123,21 @@ def test_decode_v04(content_type, payload): ] ), ), + ( + "application/msgpack", + msgpack.packb( + [ + [ + { + "name": "span", + "span_id": 1234, + "trace_id": 321, + "span_events": [{"time_unix_nano": 1, "name": "event", "attributes": {"b": ["2", 3]}}], + } + ] + ] + ), + ), ], ) def test_decode_v04_bad(content_type, payload):