diff --git a/.gitignore b/.gitignore index 15752f34b4..0d47938e13 100644 --- a/.gitignore +++ b/.gitignore @@ -1,18 +1,162 @@ -*.pyc -*~ +.nicesetup + client.cfg -build -dist -luigi.egg-info + +hadoop_test.py +minicluster.py +mrrunner.py + packages.tar + test/data -.nicesetup -.tox + +Vagrantfile + *.pickle *.rej *.orig -.DS_Store -.idea/ + + +# Created by https://www.gitignore.io + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.cache +nosetests.xml +coverage.xml + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + + +### Vim ### +[._]*.s[a-w][a-z] +[._]s[a-w][a-z] +*.un~ +Session.vim +.netrwhist +*~ + + +### PyCharm ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm + *.iml -Vagrantfile + +## Directory-based project format: +.idea/ +# if you remove the above rule, at least ignore the following: + +# User-specific stuff: +# .idea/workspace.xml +# .idea/tasks.xml +# .idea/dictionaries + +# Sensitive or high-churn files: +# .idea/dataSources.ids +# .idea/dataSources.xml +# .idea/sqlDataSources.xml +# .idea/dynamic.xml +# .idea/uiDesigner.xml + +# Gradle: +# .idea/gradle.xml +# .idea/libraries + +# Mongo Explorer plugin: +# .idea/mongoSettings.xml + +## File-based project format: +*.ipr +*.iws + +## Plugin-specific files: + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties + + +### Vagrant ### .vagrant/ + + +### OSX ### +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear on external disk +.Spotlight-V100 +.Trashes + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk diff --git a/.travis.yml b/.travis.yml index 020fb33605..e0724e2dde 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,6 +14,7 @@ env: - TOX_ENV=cdh - TOX_ENV=hdp - TOX_ENV=nonhdfs + - TOX_ENV=docs sudo: false diff --git a/README.rst b/README.rst index 838966f896..c21bcb785a 100644 --- a/README.rst +++ b/README.rst @@ -68,7 +68,7 @@ build up data files. Background ---------- -We use Luigi internally at `Spotify <http://www.spotify.com/>`_ to run +We use Luigi internally at `Spotify <https://www.spotify.com/us/>`_ to run thousands of tasks every day, organized in complex dependency graphs. Most of these tasks are Hadoop jobs. Luigi provides an infrastructure that powers all kinds of stuff including recommendations, toplists, A/B @@ -80,7 +80,7 @@ can help programmers focus on the most important bits and leave the rest Conceptually, Luigi is similar to `GNU Make <http://www.gnu.org/software/make/>`_ where you have certain tasks and these tasks in turn may have dependencies on other tasks. There are -also some similarities to `Oozie <http://incubator.apache.org/oozie/>`_ +also some similarities to `Oozie <http://oozie.apache.org/>`_ and `Azkaban <http://data.linkedin.com/opensource/azkaban>`_. One major difference is that Luigi is not just built specifically for Hadoop, and it's easy to extend it with other kinds of tasks. @@ -115,7 +115,7 @@ Who uses Luigi? Several companies have written blog posts or presentation about Luigi: -* `Spotify <http://www.slideshare.net/erikbern/luigi-presentation-nyc-data-science>`_ +* `Spotify : NYC Data Science <http://www.slideshare.net/erikbern/luigi-presentation-nyc-data-science>`_ * `Foursquare <http://www.slideshare.net/OpenAnayticsMeetup/luigi-presentation-17-23199897>`_ * `Mortar Data <https://help.mortardata.com/technologies/luigi>`_ * `Stripe <http://www.slideshare.net/PyData/python-as-part-of-a-production-machine-learning-stack-by-michael-manapat-pydata-sv-2014>`_ @@ -136,14 +136,14 @@ External links -------------- * `Documentation <http://luigi.readthedocs.org/>`_, including the `Luigi package documentation <http://luigi.readthedocs.org/en/latest/api/luigi.html>`_ (Read the Docs) -* `Mailing List <https://groups.google.com/forum/#!forum/luigi-user>`_ (Google Groups) +* `Mailing List <https://groups.google.com/d/forum/luigi-user/>`_ (Google Groups) * `Releases <https://pypi.python.org/pypi/luigi>`_ (PyPi) * `Source code <https://github.com/spotify/luigi>`_ (Github) Authors ------- -Luigi was built at `Spotify <http://www.spotify.com/>`_, mainly by +Luigi was built at `Spotify <https://www.spotify.com/us/>`_, mainly by `Erik Bernhardsson <https://github.com/erikbern>`_ and `Elias Freider <https://github.com/freider>`_, but many other people have contributed. diff --git a/doc/api_overview.rst b/doc/api_overview.rst index 9096d17160..5273cd40af 100644 --- a/doc/api_overview.rst +++ b/doc/api_overview.rst @@ -130,7 +130,7 @@ support command line interaction and make sure to convert the input to the corresponding type (i.e. datetime.date instead of a string). Setting parameter value for other classes ------------------------------------------ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ All parameters are also exposed on a class level on the command line interface. For instance, say you have classes TaskA and TaskB: diff --git a/doc/central_scheduler.rst b/doc/central_scheduler.rst index 793af9c5f3..a745497f9a 100644 --- a/doc/central_scheduler.rst +++ b/doc/central_scheduler.rst @@ -47,11 +47,11 @@ To enable the task history, specify ``record_task_history = True`` in the ``[scheduler]`` section of ``client.cfg`` and specify ``db_connection`` under ``[task_history]``. -The ``db_connection`` string is to used to configure the -`SQLAlchemy engine<http://docs.sqlalchemy.org/en/rel_0_9/core/engines.html>`_. +The ``db_connection`` string is to used to configure the `SQLAlchemy engine +<http://docs.sqlalchemy.org/en/rel_0_9/core/engines.html>`_. When starting up, -``luigid`` will create all the necessary tables using -`create_all<http://docs.sqlalchemy.org/en/rel_0_9/core/metadata.html#sqlalchemy.schema.MetaData.create_all>`_. +``luigid`` will create all the necessary tables using `create_all +<http://docs.sqlalchemy.org/en/rel_0_9/core/metadata.html#sqlalchemy.schema.MetaData.create_all>`_. Example configuration:: diff --git a/doc/conf.py b/doc/conf.py index 1f30c58a08..9100ab455e 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -74,7 +74,7 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -exclude_patterns = ['_build'] +exclude_patterns = ['_build', 'README.rst'] # The reST default role (used for this markup: `text`) to use for all # documents. @@ -149,7 +149,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +#html_static_path = ['_static'] # Add any extra paths that contain custom files (such as robots.txt or # .htaccess) here, relative to this directory. These files are copied diff --git a/doc/configuration.rst b/doc/configuration.rst index 6f330852bb..37999ac2d9 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -98,6 +98,7 @@ max-reschedules max-shown-tasks .. versionadded:: 1.0.20 + The maximum number of tasks returned in a task_list api call. This will restrict the number of tasks shown in any section in the visualiser. Small values can alleviate frozen browsers when there are @@ -160,6 +161,7 @@ worker-ping-interval worker-timeout .. versionadded:: 1.0.20 + Number of seconds after which to kill a task which has been running for too long. This provides a default value for all tasks, which can be overridden by setting the worker-timeout property in any task. This diff --git a/doc/example_top_artists.rst b/doc/example_top_artists.rst index b0437bee50..446ba39058 100644 --- a/doc/example_top_artists.rst +++ b/doc/example_top_artists.rst @@ -253,14 +253,14 @@ Launching *http://localhost:8082* should show something like this: .. figure:: web_server.png :alt: Web server screenshot - Web server screenshot +Web server screenshot Looking at the dependency graph for any of the tasks yields something like this: .. figure:: aggregate_artists.png :alt: Aggregate artists screenshot - Aggregate artists screenshot +Aggregate artists screenshot In case your job crashes remotely due to any Python exception, Luigi will try to fetch the traceback and print it on standard output. You need `Mechanize <http://wwwsearch.sourceforge.net/mechanize/>`__ for it diff --git a/doc/index.rst b/doc/index.rst index a540eb5d31..aa4aa95ecc 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -3,7 +3,7 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -.. include:: ../README.rst +.. include:: README.rst Table of Contents ----------------- @@ -30,6 +30,7 @@ API Reference luigi luigi.contrib + luigi.tools Indices and tables diff --git a/luigi/contrib/mysqldb.py b/luigi/contrib/mysqldb.py index 2a04d857f2..83573871fa 100644 --- a/luigi/contrib/mysqldb.py +++ b/luigi/contrib/mysqldb.py @@ -21,15 +21,17 @@ class MySqlTarget(luigi.Target): def __init__(self, host, database, user, password, table, update_id): """ + Initializes a MySqlTarget instance. + :param host: MySql server address. Possibly a host:port string. :type host: str - :param database: Database name + :param database: database name. :type database: str - :param user: Database user + :param user: database user :type user: str - :param password : Password for specified user + :param password: password for specified user. :type password: str - :param update_id: An identifier for this data set + :param update_id: an identifier for this data set. :type update_id: str """ if ':' in host: @@ -48,7 +50,7 @@ def touch(self, connection=None): """ Mark this update as complete. - Important: If the marker table doesn't exist, + IMPORTANT, If the marker table doesn't exist, the connection transaction will be aborted and the connection reset. Then the marker table will be created. """ diff --git a/luigi/contrib/redshift.py b/luigi/contrib/redshift.py index c0b73e5063..a015090f83 100644 --- a/luigi/contrib/redshift.py +++ b/luigi/contrib/redshift.py @@ -39,17 +39,17 @@ class S3CopyToTable(rdbms.CopyToTable): Template task for inserting a data set into Redshift from s3. Usage: - Subclass and override the required attributes: - - * `host`, - * `database`, - * `user`, - * `password`, - * `table`, - * `columns`, - * `aws_access_key_id`, - * `aws_secret_access_key`, - * `s3_load_path`. + + * Subclass and override the required attributes: + * `host`, + * `database`, + * `user`, + * `password`, + * `table`, + * `columns`, + * `aws_access_key_id`, + * `aws_secret_access_key`, + * `s3_load_path`. """ @abc.abstractproperty @@ -217,19 +217,19 @@ class S3CopyJSONToTable(S3CopyToTable): Usage: - Subclass and override the required attributes: - - * `host`, - * `database`, - * `user`, - * `password`, - * `table`, - * `columns`, - * `aws_access_key_id`, - * `aws_secret_access_key`, - * `s3_load_path`, - * `jsonpath`, - * `copy_json_options`. + * Subclass and override the required attributes: + + * `host`, + * `database`, + * `user`, + * `password`, + * `table`, + * `columns`, + * `aws_access_key_id`, + * `aws_secret_access_key`, + * `s3_load_path`, + * `jsonpath`, + * `copy_json_options`. """ @abc.abstractproperty @@ -270,20 +270,20 @@ class RedshiftManifestTask(S3PathTask): in S3CopyToTable in order to copy multiple files from your s3 folder into a redshift table at once. - For full description on how to use the manifest file see: + For full description on how to use the manifest file see http://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html Usage: - Requires parameters - path - s3 path to the generated manifest file, including the - name of the generated file - to be copied into a redshift table - folder_paths - s3 paths to the folders containing files - you wish to be copied + * requires parameters + * path - s3 path to the generated manifest file, including the + name of the generated file + to be copied into a redshift table + * folder_paths - s3 paths to the folders containing files you wish to be copied Output: - generated manifest file + + * generated manifest file """ # should be over ridden to point to a variety diff --git a/luigi/db_task_history.py b/luigi/db_task_history.py index cb998aac67..1c2fed0a90 100644 --- a/luigi/db_task_history.py +++ b/luigi/db_task_history.py @@ -12,20 +12,23 @@ # License for the specific language governing permissions and limitations under # the License. -import task_history import configuration import datetime import logging +import sqlalchemy +import sqlalchemy.ext.declarative +import sqlalchemy.orm +import sqlalchemy.orm.collections + +import task_history + + from contextlib import contextmanager from task_status import PENDING, FAILED, DONE, RUNNING -from sqlalchemy.orm.collections import attribute_mapped_collection -from sqlalchemy import Column, Integer, String, ForeignKey, TIMESTAMP, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker, relationship -Base = declarative_base() +Base = sqlalchemy.ext.declarative.declarative_base() logger = logging.getLogger('luigi-interface') @@ -52,8 +55,8 @@ def _session(self, session=None): def __init__(self): config = configuration.get_config() connection_string = config.get('task_history', 'db_connection') - self.engine = create_engine(connection_string) - self.session_factory = sessionmaker(bind=self.engine, expire_on_commit=False) + self.engine = sqlalchemy.create_engine(connection_string) + self.session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine, expire_on_commit=False) Base.metadata.create_all(self.engine) self.tasks = {} # task_id -> TaskRecord @@ -144,9 +147,9 @@ class TaskParameter(Base): Table to track luigi.Parameter()s of a Task. """ __tablename__ = 'task_parameters' - task_id = Column(Integer, ForeignKey('tasks.id'), primary_key=True) - name = Column(String(128), primary_key=True) - value = Column(String(256)) + task_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey('tasks.id'), primary_key=True) + name = sqlalchemy.Column(sqlalchemy.String(128), primary_key=True) + value = sqlalchemy.Column(sqlalchemy.String(256)) def __repr__(self): return "TaskParameter(task_id=%d, name=%s, value=%s)" % (self.task_id, self.name, self.value) @@ -157,10 +160,10 @@ class TaskEvent(Base): Table to track when a task is scheduled, starts, finishes, and fails. """ __tablename__ = 'task_events' - id = Column(Integer, primary_key=True) - task_id = Column(Integer, ForeignKey('tasks.id')) - event_name = Column(String(20)) - ts = Column(TIMESTAMP, index=True) + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + task_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey('tasks.id')) + event_name = sqlalchemy.Column(sqlalchemy.String(20)) + ts = sqlalchemy.Column(sqlalchemy.TIMESTAMP, index=True) def __repr__(self): return "TaskEvent(task_id=%s, event_name=%s, ts=%s" % (self.task_id, self.event_name, self.ts) @@ -173,12 +176,17 @@ class TaskRecord(Base): References to other tables are available through task.events, task.parameters, etc. """ __tablename__ = 'tasks' - id = Column(Integer, primary_key=True) - name = Column(String(128), index=True) - host = Column(String(128)) - parameters = relationship('TaskParameter', collection_class=attribute_mapped_collection('name'), - cascade="all, delete-orphan") - events = relationship("TaskEvent", order_by=lambda: TaskEvent.ts.desc(), backref="task") + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + name = sqlalchemy.Column(sqlalchemy.String(128), index=True) + host = sqlalchemy.Column(sqlalchemy.String(128)) + parameters = sqlalchemy.orm.relationship( + 'TaskParameter', + collection_class=sqlalchemy.orm.collections.attribute_mapped_collection('name'), + cascade="all, delete-orphan") + events = sqlalchemy.orm.relationship( + 'TaskEvent', + order_by=lambda: TaskEvent.ts.desc(), + backref='task') def __repr__(self): return "TaskRecord(name=%s, host=%s)" % (self.name, self.host) diff --git a/luigi/hadoop.py b/luigi/hadoop.py index 3edc32e91a..4f97d75b27 100644 --- a/luigi/hadoop.py +++ b/luigi/hadoop.py @@ -308,7 +308,7 @@ def fetch_task_failures(tracking_url): If it does not, it's not the end of the world. TODO: Yarn has a REST API that we should probably use instead: - http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/MapredAppMasterRest.html + http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html """ import mechanize timeout = 3.0 diff --git a/luigi/hdfs.py b/luigi/hdfs.py index 2f42ceb256..398899824d 100644 --- a/luigi/hdfs.py +++ b/luigi/hdfs.py @@ -166,7 +166,9 @@ def rename_dont_move(self, path, dest): We keep the interface simple by just aliasing this to normal rename and let individual implementations redefine the method. - rename2: https://github.com/apache/hadoop/blob/ae91b13/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java#L483-L523 + rename2 - + https://github.com/apache/hadoop/blob/ae91b13/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java + (lines 483-523) """ warnings.warn("Configured HDFS client doesn't support rename_dont_move, using normal mv operation instead.") if self.exists(dest): diff --git a/luigi/interface.py b/luigi/interface.py index b6f1d17f9d..9834486571 100644 --- a/luigi/interface.py +++ b/luigi/interface.py @@ -119,8 +119,11 @@ def parse(self): @staticmethod def run(tasks, worker_scheduler_factory=None, override_defaults=None): """ - :return: True if all tasks and their dependencies were successfully run (or already completed) - False if any error occurred + :param tasks: + :param worker_scheduler_factory: + :param override_defaults: + :return: True if all tasks and their dependencies were successfully run (or already completed); + False if any error occurred. """ if worker_scheduler_factory is None: @@ -307,8 +310,7 @@ class PassThroughOptionParser(optparse.OptionParser): """ An unknown option pass-through implementation of OptionParser. - When unknown arguments are encountered, bundle with largs and try again, - until rargs is depleted. + When unknown arguments are encountered, bundle with largs and try again, until rargs is depleted. sys.exit(status) will still be called if a known argument is passed incorrectly (e.g. missing arguments or bad argument types, etc.) @@ -378,13 +380,21 @@ def load_task(module, task_name, params_str): return task_cls.from_str_params(params_str) -def run(cmdline_args=None, existing_optparse=None, use_optparse=False, main_task_cls=None, worker_scheduler_factory=None, use_dynamic_argparse=False, local_scheduler=False): +def run(cmdline_args=None, existing_optparse=None, use_optparse=False, main_task_cls=None, + worker_scheduler_factory=None, use_dynamic_argparse=False, local_scheduler=False): """ Run from cmdline. - The default parser uses argparse. - However for legacy reasons we support optparse that optionally allows for - overriding an existing option parser with new args. + The default parser uses argparse however, for legacy reasons, + we support optparse that optionally allows for overriding an existing option parser with new args. + + :param cmdline_args: + :param existing_optparse: + :param use_optparse: + :param main_task_cls: + :param worker_scheduler_factory: + :param use_dynamic_argparse: + :param local_scheduler: """ if use_optparse: interface = OptParseInterface(existing_optparse) @@ -404,8 +414,11 @@ def build(tasks, worker_scheduler_factory=None, **env_params): Run internally, bypassing the cmdline parsing. Useful if you have some luigi code that you want to run internally. - Example - luigi.build([MyTask1(), MyTask2()], local_scheduler=True) + Example: + + .. code-block:: python + + luigi.build([MyTask1(), MyTask2()], local_scheduler=True) One notable difference is that `build` defaults to not using the identical process lock. Otherwise, `build` would only be diff --git a/luigi/lock.py b/luigi/lock.py index 429ce03925..afaec9abdf 100644 --- a/luigi/lock.py +++ b/luigi/lock.py @@ -21,7 +21,6 @@ def getpcmd(pid): Returns command of process. :param pid: - :return """ cmd = 'ps -p %s -o command=' % (pid,) p = os.popen(cmd, 'r') diff --git a/tox.ini b/tox.ini index e7d919b223..115705b05f 100644 --- a/tox.ini +++ b/tox.ini @@ -30,13 +30,34 @@ commands = pep8 --ignore E501 luigi test examples bin deps = autopep8 commands = autopep8 --ignore E309,E501 -a -i -r luigi test examples bin -[testenv:docsapi] -deps = sphinx -commands = sphinx-apidoc -o doc/api -T luigi - [testenv:docs] +# Build documentation using sphinx. +# Call this using `tox -e docs`. deps = sqlalchemy Sphinx sphinx_rtd_theme -commands = sphinx-build -b html -d {envtmpdir}/doctrees doc doc/_build/html +commands = + # build API docs + sphinx-apidoc -o doc/api -T luigi + + # sync README.rst file + cp README.rst doc/README.rst + # github images are local + sed -i.orig 's/https\:\/\/raw\.githubusercontent\.com\/spotify\/luigi\/master\/doc\///g' doc/README.rst + # remove badges + sed -i.orig '/.. image::/d' doc/README.rst + sed -i.orig '/:target:/d' doc/README.rst + rm doc/README.rst.orig + + # check URLs in docs + sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees doc doc/_build/linkcheck + + # build HTML docs + sphinx-build -W -b html -d {envtmpdir}/doctrees doc doc/_build/html + +whitelist_externals = + cp + sed + rm +