Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit trace ID propagation for SFN w/o Hashing #537

Merged
merged 26 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7c127a3
add logic to extract traceID from _datadog header
avedmala Oct 29, 2024
8658dea
rename test
avedmala Oct 29, 2024
3eccb20
fix type
avedmala Oct 29, 2024
97b189a
added root arn case
avedmala Oct 30, 2024
eaa09f3
trigger ci
avedmala Oct 30, 2024
2baec69
Add `http.route` tags for API Gateway (#524)
nhulston Oct 29, 2024
3c4014b
feat: [SVLS-5677] DynamoDB Stream event span pointers (#522)
apiarian-datadog Oct 30, 2024
6f54a00
trigger ci
avedmala Oct 30, 2024
4e0afdb
Merge remote-tracking branch 'origin/main' into avedmala/sfn-span-link
avedmala Oct 31, 2024
25780a8
use default propagator.extract
avedmala Nov 4, 2024
85a157f
lint
avedmala Nov 4, 2024
aad3cd8
lint
avedmala Nov 4, 2024
c3681de
updated to use trace/parent hash from _datadog
avedmala Nov 6, 2024
7d1d475
lint
avedmala Nov 6, 2024
e9a7d46
skip is context complete check
avedmala Nov 6, 2024
a6464b4
remove unused import
avedmala Nov 6, 2024
068c2fc
fix legacy lambda parsing with new header
avedmala Nov 11, 2024
00850d8
using context object instead of pre-hashed values
avedmala Nov 12, 2024
c23911d
Merge branch 'main' into avedmala/sfn-span-link-no-hash
avedmala Nov 12, 2024
832ae1b
fixed trigger tags and tests
avedmala Nov 12, 2024
e2e732b
pull sfn trace id generation out into a helper
avedmala Nov 13, 2024
3bfc1e7
added unit tests
avedmala Nov 13, 2024
6abb3f3
update test data
avedmala Nov 13, 2024
cae62b1
rename stepfunctions to states
avedmala Nov 13, 2024
49dbaf2
update current serverless version to v1
avedmala Nov 14, 2024
c47fc62
Update trigger comment
avedmala Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datadog_lambda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

# Datadog trace sampling priority


# Datadog trace sampling priority
class SamplingPriority(object):
USER_REJECT = -1
AUTO_REJECT = 0
Expand All @@ -18,6 +17,7 @@ class TraceHeader(object):
TRACE_ID = "x-datadog-trace-id"
PARENT_ID = "x-datadog-parent-id"
SAMPLING_PRIORITY = "x-datadog-sampling-priority"
TAGS = "x-datadog-tags"


# X-Ray subsegment to save Datadog trace metadata
Expand Down
96 changes: 75 additions & 21 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,8 @@ def extract_context_from_kinesis_event(event, lambda_context):
return extract_context_from_lambda_context(lambda_context)


def _deterministic_sha256_hash(s: str, part: str) -> (int, int):
def _deterministic_sha256_hash(s: str, part: str) -> int:
sha256_hash = hashlib.sha256(s.encode()).hexdigest()

# First two chars is '0b'. zfill to ensure 256 bits, but we only care about the first 128 bits
binary_hash = bin(int(sha256_hash, 16))[2:].zfill(256)
if part == HIGHER_64_BITS:
Expand All @@ -371,36 +370,88 @@ def _deterministic_sha256_hash(s: str, part: str) -> (int, int):
return result


def _parse_high_64_bits(trace_tags: str) -> str:
"""
Parse a list of trace tags such as [_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0] and return the
value of the _dd.p.tid tag or an empty string if not found.
"""
if trace_tags:
for tag in trace_tags.split(","):
if "_dd.p.tid=" in tag:
return tag.split("=")[1]

return ""


def _generate_sfn_parent_id(context: dict) -> int:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created these helpers to pull hashing and hex related details out of extract_context_from_step_functions as it was getting pretty verbose

execution_id = context.get("Execution").get("Id")
state_name = context.get("State").get("Name")
state_entered_time = context.get("State").get("EnteredTime")

return _deterministic_sha256_hash(
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
)


def _generate_sfn_trace_id(execution_id: str, part: str):
"""
Take the SHA-256 hash of the execution_id to calculate the trace ID. If the high 64 bits are
specified, we take those bits and use hex to encode it. We also remove the first two characters
as they will be '0x in the hex string.

We care about full 128 bits because they will break up into traditional traceID and
_dd.p.tid tag.
"""
if part == HIGHER_64_BITS:
return hex(_deterministic_sha256_hash(execution_id, part))[2:]
return _deterministic_sha256_hash(execution_id, part)


def extract_context_from_step_functions(event, lambda_context):
"""
Only extract datadog trace context when Step Functions Context Object is injected
into lambda's event dict.

If '_datadog' header is present, we have two cases:
1. Root is a Lambda and we use its traceID
2. Root is a SFN, and we use its executionARN to calculate the traceID
We calculate the parentID the same in both cases by using the parent SFN's context object.

Otherwise, we're dealing with the legacy case where we only have the parent SFN's context
object.
"""
try:
execution_id = event.get("Execution").get("Id")
state_name = event.get("State").get("Name")
state_entered_time = event.get("State").get("EnteredTime")
# returning 128 bits since 128bit traceId will be break up into
# traditional traceId and _dd.p.tid tag
# https://github.com/DataDog/dd-trace-py/blob/3e34d21cb9b5e1916e549047158cb119317b96ab/ddtrace/propagation/http.py#L232-L240
trace_id = _deterministic_sha256_hash(execution_id, LOWER_64_BITS)

parent_id = _deterministic_sha256_hash(
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
)
meta = {}
dd_data = event.get("_datadog")

if dd_data and dd_data.get("serverless-version") == "v2":

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between v1 and v2? Should we start from version v2 or v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no v1, I think we can start from v1 instead if we want

I think @kimi-p suggested v2, maybe because the "legacy" context is implied to be v1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, "V2" may be somewhat confusing to customers because there is no explicit reference to "V1" at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm!

if "x-datadog-trace-id" in dd_data: # lambda root
trace_id = int(dd_data.get("x-datadog-trace-id"))
high_64_bit_trace_id = _parse_high_64_bits(
dd_data.get("x-datadog-tags")
)
if high_64_bit_trace_id:
meta["_dd.p.tid"] = high_64_bit_trace_id
else: # sfn root
root_execution_id = dd_data.get("RootExecutionId")
trace_id = _generate_sfn_trace_id(root_execution_id, LOWER_64_BITS)
meta["_dd.p.tid"] = _generate_sfn_trace_id(
root_execution_id, HIGHER_64_BITS
)

parent_id = _generate_sfn_parent_id(dd_data)
else:
execution_id = event.get("Execution").get("Id")
trace_id = _generate_sfn_trace_id(execution_id, LOWER_64_BITS)
meta["_dd.p.tid"] = _generate_sfn_trace_id(execution_id, HIGHER_64_BITS)
parent_id = _generate_sfn_parent_id(event)

sampling_priority = SamplingPriority.AUTO_KEEP
return Context(
trace_id=trace_id,
span_id=parent_id,
sampling_priority=sampling_priority,
# take the higher 64 bits as _dd.p.tid tag and use hex to encode
# [2:] to remove '0x' in the hex str
meta={
"_dd.p.tid": hex(
_deterministic_sha256_hash(execution_id, HIGHER_64_BITS)
)[2:]
},
meta=meta,
)
except Exception as e:
logger.debug("The Step Functions trace extractor returned with error %s", e)
Expand All @@ -415,7 +466,10 @@ def is_legacy_lambda_step_function(event):
return False

event = event.get("Payload")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

variable name is the same as a function parameter (...read more)

A function parameter should only be read and not be modified. If your intent is to modify the value of the parameter, return the value in the function and handle the new value in the caller of the function.

View in Datadog  Leave us feedback  Documentation

return "Execution" in event and "StateMachine" in event and "State" in event
return isinstance(event, dict) and (
kimi-p marked this conversation as resolved.
Show resolved Hide resolved
"_datadog" in event
or ("Execution" in event and "StateMachine" in event and "State" in event)
)


def extract_context_custom_extractor(extractor, event, lambda_context):
Expand Down
11 changes: 10 additions & 1 deletion datadog_lambda/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def parse_event_source(event: dict) -> _EventSource:
if event.get("source") == "aws.events" or has_event_categories:
event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS)

if "Execution" in event and "StateMachine" in event and "State" in event:
if (
"_datadog" in event and event.get("_datadog").get("serverless-version") == "v2"
) or ("Execution" in event and "StateMachine" in event and "State" in event):
event_source = _EventSource(EventTypes.STEPFUNCTIONS)

event_record = get_first_record(event)
Expand Down Expand Up @@ -254,6 +256,13 @@ def parse_event_source_arn(source: _EventSource, event: dict, context: Any) -> s
if source.event_type == EventTypes.CLOUDWATCH_EVENTS and event.get("resources"):
return event.get("resources")[0]

# e.g. arn:aws:states:us-east-1:123456789012:stateMachine:stateMachineName
avedmala marked this conversation as resolved.
Show resolved Hide resolved
if source.event_type == EventTypes.STEPFUNCTIONS:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will parse the ARN so we we get trigger tags like this
Screenshot 2024-11-13 at 1 11 23 PM

context = event
if "_datadog" in event:
context = event.get("_datadog")
return context.get("StateMachine").get("Id")


def get_event_source_arn(source: _EventSource, event: dict, context: Any) -> str:
event_source_arn = event.get("eventSourceARN") or event.get("eventSourceArn")
Expand Down
22 changes: 22 additions & 0 deletions tests/event_samples/states.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"_datadog": {
"Execution": {
"Id": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c",
"StartTime": "2024-11-13T16:46:47.715Z",
"Name": "1356a963-42a5-48b0-ba3f-73bde559a50c",
"RoleArn": "arn:aws:iam::425362996713:role/service-role/StepFunctions-MyStateMachine-wsx8chv4d-role-1su0fkfd3",
"RedriveCount": 0
},
"StateMachine": {
"Id": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
"Name": "MyStateMachine-wsx8chv4d"
},
"State": {
"Name": "Lambda Invoke",
"EnteredTime": "2024-11-13T16:46:47.740Z",
"RetryCount": 0
},
"RootExecutionId": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c",
"serverless-version": "v2"
}
}
93 changes: 73 additions & 20 deletions tests/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ def test_with_complete_datadog_trace_headers_with_trigger_tags(self):
@with_trace_propagation_style("datadog")
def test_step_function_trace_data(self):
lambda_ctx = get_mock_context()
sqs_event = {
sfn_event = {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
Expand All @@ -627,7 +627,7 @@ def test_step_function_trace_data(self):
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
},
}
ctx, source, event_source = extract_dd_trace_context(sqs_event, lambda_ctx)
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
self.assertEqual(source, "event")
expected_context = Context(
trace_id=3675572987363469717,
Expand All @@ -642,7 +642,7 @@ def test_step_function_trace_data(self):
TraceHeader.TRACE_ID: "3675572987363469717",
TraceHeader.PARENT_ID: "10713633173203262661",
TraceHeader.SAMPLING_PRIORITY: "1",
"x-datadog-tags": "_dd.p.tid=e987c84b36b11ab",
TraceHeader.TAGS: "_dd.p.tid=e987c84b36b11ab",
},
)
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
Expand All @@ -651,9 +651,11 @@ def test_step_function_trace_data(self):
expected_context,
)

