Skip to content

Commit

Permalink
Adding minimum flush interval and removing namespace for telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Feb 11, 2020
1 parent a8e557b commit 65f757e
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 60 deletions.
61 changes: 37 additions & 24 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import os
import socket
import errno
import time
from threading import Lock

# datadog
from datadog.dogstatsd.context import TimedContextManagerDecorator
from datadog.dogstatsd.route import get_default_route
from datadog.util.compat import text
from datadog.util.config import get_version
from datadog.util.config import get_pkg_version

# Logging
log = logging.getLogger('datadog.dogstatsd')
Expand All @@ -30,13 +31,17 @@
# Tag name of entity_id
ENTITY_ID_TAG_NAME = "dd.internal.entity_id"

# Telemetry minimum flush interval in seconds
DEFAULT_TELEMETRY_MIN_FLUSH_INTERVAL = 10


class DogStatsd(object):
OK, WARNING, CRITICAL, UNKNOWN = (0, 1, 2, 3)

def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, namespace=None,
constant_tags=None, use_ms=False, use_default_route=False,
socket_path=None, default_sample_rate=1, disable_telemetry=False):
socket_path=None, default_sample_rate=1, disable_telemetry=False,
telemetry_min_flush_interval=DEFAULT_TELEMETRY_MIN_FLUSH_INTERVAL):
"""
Initialize a DogStatsd object.
Expand Down Expand Up @@ -135,13 +140,18 @@ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, max_buffer_size=50, nam
self.use_ms = use_ms
self.default_sample_rate = default_sample_rate

# init telemetry
# init telemetry version
try:
client_version = get_pkg_version()
except:
client_version = u"unknown"
self._client_tags = [
"client:py",
"client_version:{}".format(get_version()),
"client_version:{}".format(client_version),
"client_transport:{}".format(transport),
]
self._reset_telementry()
self._telemetry_flush_interval = telemetry_min_flush_interval
self._telemetry = not disable_telemetry

def disable_telemetry(self):
Expand Down Expand Up @@ -365,34 +375,37 @@ def _reset_telementry(self):
self.bytes_dropped = 0
self.packets_sent = 0
self.packets_dropped = 0
self._last_flush_time = time.time()

def _flush_telemetry(self):
telemetry_tags = self._add_constant_tags(self._client_tags)
return "\n%s\n%s\n%s\n%s\n%s\n%s\n%s" % (
self._serialize_metric("datadog.dogstatsd.client.metrics",
"c", self.metrics_count, telemetry_tags),
self._serialize_metric("datadog.dogstatsd.client.events",
"c", self.events_count, telemetry_tags),
self._serialize_metric("datadog.dogstatsd.client.service_checks",
"c", self.service_checks_count, telemetry_tags),
self._serialize_metric("datadog.dogstatsd.client.bytes_sent",
"c", self.bytes_sent, telemetry_tags),
self._serialize_metric("datadog.dogstatsd.client.bytes_dropped",
"c", self.bytes_dropped, telemetry_tags),
self._serialize_metric("datadog.dogstatsd.client.packets_sent",
"c", self.packets_sent, telemetry_tags),
self._serialize_metric("datadog.dogstatsd.client.packets_dropped",
"c", self.packets_dropped, telemetry_tags),
)
telemetry_tags = ",".join(self._add_constant_tags(self._client_tags))
return "\n".join((
"datadog.dogstatsd.client.metrics:%s|c|#%s" % (self.metrics_count, telemetry_tags),
"datadog.dogstatsd.client.events:%s|c|#%s" % (self.events_count, telemetry_tags),
"datadog.dogstatsd.client.service_checks:%s|c|#%s" % (self.service_checks_count, telemetry_tags),
"datadog.dogstatsd.client.bytes_sent:%s|c|#%s" % (self.bytes_sent, telemetry_tags),
"datadog.dogstatsd.client.bytes_dropped:%s|c|#%s" % (self.bytes_dropped, telemetry_tags),
"datadog.dogstatsd.client.packets_sent:%s|c|#%s" % (self.packets_sent, telemetry_tags),
"datadog.dogstatsd.client.packets_dropped:%s|c|#%s" % (self.packets_dropped, telemetry_tags),
))

def _is_telemetry_flush_time(self):
if self._telemetry \
and self._last_flush_time + self._telemetry_flush_interval < time.time():
return True
return False

def _send_to_server(self, packet):
if self._telemetry:
packet += self._flush_telemetry()
flush_telemetry = self._is_telemetry_flush_time()
if flush_telemetry:
packet += "\n"+self._flush_telemetry()

try:
# If set, use socket directly
(self.socket or self.get_socket()).send(packet.encode(self.encoding))
if self._telemetry:
self._reset_telementry()
if flush_telemetry:
self._reset_telementry()
self.packets_sent += 1
self.bytes_sent += len(packet)
return
Expand Down
34 changes: 18 additions & 16 deletions datadog/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,28 @@ def get_config(cfg_path=None, options=None):

return agentConfig


def get_version():
def get_pkg_version():
"""
Resolve `datadog` package version.
"""
version = u"unknown"

if not pkg:
return version
return u"unknown"

dist = pkg.get_distribution("datadog")
# Normalize case for Windows systems
dist_loc = os.path.normcase(dist.location)
here = os.path.normcase(__file__)
if not here.startswith(dist_loc):
# not installed, but there is another version that *is*
raise pkg.DistributionNotFound

return dist.version

def get_version():
"""
Resolve `datadog` package version.
"""
try:
dist = pkg.get_distribution("datadog")
# Normalize case for Windows systems
dist_loc = os.path.normcase(dist.location)
here = os.path.normcase(__file__)
if not here.startswith(dist_loc):
# not installed, but there is another version that *is*
raise pkg.DistributionNotFound
version = dist.version
return get_pkg_version()
except pkg.DistributionNotFound:
version = u"Please install `datadog` with setup.py"

return version
return u"Please install `datadog` with setup.py"
79 changes: 59 additions & 20 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,22 @@ def send(self, payload):
class OverflownSocket(FakeSocket):

def send(self, payload):
error = socker.error("Socker error")
error = socket.error("Socket error")
error.errno = errno.EAGAIN
raise error


def telemetry_metrics(metrics=1, events=0, service_checks=0, bytes_sent=0, bytes_dropped=0, packets_sent=0, packets_dropped=0, transport="udp", tags="", namespace=""):
def telemetry_metrics(metrics=1, events=0, service_checks=0, bytes_sent=0, bytes_dropped=0, packets_sent=0, packets_dropped=0, transport="udp", tags=""):
version = get_version()
if tags:
tags = "," + tags
tags = "," + tags if tags else ""

return "\n{}datadog.dogstatsd.client.metrics:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(namespace, metrics, version, transport, tags) \
+ "{}datadog.dogstatsd.client.events:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(namespace, events, version, transport, tags) \
+ "{}datadog.dogstatsd.client.service_checks:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(namespace, service_checks, version, transport, tags) \
+ "{}datadog.dogstatsd.client.bytes_sent:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(namespace, bytes_sent, version, transport, tags) \
+ "{}datadog.dogstatsd.client.bytes_dropped:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(namespace, bytes_dropped, version, transport, tags) \
+ "{}datadog.dogstatsd.client.packets_sent:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(namespace, packets_sent, version, transport, tags) \
+ "{}datadog.dogstatsd.client.packets_dropped:{}|c|#client:py,client_version:{},client_transport:{}{}".format(namespace, packets_dropped, version, transport, tags)
return "\ndatadog.dogstatsd.client.metrics:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(metrics, version, transport, tags) \
+ "datadog.dogstatsd.client.events:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(events, version, transport, tags) \
+ "datadog.dogstatsd.client.service_checks:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(service_checks, version, transport, tags) \
+ "datadog.dogstatsd.client.bytes_sent:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(bytes_sent, version, transport, tags) \
+ "datadog.dogstatsd.client.bytes_dropped:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(bytes_dropped, version, transport, tags) \
+ "datadog.dogstatsd.client.packets_sent:{}|c|#client:py,client_version:{},client_transport:{}{}\n".format(packets_sent, version, transport, tags) \
+ "datadog.dogstatsd.client.packets_dropped:{}|c|#client:py,client_version:{},client_transport:{}{}".format(packets_dropped, version, transport, tags)

def assert_equal_telemetry(expected_payload, actual_payload, telemetry=None):
if telemetry is None:
Expand All @@ -101,7 +100,7 @@ def setUp(self):
Set up a default Dogstatsd instance and mock the proc filesystem.
"""
#
self.statsd = DogStatsd()
self.statsd = DogStatsd(telemetry_min_flush_interval=0)
self.statsd.socket = FakeSocket()

