From 3ad759fffb8431a826121425ea0d3c22c2c3e748 Mon Sep 17 00:00:00 2001 From: Srdjan Grubor Date: Wed, 13 Oct 2021 13:33:41 -0500 Subject: [PATCH 1/3] [statsd] Disable buffering by default Due to impact on users of other clients where buffering was enabled by default, especially in environments and frameworks where `fork()` is used we will for the time being disable buffering by default until a decision is made on the path forward. Buffering can still be turned on with `disable_buffering = False` flag it's just that for now it isi defaulted to `True`. --- datadog/dogstatsd/base.py | 14 +++++----- tests/unit/dogstatsd/test_statsd.py | 40 ++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 1cf190188..a8cd63ffa 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -26,7 +26,6 @@ ) from datadog.dogstatsd.route import get_default_route from datadog.util.compat import is_p3k, text -from datadog.util.deprecation import deprecated from datadog.util.format import normalize_tags from datadog.version import __version__ @@ -89,7 +88,7 @@ def __init__( port=DEFAULT_PORT, # type: int max_buffer_size=None, # type: None flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float - disable_buffering=False, # type: bool + disable_buffering=True, # type: bool namespace=None, # type: Optional[Text] constant_tags=None, # type: Optional[List[str]] use_ms=False, # type: bool @@ -436,10 +435,8 @@ def _get_udp_socket(cls, host, port): return sock - @deprecated("Statsd module now uses buffering by default.") def open_buffer(self, max_buffer_size=None): """ - WARNING: Deprecated method - all operations are now buffered by default. Open a buffer to send a batch of metrics. To take advantage of automatic flushing, you should use the context manager instead @@ -453,16 +450,16 @@ def open_buffer(self, max_buffer_size=None): self._manual_buffer_lock.acquire() + # XXX Remove if `disable_buffering` default is changed to False + self._send = self._send_to_buffer + if max_buffer_size is not None: log.warning("The parameter max_buffer_size is now deprecated and is not used anymore") self._reset_buffer() - @deprecated("Statsd module now uses buffering by default.") def close_buffer(self): """ - WARNING: Deprecated method - all operations are now buffered by default. - Flush the buffer and switch back to single metric packets. Note: This method must be called after a matching open_buffer() @@ -471,6 +468,9 @@ def close_buffer(self): try: self.flush() finally: + # XXX Remove if `disable_buffering` default is changed to False + self._send = self._send_to_server + self._manual_buffer_lock.release() def _reset_buffer(self): diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index 76cb28a6a..5ade49110 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -905,13 +905,17 @@ def test_batching(self): ) def test_flush(self): - self.statsd.increment('page.views') - self.assertIsNone(self.recv(no_wait=True)) - self.statsd.flush() - self.assert_equal_telemetry('page.views:1|c\n', self.recv(2)) + dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0) + fake_socket = FakeSocket() + dogstatsd.socket = fake_socket + + dogstatsd.increment('page.views') + self.assertIsNone(fake_socket.recv(no_wait=True)) + dogstatsd.flush() + self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2)) def test_flush_interval(self): - dogstatsd = DogStatsd(flush_interval=1, telemetry_min_flush_interval=0) + dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0) fake_socket = FakeSocket() dogstatsd.socket = fake_socket @@ -940,6 +944,7 @@ def test_disable_buffering(self): def test_flush_disable(self): dogstatsd = DogStatsd( + disable_buffering=False, flush_interval=0, telemetry_min_flush_interval=0 ) @@ -955,6 +960,7 @@ def test_flush_disable(self): time.sleep(0.3) self.assertIsNone(fake_socket.recv(no_wait=True)) + @unittest.skip("Buffering has been disabled again so the deprecation is not valid") @patch("warnings.warn") def test_manual_buffer_ops_deprecation(self, mock_warn): self.assertFalse(mock_warn.called) @@ -1108,7 +1114,7 @@ def test_telemetry(self): self.assertEqual(0, self.statsd.packets_dropped) def test_telemetry_flush_interval(self): - dogstatsd = DogStatsd() + dogstatsd = DogStatsd(disable_buffering=False) fake_socket = FakeSocket() dogstatsd.socket = fake_socket @@ -1173,7 +1179,7 @@ def test_telemetry_flush_interval_alternate_destination(self): self.assertTrue(time1 < dogstatsd._last_flush_time) def test_telemetry_flush_interval_batch(self): - dogstatsd = DogStatsd() + dogstatsd = DogStatsd(disable_buffering=False) fake_socket = FakeSocket() dogstatsd.socket = fake_socket @@ -1256,7 +1262,7 @@ def wait_for_data(): def test_context_manager(self): fake_socket = FakeSocket() - with DogStatsd(telemetry_min_flush_interval=0) as dogstatsd: + with DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0) as dogstatsd: dogstatsd.socket = fake_socket dogstatsd.gauge('page.views', 123) dogstatsd.timing('timer', 123) @@ -1279,7 +1285,7 @@ def test_context_manager(self): def test_batched_buffer_autoflush(self): fake_socket = FakeSocket() bytes_sent = 0 - with DogStatsd(telemetry_min_flush_interval=0) as dogstatsd: + with DogStatsd(telemetry_min_flush_interval=0, disable_buffering=False) as dogstatsd: dogstatsd.socket = fake_socket self.assertEqual(dogstatsd._max_payload_size, UDP_OPTIMAL_PAYLOAD_LENGTH) @@ -1453,7 +1459,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self): ) def test_default_max_udp_packet_size(self): - dogstatsd = DogStatsd(flush_interval=10000, disable_telemetry=True) + dogstatsd = DogStatsd(disable_buffering=False, flush_interval=10000, disable_telemetry=True) dogstatsd.socket = FakeSocket() for _ in range(10000): @@ -1469,7 +1475,12 @@ def test_default_max_udp_packet_size(self): payload = dogstatsd.socket.recv() def test_default_max_uds_packet_size(self): - dogstatsd = DogStatsd(socket_path="fake", flush_interval=10000, disable_telemetry=True) + dogstatsd = DogStatsd( + disable_buffering=False, + socket_path="fake", + flush_interval=10000, + disable_telemetry=True, + ) dogstatsd.socket = FakeSocket() for _ in range(10000): @@ -1485,7 +1496,12 @@ def test_default_max_uds_packet_size(self): payload = dogstatsd.socket.recv() def test_custom_max_packet_size(self): - dogstatsd = DogStatsd(max_buffer_len=4000, flush_interval=10000, disable_telemetry=True) + dogstatsd = DogStatsd( + disable_buffering=False, + max_buffer_len=4000, + flush_interval=10000, + disable_telemetry=True, + ) dogstatsd.socket = FakeSocket() for _ in range(10000): From a8a12cd17136157c83b83f95602eaae069b663b8 Mon Sep 17 00:00:00 2001 From: Srdjan Grubor Date: Wed, 13 Oct 2021 13:42:51 -0500 Subject: [PATCH 2/3] [statsd] Update notes about buffering in statsd Since we are at least for the time being disabling buffering by default, the docs here are being updated to match the changes applied. --- doc/source/index.rst | 34 ++-------------------------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/doc/source/index.rst b/doc/source/index.rst index 8b6897f79..a251f0946 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -159,28 +159,8 @@ an instance of :class:`datadog.threadstats.ThreadStats`:: datadog.dogstatsd ================= -:mod:`datadog.dogstatsd` is a Python client for DogStatsd that automatically -buffers submitted metrics and submits them to the server asynchronously or at -maximum sizes for a packet that would prevent fragmentation. - -.. note:: - - To ensure that all the metrics are sent to the server, either use the - context-managed instance of :class:`~datadog.dogstatsd.base.DogStatsd` - or when you are finished with the client perform a manual :code:`flush()`. - Otherwise the buffered data may get de-allocated before being sent. - - -.. warning:: - - :class:`~datadog.dogstatsd.base.DogStatsd` instances are not :code:`fork()`-safe - because the automatic buffer flushing occurs in a thread only on the - process that created the :class:`~datadog.dogstatsd.base.DogStatsd` - instance. Because of this, instances of those clients must not be copied - from a parent process to a child process. Instead, the parent process and - each child process must create their own instances of the client or the - buffering must be globally disabled by using the :code:`disable_buffering` - initialization flag. +:mod:`datadog.dogstatsd` is a Python client for DogStatsd that submits metrics +to the Agent. Usage @@ -192,7 +172,6 @@ Usage client = DogStatsd() client.increment("home.page.hits") - client.flush() .. autoclass:: datadog.dogstatsd.base.DogStatsd @@ -209,15 +188,6 @@ Usage >>> initialize(statsd_host="localhost", statsd_port=8125) >>> statsd.increment("home.page.hits") -.. warning:: - - Global :class:`~datadog.dogstatsd.base.DogStatsd` is not :code:`fork()`-safe - because the automatic buffer flushing occurs in a thread only on the - process that created the :class:`~datadog.dogstatsd.base.DogStatsd` - instance. Because of this, the parent process and each child process must - create their own instances of the client or the buffering must be globally - disabled by using the :code:`disable_buffering` initialization flag. - Get in Touch ============ From bbca063d33197f8e598e6a2480ae44a55c3f6f60 Mon Sep 17 00:00:00 2001 From: Srdjan Grubor Date: Wed, 13 Oct 2021 14:26:24 -0500 Subject: [PATCH 3/3] [tests] Ensure that context maanger works in non-buffered envs With a non-buffered environment, our context manager may not work properly. This change ensures that we propely test this behavior. --- datadog/dogstatsd/base.py | 4 ++-- tests/unit/dogstatsd/test_statsd.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index a8cd63ffa..d06560905 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -351,11 +351,11 @@ def _dedicated_telemetry_destination(self): return bool(self.telemetry_socket_path or self.telemetry_host) def __enter__(self): - self._reset_buffer() + self.open_buffer() return self def __exit__(self, exc_type, value, traceback): - self.flush() + self.close_buffer() @staticmethod def resolve_host(host, use_default_route): diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index 5ade49110..830b9bcbc 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -1262,7 +1262,7 @@ def wait_for_data(): def test_context_manager(self): fake_socket = FakeSocket() - with DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0) as dogstatsd: + with DogStatsd(telemetry_min_flush_interval=0) as dogstatsd: dogstatsd.socket = fake_socket dogstatsd.gauge('page.views', 123) dogstatsd.timing('timer', 123)