From c4da90eca378d791201f8a0325d824848ec11709 Mon Sep 17 00:00:00 2001 From: Abhinav Vedmala Date: Mon, 18 Nov 2024 15:15:03 -0500 Subject: [PATCH] Explicit trace ID propagation for SFN w/o Hashing (#537) * add logic to extract traceID from _datadog header * rename test * fix type * added root arn case * trigger ci * Add `http.route` tags for API Gateway (#524) Add route tags * feat: [SVLS-5677] DynamoDB Stream event span pointers (#522) * trigger ci * use default propagator.extract * lint * lint * updated to use trace/parent hash from _datadog * lint * skip is context complete check * remove unused import * fix legacy lambda parsing with new header * using context object instead of pre-hashed values * fixed trigger tags and tests * pull sfn trace id generation out into a helper * added unit tests * update test data * rename stepfunctions to states * update current serverless version to v1 * Update trigger comment Co-authored-by: kimi <47579703+kimi-p@users.noreply.github.com> --------- Co-authored-by: Nicholas Hulston Co-authored-by: Aleksandr Pasechnik Co-authored-by: kimi <47579703+kimi-p@users.noreply.github.com> --- datadog_lambda/constants.py | 4 +- datadog_lambda/tracing.py | 96 +++++++++++++++++++++++++-------- datadog_lambda/trigger.py | 11 +++- tests/event_samples/states.json | 22 ++++++++ tests/test_tracing.py | 93 +++++++++++++++++++++++++------- tests/test_trigger.py | 29 ++++++++++ 6 files changed, 211 insertions(+), 44 deletions(-) create mode 100644 tests/event_samples/states.json diff --git a/datadog_lambda/constants.py b/datadog_lambda/constants.py index fd8afb3e..6ab62738 100644 --- a/datadog_lambda/constants.py +++ b/datadog_lambda/constants.py @@ -3,9 +3,8 @@ # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2019 Datadog, Inc. -# Datadog trace sampling priority - +# Datadog trace sampling priority class SamplingPriority(object): USER_REJECT = -1 AUTO_REJECT = 0 @@ -18,6 +17,7 @@ class TraceHeader(object): TRACE_ID = "x-datadog-trace-id" PARENT_ID = "x-datadog-parent-id" SAMPLING_PRIORITY = "x-datadog-sampling-priority" + TAGS = "x-datadog-tags" # X-Ray subsegment to save Datadog trace metadata diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index dfb08dd2..5ed306a3 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -356,9 +356,8 @@ def extract_context_from_kinesis_event(event, lambda_context): return extract_context_from_lambda_context(lambda_context) -def _deterministic_sha256_hash(s: str, part: str) -> (int, int): +def _deterministic_sha256_hash(s: str, part: str) -> int: sha256_hash = hashlib.sha256(s.encode()).hexdigest() - # First two chars is '0b'. zfill to ensure 256 bits, but we only care about the first 128 bits binary_hash = bin(int(sha256_hash, 16))[2:].zfill(256) if part == HIGHER_64_BITS: @@ -371,36 +370,88 @@ def _deterministic_sha256_hash(s: str, part: str) -> (int, int): return result +def _parse_high_64_bits(trace_tags: str) -> str: + """ + Parse a list of trace tags such as [_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0] and return the + value of the _dd.p.tid tag or an empty string if not found. + """ + if trace_tags: + for tag in trace_tags.split(","): + if "_dd.p.tid=" in tag: + return tag.split("=")[1] + + return "" + + +def _generate_sfn_parent_id(context: dict) -> int: + execution_id = context.get("Execution").get("Id") + state_name = context.get("State").get("Name") + state_entered_time = context.get("State").get("EnteredTime") + + return _deterministic_sha256_hash( + f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS + ) + + +def _generate_sfn_trace_id(execution_id: str, part: str): + """ + Take the SHA-256 hash of the execution_id to calculate the trace ID. If the high 64 bits are + specified, we take those bits and use hex to encode it. We also remove the first two characters + as they will be '0x in the hex string. + + We care about full 128 bits because they will break up into traditional traceID and + _dd.p.tid tag. + """ + if part == HIGHER_64_BITS: + return hex(_deterministic_sha256_hash(execution_id, part))[2:] + return _deterministic_sha256_hash(execution_id, part) + + def extract_context_from_step_functions(event, lambda_context): """ Only extract datadog trace context when Step Functions Context Object is injected into lambda's event dict. + + If '_datadog' header is present, we have two cases: + 1. Root is a Lambda and we use its traceID + 2. Root is a SFN, and we use its executionARN to calculate the traceID + We calculate the parentID the same in both cases by using the parent SFN's context object. + + Otherwise, we're dealing with the legacy case where we only have the parent SFN's context + object. """ try: - execution_id = event.get("Execution").get("Id") - state_name = event.get("State").get("Name") - state_entered_time = event.get("State").get("EnteredTime") - # returning 128 bits since 128bit traceId will be break up into - # traditional traceId and _dd.p.tid tag - # https://github.com/DataDog/dd-trace-py/blob/3e34d21cb9b5e1916e549047158cb119317b96ab/ddtrace/propagation/http.py#L232-L240 - trace_id = _deterministic_sha256_hash(execution_id, LOWER_64_BITS) - - parent_id = _deterministic_sha256_hash( - f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS - ) + meta = {} + dd_data = event.get("_datadog") + + if dd_data and dd_data.get("serverless-version") == "v1": + if "x-datadog-trace-id" in dd_data: # lambda root + trace_id = int(dd_data.get("x-datadog-trace-id")) + high_64_bit_trace_id = _parse_high_64_bits( + dd_data.get("x-datadog-tags") + ) + if high_64_bit_trace_id: + meta["_dd.p.tid"] = high_64_bit_trace_id + else: # sfn root + root_execution_id = dd_data.get("RootExecutionId") + trace_id = _generate_sfn_trace_id(root_execution_id, LOWER_64_BITS) + meta["_dd.p.tid"] = _generate_sfn_trace_id( + root_execution_id, HIGHER_64_BITS + ) + + parent_id = _generate_sfn_parent_id(dd_data) + else: + execution_id = event.get("Execution").get("Id") + trace_id = _generate_sfn_trace_id(execution_id, LOWER_64_BITS) + meta["_dd.p.tid"] = _generate_sfn_trace_id(execution_id, HIGHER_64_BITS) + parent_id = _generate_sfn_parent_id(event) sampling_priority = SamplingPriority.AUTO_KEEP return Context( trace_id=trace_id, span_id=parent_id, sampling_priority=sampling_priority, - # take the higher 64 bits as _dd.p.tid tag and use hex to encode - # [2:] to remove '0x' in the hex str - meta={ - "_dd.p.tid": hex( - _deterministic_sha256_hash(execution_id, HIGHER_64_BITS) - )[2:] - }, + meta=meta, ) except Exception as e: logger.debug("The Step Functions trace extractor returned with error %s", e) @@ -415,7 +466,10 @@ def is_legacy_lambda_step_function(event): return False event = event.get("Payload") - return "Execution" in event and "StateMachine" in event and "State" in event + return isinstance(event, dict) and ( + "_datadog" in event + or ("Execution" in event and "StateMachine" in event and "State" in event) + ) def extract_context_custom_extractor(extractor, event, lambda_context): diff --git a/datadog_lambda/trigger.py b/datadog_lambda/trigger.py index 64eff1a0..11759a0a 100644 --- a/datadog_lambda/trigger.py +++ b/datadog_lambda/trigger.py @@ -146,7 +146,9 @@ def parse_event_source(event: dict) -> _EventSource: if event.get("source") == "aws.events" or has_event_categories: event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS) - if "Execution" in event and "StateMachine" in event and "State" in event: + if ( + "_datadog" in event and event.get("_datadog").get("serverless-version") == "v1" + ) or ("Execution" in event and "StateMachine" in event and "State" in event): event_source = _EventSource(EventTypes.STEPFUNCTIONS) event_record = get_first_record(event) @@ -254,6 +256,13 @@ def parse_event_source_arn(source: _EventSource, event: dict, context: Any) -> s if source.event_type == EventTypes.CLOUDWATCH_EVENTS and event.get("resources"): return event.get("resources")[0] + # Returning state machine arn as event source arn. + if source.event_type == EventTypes.STEPFUNCTIONS: + context = event + if "_datadog" in event: + context = event.get("_datadog") + return context.get("StateMachine").get("Id") + def get_event_source_arn(source: _EventSource, event: dict, context: Any) -> str: event_source_arn = event.get("eventSourceARN") or event.get("eventSourceArn") diff --git a/tests/event_samples/states.json b/tests/event_samples/states.json new file mode 100644 index 00000000..778fe437 --- /dev/null +++ b/tests/event_samples/states.json @@ -0,0 +1,22 @@ +{ + "_datadog": { + "Execution": { + "Id": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c", + "StartTime": "2024-11-13T16:46:47.715Z", + "Name": "1356a963-42a5-48b0-ba3f-73bde559a50c", + "RoleArn": "arn:aws:iam::425362996713:role/service-role/StepFunctions-MyStateMachine-wsx8chv4d-role-1su0fkfd3", + "RedriveCount": 0 + }, + "StateMachine": { + "Id": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d", + "Name": "MyStateMachine-wsx8chv4d" + }, + "State": { + "Name": "Lambda Invoke", + "EnteredTime": "2024-11-13T16:46:47.740Z", + "RetryCount": 0 + }, + "RootExecutionId": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c", + "serverless-version": "v1" + } +} diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 22ac7049..d0db05cd 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -617,7 +617,7 @@ def test_with_complete_datadog_trace_headers_with_trigger_tags(self): @with_trace_propagation_style("datadog") def test_step_function_trace_data(self): lambda_ctx = get_mock_context() - sqs_event = { + sfn_event = { "Execution": { "Id": "665c417c-1237-4742-aaca-8b3becbb9e75", }, @@ -627,7 +627,7 @@ def test_step_function_trace_data(self): "EnteredTime": "Mon Nov 13 12:43:33 PST 2023", }, } - ctx, source, event_source = extract_dd_trace_context(sqs_event, lambda_ctx) + ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx) self.assertEqual(source, "event") expected_context = Context( trace_id=3675572987363469717, @@ -642,7 +642,7 @@ def test_step_function_trace_data(self): TraceHeader.TRACE_ID: "3675572987363469717", TraceHeader.PARENT_ID: "10713633173203262661", TraceHeader.SAMPLING_PRIORITY: "1", - "x-datadog-tags": "_dd.p.tid=e987c84b36b11ab", + TraceHeader.TAGS: "_dd.p.tid=e987c84b36b11ab", }, ) create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) @@ -651,9 +651,11 @@ def test_step_function_trace_data(self): expected_context, ) - def test_is_legacy_lambda_step_function(self): - sf_event = { - "Payload": { + @with_trace_propagation_style("datadog") + def test_step_function_trace_data_lambda_root(self): + lambda_ctx = get_mock_context() + sfn_event = { + "_datadog": { "Execution": { "Id": "665c417c-1237-4742-aaca-8b3becbb9e75", }, @@ -662,24 +664,75 @@ def test_is_legacy_lambda_step_function(self): "Name": "my-awesome-state", "EnteredTime": "Mon Nov 13 12:43:33 PST 2023", }, + "x-datadog-trace-id": "5821803790426892636", + "x-datadog-tags": "_dd.p.dm=-0,_dd.p.tid=672a7cb100000000", + "serverless-version": "v1", } } - self.assertTrue(is_legacy_lambda_step_function(sf_event)) - - sf_event = { - "Execution": { - "Id": "665c417c-1237-4742-aaca-8b3becbb9e75", - }, - "StateMachine": {}, - "State": { - "Name": "my-awesome-state", - "EnteredTime": "Mon Nov 13 12:43:33 PST 2023", + ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx) + self.assertEqual(source, "event") + expected_context = Context( + trace_id=5821803790426892636, + span_id=6880978411788117524, + sampling_priority=1, + meta={"_dd.p.tid": "672a7cb100000000"}, + ) + self.assertEqual(ctx, expected_context) + self.assertEqual( + get_dd_trace_context(), + { + TraceHeader.TRACE_ID: "5821803790426892636", + TraceHeader.PARENT_ID: "10713633173203262661", + TraceHeader.SAMPLING_PRIORITY: "1", + TraceHeader.TAGS: "_dd.p.tid=672a7cb100000000", }, - } - self.assertFalse(is_legacy_lambda_step_function(sf_event)) + ) + create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) + self.mock_send_segment.assert_called_with( + XraySubsegment.TRACE_KEY, + expected_context, + ) - other_event = ["foo", "bar"] - self.assertFalse(is_legacy_lambda_step_function(other_event)) + @with_trace_propagation_style("datadog") + def test_step_function_trace_data_sfn_root(self): + lambda_ctx = get_mock_context() + sfn_event = { + "_datadog": { + "Execution": { + "Id": "665c417c-1237-4742-aaca-8b3becbb9e75", + }, + "StateMachine": {}, + "State": { + "Name": "my-awesome-state", + "EnteredTime": "Mon Nov 13 12:43:33 PST 2023", + }, + "RootExecutionId": "4875aba4-ae31-4a4c-bf8a-63e9eee31dad", + "serverless-version": "v1", + } + } + ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx) + self.assertEqual(source, "event") + expected_context = Context( + trace_id=4521899030418994483, + span_id=6880978411788117524, + sampling_priority=1, + meta={"_dd.p.tid": "12d1270d99cc5e03"}, + ) + self.assertEqual(ctx, expected_context) + self.assertEqual( + get_dd_trace_context(), + { + TraceHeader.TRACE_ID: "4521899030418994483", + TraceHeader.PARENT_ID: "10713633173203262661", + TraceHeader.SAMPLING_PRIORITY: "1", + TraceHeader.TAGS: "_dd.p.tid=12d1270d99cc5e03", + }, + ) + create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) + self.mock_send_segment.assert_called_with( + XraySubsegment.TRACE_KEY, + expected_context, + ) class TestXRayContextConversion(unittest.TestCase): diff --git a/tests/test_trigger.py b/tests/test_trigger.py index c1e7f5d7..be028a23 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -230,6 +230,20 @@ def test_event_source_sqs(self): "arn:aws:sqs:eu-west-1:601427279990:InferredSpansQueueNode", ) + def test_event_source_stepfunctions(self): + event_sample_source = "states" + test_file = event_samples + event_sample_source + ".json" + with open(test_file, "r") as event: + event = json.load(event) + ctx = get_mock_context() + event_source = parse_event_source(event) + event_source_arn = get_event_source_arn(event_source, event, ctx) + self.assertEqual(event_source.to_string(), event_sample_source) + self.assertEqual( + event_source_arn, + "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d", + ) + def test_event_source_unsupported(self): event_sample_source = "custom" test_file = event_samples + event_sample_source + ".json" @@ -485,6 +499,21 @@ def test_extract_trigger_tags_sqs(self): }, ) + def test_extract_trigger_tags_stepfunctions(self): + event_sample_source = "states" + test_file = event_samples + event_sample_source + ".json" + ctx = get_mock_context() + with open(test_file, "r") as event: + event = json.load(event) + tags = extract_trigger_tags(event, ctx) + self.assertEqual( + tags, + { + "function_trigger.event_source": "states", + "function_trigger.event_source_arn": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d", + }, + ) + def test_extract_trigger_tags_unsupported(self): event_sample_source = "custom" test_file = event_samples + event_sample_source + ".json"