From ba026b308216c9808ea9b034e06a20ddb77f756c Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 09:08:04 -0400 Subject: [PATCH 01/11] Add support for distributions to threadstats and the API --- datadog/api/metrics.py | 52 +++++++++++++++++++++++++++++--- datadog/threadstats/base.py | 35 ++++++++++++++++++--- datadog/threadstats/metrics.py | 17 +++++++++++ datadog/threadstats/reporters.py | 3 ++ 4 files changed, 97 insertions(+), 10 deletions(-) diff --git a/datadog/api/metrics.py b/datadog/api/metrics.py index 89c3b0a51..1b954eeae 100644 --- a/datadog/api/metrics.py +++ b/datadog/api/metrics.py @@ -16,6 +16,7 @@ class Metric(SearchableAPIResource, SendableAPIResource, ListableAPIResource): _METRIC_QUERY_ENDPOINT = 'query' _METRIC_SUBMIT_ENDPOINT = 'series' _METRIC_LIST_ENDPOINT = 'metrics' + _METRIC_DIST_ENDPOINT = 'distribution_points' @classmethod def _process_points(cls, points): @@ -42,8 +43,19 @@ def rec_parse(points_lst): return [] point = points_lst.pop() - timestamp = now if isinstance(point, Number) else point[0] - value = float(point) if isinstance(point, Number) else float(point[1]) + if isinstance(point, Number): + timestamp = now + value = float(point) + # Distributions contain a list of points + else: + timestamp = point[0] + if isinstance(point[1], list): + float_points = [] + for p in point[1]: + float_points.append(float(p)) + value = float_points + else: + value = float(point[1]) point = [(timestamp, value)] @@ -64,6 +76,7 @@ def rec_parse(points_lst): return rec_parse(points_lst) + @classmethod def list(cls, from_epoch): """ @@ -106,6 +119,38 @@ def send(cls, metrics=None, **single_metric): :returns: Dictionary representing the API's JSON response """ + # Set the right endpoint + cls._resource_name = cls._METRIC_SUBMIT_ENDPOINT + cls._send_common(metrics, **single_metric) + + @classmethod + def send_dist(cls, metrics=None, **single_metric): + """ + Submit a distribution metric or a list of distribution metrics to the distribution metric API + + :param metric: the name of the time series + :type metric: string + + :param points: a (timestamp, value) pair or list of (timestamp, value) pairs + :type points: list + + :param host: host name that produced the metric + :type host: string + + :param tags: list of tags associated with the metric. + :type tags: string list + + :param type: type of the metric + :type type: 'gauge' or 'count' or 'rate' string + + :returns: Dictionary representing the API's JSON response + """ + # Set the right endpoint + cls._resource_name = cls._METRIC_DIST_ENDPOINT + cls._send_common(metrics, **single_metric) + + @classmethod + def _send_common(cls, metrics=None, **single_metric): def rename_metric_type(metric): """ FIXME DROPME in 1.0: @@ -117,9 +162,6 @@ def rename_metric_type(metric): if 'metric_type' in metric: metric['type'] = metric.pop('metric_type') - # Set the right endpoint - cls._resource_name = cls._METRIC_SUBMIT_ENDPOINT - # Format the payload try: if metrics: diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index bcc313061..f801c3929 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -15,7 +15,7 @@ # datadog from datadog.api.exceptions import ApiNotInitialized from datadog.threadstats.events import EventsAggregator -from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing +from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing, Distribution from datadog.threadstats.reporters import HttpReporter # Loggers @@ -206,6 +206,19 @@ def histogram(self, metric_name, value, timestamp=None, tags=None, sample_rate=1 self._metric_aggregator.add_point(metric_name, tags, timestamp or time(), value, Histogram, sample_rate=sample_rate, host=host) + def distribution(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None): + """ + Sample a distirbution value. Distributions will produce metrics that + describe the distribution of the recorded values, namely the maximum, + median, average, count and the 50/75/90/95/99 percentiles. Optionally, + specify a list of ``tags`` to associate with the metric. + + >>> stats.distribution('uploaded_file.size', uploaded_file.size()) + """ + if not self._disabled: + self._metric_aggregator.add_point(metric_name, tags, timestamp or time(), value, + Distribution, sample_rate=sample_rate, host=host) + def timing(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None): """ Record a timing, optionally setting tags and a sample rate. @@ -289,7 +302,7 @@ def flush(self, timestamp=None): self._is_flush_in_progress = True # Process metrics - metrics = self._get_aggregate_metrics(timestamp or time()) + (metrics, dists) = self._get_aggregate_metrics_and_dists(timestamp or time()) count_metrics = len(metrics) if count_metrics: self.flush_count += 1 @@ -298,6 +311,14 @@ def flush(self, timestamp=None): else: log.debug("No metrics to flush. Continuing.") + count_dists = len(dists) + if count_dists: + self.flush_count += 1 + log.debug("Flush #%s sending %s distributions" % (self.flush_count, count_dists)) + self.reporter.flush_distributions(dists) + else: + log.debug("No distributions to flush. Continuing.") + # Process events events = self._get_aggregate_events() count_events = len(events) @@ -317,7 +338,7 @@ def flush(self, timestamp=None): finally: self._is_flush_in_progress = False - def _get_aggregate_metrics(self, flush_time=None): + def _get_aggregate_metrics_and_dists(self, flush_time=None): """ Get, format and return the rolled up metrics from the aggregator. """ @@ -326,6 +347,7 @@ def _get_aggregate_metrics(self, flush_time=None): # FIXME: emit a dictionary from the aggregator metrics = [] + dists = [] for timestamp, value, name, tags, host, metric_type, interval in rolled_up_metrics: metric_tags = tags metric_name = name @@ -350,8 +372,11 @@ def _get_aggregate_metrics(self, flush_time=None): 'tags': metric_tags, 'interval': interval } - metrics.append(metric) - return metrics + if metric_type != MetricType.Distribution: + metrics.append(metric) + else: + dists.append(metric) + return (metrics, dists) def _get_aggregate_events(self): # Get events diff --git a/datadog/threadstats/metrics.py b/datadog/threadstats/metrics.py index b14da50e5..a97c05dfd 100644 --- a/datadog/threadstats/metrics.py +++ b/datadog/threadstats/metrics.py @@ -62,6 +62,23 @@ def flush(self, timestamp, interval): return [(timestamp, count/float(interval), self.name, self.tags, self.host, MetricType.Rate, interval)] +class Distribution(Metric): + """ A distribution metric. """ + + stats_tag = 'd' + + def __init__(self, name, tags, host): + self.name = name + self.tags = tags + self.host = host + self.value = [] + + def add_point(self, value): + self.value.append(value) + + def flush(self, timestamp, interval): + return [(timestamp, self.value, self.name, self.tags, + self.host, MetricType.Distribution, interval)] class Histogram(Metric): """ A histogram metric. """ diff --git a/datadog/threadstats/reporters.py b/datadog/threadstats/reporters.py index f79189edd..985ddbfa9 100644 --- a/datadog/threadstats/reporters.py +++ b/datadog/threadstats/reporters.py @@ -14,6 +14,9 @@ def flush(self, metrics): class HttpReporter(Reporter): + def flush_distributions(self, distributions): + api.Metric.send_dist(distributions) + def flush_metrics(self, metrics): api.Metric.send(metrics) From 4921da8d81ca3b60dd9bf42db5748356c1d9d00e Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 10:07:40 -0400 Subject: [PATCH 02/11] Fix missing import and constant --- datadog/threadstats/base.py | 1 + datadog/threadstats/constants.py | 1 + 2 files changed, 2 insertions(+) diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index f801c3929..66e1fe8aa 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -14,6 +14,7 @@ # datadog from datadog.api.exceptions import ApiNotInitialized +from datadog.threadstats.constants import MetricType from datadog.threadstats.events import EventsAggregator from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing, Distribution from datadog.threadstats.reporters import HttpReporter diff --git a/datadog/threadstats/constants.py b/datadog/threadstats/constants.py index c03d01b38..837bd4b0a 100644 --- a/datadog/threadstats/constants.py +++ b/datadog/threadstats/constants.py @@ -3,6 +3,7 @@ class MetricType(object): Counter = "counter" Histogram = "histogram" Rate = "rate" + Distribution = "distribution" class MonitorType(object): From cb897575a1c49e51546947dde2ef78af87e1917b Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 10:36:43 -0400 Subject: [PATCH 03/11] Fix flake errors --- datadog/api/metrics.py | 4 ++-- datadog/threadstats/base.py | 3 ++- datadog/threadstats/metrics.py | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/datadog/api/metrics.py b/datadog/api/metrics.py index 1b954eeae..7a68c6948 100644 --- a/datadog/api/metrics.py +++ b/datadog/api/metrics.py @@ -76,7 +76,6 @@ def rec_parse(points_lst): return rec_parse(points_lst) - @classmethod def list(cls, from_epoch): """ @@ -126,7 +125,8 @@ def send(cls, metrics=None, **single_metric): @classmethod def send_dist(cls, metrics=None, **single_metric): """ - Submit a distribution metric or a list of distribution metrics to the distribution metric API + Submit a distribution metric or a list of distribution metrics to the distribution metric + API :param metric: the name of the time series :type metric: string diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index 66e1fe8aa..da63f4ab4 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -16,7 +16,8 @@ from datadog.api.exceptions import ApiNotInitialized from datadog.threadstats.constants import MetricType from datadog.threadstats.events import EventsAggregator -from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing, Distribution +from datadog.threadstats.metrics import (MetricsAggregator, Counter, Gauge, Histogram, Timing, + Distribution) from datadog.threadstats.reporters import HttpReporter # Loggers diff --git a/datadog/threadstats/metrics.py b/datadog/threadstats/metrics.py index a97c05dfd..71984824b 100644 --- a/datadog/threadstats/metrics.py +++ b/datadog/threadstats/metrics.py @@ -62,6 +62,7 @@ def flush(self, timestamp, interval): return [(timestamp, count/float(interval), self.name, self.tags, self.host, MetricType.Rate, interval)] + class Distribution(Metric): """ A distribution metric. """ @@ -80,6 +81,7 @@ def flush(self, timestamp, interval): return [(timestamp, self.value, self.name, self.tags, self.host, MetricType.Distribution, interval)] + class Histogram(Metric): """ A histogram metric. """ From 8cc513765c24315da5d6e7481de4b3c6ca5f0ce4 Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 11:14:47 -0400 Subject: [PATCH 04/11] Fix multiline import --- datadog/threadstats/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index da63f4ab4..df58d4042 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -16,8 +16,8 @@ from datadog.api.exceptions import ApiNotInitialized from datadog.threadstats.constants import MetricType from datadog.threadstats.events import EventsAggregator -from datadog.threadstats.metrics import (MetricsAggregator, Counter, Gauge, Histogram, Timing, - Distribution) +from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing,\ + Distribution from datadog.threadstats.reporters import HttpReporter # Loggers From 32341db01cc708826844231c89dc85e1515938fe Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 12:55:35 -0400 Subject: [PATCH 05/11] Add simple distribution test --- datadog/api/metrics.py | 5 +---- tests/unit/api/test_api.py | 44 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/datadog/api/metrics.py b/datadog/api/metrics.py index 7a68c6948..8f60c1ce5 100644 --- a/datadog/api/metrics.py +++ b/datadog/api/metrics.py @@ -50,10 +50,7 @@ def rec_parse(points_lst): else: timestamp = point[0] if isinstance(point[1], list): - float_points = [] - for p in point[1]: - float_points.append(float(p)) - value = float_points + value = [float(p) for p in point[1]] else: value = float(point[1]) diff --git a/tests/unit/api/test_api.py b/tests/unit/api/test_api.py index 7fb739f75..2eeca5b0f 100644 --- a/tests/unit/api/test_api.py +++ b/tests/unit/api/test_api.py @@ -385,6 +385,36 @@ def submit_and_assess_metric_payload(self, serie): # it's time not so far from current time assert now - 1 < metric['points'][0][0] < now + 1 + def submit_and_assess_dist_payload(self, serie): + """ + Helper to assess the metric payload format. + """ + now = time() + + if isinstance(serie, dict): + Metric.send_dist(**deepcopy(serie)) + serie = [serie] + else: + Metric.send_dist(deepcopy(serie)) + + payload = self.get_request_data() + + for i, metric in enumerate(payload['series']): + self.assertEquals(set(metric.keys()), set(['metric', 'points', 'host'])) + + self.assertEquals(metric['metric'], serie[i]['metric']) + self.assertEquals(metric['host'], api._host_name) + + # points is a list of 1 point + self.assertTrue(isinstance(metric['points'], list)) + self.assertEquals(len(metric['points']), 1) + # it consists of a [time, value] pair + self.assertEquals(len(metric['points'][0]), 2) + # its value == value we sent + self.assertEquals(metric['points'][0][1], serie[i]['points'][0][1]) + # it's time not so far from current time + assert now - 1 < metric['points'][0][0] < now + 1 + def test_metric_submit_query_switch(self): """ Endpoints are different for submission and queries. @@ -410,6 +440,20 @@ def test_points_submission(self): dict(metric='metric.2', points=19)] self.submit_and_assess_metric_payload(serie) + def test_dist_points_submission(self): + """ + Assess the distribution data payload format, when submitting a single or multiple points. + """ + # Single point + serie = dict(metric='metric.1', points=[[time(), [13]]]) + self.submit_and_assess_dist_payload(serie) + + # Multiple point + serie = [dict(metric='metric.1', points=[[time(), [13]]]), + dict(metric='metric.2', points=[[time(), [19]]])] + self.submit_and_assess_dist_payload(serie) + + def test_data_type_support(self): """ `Metric` API supports `real` numerical data types. From 09c41566b30acbf682f802a905992e03f0c5133f Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 13:04:56 -0400 Subject: [PATCH 06/11] Add threadstats distribution test --- tests/unit/threadstats/test_threadstats.py | 49 +++++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/tests/unit/threadstats/test_threadstats.py b/tests/unit/threadstats/test_threadstats.py index 6affd8f88..5e14d99fc 100644 --- a/tests/unit/threadstats/test_threadstats.py +++ b/tests/unit/threadstats/test_threadstats.py @@ -29,9 +29,13 @@ class MemoryReporter(object): """ def __init__(self): + self.distributions = [] self.metrics = [] self.events = [] + def flush_distributions(self, distributions): + self.distributions += distributions + def flush_metrics(self, metrics): self.metrics += metrics @@ -374,6 +378,39 @@ def test_counter(self): dog.flush(1050.0) nt.assert_equal(len(reporter.metrics), 0) + def test_distribution(self): + # Create some fake metrics. + dog = ThreadStats() + dog.start(roll_up_interval=10, flush_in_thread=False) + reporter = dog.reporter = MemoryReporter() + + dog.distribution('test.dist.1', 20, 100.0) + dog.distribution('test.dist.1', 22, 105.0) + dog.distribution('test.dist.2', 30, 115.0) + dog.distribution('test.dist.3', 30, 125.0) + dog.flush(120.0) + + # Assert they've been properly flushed. + dists = self.sort_metrics(reporter.distributions) + nt.assert_equal(len(dists), 2) + + (first, second) = dists + nt.assert_equal(first['metric'], 'test.dist.1') + nt.assert_equal(first['points'][0][0], 100.0) + nt.assert_equal(first['points'][0][1], [20, 22]) + nt.assert_equal(second['metric'], 'test.dist.2') + + # Flush again and make sure we're progressing. + reporter.distributions = [] + dog.flush(130.0) + nt.assert_equal(len(reporter.distributions), 1) + + # Finally, make sure we've flushed all metrics. + reporter.distributions = [] + dog.flush(150.0) + nt.assert_equal(len(reporter.distributions), 0) + + def test_default_host_and_device(self): dog = ThreadStats() dog.start(roll_up_interval=1, flush_in_thread=False) @@ -454,13 +491,13 @@ def test_constant_tags(self): # Assertions on gauges self.assertMetric(name='gauge', value=10, tags=["type:constant"], count=1) - self.assertMetric(name="gauge", value=15, + self.assertMetric(name="gauge", value=15, tags=["env:production", "db", "type:constant"], count=1) # noqa self.assertMetric(name="gauge", value=20, tags=["env:staging", "type:constant"], count=1) # Assertions on counters self.assertMetric(name="counter", value=1, tags=["type:constant"], count=1) - self.assertMetric(name="counter", value=1, + self.assertMetric(name="counter", value=1, tags=["env:production", "db", "type:constant"], count=1) # noqa self.assertMetric(name="counter", value=1, tags=["env:staging", "type:constant"], count=1) @@ -688,10 +725,10 @@ def test_metric_type(self): dog.increment("counter", timestamp=100.0) dog.histogram('histogram.1', 20, 100.0) dog.flush(200.0) - - (first, second, p75, p85, p95, p99, avg, cnt, + + (first, second, p75, p85, p95, p99, avg, cnt, max_, min_) = self.sort_metrics(reporter.metrics) - + # Assert Metric type nt.assert_equal(first['type'], 'rate') nt.assert_equal(second['type'], 'gauge') @@ -702,4 +739,4 @@ def test_metric_type(self): nt.assert_equal(avg['type'], 'gauge') nt.assert_equal(cnt['type'], 'rate') nt.assert_equal(max_['type'], 'gauge') - nt.assert_equal(min_['type'], 'gauge') \ No newline at end of file + nt.assert_equal(min_['type'], 'gauge') From 0221fe354457982bb8facf5ab881024fafcbec15 Mon Sep 17 00:00:00 2001 From: Joel Barciauskas Date: Thu, 1 Nov 2018 13:11:22 -0400 Subject: [PATCH 07/11] Code review feedback --- datadog/api/metrics.py | 3 ++- datadog/threadstats/base.py | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/datadog/api/metrics.py b/datadog/api/metrics.py index 8f60c1ce5..20e2f172d 100644 --- a/datadog/api/metrics.py +++ b/datadog/api/metrics.py @@ -1,5 +1,6 @@ # stdlib import time +from collections import Iterable from numbers import Number # datadog @@ -49,7 +50,7 @@ def rec_parse(points_lst): # Distributions contain a list of points else: timestamp = point[0] - if isinstance(point[1], list): + if isinstance(point[1], Iterable): value = [float(p) for p in point[1]] else: value = float(point[1]) diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index df58d4042..28548ee05 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -210,7 +210,7 @@ def histogram(self, metric_name, value, timestamp=None, tags=None, sample_rate=1 def distribution(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None): """ - Sample a distirbution value. Distributions will produce metrics that + Sample a distribution value. Distributions will produce metrics that describe the distribution of the recorded values, namely the maximum, median, average, count and the 50/75/90/95/99 percentiles. Optionally, specify a list of ``tags`` to associate with the metric. @@ -374,10 +374,10 @@ def _get_aggregate_metrics_and_dists(self, flush_time=None): 'tags': metric_tags, 'interval': interval } - if metric_type != MetricType.Distribution: - metrics.append(metric) - else: + if metric_type == MetricType.Distribution: dists.append(metric) + else: + metrics.append(metric) return (metrics, dists) def _get_aggregate_events(self): From cf04a2f1386d5f91bea20d60af8e3106ac0046a4 Mon Sep 17 00:00:00 2001 From: Marek Stepniowski Date: Thu, 1 Nov 2018 13:20:47 -0400 Subject: [PATCH 08/11] Remove unneeded parens Co-Authored-By: jbarciauskas --- datadog/threadstats/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index 28548ee05..0539cc161 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -304,7 +304,7 @@ def flush(self, timestamp=None): self._is_flush_in_progress = True # Process metrics - (metrics, dists) = self._get_aggregate_metrics_and_dists(timestamp or time()) + metrics, dists = self._get_aggregate_metrics_and_dists(timestamp or time()) count_metrics = len(metrics) if count_metrics: self.flush_count += 1 From 5b06abd28088bd5a0aac514c4d8ce6f8d48adfa3 Mon Sep 17 00:00:00 2001 From: Yann MAHE Date: Wed, 7 Nov 2018 17:55:13 -0500 Subject: [PATCH 09/11] [api] make distribution its own resource --- datadog/api/__init__.py | 1 + datadog/api/distributions.py | 37 ++++++++++ datadog/api/format.py | 58 +++++++++++++++ datadog/api/metrics.py | 121 ++++--------------------------- datadog/threadstats/reporters.py | 2 +- tests/unit/api/test_api.py | 14 ++-- 6 files changed, 122 insertions(+), 111 deletions(-) create mode 100644 datadog/api/distributions.py create mode 100644 datadog/api/format.py diff --git a/datadog/api/__init__.py b/datadog/api/__init__.py index bdd2c3487..f88a371f6 100644 --- a/datadog/api/__init__.py +++ b/datadog/api/__init__.py @@ -19,6 +19,7 @@ # Resources from datadog.api.comments import Comment from datadog.api.dashboard_lists import DashboardList +from datadog.api.distributions import Distribution from datadog.api.downtimes import Downtime from datadog.api.timeboards import Timeboard from datadog.api.events import Event diff --git a/datadog/api/distributions.py b/datadog/api/distributions.py new file mode 100644 index 000000000..0a50229e3 --- /dev/null +++ b/datadog/api/distributions.py @@ -0,0 +1,37 @@ +# datadog +from datadog.api.format import format_points +from datadog.api.resources import SendableAPIResource + + +class Distribution(SendableAPIResource): + """A wrapper around Distribution HTTP API""" + _resource_name = 'distribution_points' + + @classmethod + def send(cls, distributions=None, **distribution): + """ + Submit a distribution metric or a list of distribution metrics to the distribution metric + API + :param metric: the name of the time series + :type metric: string + :param points: a (timestamp, [list of values]) pair or list of (timestamp, [list of values]) pairs + :type points: list + :param host: host name that produced the metric + :type host: string + :param tags: list of tags associated with the metric. + :type tags: string list + :param type: type of the metric + :type type: 'gauge' or 'count' or 'rate' string + :returns: Dictionary representing the API's JSON response + """ + if distributions: + # Multiple distributions are sent + for d in distributions: + if isinstance(d, dict): + d['points'] = format_points(d['points']) + series_dict = {"series": distributions} + else: + # One distribution is sent + distribution['points'] = format_points(distribution['points']) + series_dict = {"series": [distribution]} + return super(Distribution, cls).send(attach_host_name=True, **series_dict) diff --git a/datadog/api/format.py b/datadog/api/format.py new file mode 100644 index 000000000..fe7a33e4b --- /dev/null +++ b/datadog/api/format.py @@ -0,0 +1,58 @@ +from collections import Iterable +from numbers import Number +import time + + +def format_points(points): + """ + Format `points` parameter. + + Input: + a value or (timestamp, value) pair or a list of value or (timestamp, value) pairs + + Returns: + list of (timestamp, float value) pairs + + """ + now = time.time() + points_lst = points if isinstance(points, list) else [points] + + def rec_parse(points_lst): + """ + Recursively parse a list of values or a list of (timestamp, value) pairs to a list of + (timestamp, `float` value) pairs. + """ + try: + if not points_lst: + return [] + + point = points_lst.pop() + if isinstance(point, Number): + timestamp = now + value = float(point) + # Distributions contain a list of points + else: + timestamp = point[0] + if isinstance(point[1], Iterable): + value = [float(p) for p in point[1]] + else: + value = float(point[1]) + + point = [(timestamp, value)] + + return point + rec_parse(points_lst) + + except TypeError as e: + raise TypeError( + u"{0}: " + "`points` parameter must use real numerical values.".format(e) + ) + + except IndexError as e: + raise IndexError( + u"{0}: " + u"`points` must be a list of values or " + u"a list of (timestamp, value) pairs".format(e) + ) + + return rec_parse(points_lst) diff --git a/datadog/api/metrics.py b/datadog/api/metrics.py index 20e2f172d..4f5de2e5d 100644 --- a/datadog/api/metrics.py +++ b/datadog/api/metrics.py @@ -1,10 +1,6 @@ -# stdlib -import time -from collections import Iterable -from numbers import Number - # datadog from datadog.api.exceptions import ApiError +from datadog.api.format import format_points from datadog.api.resources import SearchableAPIResource, SendableAPIResource, ListableAPIResource @@ -17,62 +13,6 @@ class Metric(SearchableAPIResource, SendableAPIResource, ListableAPIResource): _METRIC_QUERY_ENDPOINT = 'query' _METRIC_SUBMIT_ENDPOINT = 'series' _METRIC_LIST_ENDPOINT = 'metrics' - _METRIC_DIST_ENDPOINT = 'distribution_points' - - @classmethod - def _process_points(cls, points): - """ - Format `points` parameter. - - Input: - a value or (timestamp, value) pair or a list of value or (timestamp, value) pairs - - Returns: - list of (timestamp, float value) pairs - - """ - now = time.time() - points_lst = points if isinstance(points, list) else [points] - - def rec_parse(points_lst): - """ - Recursively parse a list of values or a list of (timestamp, value) pairs to a list of - (timestamp, `float` value) pairs. - """ - try: - if not points_lst: - return [] - - point = points_lst.pop() - if isinstance(point, Number): - timestamp = now - value = float(point) - # Distributions contain a list of points - else: - timestamp = point[0] - if isinstance(point[1], Iterable): - value = [float(p) for p in point[1]] - else: - value = float(point[1]) - - point = [(timestamp, value)] - - return point + rec_parse(points_lst) - - except TypeError as e: - raise TypeError( - u"{0}: " - "`points` parameter must use real numerical values.".format(e) - ) - - except IndexError as e: - raise IndexError( - u"{0}: " - u"`points` must be a list of values or " - u"a list of (timestamp, value) pairs".format(e) - ) - - return rec_parse(points_lst) @classmethod def list(cls, from_epoch): @@ -94,37 +34,22 @@ def list(cls, from_epoch): return super(Metric, cls).get_all(**params) - @classmethod - def send(cls, metrics=None, **single_metric): + @staticmethod + def _rename_metric_type(metric): """ - Submit a metric or a list of metrics to the metric API - - :param metric: the name of the time series - :type metric: string - - :param points: a (timestamp, value) pair or list of (timestamp, value) pairs - :type points: list + FIXME DROPME in 1.0: - :param host: host name that produced the metric - :type host: string - - :param tags: list of tags associated with the metric. - :type tags: string list - - :param type: type of the metric - :type type: 'gauge' or 'count' or 'rate' string - - :returns: Dictionary representing the API's JSON response + API documentation was illegitimately promoting usage of `metric_type` parameter + instead of `type`. + To be consistent and avoid 'backward incompatibilities', properly rename this parameter. """ - # Set the right endpoint - cls._resource_name = cls._METRIC_SUBMIT_ENDPOINT - cls._send_common(metrics, **single_metric) + if 'metric_type' in metric: + metric['type'] = metric.pop('metric_type') @classmethod - def send_dist(cls, metrics=None, **single_metric): + def send(cls, metrics=None, **single_metric): """ - Submit a distribution metric or a list of distribution metrics to the distribution metric - API + Submit a metric or a list of metrics to the metric API :param metric: the name of the time series :type metric: string @@ -144,33 +69,19 @@ def send_dist(cls, metrics=None, **single_metric): :returns: Dictionary representing the API's JSON response """ # Set the right endpoint - cls._resource_name = cls._METRIC_DIST_ENDPOINT - cls._send_common(metrics, **single_metric) - - @classmethod - def _send_common(cls, metrics=None, **single_metric): - def rename_metric_type(metric): - """ - FIXME DROPME in 1.0: - - API documentation was illegitimately promoting usage of `metric_type` parameter - instead of `type`. - To be consistent and avoid 'backward incompatibilities', properly rename this parameter. - """ - if 'metric_type' in metric: - metric['type'] = metric.pop('metric_type') + cls._resource_name = cls._METRIC_SUBMIT_ENDPOINT # Format the payload try: if metrics: for metric in metrics: if isinstance(metric, dict): - rename_metric_type(metric) - metric['points'] = cls._process_points(metric['points']) + cls._rename_metric_type(metric) + metric['points'] = format_points(metric['points']) metrics_dict = {"series": metrics} else: - rename_metric_type(single_metric) - single_metric['points'] = cls._process_points(single_metric['points']) + cls._rename_metric_type(single_metric) + single_metric['points'] = format_points(single_metric['points']) metrics = [single_metric] metrics_dict = {"series": metrics} diff --git a/datadog/threadstats/reporters.py b/datadog/threadstats/reporters.py index 985ddbfa9..cffd06359 100644 --- a/datadog/threadstats/reporters.py +++ b/datadog/threadstats/reporters.py @@ -15,7 +15,7 @@ def flush(self, metrics): class HttpReporter(Reporter): def flush_distributions(self, distributions): - api.Metric.send_dist(distributions) + api.Distribution.send(distributions) def flush_metrics(self, metrics): api.Metric.send(metrics) diff --git a/tests/unit/api/test_api.py b/tests/unit/api/test_api.py index 2eeca5b0f..c70e63d32 100644 --- a/tests/unit/api/test_api.py +++ b/tests/unit/api/test_api.py @@ -10,7 +10,11 @@ # datadog from datadog import initialize, api -from datadog.api import Metric, ServiceCheck +from datadog.api import ( + Distribution, + Metric, + ServiceCheck +) from datadog.api.exceptions import ApiError, ApiNotInitialized from datadog.util.compat import is_p3k from tests.unit.api.helper import ( @@ -30,7 +34,8 @@ APP_KEY, API_HOST, HOST_NAME, - FAKE_PROXY) + FAKE_PROXY +) def preserve_environ_datadog(func): @@ -392,10 +397,10 @@ def submit_and_assess_dist_payload(self, serie): now = time() if isinstance(serie, dict): - Metric.send_dist(**deepcopy(serie)) + Distribution.send(**deepcopy(serie)) serie = [serie] else: - Metric.send_dist(deepcopy(serie)) + Distribution.send(deepcopy(serie)) payload = self.get_request_data() @@ -453,7 +458,6 @@ def test_dist_points_submission(self): dict(metric='metric.2', points=[[time(), [19]]])] self.submit_and_assess_dist_payload(serie) - def test_data_type_support(self): """ `Metric` API supports `real` numerical data types. From 4fe1384de3219d03075091e7e4da0265a4c3b812 Mon Sep 17 00:00:00 2001 From: Yann MAHE Date: Wed, 7 Nov 2018 18:02:07 -0500 Subject: [PATCH 10/11] flake8 max line --- datadog/api/distributions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog/api/distributions.py b/datadog/api/distributions.py index 0a50229e3..edad7145e 100644 --- a/datadog/api/distributions.py +++ b/datadog/api/distributions.py @@ -14,7 +14,8 @@ def send(cls, distributions=None, **distribution): API :param metric: the name of the time series :type metric: string - :param points: a (timestamp, [list of values]) pair or list of (timestamp, [list of values]) pairs + :param points: a (timestamp, [list of values]) pair or + list of (timestamp, [list of values]) pairs :type points: list :param host: host name that produced the metric :type host: string From c5d38cd33e79352db1e8466353514ab5358a0494 Mon Sep 17 00:00:00 2001 From: Yann MAHE Date: Wed, 7 Nov 2018 18:21:08 -0500 Subject: [PATCH 11/11] [api] rewrite format method --- datadog/api/distributions.py | 2 -- datadog/api/format.py | 58 +++++++++++------------------------- 2 files changed, 18 insertions(+), 42 deletions(-) diff --git a/datadog/api/distributions.py b/datadog/api/distributions.py index edad7145e..ecffa4c50 100644 --- a/datadog/api/distributions.py +++ b/datadog/api/distributions.py @@ -21,8 +21,6 @@ def send(cls, distributions=None, **distribution): :type host: string :param tags: list of tags associated with the metric. :type tags: string list - :param type: type of the metric - :type type: 'gauge' or 'count' or 'rate' string :returns: Dictionary representing the API's JSON response """ if distributions: diff --git a/datadog/api/format.py b/datadog/api/format.py index fe7a33e4b..dd3067ad1 100644 --- a/datadog/api/format.py +++ b/datadog/api/format.py @@ -15,44 +15,22 @@ def format_points(points): """ now = time.time() - points_lst = points if isinstance(points, list) else [points] - - def rec_parse(points_lst): - """ - Recursively parse a list of values or a list of (timestamp, value) pairs to a list of - (timestamp, `float` value) pairs. - """ - try: - if not points_lst: - return [] - - point = points_lst.pop() - if isinstance(point, Number): - timestamp = now - value = float(point) - # Distributions contain a list of points + if not isinstance(points, list): + points = [points] + + formatted_points = [] + for point in points: + if isinstance(point, Number): + timestamp = now + value = float(point) + # Distributions contain a list of points + else: + timestamp = point[0] + if isinstance(point[1], Iterable): + value = [float(p) for p in point[1]] else: - timestamp = point[0] - if isinstance(point[1], Iterable): - value = [float(p) for p in point[1]] - else: - value = float(point[1]) - - point = [(timestamp, value)] - - return point + rec_parse(points_lst) - - except TypeError as e: - raise TypeError( - u"{0}: " - "`points` parameter must use real numerical values.".format(e) - ) - - except IndexError as e: - raise IndexError( - u"{0}: " - u"`points` must be a list of values or " - u"a list of (timestamp, value) pairs".format(e) - ) - - return rec_parse(points_lst) + value = float(point[1]) + + formatted_points.append((timestamp, value)) + + return formatted_points