Skip to content

Commit

Permalink
Explicit trace ID propagation for SFN w/o Hashing (#537)
Browse files Browse the repository at this point in the history
* add logic to extract traceID from _datadog header

* rename test

* fix type

* added root arn case

* trigger ci

* Add `http.route` tags for API Gateway (#524)

Add route tags

* feat: [SVLS-5677] DynamoDB Stream event span pointers (#522)

* trigger ci

* use default propagator.extract

* lint

* lint

* updated to use trace/parent hash from _datadog

* lint

* skip is context complete check

* remove unused import

* fix legacy lambda parsing with new header

* using context object instead of pre-hashed values

* fixed trigger tags and tests

* pull sfn trace id generation out into a helper

* added unit tests

* update test data

* rename stepfunctions to states

* update current serverless version to v1

* Update trigger comment

Co-authored-by: kimi <47579703+kimi-p@users.noreply.github.com>

---------

Co-authored-by: Nicholas Hulston <nicholashulston@gmail.com>
Co-authored-by: Aleksandr Pasechnik <aleksandr.pasechnik@datadoghq.com>
Co-authored-by: kimi <47579703+kimi-p@users.noreply.github.com>
  • Loading branch information
4 people authored Nov 18, 2024
1 parent 49df8ee commit c4da90e
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 44 deletions.
4 changes: 2 additions & 2 deletions datadog_lambda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

# Datadog trace sampling priority


# Datadog trace sampling priority
class SamplingPriority(object):
USER_REJECT = -1
AUTO_REJECT = 0
Expand All @@ -18,6 +17,7 @@ class TraceHeader(object):
TRACE_ID = "x-datadog-trace-id"
PARENT_ID = "x-datadog-parent-id"
SAMPLING_PRIORITY = "x-datadog-sampling-priority"
TAGS = "x-datadog-tags"


# X-Ray subsegment to save Datadog trace metadata
Expand Down
96 changes: 75 additions & 21 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,8 @@ def extract_context_from_kinesis_event(event, lambda_context):
return extract_context_from_lambda_context(lambda_context)


def _deterministic_sha256_hash(s: str, part: str) -> (int, int):
def _deterministic_sha256_hash(s: str, part: str) -> int:
sha256_hash = hashlib.sha256(s.encode()).hexdigest()

# First two chars is '0b'. zfill to ensure 256 bits, but we only care about the first 128 bits
binary_hash = bin(int(sha256_hash, 16))[2:].zfill(256)
if part == HIGHER_64_BITS:
Expand All @@ -371,36 +370,88 @@ def _deterministic_sha256_hash(s: str, part: str) -> (int, int):
return result


def _parse_high_64_bits(trace_tags: str) -> str:
"""
Parse a list of trace tags such as [_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0] and return the
value of the _dd.p.tid tag or an empty string if not found.
"""
if trace_tags:
for tag in trace_tags.split(","):
if "_dd.p.tid=" in tag:
return tag.split("=")[1]

return ""


def _generate_sfn_parent_id(context: dict) -> int:
execution_id = context.get("Execution").get("Id")
state_name = context.get("State").get("Name")
state_entered_time = context.get("State").get("EnteredTime")

return _deterministic_sha256_hash(
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
)


def _generate_sfn_trace_id(execution_id: str, part: str):
"""
Take the SHA-256 hash of the execution_id to calculate the trace ID. If the high 64 bits are
specified, we take those bits and use hex to encode it. We also remove the first two characters
as they will be '0x in the hex string.
We care about full 128 bits because they will break up into traditional traceID and
_dd.p.tid tag.
"""
if part == HIGHER_64_BITS:
return hex(_deterministic_sha256_hash(execution_id, part))[2:]
return _deterministic_sha256_hash(execution_id, part)


def extract_context_from_step_functions(event, lambda_context):
"""
Only extract datadog trace context when Step Functions Context Object is injected
into lambda's event dict.
If '_datadog' header is present, we have two cases:
1. Root is a Lambda and we use its traceID
2. Root is a SFN, and we use its executionARN to calculate the traceID
We calculate the parentID the same in both cases by using the parent SFN's context object.
Otherwise, we're dealing with the legacy case where we only have the parent SFN's context
object.
"""
try:
execution_id = event.get("Execution").get("Id")
state_name = event.get("State").get("Name")
state_entered_time = event.get("State").get("EnteredTime")
# returning 128 bits since 128bit traceId will be break up into
# traditional traceId and _dd.p.tid tag
# https://github.com/DataDog/dd-trace-py/blob/3e34d21cb9b5e1916e549047158cb119317b96ab/ddtrace/propagation/http.py#L232-L240
trace_id = _deterministic_sha256_hash(execution_id, LOWER_64_BITS)

parent_id = _deterministic_sha256_hash(
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
)
meta = {}
dd_data = event.get("_datadog")

if dd_data and dd_data.get("serverless-version") == "v1":
if "x-datadog-trace-id" in dd_data: # lambda root
trace_id = int(dd_data.get("x-datadog-trace-id"))
high_64_bit_trace_id = _parse_high_64_bits(
dd_data.get("x-datadog-tags")
)
if high_64_bit_trace_id:
meta["_dd.p.tid"] = high_64_bit_trace_id
else: # sfn root
root_execution_id = dd_data.get("RootExecutionId")
trace_id = _generate_sfn_trace_id(root_execution_id, LOWER_64_BITS)
meta["_dd.p.tid"] = _generate_sfn_trace_id(
root_execution_id, HIGHER_64_BITS
)

parent_id = _generate_sfn_parent_id(dd_data)
else:
execution_id = event.get("Execution").get("Id")
trace_id = _generate_sfn_trace_id(execution_id, LOWER_64_BITS)
meta["_dd.p.tid"] = _generate_sfn_trace_id(execution_id, HIGHER_64_BITS)
parent_id = _generate_sfn_parent_id(event)

sampling_priority = SamplingPriority.AUTO_KEEP
return Context(
trace_id=trace_id,
span_id=parent_id,
sampling_priority=sampling_priority,
# take the higher 64 bits as _dd.p.tid tag and use hex to encode
# [2:] to remove '0x' in the hex str
meta={
"_dd.p.tid": hex(
_deterministic_sha256_hash(execution_id, HIGHER_64_BITS)
)[2:]
},
meta=meta,
)
except Exception as e:
logger.debug("The Step Functions trace extractor returned with error %s", e)
Expand All @@ -415,7 +466,10 @@ def is_legacy_lambda_step_function(event):
return False

event = event.get("Payload")
return "Execution" in event and "StateMachine" in event and "State" in event
return isinstance(event, dict) and (
"_datadog" in event
or ("Execution" in event and "StateMachine" in event and "State" in event)
)


def extract_context_custom_extractor(extractor, event, lambda_context):
Expand Down
11 changes: 10 additions & 1 deletion datadog_lambda/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def parse_event_source(event: dict) -> _EventSource:
if event.get("source") == "aws.events" or has_event_categories:
event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS)

if "Execution" in event and "StateMachine" in event and "State" in event:
if (
"_datadog" in event and event.get("_datadog").get("serverless-version") == "v1"
) or ("Execution" in event and "StateMachine" in event and "State" in event):
event_source = _EventSource(EventTypes.STEPFUNCTIONS)

event_record = get_first_record(event)
Expand Down Expand Up @@ -254,6 +256,13 @@ def parse_event_source_arn(source: _EventSource, event: dict, context: Any) -> s
if source.event_type == EventTypes.CLOUDWATCH_EVENTS and event.get("resources"):
return event.get("resources")[0]

# Returning state machine arn as event source arn.
if source.event_type == EventTypes.STEPFUNCTIONS:
context = event
if "_datadog" in event:
context = event.get("_datadog")
return context.get("StateMachine").get("Id")


def get_event_source_arn(source: _EventSource, event: dict, context: Any) -> str:
event_source_arn = event.get("eventSourceARN") or event.get("eventSourceArn")
Expand Down
22 changes: 22 additions & 0 deletions tests/event_samples/states.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"_datadog": {
"Execution": {
"Id": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c",
"StartTime": "2024-11-13T16:46:47.715Z",
"Name": "1356a963-42a5-48b0-ba3f-73bde559a50c",
"RoleArn": "arn:aws:iam::425362996713:role/service-role/StepFunctions-MyStateMachine-wsx8chv4d-role-1su0fkfd3",
"RedriveCount": 0
},
"StateMachine": {
"Id": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
"Name": "MyStateMachine-wsx8chv4d"
},
"State": {
"Name": "Lambda Invoke",
"EnteredTime": "2024-11-13T16:46:47.740Z",
"RetryCount": 0
},
"RootExecutionId": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c",
"serverless-version": "v1"
}
}
93 changes: 73 additions & 20 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ def test_with_complete_datadog_trace_headers_with_trigger_tags(self):
@with_trace_propagation_style("datadog")
def test_step_function_trace_data(self):
lambda_ctx = get_mock_context()
sqs_event = {
sfn_event = {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
Expand All @@ -627,7 +627,7 @@ def test_step_function_trace_data(self):
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
},
}
ctx, source, event_source = extract_dd_trace_context(sqs_event, lambda_ctx)
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
self.assertEqual(source, "event")
expected_context = Context(
trace_id=3675572987363469717,
Expand All @@ -642,7 +642,7 @@ def test_step_function_trace_data(self):
TraceHeader.TRACE_ID: "3675572987363469717",
TraceHeader.PARENT_ID: "10713633173203262661",
TraceHeader.SAMPLING_PRIORITY: "1",
"x-datadog-tags": "_dd.p.tid=e987c84b36b11ab",
TraceHeader.TAGS: "_dd.p.tid=e987c84b36b11ab",
},
)
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
Expand All @@ -651,9 +651,11 @@ def test_step_function_trace_data(self):
expected_context,
)

