diff --git a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py index 83387e81ce..e29276fc2b 100644 --- a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py +++ b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py @@ -71,6 +71,11 @@ def _check_active_backends(self, job_slice=None): log.debug("RegistryLockError: The job was most likely removed") log.debug("Reg LockError%s" % str(err)) + if job_slice and len(found_active_backends)==0: + log.debug("No active backends found with a job slice. Turning off the monitoring loop") + self.enabled = False + return + # If a backend is newly found as active, trigger its monitoring previously_active_backends = self.active_backends self.active_backends = found_active_backends @@ -84,7 +89,7 @@ def _check_active_backends(self, job_slice=None): if previously_active_backends: self._cleanup_finished_backends(previously_active_backends, found_active_backends) - self.loop.call_later(POLL_RATE, self._check_active_backends) + self.loop.call_later(POLL_RATE, self._check_active_backends, job_slice) def _log_backend_summary(self, active_backends): summary = "{" @@ -226,3 +231,27 @@ def stop(self): log.error(err) self._cleanup_scheduled_tasks() self.loop.call_soon_threadsafe(self.loop.stop) + + def runMonitoring(self, jobs=None): + """ + Enable/Run the monitoring loop and wait for the monitoring steps completion. + Parameters: + steps: number of monitoring steps to run + timeout: how long to wait for monitor steps termination (seconds) + jobs: a registry slice to be monitored (None -> all jobs) + Return: + False, if the loop cannot be started or the timeout occured while waiting for monitoring termination + True, if the monitoring steps were successfully executed + Note: + This method is meant to be used in Ganga scripts to request monitoring on demand. + """ + log.debug("runMonitoring") + if not self.alive: + log.error("Cannot run the monitoring loop. It has already been stopped") + return False + + self.enabled = True + self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE) + self._cleanup_dirty_jobs() + self.loop.call_soon_threadsafe(functools.partial(self._check_active_backends, jobs)) + return True diff --git a/ganga/GangaCore/Core/__init__.py b/ganga/GangaCore/Core/__init__.py index 1099a0b71b..0d3a6b952e 100644 --- a/ganga/GangaCore/Core/__init__.py +++ b/ganga/GangaCore/Core/__init__.py @@ -55,4 +55,4 @@ def bootstrap(reg_slice, interactive_session, my_interface=None): import GangaCore.GPI my_interface = GangaCore.GPI - exportToInterface(my_interface, 'runMonitoring', monitoring_component.enable, 'Functions') + exportToInterface(my_interface, 'runMonitoring', monitoring_component.runMonitoring, 'Functions')