Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error message in case of unfulfilled dependencies with single output #3281

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions luigi/worker.py
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
from luigi.target import Target
from luigi.task import Task, Config, DynamicRequirements
from luigi.task import Task, Config, DynamicRequirements, flatten
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter
@@ -185,7 +185,7 @@
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()]

Check warning on line 188 in luigi/worker.py

Codecov / codecov/patch

luigi/worker.py#L188

Added line #L188 was not covered by tests
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
else:
37 changes: 37 additions & 0 deletions test/worker_task_test.py
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import luigi
import luigi.date_interval
import luigi.notifications
from luigi.mock import MockTarget
from luigi.worker import TaskException, TaskProcess
from luigi.scheduler import DONE, FAILED

@@ -106,6 +107,42 @@ def complete(self):
None
))

def test_fail_on_unfulfilled_dependencies(self):
class NeverCompleteTask(luigi.Task):
def complete(self):
return False

class A(NeverCompleteTask):
def output(self):
return []

class B(NeverCompleteTask):
def output(self):
return MockTarget("foo-B")

class C(NeverCompleteTask):
def output(self):
return [MockTarget("foo-C1"), MockTarget("foo-C2")]

class Main(NeverCompleteTask):
def requires(self):
return [A(), B(), C()]

task = Main()
result_queue = multiprocessing.Queue()
task_process = TaskProcess(task, 1, result_queue, mock.Mock())

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
expected_missing = [A().task_id, f"{B().task_id} (foo-B)", f"{C().task_id} (foo-C1, foo-C2)"]
mock_put.assert_called_once_with((
task.task_id,
FAILED,
StringContaining(f"Unfulfilled dependencies at run time: {', '.join(expected_missing)}"),
expected_missing,
[],
))

def test_cleanup_children_on_terminate(self):
"""
Subprocesses spawned by tasks should be terminated on terminate
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ deps =
pytest<7.0
pytest-cov>=2.0,<3.0
mock<2.0
moto>=1.3.10
moto>=1.3.10,<5.0
HTTPretty==0.8.10
docker>=2.1.0
boto>=2.42,<3.0