diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py new file mode 100644 index 0000000000..8d5ca8d5c9 --- /dev/null +++ b/luigi/contrib/datadog.py @@ -0,0 +1,117 @@ +from luigi import parameter +from luigi.metrics import MetricsCollector +from luigi.task import Config + +from datadog import initialize, api, statsd + + +class datadog(Config): + api_key = parameter.Parameter(default='dummy_api_key', description='API key provided by Datadog') + app_key = parameter.Parameter(default='dummy_app_key', description='APP key provided by Datadog') + default_tags = parameter.Parameter(default='application:luigi', description='Default tags for every events and metrics sent to Datadog') + environment = parameter.Parameter(default='development', description="Environment of which the pipeline is ran from (eg: 'production', 'staging', ...") + metric_namespace = parameter.Parameter(default='luigi', description="Default namespace for events and metrics (eg: 'luigi' for 'luigi.task.started')") + statsd_host = parameter.Parameter(default='localhost', description='StatsD host implementing the Datadog service') + statsd_port = parameter.IntParameter(default=8125, description='StatsD port implementing the Datadog service') + + +class DatadogMetricsCollector(MetricsCollector): + def __init__(self, *args, **kwargs): + super(DatadogMetricsCollector, self).__init__(*args, **kwargs) + self._config = datadog(**kwargs) + + initialize(api_key=self._config.api_key, + app_key=self._config.app_key, + statsd_host=self._config.statsd_host, + statsd_port=self._config.statsd_port) + + def handle_task_started(self, task): + title = "Luigi: A task has been started!" + text = "A task has been started in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('task.started', tags=tags) + + event_tags = tags + ["task_state:STARTED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') + + def handle_task_failed(self, task): + title = "Luigi: A task has failed!" + text = "A task has failed in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('task.failed', tags=tags) + + event_tags = tags + ["task_state:FAILED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') + + def handle_task_disabled(self, task, config): + title = "Luigi: A task has been disabled!" + text = """A task has been disabled in the pipeline named: {name}. + The task has failed {failures} times in the last {window} + seconds, so it is being disabled for {persist} seconds.""".format( + name=task.family, + persist=config.disable_persist, + failures=task.retry_policy.retry_count, + window=config.disable_window + ) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('task.disabled', tags=tags) + + event_tags = tags + ["task_state:DISABLED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') + + def handle_task_done(self, task): + # The task is already done -- Let's not re-create an event + if task.time_running is None: + return + + title = "Luigi: A task has been completed!" + text = "A task has completed in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + time_elapse = task.updated - task.time_running + + self.send_increment('task.done', tags=tags) + self.send_gauge('task.execution_time', time_elapse, tags=tags) + + event_tags = tags + ["task_state:DONE"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') + + def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'): + all_tags = tags + self.default_tags() + + api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority) + + def send_gauge(self, metric_name, value, tags=[]): + all_tags = tags + self.default_tags() + + namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, + metric_name=metric_name) + statsd.gauge(namespaced_metric, value, tags=all_tags) + + def send_increment(self, metric_name, value=1, tags=[]): + all_tags = tags + self.default_tags() + + namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, + metric_name=metric_name) + statsd.increment(namespaced_metric, value, tags=all_tags) + + def _format_task_params_to_tags(self, task): + params = [] + for key, value in task.params.items(): + params.append("{key}:{value}".format(key=key, value=value)) + + return params + + def default_tags(self): + default_tags = [] + + env_tag = "environment:{environment}".format(environment=self._config.environment) + default_tags.append(env_tag) + + if self._config.default_tags: + default_tags = default_tags + str.split(self._config.default_tags, ',') + + return default_tags diff --git a/luigi/metrics.py b/luigi/metrics.py new file mode 100644 index 0000000000..9a251ab92a --- /dev/null +++ b/luigi/metrics.py @@ -0,0 +1,18 @@ +class MetricsCollector(object): + """Dummy MetricsCollecter base class that can be replace by tool specific + implementation. + """ + def __init__(self): + pass + + def handle_task_started(self, task): + pass + + def handle_task_failed(self, task): + pass + + def handle_task_disabled(self, task, config): + pass + + def handle_task_done(self, task): + pass diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 405507028b..e216bc07ed 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -149,6 +149,8 @@ class scheduler(Config): pause_enabled = parameter.BoolParameter(default=True) + metrics_collection = parameter.Parameter(default='') + def _get_retry_policy(self): return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window) @@ -434,6 +436,7 @@ def __init__(self, state_path): self._status_tasks = collections.defaultdict(dict) self._active_workers = {} # map from id to a Worker object self._task_batchers = {} + self._metrics_collector = None def get_state(self): return self._tasks, self._active_workers, self._task_batchers @@ -553,8 +556,10 @@ def set_status(self, task, new_status, config=None): if new_status == FAILED and task.status != DISABLED: task.add_failure() if task.has_excessive_failures(): + self.update_metrics_task_failed(task) task.scheduler_disable_time = time.time() new_status = DISABLED + self.update_metrics_task_disabled(task, config) if not config.batch_emails: notifications.send_error_email( 'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id), @@ -574,6 +579,9 @@ def set_status(self, task, new_status, config=None): task.status = new_status task.updated = time.time() + if new_status == DONE: + self.update_metrics_task_done(task) + if new_status == FAILED: task.retry = time.time() + config.retry_delay if remove_on_failure: @@ -656,6 +664,18 @@ def disable_workers(self, worker_ids): worker.disabled = True worker.tasks.clear() + def update_metrics_task_started(self, task): + self._metrics_collector.handle_task_started(task) + + def update_metrics_task_disabled(self, task, config): + self._metrics_collector.handle_task_disabled(task, config) + + def update_metrics_task_failed(self, task): + self._metrics_collector.handle_task_failed(task) + + def update_metrics_task_done(self, task): + self._metrics_collector.handle_task_done(task) + class Scheduler(object): """ @@ -689,6 +709,13 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs if self._config.batch_emails: self._email_batcher = BatchNotifier() + if self._config.metrics_collection == 'datadog': + import luigi.contrib.datadog as datadog + self._state._metrics_collector = datadog.DataDogMetricsCollector(self) + else: + from luigi.metrics import MetricsCollector + self._state._metrics_collector = MetricsCollector(self) + def load(self): self._state.load() @@ -1186,6 +1213,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, reply['batch_task_ids'] = [task.id for task in batched_tasks] elif best_task: + self.update_metrics_task_started(best_task) self._state.set_status(best_task, RUNNING, self._config) best_task.worker_running = worker_id best_task.resources_running = best_task.resources.copy() @@ -1576,3 +1604,19 @@ def _update_task_history(self, task, status, host=None): def task_history(self): # Used by server.py to expose the calls return self._task_history + + @rpc_method() + def update_metrics_task_started(self, task): + self._state.update_metrics_task_started(task) + + @rpc_method() + def update_metrics_task_disabled(self, task): + self._state.update_metrics_task_disabled(task) + + @rpc_method() + def update_metrics_task_failed(self, task): + self._state.update_metrics_task_failed(task) + + @rpc_method() + def update_metrics_task_done(self, task): + self._state.update_metrics_task_done(task)