Skip to content

Commit

Permalink
Merge pull request #1175 from toidi/more-flake8-compliant
Browse files Browse the repository at this point in the history
Make luigi package more flake8 compliant
  • Loading branch information
Tarrasch committed Aug 31, 2015
2 parents 8fa712f + 35d1b10 commit 7b0d98a
Show file tree
Hide file tree
Showing 35 changed files with 130 additions and 70 deletions.
5 changes: 4 additions & 1 deletion luigi/date_interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
class MyTask(luigi.Task):
date_interval = luigi.DateIntervalParameter()
Now, you can launch this from the command line using ``--date-interval 2014-05-10`` or ``--date-interval 2014-W26`` (using week notation) or ``--date-interval 2014`` (for a year) and some other notations.
Now, you can launch this from the command line using
``--date-interval 2014-05-10`` or
``--date-interval 2014-W26`` (using week notation) or
``--date-interval 2014`` (for a year) and some other notations.
"""

from __future__ import division
Expand Down
24 changes: 17 additions & 7 deletions luigi/execution_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ def _partition_tasks(worker):
pending_tasks = {task for(task, status, ext) in task_history if status == 'PENDING'}
set_tasks = {}
set_tasks["completed"] = {task for (task, status, ext) in task_history if status == 'DONE' and task in pending_tasks}
set_tasks["already_done"] = {task for (task, status, ext) in task_history if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]}
set_tasks["already_done"] = {task for (task, status, ext) in task_history
if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]}
set_tasks["failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'}
set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and not ext}
set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and ext}
set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history
if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and not ext}
set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history
if status == 'PENDING' and task not in set_tasks["failed"] and task not in set_tasks["completed"] and ext}
set_tasks["run_by_other_worker"] = set()
set_tasks["upstream_failure"] = set()
set_tasks["upstream_missing_dependency"] = set()
Expand Down Expand Up @@ -75,7 +78,8 @@ def _depth_first_search(set_tasks, current_task, visited):
if task in set_tasks["run_by_other_worker"] or task in set_tasks["upstream_run_by_other_worker"]:
set_tasks["upstream_run_by_other_worker"].add(current_task)
upstream_run_by_other_worker = True
if not upstream_failure and not upstream_missing_dependency and not upstream_run_by_other_worker and current_task not in set_tasks["run_by_other_worker"]:
if not upstream_failure and not upstream_missing_dependency and \
not upstream_run_by_other_worker and current_task not in set_tasks["run_by_other_worker"]:
set_tasks["unknown_reason"].add(current_task)


Expand All @@ -97,7 +101,8 @@ def _get_str(task_dict, extra_indent):
break
if len(tasks[0].get_params()) == 0:
row += '- {0} {1}()'.format(len(tasks), str(task_family))
elif _get_len_of_params(tasks[0]) > 60 or (len(tasks) == 2 and len(tasks[0].get_params()) > 1 and (_get_len_of_params(tasks[0]) > 40 or len(str(tasks[0])) > 100)) or len(str(tasks[0])) > 200:
elif _get_len_of_params(tasks[0]) > 60 or len(str(tasks[0])) > 200 or \
(len(tasks) == 2 and len(tasks[0].get_params()) > 1 and (_get_len_of_params(tasks[0]) > 40 or len(str(tasks[0])) > 100)):
"""
This is to make sure that there is no really long task in the output
"""
Expand Down Expand Up @@ -287,7 +292,9 @@ def _get_external_workers(worker):

def _group_tasks_by_name_and_status(task_dict):
"""
Takes a dictionary with sets of tasks grouped by their status and returns a dictionary with dictionaries with an array of tasks grouped by their status and task name
Takes a dictionary with sets of tasks grouped by their status and
returns a dictionary with dictionaries with an array of tasks grouped by
their status and task name
"""
group_status = {}
for task in task_dict:
Expand All @@ -309,7 +316,10 @@ def _summary_format(set_tasks, worker):
for status, task_dict in set_tasks.items():
group_tasks[status] = _group_tasks_by_name_and_status(task_dict)
comments = _get_comments(group_tasks)
num_all_tasks = len(set_tasks["already_done"]) + len(set_tasks["completed"]) + len(set_tasks["failed"]) + len(set_tasks["still_pending_ext"]) + len(set_tasks["still_pending_not_ext"])
num_all_tasks = sum([len(set_tasks["already_done"]),
len(set_tasks["completed"]), len(set_tasks["failed"]),
len(set_tasks["still_pending_ext"]),
len(set_tasks["still_pending_not_ext"])])
str_output = ''
str_output += 'Scheduled {0} tasks of which:\n'.format(num_all_tasks)
for status in _ORDERED_STATUSES:
Expand Down
4 changes: 3 additions & 1 deletion luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ def on_failure(self, exception):
Override for custom error handling.
This method gets called if an exception is raised in :py:meth:`run`.
Return value of this method is json encoded and sent to the scheduler as the `expl` argument. Its string representation will be used as the body of the error email sent out if any.
Return value of this method is json encoded and sent to the scheduler
as the `expl` argument. Its string representation will be used as the
body of the error email sent out if any.
Default behavior is to return a string representation of the stack trace.
"""
Expand Down
28 changes: 23 additions & 5 deletions luigi/tools/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,13 @@ class RangeDailyBase(RangeBase):
description="ending date, exclusive. Default: None - work forward forever")
days_back = luigi.IntParameter(
default=100, # slightly more than three months
description="extent to which contiguousness is to be assured into past, in days from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit")
description=("extent to which contiguousness is to be assured into "
"past, in days from current time. Prevents infinite loop "
"when start is none. If the dataset has limited retention"
" (i.e. old outputs get removed), this should be set "
"shorter to that, too, to prevent the oldest outputs "
"flapping. Increase freely if you intend to process old "
"dates - worker's memory is the limit"))
days_forward = luigi.IntParameter(
default=0,
description="extent to which contiguousness is to be assured into future, in days from current time. Prevents infinite loop when stop is none")
Expand Down Expand Up @@ -280,7 +286,13 @@ class RangeHourlyBase(RangeBase):
description="ending datehour, exclusive. Default: None - work forward forever")
hours_back = luigi.IntParameter(
default=100 * 24, # slightly more than three months
description="extent to which contiguousness is to be assured into past, in hours from current time. Prevents infinite loop when start is none. If the dataset has limited retention (i.e. old outputs get removed), this should be set shorter to that, too, to prevent the oldest outputs flapping. Increase freely if you intend to process old dates - worker's memory is the limit")
description=("extent to which contiguousness is to be assured into "
"past, in hours from current time. Prevents infinite "
"loop when start is none. If the dataset has limited "
"retention (i.e. old outputs get removed), this should "
"be set shorter to that, too, to prevent the oldest "
"outputs flapping. Increase freely if you intend to "
"process old dates - worker's memory is the limit"))
# TODO always entire interval for reprocessings (fixed start and stop)?
hours_forward = luigi.IntParameter(
default=0,
Expand Down Expand Up @@ -376,19 +388,25 @@ def _get_per_location_glob(tasks, outputs, regexes):
don't even have to retrofit existing tasks anyhow.
"""
paths = [o.path for o in outputs]
matches = [r.search(p) for r, p in zip(regexes, paths)] # naive, because some matches could be confused by numbers earlier in path, e.g. /foo/fifa2000k/bar/2000-12-31/00
# naive, because some matches could be confused by numbers earlier
# in path, e.g. /foo/fifa2000k/bar/2000-12-31/00
matches = [r.search(p) for r, p in zip(regexes, paths)]

for m, p, t in zip(matches, paths, tasks):
if m is None:
raise NotImplementedError("Couldn't deduce datehour representation in output path %r of task %s" % (p, t))

n_groups = len(matches[0].groups())
positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, n_groups + 1)] # the most common position of every group is likely to be conclusive hit or miss
# the most common position of every group is likely
# to be conclusive hit or miss
positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, n_groups + 1)]

glob = list(paths[0]) # FIXME sanity check that it's the same for all paths
for start, end in positions:
glob = glob[:start] + ['[0-9]'] * (end - start) + glob[end:]
return ''.join(glob).rsplit('/', 1)[0] # chop off the last path item (wouldn't need to if `hadoop fs -ls -d` equivalent were available)
# chop off the last path item
# (wouldn't need to if `hadoop fs -ls -d` equivalent were available)
return ''.join(glob).rsplit('/', 1)[0]


def _get_filesystems_and_globs(datetime_to_task, datetime_to_re):
Expand Down
4 changes: 3 additions & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,9 @@ def _add(self, task, is_complete):
runnable = worker().retry_external_tasks

task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning('Data for %s does not exist (yet?). The task is an external data depedency, so it can not be run from this luigi process.', task.task_id)
logger.warning('Data for %s does not exist (yet?). The task is an '
'external data depedency, so it can not be run from'
' this luigi process.', task.task_id)

else:
deps = task.deps()
Expand Down
1 change: 0 additions & 1 deletion test/_test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import datetime
import ftplib
import os
import time
from helpers import unittest
try:
from cStringIO import StringIO
Expand Down
1 change: 0 additions & 1 deletion test/central_planner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import time
import datetime
from helpers import unittest

from nose.plugins.attrib import attr
Expand Down
2 changes: 0 additions & 2 deletions test/cmdline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import ConfigParser
except ImportError:
import configparser as ConfigParser
import logging
import mock
import os
import subprocess
from helpers import unittest
import warnings

from luigi import six

Expand Down
3 changes: 0 additions & 3 deletions test/contrib/_webhdfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
# limitations under the License.
#

import datetime
import os
import posixpath
import time
from helpers import unittest

from luigi.contrib import webhdfs
Expand Down
1 change: 0 additions & 1 deletion test/contrib/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import luigi
from luigi.contrib import bigquery
from luigi.contrib import gcs

from contrib import gcs_test
from nose.plugins.attrib import attr
Expand Down
2 changes: 1 addition & 1 deletion test/contrib/hadoop_jar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import shlex
from helpers import unittest
from luigi.contrib.hadoop_jar import HadoopJarJobError, HadoopJarJobTask
from mock import patch, MagicMock
from mock import patch


class TestHadoopJarJob(HadoopJarJobTask):
Expand Down
2 changes: 1 addition & 1 deletion test/contrib/hadoop_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def extra_files(self):
return [(fn, 'my_dir/my_file')]

def init_remote(self):
f = open('my_dir/my_file') # make sure it exists
open('my_dir/my_file') # make sure it exists


class MapOnlyJob(HadoopJobTask):
Expand Down
1 change: 0 additions & 1 deletion test/contrib/hdfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import functools
import re
from helpers import unittest
from datetime import datetime
import random

import helpers
Expand Down
1 change: 0 additions & 1 deletion test/contrib/hive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from collections import OrderedDict
import os
import sys
import tempfile
from helpers import unittest

Expand Down
12 changes: 9 additions & 3 deletions test/contrib/pig_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import tempfile

import luigi
from helpers import with_config, unittest
from helpers import unittest
from luigi.contrib.pig import PigJobError, PigJobTask
from mock import patch

Expand Down Expand Up @@ -104,7 +104,10 @@ def test_run__success(self, mock):
try:
job = ComplexTestJob()
job.run()
self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p', 'YOUR_PARAM_NAME=Your param value', '-propertyFile', 'pig_property_file', '-f', 'my_complex_pig_script.pig']], arglist_result)
self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p',
'YOUR_PARAM_NAME=Your param value',
'-propertyFile', 'pig_property_file', '-f',
'my_complex_pig_script.pig']], arglist_result)

# Check property file
with open('pig_property_file') as pprops_file:
Expand All @@ -125,7 +128,10 @@ def test_run__fail(self, mock):
except PigJobError as e:
p = e
self.assertEqual('stderr', p.err)
self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p', 'YOUR_PARAM_NAME=Your param value', '-propertyFile', 'pig_property_file', '-f', 'my_complex_pig_script.pig']], arglist_result)
self.assertEqual([['/usr/share/pig/bin/pig', '-x', 'local', '-p',
'YOUR_PARAM_NAME=Your param value',
'-propertyFile', 'pig_property_file', '-f',
'my_complex_pig_script.pig']], arglist_result)

# Check property file
with open('pig_property_file') as pprops_file:
Expand Down
2 changes: 1 addition & 1 deletion test/create_packages_archive_root/package/submodule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

import os
import os # NOQA
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

from __future__ import absolute_import

import os
import os # NOQA
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

import os # NOQA
2 changes: 1 addition & 1 deletion test/db_task_history_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from helpers import with_config
import luigi
from luigi.db_task_history import DbTaskHistory, Base
from luigi.db_task_history import DbTaskHistory
from luigi.task_status import DONE, PENDING, RUNNING


Expand Down
2 changes: 1 addition & 1 deletion test/dynamic_import_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

from helpers import unittest, LuigiTestCase
from helpers import LuigiTestCase

import luigi
import luigi.interface
Expand Down
16 changes: 13 additions & 3 deletions test/event_callbacks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

import random
from helpers import unittest

import luigi
Expand Down Expand Up @@ -195,7 +194,9 @@ def tearDown(self):
def _run_test(self, task, expected_events):
actual_events = {}

# yucky to create separate callbacks; would be nicer if the callback received an instance of a subclass of Event, so one callback could accumulate all types
# yucky to create separate callbacks; would be nicer if the callback
# received an instance of a subclass of Event, so one callback could
# accumulate all types
@luigi.Task.event_handler(Event.DEPENDENCY_DISCOVERED)
def callback_dependency_discovered(*args):
actual_events.setdefault(Event.DEPENDENCY_DISCOVERED, set()).add(tuple(map(lambda t: t.task_id, args)))
Expand Down Expand Up @@ -255,4 +256,13 @@ def test_complete_dag(self):
('D(param=3)',),
]),
})
self.assertEqual(eval_contents(A().output()), ['A(param=1)', ['B(param=1)', ['C(param=1)', ['D(param=1)'], ['D(param=2)']]], ['B(param=2)', ['C(param=2)', ['D(param=2)'], ['D(param=3)']]]])
self.assertEqual(eval_contents(A().output()),
['A(param=1)',
['B(param=1)',
['C(param=1)',
['D(param=1)'],
['D(param=2)']]],
['B(param=2)',
['C(param=2)',
['D(param=2)'],
['D(param=3)']]]])
30 changes: 23 additions & 7 deletions test/execution_summary_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from helpers import unittest, LuigiTestCase

from luigi import six
from helpers import LuigiTestCase

import luigi
import luigi.worker
Expand Down Expand Up @@ -76,7 +74,12 @@ def requires(self):
s = self.summary()
self.assertIn('\n* 3 ran successfully:\n - 3 Bar(num=', s)
self.assertIn('\n* 1 present dependencies were encountered:\n - 1 Bar(num=1)\n', s)
self.assertIn('\n* 1 failed:\n - 1 Bar(num=0)\n* 1 were left pending, among these:\n * 1 had failed dependencies:\n - 1 Foo()\n\nThis progress looks :( because there were failed tasks', s)
self.assertIn('\n* 1 failed:\n'
' - 1 Bar(num=0)\n'
'* 1 were left pending, among these:\n'
' * 1 had failed dependencies:\n'
' - 1 Foo()\n\n'
'This progress looks :( because there were failed tasks', s)
self.assertNotIn('\n\n\n', s)

def test_upstream_not_running(self):
Expand Down Expand Up @@ -118,7 +121,11 @@ def requires(self):
self.assertIn('\n* 4 ran successfully:\n - 4 Bar(num=1...4)\n', s)
self.assertIn('\n* 1 failed:\n - 1 Bar(num=0)\n', s)
self.assertIn('\n* 5 were left pending, among these:\n * 4 were missing external dependencies:\n - 4 ExternalBar(num=', s)
self.assertIn('\n * 1 had failed dependencies:\n - 1 Foo()\n * 1 had missing external dependencies:\n - 1 Foo()\n\nThis progress looks :( because there were failed tasks\n', s)
self.assertIn('\n * 1 had failed dependencies:\n'
' - 1 Foo()\n'
' * 1 had missing external dependencies:\n'
' - 1 Foo()\n\n'
'This progress looks :( because there were failed tasks\n', s)
self.assertNotIn('\n\n\n', s)

def test_already_running(self):
Expand Down Expand Up @@ -166,8 +173,17 @@ def run(self):
self.assertEqual({LockTask()}, d['run_by_other_worker'])
self.assertEqual({ParentTask()}, d['upstream_run_by_other_worker'])
s = self.summary()
self.assertIn('\nScheduled 2 tasks of which:\n* 2 were left pending, among these:\n * 1 were being run by another worker:\n - 1 LockTask()\n * 1 had dependencies that were being run by other worker:\n - 1 ParentTask()\n', s)
self.assertIn('\n\nThe other workers were:\n - other_worker ran 1 tasks\n\nDid not run any tasks\nThis progress looks :) because there were no failed tasks or missing external dependencies\n', s)
self.assertIn('\nScheduled 2 tasks of which:\n'
'* 2 were left pending, among these:\n'
' * 1 were being run by another worker:\n'
' - 1 LockTask()\n'
' * 1 had dependencies that were being run by other worker:\n'
' - 1 ParentTask()\n', s)
self.assertIn('\n\nThe other workers were:\n'
' - other_worker ran 1 tasks\n\n'
'Did not run any tasks\n'
'This progress looks :) because there were no failed '
'tasks or missing external dependencies\n', s)
self.assertNotIn('\n\n\n', s)

def test_larger_tree(self):
Expand Down
Loading

0 comments on commit 7b0d98a

Please sign in to comment.