Skip to content

Commit

Permalink
Fix lossiness when submitting timestamped custom metrics (#527)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhulston authored Oct 31, 2024
1 parent 02cc8c6 commit 59e6e7c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 7 deletions.
8 changes: 7 additions & 1 deletion datadog_lambda/thread_stats_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ def flush(self, tags=None):
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
to gain better control over exception handling.
"""
original_constant_tags = self.thread_stats.constant_tags.copy()
if tags:
self.thread_stats.constant_tags = self.thread_stats.constant_tags + tags
# Temporarily add tags for this flush
self.thread_stats.constant_tags = original_constant_tags + tags

_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
count_dists = len(dists)
if not count_dists:
Expand Down Expand Up @@ -62,6 +65,9 @@ def flush(self, tags=None):
logger.debug(
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True
)
finally:
# Reset constant_tags to its original state
self.thread_stats.constant_tags = original_constant_tags

def stop(self):
self.thread_stats.stop()
68 changes: 62 additions & 6 deletions tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,69 @@ def test_retry_on_remote_disconnected(self):

def test_flush_stats_with_tags(self):
lambda_stats = ThreadStatsWriter(True)
original_constant_tags = lambda_stats.thread_stats.constant_tags.copy()
tags = ["tag1:value1", "tag2:value2"]
lambda_stats.flush(tags)
self.mock_threadstats_flush_distributions.assert_called_once_with(
lambda_stats.thread_stats._get_aggregate_metrics_and_dists(float("inf"))[1]
)
for tag in tags:
self.assertTrue(tag in lambda_stats.thread_stats.constant_tags)

# Add a metric to be flushed
lambda_stats.distribution("test.metric", 1, tags=["metric:tag"])

with patch.object(
lambda_stats.thread_stats.reporter, "flush_distributions"
) as mock_flush_distributions:
lambda_stats.flush(tags)
mock_flush_distributions.assert_called_once()
# Verify that after flush, constant_tags is reset to original
self.assertEqual(
lambda_stats.thread_stats.constant_tags, original_constant_tags
)

def test_flush_temp_constant_tags(self):
lambda_stats = ThreadStatsWriter(flush_in_thread=True)
lambda_stats.thread_stats.constant_tags = ["initial:tag"]
original_constant_tags = lambda_stats.thread_stats.constant_tags.copy()

lambda_stats.distribution("test.metric", 1, tags=["metric:tag"])
flush_tags = ["flush:tag1", "flush:tag2"]

with patch.object(
lambda_stats.thread_stats.reporter, "flush_distributions"
) as mock_flush_distributions:
lambda_stats.flush(tags=flush_tags)
mock_flush_distributions.assert_called_once()
flushed_dists = mock_flush_distributions.call_args[0][0]

# Expected tags: original constant_tags + flush_tags + metric tags
expected_tags = original_constant_tags + flush_tags + ["metric:tag"]

# Verify the tags on the metric
self.assertEqual(len(flushed_dists), 1)
metric = flushed_dists[0]
self.assertEqual(sorted(metric["tags"]), sorted(expected_tags))

# Verify that constant_tags is reset after flush
self.assertEqual(
lambda_stats.thread_stats.constant_tags, original_constant_tags
)

# Repeat to ensure tags do not accumulate over multiple flushes
new_flush_tags = ["flush:tag3"]
lambda_stats.distribution("test.metric2", 2, tags=["metric2:tag"])

with patch.object(
lambda_stats.thread_stats.reporter, "flush_distributions"
) as mock_flush_distributions:
lambda_stats.flush(tags=new_flush_tags)
mock_flush_distributions.assert_called_once()
flushed_dists = mock_flush_distributions.call_args[0][0]
# Expected tags for the new metric
expected_tags = original_constant_tags + new_flush_tags + ["metric2:tag"]

self.assertEqual(len(flushed_dists), 1)
metric = flushed_dists[0]
self.assertEqual(sorted(metric["tags"]), sorted(expected_tags))
self.assertEqual(
lambda_stats.thread_stats.constant_tags, original_constant_tags
)

def test_flush_stats_without_context(self):
flush_stats(lambda_context=None)
Expand Down

0 comments on commit 59e6e7c

Please sign in to comment.