From 7d1b133c4c4e3035b30edb73731cdfc2efe1f017 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 30 Jul 2018 02:46:43 +0300 Subject: [PATCH] Replace luigi.task by RunOnceTask where possible --- test/scheduler_visualisation_test.py | 50 +++++++++++++--------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/test/scheduler_visualisation_test.py b/test/scheduler_visualisation_test.py index 1d35f69ffc..4edb668e51 100644 --- a/test/scheduler_visualisation_test.py +++ b/test/scheduler_visualisation_test.py @@ -19,7 +19,7 @@ import os import tempfile import time -from helpers import unittest +from helpers import unittest, RunOnceTask import luigi import luigi.notifications @@ -33,7 +33,7 @@ class DummyTask(luigi.Task): - task_id = luigi.Parameter() + task_id = luigi.IntParameter() def run(self): f = self.output().open('w') @@ -44,7 +44,7 @@ def output(self): class FactorTask(luigi.Task): - product = luigi.Parameter() + product = luigi.IntParameter() def requires(self): for factor in range(2, self.product): @@ -77,7 +77,10 @@ def complete(self): class FailingTask(luigi.Task): task_namespace = __name__ - task_id = luigi.Parameter() + task_id = luigi.IntParameter() + + def complete(self): + return False def run(self): raise Exception("Error Message") @@ -100,7 +103,6 @@ def run(self): class SchedulerVisualisationTest(unittest.TestCase): - def setUp(self): self.scheduler = luigi.scheduler.Scheduler() @@ -190,7 +192,7 @@ def complete(self): six.assertCountEqual(self, expected_nodes, graph) def test_truncate_graph_with_full_levels(self): - class BinaryTreeTask(luigi.Task): + class BinaryTreeTask(RunOnceTask): idx = luigi.IntParameter() def requires(self): @@ -226,7 +228,7 @@ def complete(self): graph = self.scheduler.dep_graph(root_task.task_id) self.assertEqual(10, len(graph)) - expected_nodes = [LinearTask(i).task_id for i in range(100, 91, -1)] +\ + expected_nodes = [LinearTask(i).task_id for i in range(100, 91, -1)] + \ [LinearTask(0).task_id] self.maxDiff = None six.assertCountEqual(self, expected_nodes, graph) @@ -387,30 +389,29 @@ def test_task_list_failed(self): def test_task_list_upstream_status(self): class A(luigi.ExternalTask): - pass + def complete(self): + return False class B(luigi.ExternalTask): - def complete(self): return True - class C(luigi.Task): - + class C(RunOnceTask): def requires(self): return [A(), B()] class F(luigi.Task): + def complete(self): + return False def run(self): raise Exception() - class D(luigi.Task): - + class D(RunOnceTask): def requires(self): return [F()] - class E(luigi.Task): - + class E(RunOnceTask): def requires(self): return [C(), D()] @@ -478,22 +479,20 @@ def test_fetch_error(self): self.assertTrue("Traceback" in error["error"]) def test_inverse_deps(self): - class X(luigi.Task): + class X(RunOnceTask): pass - class Y(luigi.Task): - + class Y(RunOnceTask): def requires(self): return [X()] - class Z(luigi.Task): - id = luigi.Parameter() + class Z(RunOnceTask): + id = luigi.IntParameter() def requires(self): return [Y()] - class ZZ(luigi.Task): - + class ZZ(RunOnceTask): def requires(self): return [Z(1), Z(2)] @@ -513,7 +512,6 @@ def assert_has_deps(task_id, deps): def test_simple_worker_list(self): class X(luigi.Task): - def run(self): self._complete = True @@ -536,12 +534,10 @@ def complete(self): def test_worker_list_pending_uniques(self): class X(luigi.Task): - def complete(self): return False class Y(X): - def requires(self): return X() @@ -562,7 +558,7 @@ class Z(Y): self.assertEqual(0, worker['num_running']) def test_worker_list_running(self): - class X(luigi.Task): + class X(RunOnceTask): n = luigi.IntParameter() w = luigi.worker.Worker(worker_id='w', scheduler=self.scheduler, worker_processes=3) @@ -584,7 +580,7 @@ class X(luigi.Task): self.assertEqual(1, worker['num_uniques']) def test_worker_list_disabled_worker(self): - class X(luigi.Task): + class X(RunOnceTask): pass with luigi.worker.Worker(worker_id='w', scheduler=self.scheduler) as w: