Skip to content

Commit

Permalink
Silence redundant interface logs when sandboxed.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Nov 9, 2023
1 parent a9dc218 commit 5561678
Showing 1 changed file with 52 additions and 5 deletions.
57 changes: 52 additions & 5 deletions law/patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def before_run(func, force=False):
if func not in _before_run_funcs or force:
_before_run_funcs.append(func)
return True
else:
return False

return False


def patch_all():
Expand All @@ -48,6 +48,7 @@ def patch_all():
_patched = True

patch_schedule_and_run()
patch_task_process_run()
patch_default_retcodes()
patch_worker_add_task()
patch_worker_add()
Expand Down Expand Up @@ -96,6 +97,38 @@ def run(self):
logger.debug("patched luigi.interface._schedule_and_run")


def patch_task_process_run():
"""
Patches ``luigi.worker.TaskProcess.run`` to increase the severity of luigi's interface logger
when running local workflows that already yielded their branch tasks as dynamic dependencies.
"""
run_orig = luigi.worker.TaskProcess.run

interface_logger = logging.getLogger("luigi-interface")

@functools.wraps(run_orig)
def run(self):
previous_level = interface_logger.level

# update logging for local workflows that already yielded their branch tasks
if (
isinstance(self.task, law.LocalWorkflow) and
self.task.is_workflow() and
not self.task.local_workflow_require_branches and
self.task.workflow_proxy._local_workflow_has_yielded
):
interface_logger.setLevel(logging.WARNING)

try:
return run_orig(self)
finally:
interface_logger.setLevel(previous_level)

luigi.worker.TaskProcess.run = run

logger.debug("patched luigi.worker.TaskProcess.run")


def patch_default_retcodes():
"""
Sets the default luigi return codes in ``luigi.retcodes.retcode`` to:
Expand Down Expand Up @@ -125,14 +158,28 @@ def patch_worker_add_task():
"""
Patches the ``luigi.worker.Worker._add_task`` method to skip dependencies of the triggered task
when running in a sandbox, as dependencies are already controlled from outside the sandbox.
Also, the severity of luigi's interface logging is increased when running in a sandbox.
"""
_add_task_orig = luigi.worker.Worker._add_task

interface_logger = logging.getLogger("luigi-interface")

@functools.wraps(_add_task_orig)
def _add_task(self, *args, **kwargs):
if law.sandbox.base._sandbox_switched and "deps" in kwargs:
kwargs["deps"] = None
return _add_task_orig(self, *args, **kwargs)
previous_level = interface_logger.level

if law.sandbox.base._sandbox_switched:
# increase the log level
interface_logger.setLevel(logging.WARNING)

# reset deps
if "deps" in kwargs:
kwargs["deps"] = None

try:
return _add_task_orig(self, *args, **kwargs)
finally:
interface_logger.setLevel(previous_level)

luigi.worker.Worker._add_task = _add_task

Expand Down

0 comments on commit 5561678

Please sign in to comment.