diff --git a/law/patches.py b/law/patches.py index e2e00678..6dc8f586 100644 --- a/law/patches.py +++ b/law/patches.py @@ -34,8 +34,8 @@ def before_run(func, force=False): if func not in _before_run_funcs or force: _before_run_funcs.append(func) return True - else: - return False + + return False def patch_all(): @@ -48,6 +48,7 @@ def patch_all(): _patched = True patch_schedule_and_run() + patch_task_process_run() patch_default_retcodes() patch_worker_add_task() patch_worker_add() @@ -96,6 +97,38 @@ def run(self): logger.debug("patched luigi.interface._schedule_and_run") +def patch_task_process_run(): + """ + Patches ``luigi.worker.TaskProcess.run`` to increase the severity of luigi's interface logger + when running local workflows that already yielded their branch tasks as dynamic dependencies. + """ + run_orig = luigi.worker.TaskProcess.run + + interface_logger = logging.getLogger("luigi-interface") + + @functools.wraps(run_orig) + def run(self): + previous_level = interface_logger.level + + # update logging for local workflows that already yielded their branch tasks + if ( + isinstance(self.task, law.LocalWorkflow) and + self.task.is_workflow() and + not self.task.local_workflow_require_branches and + self.task.workflow_proxy._local_workflow_has_yielded + ): + interface_logger.setLevel(logging.WARNING) + + try: + return run_orig(self) + finally: + interface_logger.setLevel(previous_level) + + luigi.worker.TaskProcess.run = run + + logger.debug("patched luigi.worker.TaskProcess.run") + + def patch_default_retcodes(): """ Sets the default luigi return codes in ``luigi.retcodes.retcode`` to: @@ -125,14 +158,28 @@ def patch_worker_add_task(): """ Patches the ``luigi.worker.Worker._add_task`` method to skip dependencies of the triggered task when running in a sandbox, as dependencies are already controlled from outside the sandbox. + Also, the severity of luigi's interface logging is increased when running in a sandbox. """ _add_task_orig = luigi.worker.Worker._add_task + interface_logger = logging.getLogger("luigi-interface") + @functools.wraps(_add_task_orig) def _add_task(self, *args, **kwargs): - if law.sandbox.base._sandbox_switched and "deps" in kwargs: - kwargs["deps"] = None - return _add_task_orig(self, *args, **kwargs) + previous_level = interface_logger.level + + if law.sandbox.base._sandbox_switched: + # increase the log level + interface_logger.setLevel(logging.WARNING) + + # reset deps + if "deps" in kwargs: + kwargs["deps"] = None + + try: + return _add_task_orig(self, *args, **kwargs) + finally: + interface_logger.setLevel(previous_level) luigi.worker.Worker._add_task = _add_task