Skip to content

Commit

Permalink
Merge branch 'main' into avara1986/iast_error_metrics_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
avara1986 authored Sep 23, 2024
2 parents bdd3a40 + 5546c23 commit c33988c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
2 changes: 1 addition & 1 deletion ddtrace/contrib/internal/botocore/services/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,
parent_ctx,
params,
time_estimate,
data_obj.get("_datadog"),
data_obj.get("_datadog") if data_obj else None,
record,
result,
config.botocore.propagation_enabled,
Expand Down
5 changes: 2 additions & 3 deletions ddtrace/contrib/internal/botocore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ def get_json_from_str(data_str: str) -> Tuple[str, Optional[Dict[str, Any]]]:
return None, data_obj


def get_kinesis_data_object(data: str) -> Tuple[str, Optional[Dict[str, Any]]]:
def get_kinesis_data_object(data: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
"""
:data: the data from a kinesis stream
The data from a kinesis stream comes as a string (could be json, base64 encoded, etc.)
We support injecting our trace context in the following three cases:
- json string
- byte encoded json string
- base64 encoded json string
If it's none of these, then we leave the message as it is.
If it's none of these, then we return None
"""

# check if data is a json string
try:
return get_json_from_str(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
kinesis: This fix resolves an issue where unparsable data in a Kinesis record would cause a NoneType error.
13 changes: 11 additions & 2 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2846,7 +2846,9 @@ def _test_kinesis_put_record_trace_injection(self, test_name, data, client=None,

return decoded_record_data

def _test_kinesis_put_records_trace_injection(self, test_name, data, client=None, enable_stream_arn=False):
def _test_kinesis_put_records_trace_injection(
self, test_name, data, client=None, enable_stream_arn=False, verify=True
):
if not client:
client = self.session.create_client("kinesis", region_name="us-east-1")

Expand All @@ -2858,7 +2860,8 @@ def _test_kinesis_put_records_trace_injection(self, test_name, data, client=None
client.put_records(StreamName=stream_name, Records=data, StreamARN=stream_arn)
else:
client.put_records(StreamName=stream_name, Records=data)

if not verify:
return None
# assert commons for span
span = self._kinesis_assert_spans()

Expand Down Expand Up @@ -3249,6 +3252,12 @@ def test_kinesis_put_records_newline_json_trace_injection(self):

assert decoded_record_data.endswith("\n")

@mock_kinesis
def test_kinesis_put_records_unparsable_data_object_avoid_nonetype_error(self):
# If the data is unparsable we should not error in tracer code
records = [{"Data": b"", "PartitionKey": "1234"}]
self._test_kinesis_put_records_trace_injection("unparsable_data_obj", records, verify=False)

@mock_kinesis
def test_kinesis_put_records_newline_bytes_trace_injection(self):
# (dict -> json string -> bytes + new line)[]
Expand Down

0 comments on commit c33988c

Please sign in to comment.