Skip to content

Commit

Permalink
Add Datadog contrib for monitoring purpose
Browse files Browse the repository at this point in the history
Datadog is a tool that allows you to send metrics that you create
dashboard and add alerting on specific behaviors.

Adding this contrib will allow for users of this tool to log their pipeline
information to Datadog.

Based on the status change of a task, we log that information to Datadog
with the parameters that were used to run that specific task.

This allow us to easily create dashboard to visualize the health. For
example, we can be notified via Datadog if a task has failed, or we can
graph the execution time of a specific task over a period of time.

The implementation idea was strongly based on the stale PR
#2044.
  • Loading branch information
thisiscab committed May 14, 2018
1 parent 1b2f0fd commit 5ed031c
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
117 changes: 117 additions & 0 deletions luigi/contrib/datadog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config

from datadog import initialize, api, statsd


class datadog(Config):
api_key = parameter.Parameter(default='dummy_api_key', description='API key provided by Datadog')
app_key = parameter.Parameter(default='dummy_app_key', description='APP key provided by Datadog')
default_tags = parameter.Parameter(default='application:luigi', description='Default tags for every events and metrics sent to Datadog')
environment = parameter.Parameter(default='development', description="Environment of which the pipeline is ran from (eg: 'production', 'staging', ...")
metric_namespace = parameter.Parameter(default='luigi', description="Default namespace for events and metrics (eg: 'luigi' for 'luigi.task.started')")
statsd_host = parameter.Parameter(default='localhost', description='StatsD host implementing the Datadog service')
statsd_port = parameter.IntParameter(default=8125, description='StatsD port implementing the Datadog service')


class DatadogMetricsCollector(MetricsCollector):
def __init__(self, *args, **kwargs):
super(DatadogMetricsCollector, self).__init__(*args, **kwargs)
self._config = datadog(**kwargs)

initialize(api_key=self._config.api_key,
app_key=self._config.app_key,
statsd_host=self._config.statsd_host,
statsd_port=self._config.statsd_port)

def handle_task_started(self, task):
title = "Luigi: A task has been started!"
text = "A task has been started in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self.send_increment('task.started', tags=tags)

event_tags = tags + ["task_state:STARTED"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')

def handle_task_failed(self, task):
title = "Luigi: A task has failed!"
text = "A task has failed in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self.send_increment('task.failed', tags=tags)

event_tags = tags + ["task_state:FAILED"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')

def handle_task_disabled(self, task, config):
title = "Luigi: A task has been disabled!"
text = """A task has been disabled in the pipeline named: {name}.
The task has failed {failures} times in the last {window}
seconds, so it is being disabled for {persist} seconds.""".format(
name=task.family,
persist=config.disable_persist,
failures=task.retry_policy.retry_count,
window=config.disable_window
)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self.send_increment('task.disabled', tags=tags)

event_tags = tags + ["task_state:DISABLED"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')

def handle_task_done(self, task):
# The task is already done -- Let's not re-create an event
if task.time_running is None:
return

title = "Luigi: A task has been completed!"
text = "A task has completed in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

time_elapse = task.updated - task.time_running

self.send_increment('task.done', tags=tags)
self.send_gauge('task.execution_time', time_elapse, tags=tags)

event_tags = tags + ["task_state:DONE"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')

def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'):
all_tags = tags + self.default_tags()

api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority)

def send_gauge(self, metric_name, value, tags=[]):
all_tags = tags + self.default_tags()

namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace,
metric_name=metric_name)
statsd.gauge(namespaced_metric, value, tags=all_tags)

def send_increment(self, metric_name, value=1, tags=[]):
all_tags = tags + self.default_tags()

namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace,
metric_name=metric_name)
statsd.increment(namespaced_metric, value, tags=all_tags)

def _format_task_params_to_tags(self, task):
params = []
for key, value in task.params.items():
params.append("{key}:{value}".format(key=key, value=value))

return params

def default_tags(self):
default_tags = []

env_tag = "environment:{environment}".format(environment=self._config.environment)
default_tags.append(env_tag)

if self._config.default_tags:
default_tags = default_tags + str.split(self._config.default_tags, ',')

return default_tags
18 changes: 18 additions & 0 deletions luigi/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class MetricsCollector(object):
"""Dummy MetricsCollecter base class that can be replace by tool specific
implementation.
"""
def __init__(self):
pass

def handle_task_started(self, task):
pass

def handle_task_failed(self, task):
pass

def handle_task_disabled(self, task, config):
pass

def handle_task_done(self, task):
pass
44 changes: 44 additions & 0 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class scheduler(Config):

pause_enabled = parameter.BoolParameter(default=True)

metrics_collection = parameter.Parameter(default='')

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)

