Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for distributions to threadstats and the API #312

Merged
merged 11 commits into from
Nov 12, 2018
1 change: 1 addition & 0 deletions datadog/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# Resources
from datadog.api.comments import Comment
from datadog.api.dashboard_lists import DashboardList
from datadog.api.distributions import Distribution
from datadog.api.downtimes import Downtime
from datadog.api.timeboards import Timeboard
from datadog.api.events import Event
Expand Down
36 changes: 36 additions & 0 deletions datadog/api/distributions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# datadog
from datadog.api.format import format_points
from datadog.api.resources import SendableAPIResource


class Distribution(SendableAPIResource):
"""A wrapper around Distribution HTTP API"""
_resource_name = 'distribution_points'

@classmethod
def send(cls, distributions=None, **distribution):
"""
Submit a distribution metric or a list of distribution metrics to the distribution metric
API
:param metric: the name of the time series
:type metric: string
:param points: a (timestamp, [list of values]) pair or
list of (timestamp, [list of values]) pairs
:type points: list
:param host: host name that produced the metric
:type host: string
:param tags: list of tags associated with the metric.
:type tags: string list
:returns: Dictionary representing the API's JSON response
"""
if distributions:
# Multiple distributions are sent
for d in distributions:
if isinstance(d, dict):
d['points'] = format_points(d['points'])
series_dict = {"series": distributions}
else:
# One distribution is sent
distribution['points'] = format_points(distribution['points'])
series_dict = {"series": [distribution]}
return super(Distribution, cls).send(attach_host_name=True, **series_dict)
36 changes: 36 additions & 0 deletions datadog/api/format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from collections import Iterable
from numbers import Number
import time


def format_points(points):
"""
Format `points` parameter.

Input:
a value or (timestamp, value) pair or a list of value or (timestamp, value) pairs

Returns:
list of (timestamp, float value) pairs

"""
now = time.time()
if not isinstance(points, list):
points = [points]

formatted_points = []
for point in points:
if isinstance(point, Number):
timestamp = now
value = float(point)
# Distributions contain a list of points
else:
timestamp = point[0]
if isinstance(point[1], Iterable):
value = [float(p) for p in point[1]]
else:
value = float(point[1])

formatted_points.append((timestamp, value))

return formatted_points
83 changes: 17 additions & 66 deletions datadog/api/metrics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# stdlib
import time
from numbers import Number

# datadog
from datadog.api.exceptions import ApiError
from datadog.api.format import format_points
from datadog.api.resources import SearchableAPIResource, SendableAPIResource, ListableAPIResource


Expand All @@ -17,53 +14,6 @@ class Metric(SearchableAPIResource, SendableAPIResource, ListableAPIResource):
_METRIC_SUBMIT_ENDPOINT = 'series'
_METRIC_LIST_ENDPOINT = 'metrics'

@classmethod
def _process_points(cls, points):
"""
Format `points` parameter.

Input:
a value or (timestamp, value) pair or a list of value or (timestamp, value) pairs

Returns:
list of (timestamp, float value) pairs

"""
now = time.time()
points_lst = points if isinstance(points, list) else [points]

def rec_parse(points_lst):
"""
Recursively parse a list of values or a list of (timestamp, value) pairs to a list of
(timestamp, `float` value) pairs.
"""
try:
if not points_lst:
return []

point = points_lst.pop()
timestamp = now if isinstance(point, Number) else point[0]
value = float(point) if isinstance(point, Number) else float(point[1])

point = [(timestamp, value)]

return point + rec_parse(points_lst)

except TypeError as e:
raise TypeError(
u"{0}: "
"`points` parameter must use real numerical values.".format(e)
)

except IndexError as e:
raise IndexError(
u"{0}: "
u"`points` must be a list of values or "
u"a list of (timestamp, value) pairs".format(e)
)

return rec_parse(points_lst)

@classmethod
def list(cls, from_epoch):
"""
Expand All @@ -84,6 +34,18 @@ def list(cls, from_epoch):

return super(Metric, cls).get_all(**params)

@staticmethod
def _rename_metric_type(metric):
"""
FIXME DROPME in 1.0:

API documentation was illegitimately promoting usage of `metric_type` parameter
instead of `type`.
To be consistent and avoid 'backward incompatibilities', properly rename this parameter.
"""
if 'metric_type' in metric:
metric['type'] = metric.pop('metric_type')

@classmethod
def send(cls, metrics=None, **single_metric):
"""
Expand All @@ -106,17 +68,6 @@ def send(cls, metrics=None, **single_metric):

:returns: Dictionary representing the API's JSON response
"""
def rename_metric_type(metric):
"""
FIXME DROPME in 1.0:

