Skip to content

Commit

Permalink
[statsd] Disable statsd buffering by default (#692)
Browse files Browse the repository at this point in the history
* [statsd] Disable buffering by default

Due to impact on users of other clients where buffering was enabled by
default, especially in environments and frameworks where `fork()` is
used we will for the time being disable buffering by default until a
decision is made on the path forward. Buffering can still be turned on
with `disable_buffering = False` flag it's just that for now it isi
defaulted to `True`.

* [statsd] Update notes about buffering in statsd

Since we are at least for the time being disabling buffering by default, the
docs here are being updated to match the changes applied.

* [tests] Ensure that context maanger works in non-buffered envs

With a non-buffered environment, our context manager may not work
properly. This change ensures that we propely test this behavior.
  • Loading branch information
sgnn7 authored Oct 14, 2021
1 parent 123034e commit 2cba9f6
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 52 deletions.
18 changes: 9 additions & 9 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
)
from datadog.dogstatsd.route import get_default_route
from datadog.util.compat import is_p3k, text
from datadog.util.deprecation import deprecated
from datadog.util.format import normalize_tags
from datadog.version import __version__

Expand Down Expand Up @@ -89,7 +88,7 @@ def __init__(
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
disable_buffering=False, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
constant_tags=None, # type: Optional[List[str]]
use_ms=False, # type: bool
Expand Down Expand Up @@ -352,11 +351,11 @@ def _dedicated_telemetry_destination(self):
return bool(self.telemetry_socket_path or self.telemetry_host)

def __enter__(self):
self._reset_buffer()
self.open_buffer()
return self

def __exit__(self, exc_type, value, traceback):
self.flush()
self.close_buffer()

@staticmethod
def resolve_host(host, use_default_route):
Expand Down Expand Up @@ -436,10 +435,8 @@ def _get_udp_socket(cls, host, port):

return sock

@deprecated("Statsd module now uses buffering by default.")
def open_buffer(self, max_buffer_size=None):
"""
WARNING: Deprecated method - all operations are now buffered by default.
Open a buffer to send a batch of metrics.
To take advantage of automatic flushing, you should use the context manager instead
Expand All @@ -453,16 +450,16 @@ def open_buffer(self, max_buffer_size=None):

self._manual_buffer_lock.acquire()

# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_buffer

if max_buffer_size is not None:
log.warning("The parameter max_buffer_size is now deprecated and is not used anymore")

self._reset_buffer()

@deprecated("Statsd module now uses buffering by default.")
def close_buffer(self):
"""
WARNING: Deprecated method - all operations are now buffered by default.
Flush the buffer and switch back to single metric packets.
Note: This method must be called after a matching open_buffer()
Expand All @@ -471,6 +468,9 @@ def close_buffer(self):
try:
self.flush()
finally:
# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_server

self._manual_buffer_lock.release()

def _reset_buffer(self):
Expand Down
34 changes: 2 additions & 32 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,28 +159,8 @@ an instance of :class:`datadog.threadstats.ThreadStats`::

datadog.dogstatsd
=================
:mod:`datadog.dogstatsd` is a Python client for DogStatsd that automatically
buffers submitted metrics and submits them to the server asynchronously or at
maximum sizes for a packet that would prevent fragmentation.

.. note::

To ensure that all the metrics are sent to the server, either use the
context-managed instance of :class:`~datadog.dogstatsd.base.DogStatsd`
or when you are finished with the client perform a manual :code:`flush()`.
Otherwise the buffered data may get de-allocated before being sent.


.. warning::

:class:`~datadog.dogstatsd.base.DogStatsd` instances are not :code:`fork()`-safe
because the automatic buffer flushing occurs in a thread only on the
process that created the :class:`~datadog.dogstatsd.base.DogStatsd`
instance. Because of this, instances of those clients must not be copied
from a parent process to a child process. Instead, the parent process and
each child process must create their own instances of the client or the
buffering must be globally disabled by using the :code:`disable_buffering`
initialization flag.
:mod:`datadog.dogstatsd` is a Python client for DogStatsd that submits metrics
to the Agent.


Usage
Expand All @@ -192,7 +172,6 @@ Usage
client = DogStatsd()
client.increment("home.page.hits")
client.flush()


.. autoclass:: datadog.dogstatsd.base.DogStatsd
Expand All @@ -209,15 +188,6 @@ Usage
>>> initialize(statsd_host="localhost", statsd_port=8125)
>>> statsd.increment("home.page.hits")

.. warning::

Global :class:`~datadog.dogstatsd.base.DogStatsd` is not :code:`fork()`-safe
because the automatic buffer flushing occurs in a thread only on the
process that created the :class:`~datadog.dogstatsd.base.DogStatsd`
instance. Because of this, the parent process and each child process must
create their own instances of the client or the buffering must be globally
disabled by using the :code:`disable_buffering` initialization flag.


Get in Touch
============
Expand Down
38 changes: 27 additions & 11 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,17 @@ def test_batching(self):
)