Expand Down Expand Up @@ -434,6 +436,7 @@ def __init__(self, state_path):
self._status_tasks = collections.defaultdict(dict)
self._active_workers = {} # map from id to a Worker object
self._task_batchers = {}
self._metrics_collector = None

def get_state(self):
return self._tasks, self._active_workers, self._task_batchers
Expand Down Expand Up @@ -553,8 +556,10 @@ def set_status(self, task, new_status, config=None):
if new_status == FAILED and task.status != DISABLED:
task.add_failure()
if task.has_excessive_failures():
self.update_metrics_task_failed(task)
task.scheduler_disable_time = time.time()
new_status = DISABLED
self.update_metrics_task_disabled(task, config)
if not config.batch_emails:
notifications.send_error_email(
'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
Expand All @@ -574,6 +579,9 @@ def set_status(self, task, new_status, config=None):
task.status = new_status
task.updated = time.time()

if new_status == DONE:
self.update_metrics_task_done(task)

if new_status == FAILED:
task.retry = time.time() + config.retry_delay
if remove_on_failure:
Expand Down Expand Up @@ -656,6 +664,18 @@ def disable_workers(self, worker_ids):
worker.disabled = True
worker.tasks.clear()

def update_metrics_task_started(self, task):
self._metrics_collector.handle_task_started(task)

def update_metrics_task_disabled(self, task, config):
self._metrics_collector.handle_task_disabled(task, config)

def update_metrics_task_failed(self, task):
self._metrics_collector.handle_task_failed(task)

def update_metrics_task_done(self, task):
self._metrics_collector.handle_task_done(task)


class Scheduler(object):
"""
Expand Down Expand Up @@ -689,6 +709,13 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs
if self._config.batch_emails:
self._email_batcher = BatchNotifier()

if self._config.metrics_collection == 'datadog':
import luigi.contrib.datadog as datadog
self._state._metrics_collector = datadog.DataDogMetricsCollector(self)
else:
from luigi.metrics import MetricsCollector
self._state._metrics_collector = MetricsCollector(self)

def load(self):
self._state.load()

Expand Down Expand Up @@ -1186,6 +1213,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
reply['batch_task_ids'] = [task.id for task in batched_tasks]

elif best_task:
self.update_metrics_task_started(best_task)
self._state.set_status(best_task, RUNNING, self._config)
best_task.worker_running = worker_id
best_task.resources_running = best_task.resources.copy()
Expand Down Expand Up @@ -1576,3 +1604,19 @@ def _update_task_history(self, task, status, host=None):
def task_history(self):
# Used by server.py to expose the calls
return self._task_history

@rpc_method()
def update_metrics_task_started(self, task):
self._state.update_metrics_task_started(task)

@rpc_method()
def update_metrics_task_disabled(self, task):
self._state.update_metrics_task_disabled(task)

@rpc_method()
def update_metrics_task_failed(self, task):
self._state.update_metrics_task_failed(task)

@rpc_method()
def update_metrics_task_done(self, task):
self._state.update_metrics_task_done(task)

0 comments on commit 5ed031c

Please sign in to comment.