diff --git a/datadog/api/__init__.py b/datadog/api/__init__.py index bdd2c3487..f88a371f6 100644 --- a/datadog/api/__init__.py +++ b/datadog/api/__init__.py @@ -19,6 +19,7 @@ # Resources from datadog.api.comments import Comment from datadog.api.dashboard_lists import DashboardList +from datadog.api.distributions import Distribution from datadog.api.downtimes import Downtime from datadog.api.timeboards import Timeboard from datadog.api.events import Event diff --git a/datadog/api/distributions.py b/datadog/api/distributions.py new file mode 100644 index 000000000..ecffa4c50 --- /dev/null +++ b/datadog/api/distributions.py @@ -0,0 +1,36 @@ +# datadog +from datadog.api.format import format_points +from datadog.api.resources import SendableAPIResource + + +class Distribution(SendableAPIResource): + """A wrapper around Distribution HTTP API""" + _resource_name = 'distribution_points' + + @classmethod + def send(cls, distributions=None, **distribution): + """ + Submit a distribution metric or a list of distribution metrics to the distribution metric + API + :param metric: the name of the time series + :type metric: string + :param points: a (timestamp, [list of values]) pair or + list of (timestamp, [list of values]) pairs + :type points: list + :param host: host name that produced the metric + :type host: string + :param tags: list of tags associated with the metric. + :type tags: string list + :returns: Dictionary representing the API's JSON response + """ + if distributions: + # Multiple distributions are sent + for d in distributions: + if isinstance(d, dict): + d['points'] = format_points(d['points']) + series_dict = {"series": distributions} + else: + # One distribution is sent + distribution['points'] = format_points(distribution['points']) + series_dict = {"series": [distribution]} + return super(Distribution, cls).send(attach_host_name=True, **series_dict) diff --git a/datadog/api/format.py b/datadog/api/format.py new file mode 100644 index 000000000..dd3067ad1 --- /dev/null +++ b/datadog/api/format.py @@ -0,0 +1,36 @@ +from collections import Iterable +from numbers import Number +import time + + +def format_points(points): + """ + Format `points` parameter. + + Input: + a value or (timestamp, value) pair or a list of value or (timestamp, value) pairs + + Returns: + list of (timestamp, float value) pairs + + """ + now = time.time() + if not isinstance(points, list): + points = [points] + + formatted_points = [] + for point in points: + if isinstance(point, Number): + timestamp = now + value = float(point) + # Distributions contain a list of points + else: + timestamp = point[0] + if isinstance(point[1], Iterable): + value = [float(p) for p in point[1]] + else: + value = float(point[1]) + + formatted_points.append((timestamp, value)) + + return formatted_points diff --git a/datadog/api/metrics.py b/datadog/api/metrics.py index 89c3b0a51..4f5de2e5d 100644 --- a/datadog/api/metrics.py +++ b/datadog/api/metrics.py @@ -1,9 +1,6 @@ -# stdlib -import time -from numbers import Number - # datadog from datadog.api.exceptions import ApiError +from datadog.api.format import format_points from datadog.api.resources import SearchableAPIResource, SendableAPIResource, ListableAPIResource @@ -17,53 +14,6 @@ class Metric(SearchableAPIResource, SendableAPIResource, ListableAPIResource): _METRIC_SUBMIT_ENDPOINT = 'series' _METRIC_LIST_ENDPOINT = 'metrics' - @classmethod - def _process_points(cls, points): - """ - Format `points` parameter. - - Input: - a value or (timestamp, value) pair or a list of value or (timestamp, value) pairs - - Returns: - list of (timestamp, float value) pairs - - """ - now = time.time() - points_lst = points if isinstance(points, list) else [points] - - def rec_parse(points_lst): - """ - Recursively parse a list of values or a list of (timestamp, value) pairs to a list of - (timestamp, `float` value) pairs. - """ - try: - if not points_lst: - return [] - - point = points_lst.pop() - timestamp = now if isinstance(point, Number) else point[0] - value = float(point) if isinstance(point, Number) else float(point[1]) - - point = [(timestamp, value)] - - return point + rec_parse(points_lst) - - except TypeError as e: - raise TypeError( - u"{0}: " - "`points` parameter must use real numerical values.".format(e) - ) - - except IndexError as e: - raise IndexError( - u"{0}: " - u"`points` must be a list of values or " - u"a list of (timestamp, value) pairs".format(e) - ) - - return rec_parse(points_lst) - @classmethod def list(cls, from_epoch): """ @@ -84,6 +34,18 @@ def list(cls, from_epoch): return super(Metric, cls).get_all(**params) + @staticmethod + def _rename_metric_type(metric): + """ + FIXME DROPME in 1.0: + + API documentation was illegitimately promoting usage of `metric_type` parameter + instead of `type`. + To be consistent and avoid 'backward incompatibilities', properly rename this parameter. + """ + if 'metric_type' in metric: + metric['type'] = metric.pop('metric_type') + @classmethod def send(cls, metrics=None, **single_metric): """ @@ -106,17 +68,6 @@ def send(cls, metrics=None, **single_metric): :returns: Dictionary representing the API's JSON response """ - def rename_metric_type(metric): - """ - FIXME DROPME in 1.0: - - API documentation was illegitimately promoting usage of `metric_type` parameter - instead of `type`. - To be consistent and avoid 'backward incompatibilities', properly rename this parameter. - """ - if 'metric_type' in metric: - metric['type'] = metric.pop('metric_type') - # Set the right endpoint cls._resource_name = cls._METRIC_SUBMIT_ENDPOINT @@ -125,12 +76,12 @@ def rename_metric_type(metric): if metrics: for metric in metrics: if isinstance(metric, dict): - rename_metric_type(metric) - metric['points'] = cls._process_points(metric['points']) + cls._rename_metric_type(metric) + metric['points'] = format_points(metric['points']) metrics_dict = {"series": metrics} else: - rename_metric_type(single_metric) - single_metric['points'] = cls._process_points(single_metric['points']) + cls._rename_metric_type(single_metric) + single_metric['points'] = format_points(single_metric['points']) metrics = [single_metric] metrics_dict = {"series": metrics} diff --git a/datadog/threadstats/base.py b/datadog/threadstats/base.py index bcc313061..0539cc161 100644 --- a/datadog/threadstats/base.py +++ b/datadog/threadstats/base.py @@ -14,8 +14,10 @@ # datadog from datadog.api.exceptions import ApiNotInitialized +from datadog.threadstats.constants import MetricType from datadog.threadstats.events import EventsAggregator -from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing +from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing,\ + Distribution from datadog.threadstats.reporters import HttpReporter # Loggers @@ -206,6 +208,19 @@ def histogram(self, metric_name, value, timestamp=None, tags=None, sample_rate=1 self._metric_aggregator.add_point(metric_name, tags, timestamp or time(), value, Histogram, sample_rate=sample_rate, host=host) + def distribution(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None): + """ + Sample a distribution value. Distributions will produce metrics that + describe the distribution of the recorded values, namely the maximum, + median, average, count and the 50/75/90/95/99 percentiles. Optionally, + specify a list of ``tags`` to associate with the metric. + + >>> stats.distribution('uploaded_file.size', uploaded_file.size()) + """ + if not self._disabled: + self._metric_aggregator.add_point(metric_name, tags, timestamp or time(), value, + Distribution, sample_rate=sample_rate, host=host) + def timing(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None): """ Record a timing, optionally setting tags and a sample rate. @@ -289,7 +304,7 @@ def flush(self, timestamp=None): self._is_flush_in_progress = True # Process metrics - metrics = self._get_aggregate_metrics(timestamp or time()) + metrics, dists = self._get_aggregate_metrics_and_dists(timestamp or time()) count_metrics = len(metrics) if count_metrics: self.flush_count += 1 @@ -298,6 +313,14 @@ def flush(self, timestamp=None): else: log.debug("No metrics to flush. Continuing.") + count_dists = len(dists) + if count_dists: + self.flush_count += 1 + log.debug("Flush #%s sending %s distributions" % (self.flush_count, count_dists)) + self.reporter.flush_distributions(dists) + else: + log.debug("No distributions to flush. Continuing.") + # Process events events = self._get_aggregate_events() count_events = len(events) @@ -317,7 +340,7 @@ def flush(self, timestamp=None): finally: self._is_flush_in_progress = False - def _get_aggregate_metrics(self, flush_time=None): + def _get_aggregate_metrics_and_dists(self, flush_time=None): """ Get, format and return the rolled up metrics from the aggregator. """ @@ -326,6 +349,7 @@ def _get_aggregate_metrics(self, flush_time=None): # FIXME: emit a dictionary from the aggregator metrics = [] + dists = [] for timestamp, value, name, tags, host, metric_type, interval in rolled_up_metrics: metric_tags = tags metric_name = name @@ -350,8 +374,11 @@ def _get_aggregate_metrics(self, flush_time=None): 'tags': metric_tags, 'interval': interval } - metrics.append(metric) - return metrics + if metric_type == MetricType.Distribution: + dists.append(metric) + else: + metrics.append(metric) + return (metrics, dists) def _get_aggregate_events(self): # Get events diff --git a/datadog/threadstats/constants.py b/datadog/threadstats/constants.py index c03d01b38..837bd4b0a 100644 --- a/datadog/threadstats/constants.py +++ b/datadog/threadstats/constants.py @@ -3,6 +3,7 @@ class MetricType(object): Counter = "counter" Histogram = "histogram" Rate = "rate" + Distribution = "distribution" class MonitorType(object): diff --git a/datadog/threadstats/metrics.py b/datadog/threadstats/metrics.py index b14da50e5..71984824b 100644 --- a/datadog/threadstats/metrics.py +++ b/datadog/threadstats/metrics.py @@ -63,6 +63,25 @@ def flush(self, timestamp, interval): self.tags, self.host, MetricType.Rate, interval)] +class Distribution(Metric): + """ A distribution metric. """ + + stats_tag = 'd' + + def __init__(self, name, tags, host): + self.name = name + self.tags = tags + self.host = host + self.value = [] + + def add_point(self, value): + self.value.append(value) + + def flush(self, timestamp, interval): + return [(timestamp, self.value, self.name, self.tags, + self.host, MetricType.Distribution, interval)] + + class Histogram(Metric): """ A histogram metric. """ diff --git a/datadog/threadstats/reporters.py b/datadog/threadstats/reporters.py index f79189edd..cffd06359 100644 --- a/datadog/threadstats/reporters.py +++ b/datadog/threadstats/reporters.py @@ -14,6 +14,9 @@ def flush(self, metrics): class HttpReporter(Reporter): + def flush_distributions(self, distributions): + api.Distribution.send(distributions) + def flush_metrics(self, metrics): api.Metric.send(metrics) diff --git a/tests/unit/api/test_api.py b/tests/unit/api/test_api.py index 7fb739f75..c70e63d32 100644 --- a/tests/unit/api/test_api.py +++ b/tests/unit/api/test_api.py @@ -10,7 +10,11 @@ # datadog from datadog import initialize, api -from datadog.api import Metric, ServiceCheck +from datadog.api import ( + Distribution, + Metric, + ServiceCheck +) from datadog.api.exceptions import ApiError, ApiNotInitialized from datadog.util.compat import is_p3k from tests.unit.api.helper import ( @@ -30,7 +34,8 @@ APP_KEY, API_HOST, HOST_NAME, - FAKE_PROXY) + FAKE_PROXY +) def preserve_environ_datadog(func): @@ -385,6 +390,36 @@ def submit_and_assess_metric_payload(self, serie): # it's time not so far from current time assert now - 1 < metric['points'][0][0] < now + 1 + def submit_and_assess_dist_payload(self, serie): + """ + Helper to assess the metric payload format. + """ + now = time() + + if isinstance(serie, dict): + Distribution.send(**deepcopy(serie)) + serie = [serie] + else: + Distribution.send(deepcopy(serie)) + + payload = self.get_request_data() + + for i, metric in enumerate(payload['series']): + self.assertEquals(set(metric.keys()), set(['metric', 'points', 'host'])) + + self.assertEquals(metric['metric'], serie[i]['metric']) + self.assertEquals(metric['host'], api._host_name) + + # points is a list of 1 point + self.assertTrue(isinstance(metric['points'], list)) + self.assertEquals(len(metric['points']), 1) + # it consists of a [time, value] pair + self.assertEquals(len(metric['points'][0]), 2) + # its value == value we sent + self.assertEquals(metric['points'][0][1], serie[i]['points'][0][1]) + # it's time not so far from current time + assert now - 1 < metric['points'][0][0] < now + 1 + def test_metric_submit_query_switch(self): """ Endpoints are different for submission and queries. @@ -410,6 +445,19 @@ def test_points_submission(self): dict(metric='metric.2', points=19)] self.submit_and_assess_metric_payload(serie) + def test_dist_points_submission(self): + """ + Assess the distribution data payload format, when submitting a single or multiple points. + """ + # Single point + serie = dict(metric='metric.1', points=[[time(), [13]]]) + self.submit_and_assess_dist_payload(serie) + + # Multiple point + serie = [dict(metric='metric.1', points=[[time(), [13]]]), + dict(metric='metric.2', points=[[time(), [19]]])] + self.submit_and_assess_dist_payload(serie) + def test_data_type_support(self): """ `Metric` API supports `real` numerical data types. diff --git a/tests/unit/threadstats/test_threadstats.py b/tests/unit/threadstats/test_threadstats.py index 6affd8f88..5e14d99fc 100644 --- a/tests/unit/threadstats/test_threadstats.py +++ b/tests/unit/threadstats/test_threadstats.py @@ -29,9 +29,13 @@ class MemoryReporter(object): """ def __init__(self): + self.distributions = [] self.metrics = [] self.events = [] + def flush_distributions(self, distributions): + self.distributions += distributions + def flush_metrics(self, metrics): self.metrics += metrics @@ -374,6 +378,39 @@ def test_counter(self): dog.flush(1050.0) nt.assert_equal(len(reporter.metrics), 0) + def test_distribution(self): + # Create some fake metrics. + dog = ThreadStats() + dog.start(roll_up_interval=10, flush_in_thread=False) + reporter = dog.reporter = MemoryReporter() + + dog.distribution('test.dist.1', 20, 100.0) + dog.distribution('test.dist.1', 22, 105.0) + dog.distribution('test.dist.2', 30, 115.0) + dog.distribution('test.dist.3', 30, 125.0) + dog.flush(120.0) + + # Assert they've been properly flushed. + dists = self.sort_metrics(reporter.distributions) + nt.assert_equal(len(dists), 2) + + (first, second) = dists + nt.assert_equal(first['metric'], 'test.dist.1') + nt.assert_equal(first['points'][0][0], 100.0) + nt.assert_equal(first['points'][0][1], [20, 22]) + nt.assert_equal(second['metric'], 'test.dist.2') + + # Flush again and make sure we're progressing. + reporter.distributions = [] + dog.flush(130.0) + nt.assert_equal(len(reporter.distributions), 1) + + # Finally, make sure we've flushed all metrics. + reporter.distributions = [] + dog.flush(150.0) + nt.assert_equal(len(reporter.distributions), 0) + + def test_default_host_and_device(self): dog = ThreadStats() dog.start(roll_up_interval=1, flush_in_thread=False) @@ -454,13 +491,13 @@ def test_constant_tags(self): # Assertions on gauges self.assertMetric(name='gauge', value=10, tags=["type:constant"], count=1) - self.assertMetric(name="gauge", value=15, + self.assertMetric(name="gauge", value=15, tags=["env:production", "db", "type:constant"], count=1) # noqa self.assertMetric(name="gauge", value=20, tags=["env:staging", "type:constant"], count=1) # Assertions on counters self.assertMetric(name="counter", value=1, tags=["type:constant"], count=1) - self.assertMetric(name="counter", value=1, + self.assertMetric(name="counter", value=1, tags=["env:production", "db", "type:constant"], count=1) # noqa self.assertMetric(name="counter", value=1, tags=["env:staging", "type:constant"], count=1) @@ -688,10 +725,10 @@ def test_metric_type(self): dog.increment("counter", timestamp=100.0) dog.histogram('histogram.1', 20, 100.0) dog.flush(200.0) - - (first, second, p75, p85, p95, p99, avg, cnt, + + (first, second, p75, p85, p95, p99, avg, cnt, max_, min_) = self.sort_metrics(reporter.metrics) - + # Assert Metric type nt.assert_equal(first['type'], 'rate') nt.assert_equal(second['type'], 'gauge') @@ -702,4 +739,4 @@ def test_metric_type(self): nt.assert_equal(avg['type'], 'gauge') nt.assert_equal(cnt['type'], 'rate') nt.assert_equal(max_['type'], 'gauge') - nt.assert_equal(min_['type'], 'gauge') \ No newline at end of file + nt.assert_equal(min_['type'], 'gauge')