# Mock the proc filesystem
Expand Down Expand Up @@ -313,7 +312,7 @@ def test_metric_namespace(self):
"""
self.statsd.namespace = "foo"
self.statsd.gauge('gauge', 123.4)
assert_equal_telemetry('foo.gauge:123.4|g', self.recv(), telemetry=telemetry_metrics(namespace="foo."))
assert_equal_telemetry('foo.gauge:123.4|g', self.recv())

# Test Client level contant tags
def test_gauge_constant_tags(self):
Expand Down Expand Up @@ -628,9 +627,49 @@ def test_telemetry(self):
assert_equal(1, self.statsd.packets_sent)
assert_equal(0, self.statsd.packets_dropped)

def test_telemetry_flush_interval(self):
statsd = DogStatsd()
fake_socket = FakeSocket()
statsd.socket = fake_socket

# set the last flush time in the future to be sure we won't flush
statsd._last_flush_time = time.time() + statsd._telemetry_flush_interval
statsd.gauge('gauge', 123.4)

assert_equal('gauge:123.4|g', fake_socket.recv())

t1 = time.time()
# setting the last flush time in the past to trigger a telemetry flush
statsd._last_flush_time = t1 - statsd._telemetry_flush_interval -1
statsd.gauge('gauge', 123.4)

assert_equal_telemetry('gauge:123.4|g', fake_socket.recv(), telemetry=telemetry_metrics(metrics=2, bytes_sent=13, packets_sent=1))
# assert that _last_flush_time has been updated
assert t1 < statsd._last_flush_time

def test_telemetry_flush_interval_batch(self):
statsd = DogStatsd()

fake_socket = FakeSocket()
statsd.socket = fake_socket

statsd.open_buffer()
statsd.gauge('gauge1', 1)
statsd.gauge('gauge2', 2)

t1 = time.time()
# setting the last flush time in the past to trigger a telemetry flush
statsd._last_flush_time = t1 - statsd._telemetry_flush_interval -1

statsd.close_buffer()

assert_equal_telemetry('gauge1:1|g\ngauge2:2|g', fake_socket.recv(), telemetry=telemetry_metrics(metrics=2))
# assert that _last_flush_time has been updated
assert t1 < statsd._last_flush_time

def test_context_manager(self):
fake_socket = FakeSocket()
with DogStatsd() as statsd:
with DogStatsd(telemetry_min_flush_interval=0) as statsd:
statsd.socket = fake_socket
statsd.gauge('page.views', 123)
statsd.timing('timer', 123)
Expand All @@ -640,7 +679,7 @@ def test_context_manager(self):
def test_batched_buffer_autoflush(self):
fake_socket = FakeSocket()
bytes_sent = 0
with DogStatsd() as statsd:
with DogStatsd(telemetry_min_flush_interval=0) as statsd:
statsd.socket = fake_socket
for i in range(51):
statsd.increment('mycounter')
Expand Down Expand Up @@ -677,7 +716,7 @@ def test_accessing_socket_multiple_times_returns_same_socket(self):
def test_tags_from_environment(self):
with preserve_environment_variable('DATADOG_TAGS'):
os.environ['DATADOG_TAGS'] = 'country:china,age:45,blue'
statsd = DogStatsd()
statsd = DogStatsd(telemetry_min_flush_interval=0)
statsd.socket = FakeSocket()
statsd.gauge('gt', 123.4)
assert_equal_telemetry('gt:123.4|g|#country:china,age:45,blue',
Expand All @@ -687,7 +726,7 @@ def test_tags_from_environment(self):
def test_tags_from_environment_and_constant(self):
with preserve_environment_variable('DATADOG_TAGS'):
os.environ['DATADOG_TAGS'] = 'country:china,age:45,blue'
statsd = DogStatsd(constant_tags=['country:canada', 'red'])
statsd = DogStatsd(constant_tags=['country:canada', 'red'], telemetry_min_flush_interval=0)
statsd.socket = FakeSocket()
statsd.gauge('gt', 123.4)
tags="country:canada,red,country:china,age:45,blue"
Expand All @@ -696,7 +735,7 @@ def test_tags_from_environment_and_constant(self):
def test_entity_tag_from_environment(self):
with preserve_environment_variable('DD_ENTITY_ID'):
os.environ['DD_ENTITY_ID'] = '04652bb7-19b7-11e9-9cc6-42010a9c016d'
statsd = DogStatsd()
statsd = DogStatsd(telemetry_min_flush_interval=0)
statsd.socket = FakeSocket()
statsd.gauge('gt', 123.4)
assert_equal_telemetry('gt:123.4|g|#dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d',
Expand All @@ -706,7 +745,7 @@ def test_entity_tag_from_environment(self):
def test_entity_tag_from_environment_and_constant(self):
with preserve_environment_variable('DD_ENTITY_ID'):
os.environ['DD_ENTITY_ID'] = '04652bb7-19b7-11e9-9cc6-42010a9c016d'
statsd = DogStatsd(constant_tags=['country:canada', 'red'])
statsd = DogStatsd(constant_tags=['country:canada', 'red'], telemetry_min_flush_interval=0)
statsd.socket = FakeSocket()
statsd.gauge('gt', 123.4)
assert_equal_telemetry('gt:123.4|g|#country:canada,red,dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d',
Expand All @@ -718,7 +757,7 @@ def test_entity_tag_and_tags_from_environment_and_constant(self):
os.environ['DATADOG_TAGS'] = 'country:china,age:45,blue'
with preserve_environment_variable('DD_ENTITY_ID'):
os.environ['DD_ENTITY_ID'] = '04652bb7-19b7-11e9-9cc6-42010a9c016d'
statsd = DogStatsd(constant_tags=['country:canada', 'red'])
statsd = DogStatsd(constant_tags=['country:canada', 'red'], telemetry_min_flush_interval=0)
statsd.socket = FakeSocket()
statsd.gauge('gt', 123.4)
tags = "country:canada,red,country:china,age:45,blue,dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
Expand Down

0 comments on commit 65f757e

Please sign in to comment.