diff --git a/luigi/date_interval.py b/luigi/date_interval.py index 9362cf6968..24ca0c9a5b 100644 --- a/luigi/date_interval.py +++ b/luigi/date_interval.py @@ -25,7 +25,10 @@ class MyTask(luigi.Task): date_interval = luigi.DateIntervalParameter() -Now, you can launch this from the command line using ``--date-interval 2014-05-10`` or ``--date-interval 2014-W26`` (using week notation) or ``--date-interval 2014`` (for a year) and some other notations. +Now, you can launch this from the command line using +``--date-interval 2014-05-10`` or +``--date-interval 2014-W26`` (using week notation) or +``--date-interval 2014`` (for a year) and some other notations. """ from __future__ import division diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index 5b1544bc5c..b51265c835 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -33,10 +33,13 @@ def _partition_tasks(worker): pending_tasks = {task for(task, status, ext) in task_history if status == 'PENDING'} set_tasks = {} set_tasks["completed"] = {task for (task, status, ext) in task_history if status == 'DONE' and task in pending_tasks} - set_tasks["already_done"] = {task for (task, status, ext) in task_history if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]} + set_tasks["already_done"] = {task for (task, status, ext) in task_history + if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]} set_tasks["failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'} - set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and not ext} - set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and ext} + set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history + if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and not ext} + set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history + if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and ext} set_tasks["run_by_other_worker"] = set() set_tasks["upstream_failure"] = set() set_tasks["upstream_missing_dependency"] = set() @@ -75,7 +78,8 @@ def _depth_first_search(set_tasks, current_task, visited): if task in set_tasks["run_by_other_worker"] or task in set_tasks["upstream_run_by_other_worker"]: set_tasks["upstream_run_by_other_worker"].add(current_task) upstream_run_by_other_worker = True - if not upstream_failure and not upstream_missing_dependency and not upstream_run_by_other_worker and current_task not in set_tasks["run_by_other_worker"]: + if not upstream_failure and not upstream_missing_dependency and \ + not upstream_run_by_other_worker and current_task not in set_tasks["run_by_other_worker"]: set_tasks["unknown_reason"].add(current_task) @@ -97,7 +101,8 @@ def _get_str(task_dict, extra_indent): break if len(tasks[0].get_params()) == 0: row += '- {0} {1}()'.format(len(tasks), str(task_family)) - elif _get_len_of_params(tasks[0]) > 60 or (len(tasks) == 2 and len(tasks[0].get_params()) > 1 and (_get_len_of_params(tasks[0]) > 40 or len(str(tasks[0])) > 100)) or len(str(tasks[0])) > 200: + elif _get_len_of_params(tasks[0]) > 60 or len(str(tasks[0])) > 200 or \ + (len(tasks) == 2 and len(tasks[0].get_params()) > 1 and (_get_len_of_params(tasks[0]) > 40 or len(str(tasks[0])) > 100)): """ This is to make sure that there is no really long task in the output """ @@ -287,7 +292,9 @@ def _get_external_workers(worker): def _group_tasks_by_name_and_status(task_dict): """ - Takes a dictionary with sets of tasks grouped by their status and returns a dictionary with dictionaries with an array of tasks grouped by their status and task name + Takes a dictionary with sets of tasks grouped by their status and + returns a dictionary with dictionaries with an array of tasks grouped by + their status and task name """ group_status = {} for task in task_dict: @@ -309,7 +316,10 @@ def _summary_format(set_tasks, worker): for status, task_dict in set_tasks.items(): group_tasks[status] = _group_tasks_by_name_and_status(task_dict) comments = _get_comments(group_tasks) - num_all_tasks = len(set_tasks["already_done"]) + len(set_tasks["completed"]) + len(set_tasks["failed"]) + len(set_tasks["still_pending_ext"]) + len(set_tasks["still_pending_not_ext"]) + num_all_tasks = sum([len(set_tasks["already_done"]), + len(set_tasks["completed"]), len(set_tasks["failed"]), + len(set_tasks["still_pending_ext"]), + len(set_tasks["still_pending_not_ext"])]) str_output = '' str_output += 'Scheduled {0} tasks of which:\n'.format(num_all_tasks) for status in _ORDERED_STATUSES: diff --git a/luigi/task.py b/luigi/task.py index b092a6be44..cc1fbdb0f0 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -467,7 +467,9 @@ def on_failure(self, exception): Override for custom error handling. This method gets called if an exception is raised in :py:meth:`run`. - Return value of this method is json encoded and sent to the scheduler as the `expl` argument. Its string representation will be used as the body of the error email sent out if any. + Return value of this method is json encoded and sent to the scheduler + as the `expl` argument. Its string representation will be used as the + body of the error email sent out if any. Default behavior is to return a string representation of the stack trace. """ diff --git a/luigi/tools/range.py b/luigi/tools/range.py index 9836bccd73..6b49aa6315 100644 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -237,7 +237,13 @@ class RangeDailyBase(RangeBase): description="ending date, exclusive. Default: None - work forward forever") days_back = luigi.IntParameter( default=100, # slightly more than three months - description="extent to which contiguousness is to be assured into past, in days from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit") + description=("extent to which contiguousness is to be assured into " + "past, in days from current time. Prevents infinite loop " + "when start is none. If the dataset has limited retention" + " (i.e. old outputs get removed), this should be set " + "shorter to that, too, to prevent the oldest outputs " + "flapping. Increase freely if you intend to process old " + "dates - worker's memory is the limit")) days_forward = luigi.IntParameter( default=0, description="extent to which contiguousness is to be assured into future, in days from current time. Prevents infinite loop when stop is none") @@ -280,7 +286,13 @@ class RangeHourlyBase(RangeBase): description="ending datehour, exclusive. Default: None - work forward forever") hours_back = luigi.IntParameter( default=100 * 24, # slightly more than three months - description="extent to which contiguousness is to be assured into past, in hours from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit") + description=("extent to which contiguousness is to be assured into " + "past, in hours from current time. Prevents infinite " + "loop when start is none. If the dataset has limited " + "retention (i.e. old outputs get removed), this should " + "be set shorter to that, too, to prevent the oldest " + "outputs flapping. Increase freely if you intend to " + "process old dates - worker's memory is the limit")) # TODO always entire interval for reprocessings (fixed start and stop)? hours_forward = luigi.IntParameter( default=0, @@ -376,19 +388,25 @@ def _get_per_location_glob(tasks, outputs, regexes): don't even have to retrofit existing tasks anyhow. """ paths = [o.path for o in outputs] - matches = [r.search(p) for r, p in zip(regexes, paths)] # naive, because some matches could be confused by numbers earlier in path, e.g. /foo/fifa2000k/bar/2000-12-31/00 + # naive, because some matches could be confused by numbers earlier + # in path, e.g. /foo/fifa2000k/bar/2000-12-31/00 + matches = [r.search(p) for r, p in zip(regexes, paths)] for m, p, t in zip(matches, paths, tasks): if m is None: raise NotImplementedError("Couldn't deduce datehour representation in output path %r of task %s" % (p, t)) n_groups = len(matches[0].groups()) - positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, n_groups + 1)] # the most common position of every group is likely to be conclusive hit or miss + # the most common position of every group is likely + # to be conclusive hit or miss + positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, n_groups + 1)] glob = list(paths[0]) # FIXME sanity check that it's the same for all paths for start, end in positions: glob = glob[:start] + ['[0-9]'] * (end - start) + glob[end:] - return ''.join(glob).rsplit('/', 1)[0] # chop off the last path item (wouldn't need to if `hadoop fs -ls -d` equivalent were available) + # chop off the last path item + # (wouldn't need to if `hadoop fs -ls -d` equivalent were available) + return ''.join(glob).rsplit('/', 1)[0] def _get_filesystems_and_globs(datetime_to_task, datetime_to_re): diff --git a/luigi/worker.py b/luigi/worker.py index c12935fd74..aaa5523441 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -494,7 +494,9 @@ def _add(self, task, is_complete): runnable = worker().retry_external_tasks task.trigger_event(Event.DEPENDENCY_MISSING, task) - logger.warning('Data for %s does not exist (yet?). The task is an external data depedency, so it can not be run from this luigi process.', task.task_id) + logger.warning('Data for %s does not exist (yet?). The task is an ' + 'external data depedency, so it can not be run from' + ' this luigi process.', task.task_id) else: deps = task.deps() diff --git a/test/_test_ftp.py b/test/_test_ftp.py index 3509285126..702993c4b2 100644 --- a/test/_test_ftp.py +++ b/test/_test_ftp.py @@ -18,7 +18,6 @@ import datetime import ftplib import os -import time from helpers import unittest try: from cStringIO import StringIO diff --git a/test/central_planner_test.py b/test/central_planner_test.py index 9189355c67..bba8278aa8 100644 --- a/test/central_planner_test.py +++ b/test/central_planner_test.py @@ -16,7 +16,6 @@ # import time -import datetime from helpers import unittest from nose.plugins.attrib import attr diff --git a/test/cmdline_test.py b/test/cmdline_test.py index 842ffd939f..207546ec3f 100644 --- a/test/cmdline_test.py +++ b/test/cmdline_test.py @@ -20,12 +20,10 @@ import ConfigParser except ImportError: import configparser as ConfigParser -import logging import mock import os import subprocess from helpers import unittest -import warnings from luigi import six diff --git a/test/contrib/_webhdfs_test.py b/test/contrib/_webhdfs_test.py index 0e61b8e887..f9a5c391b5 100644 --- a/test/contrib/_webhdfs_test.py +++ b/test/contrib/_webhdfs_test.py @@ -15,10 +15,7 @@ # limitations under the License. # -import datetime import os -import posixpath -import time from helpers import unittest from luigi.contrib import webhdfs diff --git a/test/contrib/bigquery_test.py b/test/contrib/bigquery_test.py index 81a82c179b..badce47e11 100644 --- a/test/contrib/bigquery_test.py +++ b/test/contrib/bigquery_test.py @@ -26,7 +26,6 @@ import luigi from luigi.contrib import bigquery -from luigi.contrib import gcs from contrib import gcs_test from nose.plugins.attrib import attr diff --git a/test/contrib/hadoop_jar_test.py b/test/contrib/hadoop_jar_test.py index f3bbb33a7b..b3294b8b7e 100644 --- a/test/contrib/hadoop_jar_test.py +++ b/test/contrib/hadoop_jar_test.py @@ -20,7 +20,7 @@ import shlex from helpers import unittest from luigi.contrib.hadoop_jar import HadoopJarJobError, HadoopJarJobTask -from mock import patch, MagicMock +from mock import patch class TestHadoopJarJob(HadoopJarJobTask): diff --git a/test/contrib/hadoop_test.py b/test/contrib/hadoop_test.py index eb0ba6cdc1..5809127f09 100644 --- a/test/contrib/hadoop_test.py +++ b/test/contrib/hadoop_test.py @@ -116,7 +116,7 @@ def extra_files(self): return [(fn, 'my_dir/my_file')] def init_remote(self): - f = open('my_dir/my_file') # make sure it exists + open('my_dir/my_file') # make sure it exists class MapOnlyJob(HadoopJobTask): diff --git a/test/contrib/hdfs_test.py b/test/contrib/hdfs_test.py index 3c39c03f96..78b5f990a4 100644 --- a/test/contrib/hdfs_test.py +++ b/test/contrib/hdfs_test.py @@ -18,7 +18,6 @@ import functools import re from helpers import unittest -from datetime import datetime import random import helpers diff --git a/test/contrib/hive_test.py b/test/contrib/hive_test.py index 9d11532f54..75b84b3fb8 100644 --- a/test/contrib/hive_test.py +++ b/test/contrib/hive_test.py @@ -17,7 +17,6 @@ from collections import OrderedDict import os -import sys import tempfile from helpers import unittest diff --git a/test/contrib/pig_test.py b/test/contrib/pig_test.py index 0fb4d863a0..a2f946a2df 100644 --- a/test/contrib/pig_test.py +++ b/test/contrib/pig_test.py @@ -19,7 +19,7 @@ import tempfile import luigi -from helpers import with_config, unittest +from helpers import unittest from luigi.contrib.pig import PigJobError, PigJobTask from mock import patch @@ -104,7 +104,10 @@ def test_run__success(self, mock): try: job = ComplexTestJob() job.run() - self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p', 'YOUR_PARAM_NAME=Your param value', '-propertyFile', 'pig_property_file', '-f', 'my_complex_pig_script.pig']], arglist_result) + self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p', + 'YOUR_PARAM_NAME=Your param value', + '-propertyFile', 'pig_property_file', '-f', + 'my_complex_pig_script.pig']], arglist_result) # Check property file with open('pig_property_file') as pprops_file: @@ -125,7 +128,10 @@ def test_run__fail(self, mock): except PigJobError as e: p = e self.assertEqual('stderr', p.err) - self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p', 'YOUR_PARAM_NAME=Your param value', '-propertyFile', 'pig_property_file', '-f', 'my_complex_pig_script.pig']], arglist_result) + self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p', + 'YOUR_PARAM_NAME=Your param value', + '-propertyFile', 'pig_property_file', '-f', + 'my_complex_pig_script.pig']], arglist_result) # Check property file with open('pig_property_file') as pprops_file: diff --git a/test/create_packages_archive_root/package/submodule.py b/test/create_packages_archive_root/package/submodule.py index 1dec826f80..9a30a12e01 100644 --- a/test/create_packages_archive_root/package/submodule.py +++ b/test/create_packages_archive_root/package/submodule.py @@ -15,4 +15,4 @@ # limitations under the License. # -import os +import os # NOQA diff --git a/test/create_packages_archive_root/package/submodule_with_absolute_import.py b/test/create_packages_archive_root/package/submodule_with_absolute_import.py index 8dffb090ec..c7eac60f5c 100644 --- a/test/create_packages_archive_root/package/submodule_with_absolute_import.py +++ b/test/create_packages_archive_root/package/submodule_with_absolute_import.py @@ -17,4 +17,4 @@ from __future__ import absolute_import -import os +import os # NOQA diff --git a/test/create_packages_archive_root/package/subpackage/submodule.py b/test/create_packages_archive_root/package/subpackage/submodule.py index ffc579c0a3..9a30a12e01 100644 --- a/test/create_packages_archive_root/package/subpackage/submodule.py +++ b/test/create_packages_archive_root/package/subpackage/submodule.py @@ -14,4 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os + +import os # NOQA diff --git a/test/db_task_history_test.py b/test/db_task_history_test.py index 7c3a4cfe0f..574ff9eddb 100644 --- a/test/db_task_history_test.py +++ b/test/db_task_history_test.py @@ -21,7 +21,7 @@ from helpers import with_config import luigi -from luigi.db_task_history import DbTaskHistory, Base +from luigi.db_task_history import DbTaskHistory from luigi.task_status import DONE, PENDING, RUNNING diff --git a/test/dynamic_import_test.py b/test/dynamic_import_test.py index 9986e8e273..032584f3b5 100644 --- a/test/dynamic_import_test.py +++ b/test/dynamic_import_test.py @@ -15,7 +15,7 @@ # limitations under the License. # -from helpers import unittest, LuigiTestCase +from helpers import LuigiTestCase import luigi import luigi.interface diff --git a/test/event_callbacks_test.py b/test/event_callbacks_test.py index a7a66fd8e2..f63c697955 100644 --- a/test/event_callbacks_test.py +++ b/test/event_callbacks_test.py @@ -15,7 +15,6 @@ # limitations under the License. # -import random from helpers import unittest import luigi @@ -195,7 +194,9 @@ def tearDown(self): def _run_test(self, task, expected_events): actual_events = {} - # yucky to create separate callbacks; would be nicer if the callback received an instance of a subclass of Event, so one callback could accumulate all types + # yucky to create separate callbacks; would be nicer if the callback + # received an instance of a subclass of Event, so one callback could + # accumulate all types @luigi.Task.event_handler(Event.DEPENDENCY_DISCOVERED) def callback_dependency_discovered(*args): actual_events.setdefault(Event.DEPENDENCY_DISCOVERED, set()).add(tuple(map(lambda t: t.task_id, args))) @@ -255,4 +256,13 @@ def test_complete_dag(self): ('D(param=3)',), ]), }) - self.assertEqual(eval_contents(A().output()), ['A(param=1)', ['B(param=1)', ['C(param=1)', ['D(param=1)'], ['D(param=2)']]], ['B(param=2)', ['C(param=2)', ['D(param=2)'], ['D(param=3)']]]]) + self.assertEqual(eval_contents(A().output()), + ['A(param=1)', + ['B(param=1)', + ['C(param=1)', + ['D(param=1)'], + ['D(param=2)']]], + ['B(param=2)', + ['C(param=2)', + ['D(param=2)'], + ['D(param=3)']]]]) diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index 7c0ab92545..322179e0ec 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -14,10 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import mock -from helpers import unittest, LuigiTestCase -from luigi import six +from helpers import LuigiTestCase import luigi import luigi.worker @@ -76,7 +74,12 @@ def requires(self): s = self.summary() self.assertIn('\n* 3 ran successfully:\n - 3 Bar(num=', s) self.assertIn('\n* 1 present dependencies were encountered:\n - 1 Bar(num=1)\n', s) - self.assertIn('\n* 1 failed:\n - 1 Bar(num=0)\n* 1 were left pending, among these:\n * 1 had failed dependencies:\n - 1 Foo()\n\nThis progress looks :( because there were failed tasks', s) + self.assertIn('\n* 1 failed:\n' + ' - 1 Bar(num=0)\n' + '* 1 were left pending, among these:\n' + ' * 1 had failed dependencies:\n' + ' - 1 Foo()\n\n' + 'This progress looks :( because there were failed tasks', s) self.assertNotIn('\n\n\n', s) def test_upstream_not_running(self): @@ -118,7 +121,11 @@ def requires(self): self.assertIn('\n* 4 ran successfully:\n - 4 Bar(num=1...4)\n', s) self.assertIn('\n* 1 failed:\n - 1 Bar(num=0)\n', s) self.assertIn('\n* 5 were left pending, among these:\n * 4 were missing external dependencies:\n - 4 ExternalBar(num=', s) - self.assertIn('\n * 1 had failed dependencies:\n - 1 Foo()\n * 1 had missing external dependencies:\n - 1 Foo()\n\nThis progress looks :( because there were failed tasks\n', s) + self.assertIn('\n * 1 had failed dependencies:\n' + ' - 1 Foo()\n' + ' * 1 had missing external dependencies:\n' + ' - 1 Foo()\n\n' + 'This progress looks :( because there were failed tasks\n', s) self.assertNotIn('\n\n\n', s) def test_already_running(self): @@ -166,8 +173,17 @@ def run(self): self.assertEqual({LockTask()}, d['run_by_other_worker']) self.assertEqual({ParentTask()}, d['upstream_run_by_other_worker']) s = self.summary() - self.assertIn('\nScheduled 2 tasks of which:\n* 2 were left pending, among these:\n * 1 were being run by another worker:\n - 1 LockTask()\n * 1 had dependencies that were being run by other worker:\n - 1 ParentTask()\n', s) - self.assertIn('\n\nThe other workers were:\n - other_worker ran 1 tasks\n\nDid not run any tasks\nThis progress looks :) because there were no failed tasks or missing external dependencies\n', s) + self.assertIn('\nScheduled 2 tasks of which:\n' + '* 2 were left pending, among these:\n' + ' * 1 were being run by another worker:\n' + ' - 1 LockTask()\n' + ' * 1 had dependencies that were being run by other worker:\n' + ' - 1 ParentTask()\n', s) + self.assertIn('\n\nThe other workers were:\n' + ' - other_worker ran 1 tasks\n\n' + 'Did not run any tasks\n' + 'This progress looks :) because there were no failed ' + 'tasks or missing external dependencies\n', s) self.assertNotIn('\n\n\n', s) def test_larger_tree(self): diff --git a/test/file_test.py b/test/file_test.py index 71268758d8..bccc68aad8 100644 --- a/test/file_test.py +++ b/test/file_test.py @@ -231,7 +231,7 @@ def test_exists(self): def test_listdir(self): os.mkdir(self.path) - with open(self.path + '/file', 'w') as fp: + with open(self.path + '/file', 'w'): pass self.assertTrue([self.path + '/file'], list(self.fs.listdir(self.path + '/'))) diff --git a/test/helpers.py b/test/helpers.py index 684a026dc8..58454fff8f 100644 --- a/test/helpers.py +++ b/test/helpers.py @@ -17,7 +17,6 @@ import functools import itertools -import sys import luigi import luigi.task_register diff --git a/test/instance_test.py b/test/instance_test.py index 0d9791a13e..e740a9b1f4 100644 --- a/test/instance_test.py +++ b/test/instance_test.py @@ -93,7 +93,7 @@ def test_unhashable_type(self): class DummyTask(luigi.Task): x = luigi.Parameter() - dummy = DummyTask(x={}) + dummy = DummyTask(x={}) # NOQA if __name__ == '__main__': unittest.main() diff --git a/test/interface_test.py b/test/interface_test.py index 21b4771e0c..ab3bbddb82 100644 --- a/test/interface_test.py +++ b/test/interface_test.py @@ -21,7 +21,7 @@ import luigi.date_interval import luigi.notifications import sys -from luigi.interface import core, _WorkerSchedulerFactory +from luigi.interface import _WorkerSchedulerFactory from luigi.worker import Worker from mock import Mock, patch from helpers import LuigiTestCase diff --git a/test/lock_test.py b/test/lock_test.py index 3347486c5f..c560d8f601 100644 --- a/test/lock_test.py +++ b/test/lock_test.py @@ -15,7 +15,6 @@ # limitations under the License. # -import hashlib import os import subprocess import tempfile diff --git a/test/parameter_test.py b/test/parameter_test.py index b4ed4de3c3..583e6eadf0 100644 --- a/test/parameter_test.py +++ b/test/parameter_test.py @@ -23,7 +23,7 @@ import luigi.date_interval import luigi.interface import luigi.notifications -from luigi.mock import MockTarget, MockFileSystem +from luigi.mock import MockTarget from luigi.parameter import ParameterException from worker_test import email_patch @@ -262,7 +262,9 @@ def test_x_arg_y_arg_override(self): self.expect_keys(['banana-foo-bar', 'banana-dep-foo-bar']) def test_x_arg_y_arg_override_all(self): - luigi.run(['--local-scheduler', '--no-lock', 'Banana', '--x', 'foo', '--y', 'bar', '--style', 'x-arg-y-arg', '--BananaDep-y', 'xyz', '--BananaDep-x', 'blabla']) + luigi.run(['--local-scheduler', '--no-lock', 'Banana', '--x', 'foo', + '--y', 'bar', '--style', 'x-arg-y-arg', '--BananaDep-y', + 'xyz', '--BananaDep-x', 'blabla']) self.expect_keys(['banana-foo-bar', 'banana-dep-foo-bar']) def test_y_arg_override(self): @@ -270,7 +272,9 @@ def test_y_arg_override(self): self.expect_keys(['banana-foo-bar', 'banana-dep-xyz-bar']) def test_y_arg_override_both(self): - luigi.run(['--local-scheduler', '--no-lock', 'Banana', '--x', 'foo', '--y', 'bar', '--style', 'y-kwarg', '--BananaDep-x', 'xyz', '--BananaDep-y', 'blah']) + luigi.run(['--local-scheduler', '--no-lock', 'Banana', '--x', 'foo', + '--y', 'bar', '--style', 'y-kwarg', '--BananaDep-x', 'xyz', + '--BananaDep-y', 'blah']) self.expect_keys(['banana-foo-bar', 'banana-dep-xyz-bar']) def test_y_arg_override_banana(self): diff --git a/test/range_test.py b/test/range_test.py index f1a6c4f5db..68924ca424 100644 --- a/test/range_test.py +++ b/test/range_test.py @@ -179,7 +179,9 @@ class RangeDailyBaseTest(unittest.TestCase): maxDiff = None def setUp(self): - # yucky to create separate callbacks; would be nicer if the callback received an instance of a subclass of Event, so one callback could accumulate all types + # yucky to create separate callbacks; would be nicer if the callback + # received an instance of a subclass of Event, so one callback could + # accumulate all types @RangeDailyBase.event_handler(RangeEvent.DELAY) def callback_delay(*args): self.events.setdefault(RangeEvent.DELAY, []).append(args) @@ -292,7 +294,9 @@ class RangeHourlyBaseTest(unittest.TestCase): maxDiff = None def setUp(self): - # yucky to create separate callbacks; would be nicer if the callback received an instance of a subclass of Event, so one callback could accumulate all types + # yucky to create separate callbacks; would be nicer if the callback + # received an instance of a subclass of Event, so one callback could + # accumulate all types @RangeHourlyBase.event_handler(RangeEvent.DELAY) def callback_delay(*args): self.events.setdefault(RangeEvent.DELAY, []).append(args) @@ -589,17 +593,20 @@ def output(self): class RangeHourlyTest(unittest.TestCase): - @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) # fishy to mock the mock, but MockFileSystem doesn't support globs yet + # fishy to mock the mock, but MockFileSystem doesn't support globs yet + @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) @mock.patch('luigi.mock.MockFileSystem.exists', new=mock_exists_always_true) def test_missing_tasks_correctly_required(self): for task_path in task_a_paths: MockTarget(task_path) + # this test takes a few seconds. Since stop is not defined, + # finite_datetimes constitute many years to consider task = RangeHourly(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), of=TaskA, start=datetime.datetime(2014, 3, 20, 17), task_limit=3, - hours_back=3 * 365 * 24) # this test takes a few seconds. Since stop is not defined, finite_datetimes constitute many years to consider + hours_back=3 * 365 * 24) actual = [t.task_id for t in task.requires()] self.assertEqual(actual, expected_a) diff --git a/test/recursion_test.py b/test/recursion_test.py index 729021140e..6aca0d7e1c 100644 --- a/test/recursion_test.py +++ b/test/recursion_test.py @@ -17,7 +17,6 @@ from __future__ import print_function import datetime -import sys from helpers import unittest import luigi diff --git a/test/server_test.py b/test/server_test.py index c7dca44038..2dac62e6c7 100644 --- a/test/server_test.py +++ b/test/server_test.py @@ -23,7 +23,7 @@ import time import tempfile -from helpers import unittest, with_config, skipOnTravis +from helpers import unittest, skipOnTravis import luigi.rpc import luigi.server from luigi.scheduler import CentralPlannerScheduler diff --git a/test/snakebite_test.py b/test/snakebite_test.py index e41b60e87e..f25d0c65c3 100644 --- a/test/snakebite_test.py +++ b/test/snakebite_test.py @@ -21,7 +21,6 @@ import time import unittest -import luigi.interface from luigi import six from nose.plugins.attrib import attr @@ -29,10 +28,8 @@ raise unittest.SkipTest("snakebite doesn't work on Python 3 yet.") try: - import luigi.contrib.hdfs from luigi.contrib.hdfs import SnakebiteHdfsClient from minicluster import MiniClusterTestCase - from snakebite.client import AutoConfigClient as SnakebiteAutoConfigClient except ImportError: raise unittest.SkipTest('Snakebite not installed') diff --git a/test/subtask_test.py b/test/subtask_test.py index 61de4954d7..69126ad428 100644 --- a/test/subtask_test.py +++ b/test/subtask_test.py @@ -16,9 +16,6 @@ # import abc -import os -import random -import tempfile from helpers import unittest import luigi diff --git a/test/worker_test.py b/test/worker_test.py index c321c0a0e5..a31db8398b 100644 --- a/test/worker_test.py +++ b/test/worker_test.py @@ -98,8 +98,8 @@ def output(self): def run(self): import other_module - other_target_foo = yield other_module.OtherModuleTask(os.path.join(self.p, 'foo')) - other_target_bar = yield other_module.OtherModuleTask(os.path.join(self.p, 'bar')) + other_target_foo = yield other_module.OtherModuleTask(os.path.join(self.p, 'foo')) # NOQA + other_target_bar = yield other_module.OtherModuleTask(os.path.join(self.p, 'bar')) # NOQA with self.output().open('w') as f: f.write('Done!') @@ -937,7 +937,8 @@ def test_task_limit_exceeded(self): w.run() self.assertFalse(t.complete()) leaf_tasks = [ForkBombTask(3, 2, branch) for branch in [(0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1)]] - self.assertEquals(3, sum(t.complete() for t in leaf_tasks), "should have gracefully completed as much as possible even though the single last leaf didn't get scheduled") + self.assertEquals(3, sum(t.complete() for t in leaf_tasks), + "should have gracefully completed as much as possible even though the single last leaf didn't get scheduled") @with_config({'core': {'worker-task-limit': '7'}}) def test_task_limit_not_exceeded(self): diff --git a/tox.ini b/tox.ini index 82e1469243..006b4e116d 100644 --- a/tox.ini +++ b/tox.ini @@ -53,9 +53,8 @@ commands = [testenv:flake8] deps = flake8 -commands = flake8 --max-line-length=384 --exclude=doc,test,luigi/six.py +commands = flake8 --max-line-length=160 --exclude=doc,luigi/six.py flake8 --max-line-length=100 --ignore=E265 doc - flake8 --max-line-length=252 --ignore=F401,F841 test [testenv:autopep8] deps = autopep8