From 46536048575b8eeb7bab362633d01f970a711f20 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Mon, 20 May 2024 12:49:42 +0200 Subject: [PATCH 1/5] re-add runMonitoring --- ganga/GangaCore/Core/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ganga/GangaCore/Core/__init__.py b/ganga/GangaCore/Core/__init__.py index 1099a0b71b..0d3a6b952e 100644 --- a/ganga/GangaCore/Core/__init__.py +++ b/ganga/GangaCore/Core/__init__.py @@ -55,4 +55,4 @@ def bootstrap(reg_slice, interactive_session, my_interface=None): import GangaCore.GPI my_interface = GangaCore.GPI - exportToInterface(my_interface, 'runMonitoring', monitoring_component.enable, 'Functions') + exportToInterface(my_interface, 'runMonitoring', monitoring_component.runMonitoring, 'Functions') From efa1dc0f3a663d107ead73e357d8d28ce37672f6 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Mon, 20 May 2024 17:29:09 +0200 Subject: [PATCH 2/5] runMonitoring async --- .../MonitoringComponent/MonitoringService.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py index 83387e81ce..89ac3d25c6 100644 --- a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py +++ b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py @@ -226,3 +226,28 @@ def stop(self): log.error(err) self._cleanup_scheduled_tasks() self.loop.call_soon_threadsafe(self.loop.stop) + + def runMonitoring(self, jobs=None): + """ + Enable/Run the monitoring loop and wait for the monitoring steps completion. + Parameters: + steps: number of monitoring steps to run + timeout: how long to wait for monitor steps termination (seconds) + jobs: a registry slice to be monitored (None -> all jobs), it may be passed by the user so ._impl is stripped if needed + Return: + False, if the loop cannot be started or the timeout occured while waiting for monitoring termination + True, if the monitoring steps were successfully executed + Note: + This method is meant to be used in Ganga scripts to request monitoring on demand. + """ + + log.debug("runMonitoring") + if not self.alive: + log.error("Cannot run the monitoring loop. It has already been stopped") + return False + + self.enabled = True + self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE) + self._cleanup_dirty_jobs() + self.loop.call_soon_threadsafe(functools.partial(self._check_active_backends, jobs)) + return True From db544dd0c5903c9e84e9f4513362c7ed8002f6a2 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Mon, 20 May 2024 17:39:58 +0200 Subject: [PATCH 3/5] tidy --- ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py index 89ac3d25c6..2cdc9dd01e 100644 --- a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py +++ b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py @@ -233,7 +233,7 @@ def runMonitoring(self, jobs=None): Parameters: steps: number of monitoring steps to run timeout: how long to wait for monitor steps termination (seconds) - jobs: a registry slice to be monitored (None -> all jobs), it may be passed by the user so ._impl is stripped if needed + jobs: a registry slice to be monitored (None -> all jobs) Return: False, if the loop cannot be started or the timeout occured while waiting for monitoring termination True, if the monitoring steps were successfully executed From 55ca399245d0864eaf98115b01b4ccf2025005c2 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 4 Jun 2024 12:23:01 +0200 Subject: [PATCH 4/5] fix run monitoring --- .../Core/MonitoringComponent/MonitoringService.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py index 2cdc9dd01e..8a6b63eab0 100644 --- a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py +++ b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py @@ -71,6 +71,10 @@ def _check_active_backends(self, job_slice=None): log.debug("RegistryLockError: The job was most likely removed") log.debug("Reg LockError%s" % str(err)) + if job_slice and len(found_active_backends)==0: + log.debug("No active backends found with a job slice. Turning off the monitoring loop") + self.enabled = False + # If a backend is newly found as active, trigger its monitoring previously_active_backends = self.active_backends self.active_backends = found_active_backends @@ -84,7 +88,7 @@ def _check_active_backends(self, job_slice=None): if previously_active_backends: self._cleanup_finished_backends(previously_active_backends, found_active_backends) - self.loop.call_later(POLL_RATE, self._check_active_backends) + self.loop.call_later(POLL_RATE, self._check_active_backends, job_slice) def _log_backend_summary(self, active_backends): summary = "{" @@ -240,7 +244,6 @@ def runMonitoring(self, jobs=None): Note: This method is meant to be used in Ganga scripts to request monitoring on demand. """ - log.debug("runMonitoring") if not self.alive: log.error("Cannot run the monitoring loop. It has already been stopped") From 9ee763f0af377a7d7b7934bfe6d713809525ae57 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Tue, 4 Jun 2024 12:24:15 +0200 Subject: [PATCH 5/5] fix run monitoring --- ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py index 8a6b63eab0..e29276fc2b 100644 --- a/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py +++ b/ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py @@ -74,6 +74,7 @@ def _check_active_backends(self, job_slice=None): if job_slice and len(found_active_backends)==0: log.debug("No active backends found with a job slice. Turning off the monitoring loop") self.enabled = False + return # If a backend is newly found as active, trigger its monitoring previously_active_backends = self.active_backends