Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[statsd] Add ability to toggle statsd.disable_buffering state during runtime #700

Merged
merged 2 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def initialize(
api_host=None, # type: Optional[str]
statsd_host=None, # type: Optional[str]
statsd_port=None, # type: Optional[int]
statsd_disable_buffering=True, # type: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it named disable_buffering because the plan is to have the buffering enabled by default in the future? If that's the case, don't mind the following remark.
Having a parameter "disabling" something (and defaulting to True) is always weird to think about opposed to a parameter "enabling" a feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it named disable_buffering because the plan is to have the buffering enabled by default in the future

Correct. The long story is that it's currently a bit of an awkward flag name like you mention because we enabled-then-disabled buffering-by-default already and the flag that was already in the API was disable_buffering. If/when we re-enable buffering-by-default, this flag will be a better indication of the functionality it's just that for now we're a bit stuck with a mid-way change in architectural direction and the flag was already named in the published API.

statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
Expand Down Expand Up @@ -71,6 +72,10 @@ def initialize(
:param statsd_port: Port of DogStatsd server or statsd daemon
:type statsd_port: port

:param statsd_disable_buffering: Enable/disable statsd client buffering support
(default: True).
:type statsd_disable_buffering: boolean

:param statsd_use_default_route: Dynamically set the statsd host to the default route
(Useful when running the client in a container)
:type statsd_use_default_route: boolean
Expand Down Expand Up @@ -122,6 +127,8 @@ def initialize(
if statsd_constant_tags:
statsd.constant_tags += statsd_constant_tags

statsd.disable_buffering = statsd_disable_buffering

api._return_raw_response = return_raw_response

# HTTP client and API options
Expand Down
92 changes: 71 additions & 21 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,59 +304,109 @@ def __init__(

self._reset_buffer()

# This lock is used for backwards compatibility to prevent concurrent
# changes to the buffer when the user is managing the buffer themselves
# via deprecated `open_buffer()` and `close_buffer()` functions.
self._manual_buffer_lock = RLock()
# This lock is used for all cases where buffering functionality is
# being toggled (by `open_buffer()`, `close_buffer()`, or
# `self._disable_buffering` calls).
self._buffering_toggle_lock = RLock()

# If buffering is disabled, we bypass the buffer function.
self._send = self._send_to_buffer
if disable_buffering:
log.info("Statsd buffering is disabled")
self._disable_buffering = disable_buffering
if self._disable_buffering:
self._send = self._send_to_server
log.info("Statsd buffering is disabled")

# Start the flush thread if buffering is enabled and the interval is above
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
# as a value for disabling the automatic flush timer as well.
if not disable_buffering and flush_interval >= MIN_FLUSH_INTERVAL:
self._register_flush_thread(flush_interval)
log.debug(
"Statsd flush thread registered with period of %s",
flush_interval,
)
else:
log.info("Statsd periodic buffer flush is disabled")
self._flush_interval = flush_interval
self._flush_thread_stop = threading.Event()
self._start_flush_thread(self._flush_interval)

def disable_telemetry(self):
self._telemetry = False

def enable_telemetry(self):
self._telemetry = True

def _register_flush_thread(self, sleep_duration):
def _flush_thread_loop(self, sleep_duration):
while True:
time.sleep(sleep_duration)
# Note: Invocations of this method should be thread-safe
def _start_flush_thread(self, flush_interval):
if self._disable_buffering or self._flush_interval <= MIN_FLUSH_INTERVAL:
log.info("Statsd periodic buffer flush is disabled")
return

def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
self.flush()

self._flush_thread = threading.Thread(
name="{}_flush_thread".format(self.__class__.__name__),
target=_flush_thread_loop,
args=(self, sleep_duration,),
args=(self, flush_interval,),
)
self._flush_thread.daemon = True
self._flush_thread.start()

log.debug(
"Statsd flush thread registered with period of %s",
self._flush_interval,
)

# Note: Invocations of this method should be thread-safe
def _stop_flush_thread(self):
if not self._flush_thread:
log.warning("No statsd flush thread to stop")
return

try:
self.flush()
finally:
pass

self._flush_thread_stop.set()

self._flush_thread.join()
Copy link
Contributor

@remeh remeh Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really a big deal I guess but if two threads are calling _stop_flush_thread simultaneously, there is an small chance that join() could be called on one of the thread while the _flush_thread has already be terminated by the other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True that this method isn't thread-safe in isolation but the comment on the method mentions this as a warning and the only caller is lock-protected. I would have preferred to use the lock directly in this method like you imply but then the toggle to stop would needed 2 lock acquires (1 for toggle method and 1 for stop method; possibly a reentrant one) which seemed like an overkill and over-complication for a helper only called from a single spot.

self._flush_thread = None

self._flush_thread_stop.clear()

def _dedicated_telemetry_destination(self):
return bool(self.telemetry_socket_path or self.telemetry_host)

# Context manager helper
def __enter__(self):
self.open_buffer()
return self

# Context manager helper
def __exit__(self, exc_type, value, traceback):
self.close_buffer()

@property
def disable_buffering(self):
with self._buffering_toggle_lock:
return self._disable_buffering

@disable_buffering.setter
def disable_buffering(self, is_disabled):
with self._buffering_toggle_lock:
# If the toggle didn't change anything, this method is a noop
if self._disable_buffering == is_disabled:
return

self._disable_buffering = is_disabled

# If buffering has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable the buffering.
if is_disabled:
self._send = self._send_to_server
self._stop_flush_thread()
log.info("Statsd buffering is disabled")
else:
self._send = self._send_to_buffer
self._start_flush_thread(self._flush_interval)

@staticmethod
def resolve_host(host, use_default_route):
"""
Expand Down Expand Up @@ -448,7 +498,7 @@ def open_buffer(self, max_buffer_size=None):
Note: This method must be called before close_buffer() matching invocation.
"""

self._manual_buffer_lock.acquire()
self._buffering_toggle_lock.acquire()

# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_buffer
Expand All @@ -471,7 +521,7 @@ def close_buffer(self):
# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_server

self._manual_buffer_lock.release()
self._buffering_toggle_lock.release()

def _reset_buffer(self):
with self._buffer_lock:
Expand Down
124 changes: 124 additions & 0 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def telemetry_metrics(metrics=1, events=0, service_checks=0, bytes_sent=0, bytes


class TestDogStatsd(unittest.TestCase):
METRIC_TYPE_MAP = {
'gauge': { 'id': 'g' },
'timing': { 'id': 'ms' },
}

def setUp(self):
"""
Set up a default Dogstatsd instance and mock the proc filesystem.
Expand Down Expand Up @@ -144,6 +149,63 @@ def assert_equal_telemetry(self, expected_payload, actual_payload, telemetry=Non

return self.assertEqual(expected_payload, actual_payload)

def send_and_assert(
self,
dogstatsd,
expected_metrics,
last_telemetry_size=0,
buffered=False,
):
"""
Send and then asserts that a chain of metrics arrive in the right order
and with expected telemetry values.
"""

expected_messages = []
for metric_type, metric_name, metric_value in expected_metrics:
# Construct the expected message data
metric_type_id = TestDogStatsd.METRIC_TYPE_MAP[metric_type]['id']
expected_messages.append(
"{}:{}|{}\n".format(metric_name, metric_value, metric_type_id)
)

# Send the value
getattr(dogstatsd, metric_type)(metric_name, metric_value)

# Sanity check
if buffered:
# Ensure that packets didn't arrive immediately if we are expecting
# buffering behavior
self.assertIsNone(dogstatsd.socket.recv(2, no_wait=True))

metrics = 1
if buffered:
metrics = len(expected_messages)

if buffered:
expected_messages = [ ''.join(expected_messages) ]

for message in expected_messages:
packets_sent = 1
# For all ono-initial packets, our current telemetry stats will
# contain the metadata for the last telemetry packet as well.
if last_telemetry_size > 0:
packets_sent += 1

expected_metrics=telemetry_metrics(
metrics=metrics,
packets_sent=packets_sent,
bytes_sent=len(message) + last_telemetry_size
)
self.assert_equal_telemetry(
message,
dogstatsd.socket.recv(2, no_wait=not buffered, reset_wait=True),
telemetry=expected_metrics,
)
last_telemetry_size = len(expected_metrics)

return last_telemetry_size

def assert_almost_equal(self, val1, val2, delta):
"""
Calculates a delta between first and second value and ensures
Expand Down Expand Up @@ -1001,6 +1063,68 @@ def test_batching_sequential(self):
)
)

def test_batching_runtime_changes(self):
dogstatsd = DogStatsd(
disable_buffering=True,
telemetry_min_flush_interval=0
)
dogstatsd.socket = FakeSocket()

# Send some unbuffered metrics and verify we got it immediately
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'rt.gauge', 123),
('timing', 'rt.timer', 123),
],
)

# Disable buffering (noop expected) and validate
dogstatsd.disable_buffering = True
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'rt.gauge2', 321),
('timing', 'rt.timer2', 321),
],
last_telemetry_size = last_telemetry_size,
)

# Enable buffering and validate
dogstatsd.disable_buffering = False
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'buffered.gauge', 12345),
('timing', 'buffered.timer', 12345),
],
last_telemetry_size = last_telemetry_size,
buffered=True,
)

# Enable buffering again (another noop change expected)
dogstatsd.disable_buffering = False
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'buffered.gauge2', 321),
('timing', 'buffered.timer2', 321),
],
last_telemetry_size = last_telemetry_size,
buffered=True,
)

# Flip the toggle to unbuffered functionality one more time and verify
dogstatsd.disable_buffering = True
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'rt.gauge3', 333),
('timing', 'rt.timer3', 333),
],
last_telemetry_size = last_telemetry_size,
)

def test_threaded_batching(self):
num_threads = 4
threads = []
Expand Down