def test_flush(self):
self.statsd.increment('page.views')
self.assertIsNone(self.recv(no_wait=True))
self.statsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', self.recv(2))
dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket

dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))

def test_flush_interval(self):
dogstatsd = DogStatsd(flush_interval=1, telemetry_min_flush_interval=0)
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket

Expand Down Expand Up @@ -940,6 +944,7 @@ def test_disable_buffering(self):

def test_flush_disable(self):
dogstatsd = DogStatsd(
disable_buffering=False,
flush_interval=0,
telemetry_min_flush_interval=0
)
Expand All @@ -955,6 +960,7 @@ def test_flush_disable(self):
time.sleep(0.3)
self.assertIsNone(fake_socket.recv(no_wait=True))

@unittest.skip("Buffering has been disabled again so the deprecation is not valid")
@patch("warnings.warn")
def test_manual_buffer_ops_deprecation(self, mock_warn):
self.assertFalse(mock_warn.called)
Expand Down Expand Up @@ -1108,7 +1114,7 @@ def test_telemetry(self):
self.assertEqual(0, self.statsd.packets_dropped)

def test_telemetry_flush_interval(self):
dogstatsd = DogStatsd()
dogstatsd = DogStatsd(disable_buffering=False)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket

Expand Down Expand Up @@ -1173,7 +1179,7 @@ def test_telemetry_flush_interval_alternate_destination(self):
self.assertTrue(time1 < dogstatsd._last_flush_time)

def test_telemetry_flush_interval_batch(self):
dogstatsd = DogStatsd()
dogstatsd = DogStatsd(disable_buffering=False)

fake_socket = FakeSocket()
dogstatsd.socket = fake_socket
Expand Down Expand Up @@ -1279,7 +1285,7 @@ def test_context_manager(self):
def test_batched_buffer_autoflush(self):
fake_socket = FakeSocket()
bytes_sent = 0
with DogStatsd(telemetry_min_flush_interval=0) as dogstatsd:
with DogStatsd(telemetry_min_flush_interval=0, disable_buffering=False) as dogstatsd:
dogstatsd.socket = fake_socket

self.assertEqual(dogstatsd._max_payload_size, UDP_OPTIMAL_PAYLOAD_LENGTH)
Expand Down Expand Up @@ -1453,7 +1459,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
)

def test_default_max_udp_packet_size(self):
dogstatsd = DogStatsd(flush_interval=10000, disable_telemetry=True)
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=10000, disable_telemetry=True)
dogstatsd.socket = FakeSocket()

for _ in range(10000):
Expand All @@ -1469,7 +1475,12 @@ def test_default_max_udp_packet_size(self):
payload = dogstatsd.socket.recv()

def test_default_max_uds_packet_size(self):
dogstatsd = DogStatsd(socket_path="fake", flush_interval=10000, disable_telemetry=True)
dogstatsd = DogStatsd(
disable_buffering=False,
socket_path="fake",
flush_interval=10000,
disable_telemetry=True,
)
dogstatsd.socket = FakeSocket()

for _ in range(10000):
Expand All @@ -1485,7 +1496,12 @@ def test_default_max_uds_packet_size(self):
payload = dogstatsd.socket.recv()

def test_custom_max_packet_size(self):
dogstatsd = DogStatsd(max_buffer_len=4000, flush_interval=10000, disable_telemetry=True)
dogstatsd = DogStatsd(
disable_buffering=False,
max_buffer_len=4000,
flush_interval=10000,
disable_telemetry=True,
)
dogstatsd.socket = FakeSocket()

for _ in range(10000):
Expand Down

0 comments on commit 2cba9f6

Please sign in to comment.