def test_is_legacy_lambda_step_function(self):
sf_event = {
"Payload": {
@with_trace_propagation_style("datadog")
def test_step_function_trace_data_lambda_root(self):
lambda_ctx = get_mock_context()
sfn_event = {
"_datadog": {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
Expand All @@ -662,24 +664,75 @@ def test_is_legacy_lambda_step_function(self):
"Name": "my-awesome-state",
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
},
"x-datadog-trace-id": "5821803790426892636",
"x-datadog-tags": "_dd.p.dm=-0,_dd.p.tid=672a7cb100000000",
"serverless-version": "v1",
}
}
self.assertTrue(is_legacy_lambda_step_function(sf_event))

sf_event = {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
"StateMachine": {},
"State": {
"Name": "my-awesome-state",
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
self.assertEqual(source, "event")
expected_context = Context(
trace_id=5821803790426892636,
span_id=6880978411788117524,
sampling_priority=1,
meta={"_dd.p.tid": "672a7cb100000000"},
)
self.assertEqual(ctx, expected_context)
self.assertEqual(
get_dd_trace_context(),
{
TraceHeader.TRACE_ID: "5821803790426892636",
TraceHeader.PARENT_ID: "10713633173203262661",
TraceHeader.SAMPLING_PRIORITY: "1",
TraceHeader.TAGS: "_dd.p.tid=672a7cb100000000",
},
}
self.assertFalse(is_legacy_lambda_step_function(sf_event))
)
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
self.mock_send_segment.assert_called_with(
XraySubsegment.TRACE_KEY,
expected_context,
)

other_event = ["foo", "bar"]
self.assertFalse(is_legacy_lambda_step_function(other_event))
@with_trace_propagation_style("datadog")
def test_step_function_trace_data_sfn_root(self):
lambda_ctx = get_mock_context()
sfn_event = {
"_datadog": {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
"StateMachine": {},
"State": {
"Name": "my-awesome-state",
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
},
"RootExecutionId": "4875aba4-ae31-4a4c-bf8a-63e9eee31dad",
"serverless-version": "v1",
}
}
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
self.assertEqual(source, "event")
expected_context = Context(
trace_id=4521899030418994483,
span_id=6880978411788117524,
sampling_priority=1,
meta={"_dd.p.tid": "12d1270d99cc5e03"},
)
self.assertEqual(ctx, expected_context)
self.assertEqual(
get_dd_trace_context(),
{
TraceHeader.TRACE_ID: "4521899030418994483",
TraceHeader.PARENT_ID: "10713633173203262661",
TraceHeader.SAMPLING_PRIORITY: "1",
TraceHeader.TAGS: "_dd.p.tid=12d1270d99cc5e03",
},
)
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
self.mock_send_segment.assert_called_with(
XraySubsegment.TRACE_KEY,
expected_context,
)


class TestXRayContextConversion(unittest.TestCase):
Expand Down
29 changes: 29 additions & 0 deletions tests/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ def test_event_source_sqs(self):
"arn:aws:sqs:eu-west-1:601427279990:InferredSpansQueueNode",
)

def test_event_source_stepfunctions(self):
event_sample_source = "states"
test_file = event_samples + event_sample_source + ".json"
with open(test_file, "r") as event:
event = json.load(event)
ctx = get_mock_context()
event_source = parse_event_source(event)
event_source_arn = get_event_source_arn(event_source, event, ctx)
self.assertEqual(event_source.to_string(), event_sample_source)
self.assertEqual(
event_source_arn,
"arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
)

def test_event_source_unsupported(self):
event_sample_source = "custom"
test_file = event_samples + event_sample_source + ".json"
Expand Down Expand Up @@ -485,6 +499,21 @@ def test_extract_trigger_tags_sqs(self):
},
)

def test_extract_trigger_tags_stepfunctions(self):
event_sample_source = "states"
test_file = event_samples + event_sample_source + ".json"
ctx = get_mock_context()
with open(test_file, "r") as event:
event = json.load(event)
tags = extract_trigger_tags(event, ctx)
self.assertEqual(
tags,
{
"function_trigger.event_source": "states",
"function_trigger.event_source_arn": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
},
)

def test_extract_trigger_tags_unsupported(self):
event_sample_source = "custom"
test_file = event_samples + event_sample_source + ".json"
Expand Down

0 comments on commit c4da90e

Please sign in to comment.