Skip to content

Commit

Permalink
Merge branch 'master' into dynamicResources.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Mar 2, 2018
2 parents f2620c2 + 78cbd19 commit e7967af
Show file tree
Hide file tree
Showing 12 changed files with 801 additions and 79 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ pig_property_file

packages.tar

# Ignore the data files
data
test/data
examples/data

Vagrantfile

*.pickle
*.rej
*.orig


# Created by https://www.gitignore.io

### Python ###
Expand Down
38 changes: 0 additions & 38 deletions doc/command_line.rst

This file was deleted.

2 changes: 1 addition & 1 deletion doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Table of Contents
workflows.rst
tasks.rst
parameters.rst
command_line.rst
running_luigi.rst
central_scheduler.rst
execution_model.rst
luigi_patterns.rst
Expand Down
109 changes: 109 additions & 0 deletions doc/running_luigi.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
.. _RunningLuigi:

Running from the Command Line
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The prefered way to run Luigi tasks is through the ``luigi`` command line tool
that will be installed with the pip package.

.. code-block:: python
# my_module.py, available in your sys.path
import luigi
class MyTask(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=45)
def run(self):
print self.x + self.y
Should be run like this

.. code-block:: console
$ luigi --module my_module MyTask --x 123 --y 456 --local-scheduler
Or alternatively like this:

.. code-block:: console
$ python -m luigi --module my_module MyTask --x 100 --local-scheduler
Note that if a parameter name contains '_', it should be replaced by '-'.
For example, if MyTask had a parameter called 'my_parameter':

.. code-block:: console
$ luigi --module my_module MyTask --my-parameter 100 --local-scheduler
Running from Python code
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Another way to start tasks from Python code is using ``luigi.build(tasks, worker_scheduler_factory=None, **env_params)``
from ``luigi.interface`` module.

This way of running luigi tasks is useful if you want to get some dynamic parameters from another
source, such as database, or provide additional logic before you start tasks.

One notable difference is that ``build`` defaults to not using the identical process lock.
If you want to change this behaviour, just pass ``no_lock=False``.


.. code-block:: python
class MyTask1(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=0)
def run(self):
print self.x + self.y
class MyTask2(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=1)
z = luigi.IntParameter(default=2)
def run(self):
print self.x * self.y * self.z
if __name__ == '__main__':
luigi.build([MyTask1(x=10), MyTask2(x=15, z=3)])
Also, it is possible to pass additional parameters to ``build`` such as host, port, workers and local_scheduler:

.. code-block:: python
if __name__ == '__main__':
luigi.build([MyTask1(x=1)], worker=5)
To achieve some special requirements you can pass to ``build`` your ``worker_scheduler_factory``
which will return your worker and\or scheduler implementations:

