Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace luigi.Task by RunOnceTask in scheduler_visualisation_test #2476

Merged
merged 1 commit into from
Jul 30, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 23 additions & 27 deletions test/scheduler_visualisation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import os
import tempfile
import time
from helpers import unittest
from helpers import unittest, RunOnceTask

import luigi
import luigi.notifications
Expand All @@ -33,7 +33,7 @@


class DummyTask(luigi.Task):
task_id = luigi.Parameter()
task_id = luigi.IntParameter()

def run(self):
f = self.output().open('w')
Expand All @@ -44,7 +44,7 @@ def output(self):


class FactorTask(luigi.Task):
product = luigi.Parameter()
product = luigi.IntParameter()

def requires(self):
for factor in range(2, self.product):
Expand Down Expand Up @@ -77,7 +77,10 @@ def complete(self):

class FailingTask(luigi.Task):
task_namespace = __name__
task_id = luigi.Parameter()
task_id = luigi.IntParameter()

def complete(self):
return False

def run(self):
raise Exception("Error Message")
Expand All @@ -100,7 +103,6 @@ def run(self):


class SchedulerVisualisationTest(unittest.TestCase):

def setUp(self):
self.scheduler = luigi.scheduler.Scheduler()

Expand Down Expand Up @@ -190,7 +192,7 @@ def complete(self):
six.assertCountEqual(self, expected_nodes, graph)

def test_truncate_graph_with_full_levels(self):
class BinaryTreeTask(luigi.Task):
class BinaryTreeTask(RunOnceTask):
idx = luigi.IntParameter()

def requires(self):
Expand Down Expand Up @@ -226,7 +228,7 @@ def complete(self):

graph = self.scheduler.dep_graph(root_task.task_id)
self.assertEqual(10, len(graph))
expected_nodes = [LinearTask(i).task_id for i in range(100, 91, -1)] +\
expected_nodes = [LinearTask(i).task_id for i in range(100, 91, -1)] + \
[LinearTask(0).task_id]
self.maxDiff = None
six.assertCountEqual(self, expected_nodes, graph)
Expand Down Expand Up @@ -387,30 +389,29 @@ def test_task_list_failed(self):

def test_task_list_upstream_status(self):
class A(luigi.ExternalTask):
pass
def complete(self):
return False

class B(luigi.ExternalTask):

def complete(self):
return True

class C(luigi.Task):

class C(RunOnceTask):
def requires(self):
return [A(), B()]

class F(luigi.Task):
def complete(self):
return False

def run(self):
raise Exception()

class D(luigi.Task):

class D(RunOnceTask):
def requires(self):
return [F()]

class E(luigi.Task):

class E(RunOnceTask):
def requires(self):
return [C(), D()]

Expand Down Expand Up @@ -478,22 +479,20 @@ def test_fetch_error(self):
self.assertTrue("Traceback" in error["error"])

def test_inverse_deps(self):
class X(luigi.Task):
class X(RunOnceTask):
pass

class Y(luigi.Task):

class Y(RunOnceTask):
def requires(self):
return [X()]

class Z(luigi.Task):
id = luigi.Parameter()
class Z(RunOnceTask):
id = luigi.IntParameter()

def requires(self):
return [Y()]

class ZZ(luigi.Task):

class ZZ(RunOnceTask):
def requires(self):
return [Z(1), Z(2)]

Expand All @@ -513,7 +512,6 @@ def assert_has_deps(task_id, deps):

def test_simple_worker_list(self):
class X(luigi.Task):

def run(self):
self._complete = True

Expand All @@ -536,12 +534,10 @@ def complete(self):

def test_worker_list_pending_uniques(self):
class X(luigi.Task):

def complete(self):
return False

class Y(X):

def requires(self):
return X()

Expand All @@ -562,7 +558,7 @@ class Z(Y):
self.assertEqual(0, worker['num_running'])

def test_worker_list_running(self):
class X(luigi.Task):
class X(RunOnceTask):
n = luigi.IntParameter()

w = luigi.worker.Worker(worker_id='w', scheduler=self.scheduler, worker_processes=3)
Expand All @@ -584,7 +580,7 @@ class X(luigi.Task):
self.assertEqual(1, worker['num_uniques'])

def test_worker_list_disabled_worker(self):
class X(luigi.Task):
class X(RunOnceTask):
pass

with luigi.worker.Worker(worker_id='w', scheduler=self.scheduler) as w:
Expand Down