API documentation was illegitimately promoting usage of `metric_type` parameter
instead of `type`.
To be consistent and avoid 'backward incompatibilities', properly rename this parameter.
"""
if 'metric_type' in metric:
metric['type'] = metric.pop('metric_type')

# Set the right endpoint
cls._resource_name = cls._METRIC_SUBMIT_ENDPOINT

Expand All @@ -125,12 +76,12 @@ def rename_metric_type(metric):
if metrics:
for metric in metrics:
if isinstance(metric, dict):
rename_metric_type(metric)
metric['points'] = cls._process_points(metric['points'])
cls._rename_metric_type(metric)
metric['points'] = format_points(metric['points'])
metrics_dict = {"series": metrics}
else:
rename_metric_type(single_metric)
single_metric['points'] = cls._process_points(single_metric['points'])
cls._rename_metric_type(single_metric)
single_metric['points'] = format_points(single_metric['points'])
metrics = [single_metric]
metrics_dict = {"series": metrics}

Expand Down
37 changes: 32 additions & 5 deletions datadog/threadstats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

# datadog
from datadog.api.exceptions import ApiNotInitialized
from datadog.threadstats.constants import MetricType
from datadog.threadstats.events import EventsAggregator
from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing
from datadog.threadstats.metrics import MetricsAggregator, Counter, Gauge, Histogram, Timing,\
Distribution
from datadog.threadstats.reporters import HttpReporter

# Loggers
Expand Down Expand Up @@ -206,6 +208,19 @@ def histogram(self, metric_name, value, timestamp=None, tags=None, sample_rate=1
self._metric_aggregator.add_point(metric_name, tags, timestamp or time(), value,
Histogram, sample_rate=sample_rate, host=host)

def distribution(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None):
"""
Sample a distribution value. Distributions will produce metrics that
describe the distribution of the recorded values, namely the maximum,
median, average, count and the 50/75/90/95/99 percentiles. Optionally,
specify a list of ``tags`` to associate with the metric.

>>> stats.distribution('uploaded_file.size', uploaded_file.size())
"""
if not self._disabled:
self._metric_aggregator.add_point(metric_name, tags, timestamp or time(), value,
Distribution, sample_rate=sample_rate, host=host)

def timing(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None):
"""
Record a timing, optionally setting tags and a sample rate.
Expand Down Expand Up @@ -289,7 +304,7 @@ def flush(self, timestamp=None):
self._is_flush_in_progress = True

# Process metrics
metrics = self._get_aggregate_metrics(timestamp or time())
metrics, dists = self._get_aggregate_metrics_and_dists(timestamp or time())
count_metrics = len(metrics)
if count_metrics:
self.flush_count += 1
Expand All @@ -298,6 +313,14 @@ def flush(self, timestamp=None):
else:
log.debug("No metrics to flush. Continuing.")

count_dists = len(dists)
if count_dists:
self.flush_count += 1
log.debug("Flush #%s sending %s distributions" % (self.flush_count, count_dists))
self.reporter.flush_distributions(dists)
else:
log.debug("No distributions to flush. Continuing.")

# Process events
events = self._get_aggregate_events()
count_events = len(events)
Expand All @@ -317,7 +340,7 @@ def flush(self, timestamp=None):
finally:
self._is_flush_in_progress = False

def _get_aggregate_metrics(self, flush_time=None):
def _get_aggregate_metrics_and_dists(self, flush_time=None):
"""
Get, format and return the rolled up metrics from the aggregator.
"""
Expand All @@ -326,6 +349,7 @@ def _get_aggregate_metrics(self, flush_time=None):

# FIXME: emit a dictionary from the aggregator
metrics = []
dists = []
for timestamp, value, name, tags, host, metric_type, interval in rolled_up_metrics:
metric_tags = tags
metric_name = name
Expand All @@ -350,8 +374,11 @@ def _get_aggregate_metrics(self, flush_time=None):
'tags': metric_tags,
'interval': interval
}
metrics.append(metric)
return metrics
if metric_type == MetricType.Distribution:
dists.append(metric)
else:
metrics.append(metric)
return (metrics, dists)

def _get_aggregate_events(self):
# Get events
Expand Down
1 change: 1 addition & 0 deletions datadog/threadstats/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ class MetricType(object):
Counter = "counter"
Histogram = "histogram"
Rate = "rate"
Distribution = "distribution"


class MonitorType(object):
Expand Down
19 changes: 19 additions & 0 deletions datadog/threadstats/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ def flush(self, timestamp, interval):
self.tags, self.host, MetricType.Rate, interval)]


class Distribution(Metric):
""" A distribution metric. """

stats_tag = 'd'

def __init__(self, name, tags, host):
self.name = name
self.tags = tags
self.host = host
self.value = []

def add_point(self, value):
self.value.append(value)

def flush(self, timestamp, interval):
return [(timestamp, self.value, self.name, self.tags,
self.host, MetricType.Distribution, interval)]


class Histogram(Metric):
""" A histogram metric. """

Expand Down
3 changes: 3 additions & 0 deletions datadog/threadstats/reporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def flush(self, metrics):

class HttpReporter(Reporter):

def flush_distributions(self, distributions):
api.Distribution.send(distributions)

def flush_metrics(self, metrics):
api.Metric.send(metrics)

Expand Down
Loading