Skip to content

Commit

Permalink
[statsd] Add caching to tag normalization on Python3.2+
Browse files Browse the repository at this point in the history
Since tag normalization is still the highest bottleneck in metrics
submission latency, this change adds small caching (512 entries) to
that method's calls via built-in `@lru_cache` where available
(Python3.2+). When the cache is hit, we avoid the ultra-expensive
`re.sub` operation and increase the performance.
  • Loading branch information
sgnn7 committed Jun 30, 2021
1 parent 1a90bea commit 3a054f2
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 4 deletions.
27 changes: 27 additions & 0 deletions datadog/util/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import socket
import sys

# Logging
log = logging.getLogger("datadog.util")

# Note: using `sys.version_info` instead of the helper functions defined here
# so that mypy detects version-specific code paths. Currently, mypy doesn't
# support try/except imports for version-specific code paths either.
Expand Down Expand Up @@ -94,6 +97,13 @@ def is_p3k():
return _is_py_version_higher_than(3)


def is_higher_py32():
"""
Assert that Python is version 3.2 or higher.
"""
return _is_py_version_higher_than(3, 2)


def is_higher_py35():
"""
Assert that Python is version 3.5 or higher.
Expand All @@ -106,3 +116,20 @@ def is_pypy():
Assert that PyPy is being used (regardless of 2 or 3)
"""
return "__pypy__" in sys.builtin_module_names


def conditional_lru_cache(func):
"""
A decorator that conditionally enables a lru_cache of size 512 if
the version of Python can support it (>3.2) and otherwise returns
the original function
"""
if not is_higher_py32():
return func

log.debug("Enabling LRU cache for function %s", func.__name__)

# pylint: disable=import-outside-toplevel
from functools import lru_cache

return lru_cache(maxsize=512)(func)
12 changes: 10 additions & 2 deletions datadog/util/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import json
import re

from datadog.util.compat import conditional_lru_cache

TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE)
TAG_INVALID_CHARS_SUBS = "_"


def pretty_json(obj):
return json.dumps(obj, sort_keys=True, indent=2)

Expand All @@ -29,5 +30,12 @@ def force_to_epoch_seconds(epoch_sec_or_dt):
return epoch_sec_or_dt


def normalize_tags(tag_list):
@conditional_lru_cache
def _normalize_tags_with_cache(tag_list):
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]


def normalize_tags(tag_list):
# We have to turn our input tag list into a non-mutable tuple for it to
# be hashable (and thus usable) by the @lru_cache decorator.
return _normalize_tags_with_cache(tuple(tag_list))
8 changes: 6 additions & 2 deletions tests/performance/test_statsd_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ def one_line_warning(message, category, filename, lineno, *_):
# pylint: disable=too-many-locals
def test_statsd_performance(self):
print(
"Starting: {} run(s), {} threads, {} points/thread via {} (profiling: {})...".format(
"Starting: {} run(s), {} thread(s), {} points/thread via {} (profiling: {}) on Python{}.{} ...".format(
self.num_runs,
self.num_threads,
self.num_datapoints,
self.transport,
str(self.profiling_enabled).lower(),
sys.version_info[0],
sys.version_info[1],
)
)

Expand Down Expand Up @@ -209,7 +211,7 @@ def test_statsd_performance(self):
run_latencies.append(float(avg_latency))
received_packet_pcts.append(received_packet_pct)

result_msg = "\nTotal for {} run(s), {} threads, {} points/thread via {}:\n"
result_msg = "\nTotal for {} run(s), {} thread(s), {} points/thread via {} on Python{}.{}:\n"
result_msg += "\tDuration:\t\t{:.4f}s\n"
result_msg += "\tLatency:\t\t{:.2f}μs\n"
result_msg += "\tCPU:\t\t\t{:.4f}\n"
Expand All @@ -221,6 +223,8 @@ def test_statsd_performance(self):
self.num_threads,
self.num_datapoints,
self.transport,
sys.version_info[0],
sys.version_info[1],
sum(run_durations) / len(run_durations),
sum(run_latencies) / len(run_latencies),
sum(run_cpu_stats) / len(run_cpu_stats),
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/util/test_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# coding: utf8
# Unless explicitly stated otherwise all files in this repository are licensed under the BSD-3-Clause License.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2015-Present Datadog, Inc
import logging
import unittest

from mock import patch

from datadog.util.compat import conditional_lru_cache, is_higher_py32

class TestConditionalLRUCache(unittest.TestCase):
def test_normal_usage(self):
@conditional_lru_cache
def test_function(some_string, num1, num2, num3):
return (some_string, num1 + num2 + num3)

for idx in range(600):
self.assertEqual(
test_function("abc", idx, idx*2, idx *3),
("abc", idx + idx * 2 + idx *3),
)

def test_var_args(self):
@conditional_lru_cache
def test_function(*args):
return sum(list(args))

args = []
for idx in range(100):
args.append(idx)
self.assertEqual(
test_function(*args),
sum(args),
)

# pylint: disable=no-self-use
def test_debug_log(self):
test_object_logger = logging.getLogger('datadog.util')
with patch.object(test_object_logger, 'debug') as mock_debug:
@conditional_lru_cache
def test_function():
pass

test_function()

if is_higher_py32():
mock_debug.assert_called_once()
else:
mock_debug.assert_not_called()
2 changes: 2 additions & 0 deletions tests/unit/util/test_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class TestNormalizeTags:
Test of the format's `normalize_tags` functionality
"""
test_data = [
([], []),
([''],['']),
(['this is a tag'], ['this_is_a_tag']),
(['abc!@#$%^&*()0987654321{}}{'], ['abc__________0987654321____']),
(['abc!@#', '^%$#3456#'], ['abc___', '____3456_']),
Expand Down

0 comments on commit 3a054f2

Please sign in to comment.