def test_is_legacy_lambda_step_function(self):
sf_event = {
"Payload": {
@with_trace_propagation_style("datadog")
def test_step_function_trace_data_lambda_root(self):
lambda_ctx = get_mock_context()
sfn_event = {
"_datadog": {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
Expand All @@ -662,24 +664,75 @@ def test_is_legacy_lambda_step_function(self):
"Name": "my-awesome-state",
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
},
"x-datadog-trace-id": "5821803790426892636",
"x-datadog-tags": "_dd.p.dm=-0,_dd.p.tid=672a7cb100000000",
"serverless-version": "v2",
}
}
self.assertTrue(is_legacy_lambda_step_function(sf_event))

sf_event = {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
"StateMachine": {},
"State": {
"Name": "my-awesome-state",
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
self.assertEqual(source, "event")
expected_context = Context(
trace_id=5821803790426892636,
span_id=6880978411788117524,
sampling_priority=1,
meta={"_dd.p.tid": "672a7cb100000000"},
)
self.assertEqual(ctx, expected_context)
self.assertEqual(
get_dd_trace_context(),
{
TraceHeader.TRACE_ID: "5821803790426892636",
TraceHeader.PARENT_ID: "10713633173203262661",
TraceHeader.SAMPLING_PRIORITY: "1",
TraceHeader.TAGS: "_dd.p.tid=672a7cb100000000",
},
}
self.assertFalse(is_legacy_lambda_step_function(sf_event))
)
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
self.mock_send_segment.assert_called_with(
XraySubsegment.TRACE_KEY,
expected_context,
)

other_event = ["foo", "bar"]
self.assertFalse(is_legacy_lambda_step_function(other_event))
@with_trace_propagation_style("datadog")
def test_step_function_trace_data_sfn_root(self):
lambda_ctx = get_mock_context()
sfn_event = {
"_datadog": {
"Execution": {
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
},
"StateMachine": {},
"State": {
"Name": "my-awesome-state",
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
},
"RootExecutionId": "4875aba4-ae31-4a4c-bf8a-63e9eee31dad",
"serverless-version": "v2",
}
}
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
self.assertEqual(source, "event")
expected_context = Context(
trace_id=4521899030418994483,
span_id=6880978411788117524,
sampling_priority=1,
meta={"_dd.p.tid": "12d1270d99cc5e03"},
)
self.assertEqual(ctx, expected_context)
self.assertEqual(
get_dd_trace_context(),
{
TraceHeader.TRACE_ID: "4521899030418994483",
TraceHeader.PARENT_ID: "10713633173203262661",
TraceHeader.SAMPLING_PRIORITY: "1",
TraceHeader.TAGS: "_dd.p.tid=12d1270d99cc5e03",
},
)
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
self.mock_send_segment.assert_called_with(
XraySubsegment.TRACE_KEY,
expected_context,
)


class TestXRayContextConversion(unittest.TestCase):
Expand Down
29 changes: 29 additions & 0 deletions tests/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ def test_event_source_sqs(self):
"arn:aws:sqs:eu-west-1:601427279990:InferredSpansQueueNode",
)

def test_event_source_stepfunctions(self):
event_sample_source = "states"
test_file = event_samples + event_sample_source + ".json"
with open(test_file, "r") as event:
event = json.load(event)
ctx = get_mock_context()
event_source = parse_event_source(event)
event_source_arn = get_event_source_arn(event_source, event, ctx)
self.assertEqual(event_source.to_string(), event_sample_source)
self.assertEqual(
event_source_arn,
"arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
)

def test_event_source_unsupported(self):
event_sample_source = "custom"
test_file = event_samples + event_sample_source + ".json"
Expand Down Expand Up @@ -485,6 +499,21 @@ def test_extract_trigger_tags_sqs(self):
},
)

def test_extract_trigger_tags_stepfunctions(self):
event_sample_source = "states"
test_file = event_samples + event_sample_source + ".json"
ctx = get_mock_context()
with open(test_file, "r") as event:
event = json.load(event)
tags = extract_trigger_tags(event, ctx)
self.assertEqual(
tags,
{
"function_trigger.event_source": "states",
"function_trigger.event_source_arn": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
},
)

def test_extract_trigger_tags_unsupported(self):
event_sample_source = "custom"
test_file = event_samples + event_sample_source + ".json"
Expand Down
Loading