.. code-block:: python
class MyWorker(Worker):
# some custom logic
class MyFactory(object):
def create_local_scheduler(self):
return scheduler.Scheduler(prune_on_get_work=True, record_task_history=False)
def create_remote_scheduler(self, url):
return rpc.RemoteScheduler(url)
def create_worker(self, scheduler, worker_processes, assistant=False):
# return your worker instance
return MyWorker(
scheduler=scheduler, worker_processes=worker_processes, assistant=assistant)
if __name__ == '__main__':
luigi.build([MyTask1(x=1), worker_scheduler_factory=MyFactory())
In some cases (like task queue) it may be useful.
216 changes: 216 additions & 0 deletions luigi/contrib/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# -*- coding: utf-8 -*-
#
# Copyright 2018 Outlier Bio, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
AWS Batch wrapper for Luigi
From the AWS website:
AWS Batch enables you to run batch computing workloads on the AWS Cloud.
Batch computing is a common way for developers, scientists, and engineers
to access large amounts of compute resources, and AWS Batch removes the
undifferentiated heavy lifting of configuring and managing the required
infrastructure. AWS Batch is similar to traditional batch computing
software. This service can efficiently provision resources in response to
jobs submitted in order to eliminate capacity constraints, reduce compute
costs, and deliver results quickly.
See `AWS Batch User Guide`_ for more details.
To use AWS Batch, you create a jobDefinition JSON that defines a `docker run`_
command, and then submit this JSON to the API to queue up the task. Behind the
scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances,
monitors the load on these instances, and schedules the jobs.
This `boto3-powered`_ wrapper allows you to create Luigi Tasks to submit Batch
``jobDefinition``s. You can either pass a dict (mapping directly to the
``jobDefinition`` JSON) OR an Amazon Resource Name (arn) for a previously
registered ``jobDefinition``.
Requires:
- boto3 package
- Amazon AWS credentials discoverable by boto3 (e.g., by using ``aws configure``
from awscli_)
- An enabled AWS Batch job queue configured to run on a compute environment.
Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)
.. _`docker run`: https://docs.docker.com/reference/commandline/run
.. _jobDefinition: http://http://docs.aws.amazon.com/batch/latest/userguide/job_definitions.html
.. _`boto3-powered`: https://boto3.readthedocs.io
.. _awscli: https://aws.amazon.com/cli
.. _`AWS Batch User Guide`: http://docs.aws.amazon.com/AmazonECS/latest/developerguide/ECS_GetStarted.html
"""

import json
import logging
import random
import string
import time

import luigi
logger = logging.getLogger(__name__)

try:
import boto3
except ImportError:
logger.warning('boto3 is not installed. BatchTasks require boto3')


class BatchJobException(Exception):
pass


POLL_TIME = 10


def _random_id():
return 'batch-job-' + ''.join(random.sample(string.ascii_lowercase, 8))


class BatchClient(object):

def __init__(self, poll_time=POLL_TIME):
self.poll_time = poll_time
self._client = boto3.client('batch')
self._log_client = boto3.client('logs')
self._queue = self.get_active_queue()

def get_active_queue(self):
"""Get name of first active job queue"""

# Get dict of active queues keyed by name
queues = {q['jobQueueName']: q for q in self._client.describe_job_queues()['jobQueues']
if q['state'] == 'ENABLED' and q['status'] == 'VALID'}
if not queues:
raise Exception('No job queues with state=ENABLED and status=VALID')

# Pick the first queue as default
return list(queues.keys())[0]

def get_job_id_from_name(self, job_name):
"""Retrieve the first job ID matching the given name"""
jobs = self._client.list_jobs(jobQueue=self._queue, jobStatus='RUNNING')['jobSummaryList']
matching_jobs = [job for job in jobs if job['jobName'] == job_name]
if matching_jobs:
return matching_jobs[0]['jobId']

def get_job_status(self, job_id):
"""Retrieve task statuses from ECS API
:param job_id (str): AWS Batch job uuid
Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}
"""
response = self._client.describe_jobs(jobs=[job_id])

# Error checking
status_code = response['ResponseMetadata']['HTTPStatusCode']
if status_code != 200:
msg = 'Job status request received status code {0}:\n{1}'
raise Exception(msg.format(status_code, response))

return response['jobs'][0]['status']

def get_logs(self, log_stream_name, get_last=50):
"""Retrieve log stream from CloudWatch"""
response = self._log_client.get_log_events(
logGroupName='/aws/batch/job',
logStreamName=log_stream_name,
startFromHead=False)
events = response['events']
return '\n'.join(e['message'] for e in events[-get_last:])

def submit_job(self, job_definition, parameters, job_name=None, queue=None):
"""Wrap submit_job with useful defaults"""
if job_name is None:
job_name = _random_id()
response = self._client.submit_job(
jobName=job_name,
jobQueue=queue or self.get_active_queue(),
jobDefinition=job_definition,
parameters=parameters
)
return response['jobId']

def wait_on_job(self, job_id):
"""Poll task status until STOPPED"""

while True:
status = self.get_job_status(job_id)
if status == 'SUCCEEDED':
logger.info('Batch job {} SUCCEEDED'.format(job_id))
return True
elif status == 'FAILED':
# Raise and notify if job failed
jobs = self._client.describe_jobs(jobs=[job_id])['jobs']
job_str = json.dumps(jobs, indent=4)
logger.debug('Job details:\n' + job_str)

log_stream_name = jobs[0]['attempts'][0]['container']['logStreamName']
logs = self.get_logs(log_stream_name)
raise BatchJobException('Job {} failed: {}'.format(
job_id, logs))

time.sleep(self.poll_time)
logger.debug('Batch job status for job {0}: {1}'.format(
job_id, status))

def register_job_definition(self, json_fpath):
"""Register a job definition with AWS Batch, using a JSON"""
with open(json_fpath) as f:
job_def = json.load(f)
response = self._client.register_job_definition(**job_def)
status_code = response['ResponseMetadata']['HTTPStatusCode']
if status_code != 200:
msg = 'Register job definition request received status code {0}:\n{1}'
raise Exception(msg.format(status_code, response))
return response


class BatchTask(luigi.Task):

"""
Base class for an Amazon Batch job
Amazon Batch requires you to register "job definitions", which are JSON
descriptions for how to issue the ``docker run`` command. This Luigi Task
requires a pre-registered Batch jobDefinition name passed as a Parameter
:param job_definition (str): name of pre-registered jobDefinition
:param job_name: name of specific job, for tracking in the queue and logs.
"""
job_definition = luigi.Parameter()
job_name = luigi.OptionalParameter(default=None)
poll_time = luigi.IntParameter(default=POLL_TIME)

def run(self):
bc = BatchClient(self.poll_time)
job_id = bc.submit_job(
self.job_definition,
self.parameters,
job_name=self.job_name)
bc.wait_on_job(job_id)

@property
def parameters(self):
"""Override to return a dict of parameters for the Batch Task"""
return {}
Loading

0 comments on commit e7967af

Please sign in to comment.