From 5109e00e9e7a01c877a652b9a9d9a5e9a742f99b Mon Sep 17 00:00:00 2001 From: martynia Date: Tue, 27 Jun 2023 12:50:26 +0200 Subject: [PATCH 1/4] feat: pilot log download enhancement: Add remote logs download possibility --- src/DIRAC/Interfaces/API/DiracAdmin.py | 7 +- .../PilotLoggingPlugins/DownloadPlugin.py | 29 +++++++ .../FileCacheDownloadPlugin.py | 80 +++++++++++++++++++ .../Service/PilotManagerHandler.py | 73 +++++++++++++++-- .../Service/WMSAdministratorHandler.py | 1 - 5 files changed, 181 insertions(+), 9 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py create mode 100644 src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py diff --git a/src/DIRAC/Interfaces/API/DiracAdmin.py b/src/DIRAC/Interfaces/API/DiracAdmin.py index 54205ac7641..2d9905f5e66 100755 --- a/src/DIRAC/Interfaces/API/DiracAdmin.py +++ b/src/DIRAC/Interfaces/API/DiracAdmin.py @@ -425,6 +425,7 @@ def getJobPilotOutput(self, jobID, directory=""): :param job: JobID :type job: integer or string + :param str directory: a directory to download logs to. :return: S_OK,S_ERROR """ if not directory: @@ -468,13 +469,13 @@ def getJobPilotOutput(self, jobID, directory=""): ############################################################################# def getPilotOutput(self, gridReference, directory=""): - """Retrieve the pilot output (std.out and std.err) for an existing job in the WMS. + """Retrieve the pilot output (std.out and std.err) for an existing pilot reference. >>> gLogger.notice(dirac.getJobPilotOutput(12345)) {'OK': True, 'Value': {}} - :param job: JobID - :type job: integer or string + :param str gridReference: pilot reference + :param str directory: a directory to download logs to. :return: S_OK,S_ERROR """ if not isinstance(gridReference, str): diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py new file mode 100644 index 00000000000..9c607850123 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py @@ -0,0 +1,29 @@ +""" +Pilot logging plugin abstract class. +""" +from abc import ABC, abstractmethod +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class DownloadPlugin(ABC): + """ + Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote + storage to the server. + Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by + :class:`PilotManagerHandler`. + """ + + @abstractmethod + def getRemotePilotLogs(self, pilotStamp, vo): + """ + Pilot log getter method, carrying the unique pilot identity and a VO name. + + :param str pilotStamp: pilot stamp. + :param str vo: VO name of a pilot which generated the logs. + :return: S_OK or S_ERROR + :rtype: dict + """ + + pass diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py new file mode 100644 index 00000000000..c74582829ff --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py @@ -0,0 +1,80 @@ +""" +File cache pilot log downloader. +""" +import os +import tempfile +from DIRAC import S_OK, S_ERROR, gLogger, gConfig +from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.Core.Utilities.Proxy import executeWithUserProxy +from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin + +sLog = gLogger.getSubLogger(__name__) + + +class FileCacheDownloadPlugin(DownloadPlugin): + """ + Class to handle log file download from an SE + """ + + def __init__(self): + """ + Sets the pilot log files location for a WebServer. + + """ + pass + + def getRemotePilotLogs(self, pilotStamp, vo=None): + """ + Pilot log getter method, carrying the unique pilot identity and a VO name. + + :param str pilotStamp: pilot stamp. + :param str vo: VO name of a user/pilot which generated the logs. + :return: S_OK or S_ERROR + :rtype: dict + """ + + opsHelper = Operations(vo=vo) + uploadPath = opsHelper.getValue("Pilot/UploadPath", "") + lfn = os.path.join(uploadPath, pilotStamp + ".log") + sLog.info("LFN to download: ", lfn) + filepath = tempfile.TemporaryDirectory().name + os.makedirs(filepath, exist_ok=True) + # get pilot credentials which uploaded logs to an external storage: + res = opsHelper.getOptionsDict("Shifter/DataManager") + if not res["OK"]: + message = f"No shifter defined for VO: {vo} - needed to retrieve the logs !" + sLog.error(message) + return S_ERROR(message) + + proxyUser = res["Value"].get("User") + proxyGroup = res["Value"].get("Group") + + sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") + + res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg + lfn, filepath, proxyUserName=proxyUser, proxyUserGroup=proxyGroup + ) + sLog.debug("getFile result:", res) + if not res["OK"]: + sLog.error(f"Failed to contact storage") + return res + if lfn in res["Value"]["Failed"]: + sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"]) + return S_ERROR(f"Failed to retrieve a log file: {res['Value']['Failed']}") + try: + filename = os.path.join(filepath, pilotStamp + ".log") + with open(filename) as f: + stdout = f.read() + except FileNotFoundError as err: + sLog.error(f"Error opening a log file:{filename}", err) + return S_ERROR(repr(err)) + + resultDict = {} + resultDict["StdOut"] = stdout + return S_OK(resultDict) + + @executeWithUserProxy + def _downloadLogs(self, lfn, filepath): + return DataManager().getFile(lfn, destinationDir=filepath) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index a8676d0fa6b..6e935b9b16b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -6,12 +6,12 @@ from DIRAC import S_OK, S_ERROR import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities - -from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.Core.Utilities.Decorators import deprecated +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup +from DIRAC.Core.DISET.RequestHandler import getServiceOption from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import ( getPilotCE, @@ -35,6 +35,10 @@ def initializeHandler(cls, serviceInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") + # prepare remote pilot plugin initialization + defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin" + cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass) + cls.loggingPlugin = None return S_OK() ############################################################################## @@ -92,9 +96,16 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC types_getPilotOutput = [str] def export_getPilotOutput(self, pilotReference): - """Get the pilot job standard output and standard error files for the Grid - job reference """ + Get the pilot job standard output and standard error files for a pilot reference. + Handles both classic, CE-based logs and remote logs. The type og logs returned is determined + by the server. + + :param str pilotReference: + :return: S_OK or S_ERROR Dirac object + :rtype: dict + """ + result = self.pilotAgentsDB.getPilotInfo(pilotReference) if not result["OK"]: self.log.error("Failed to get info for pilot", result["Message"]) @@ -104,6 +115,25 @@ def export_getPilotOutput(self, pilotReference): return S_ERROR("Pilot info is empty") pilotDict = result["Value"][pilotReference] + vo = getVOForGroup(pilotDict["OwnerGroup"]) + opsHelper = Operations(vo=vo) + remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False) + funcs = [self._getRemotePilotOutput, self._getPilotOutput] + if remote: + funcs.reverse() + + result = funcs[0](pilotReference, pilotDict) + if not result["OK"]: + self.log.warn("Pilot log retrieval failed (first attempt), remote ?", remote) + result = funcs[1](pilotReference, pilotDict) + return result + else: + return result + + def _getPilotOutput(self, pilotReference, pilotDict): + """Get the pilot job standard output and standard error files for the Grid + job reference + """ group = pilotDict["OwnerGroup"] @@ -158,6 +188,39 @@ def export_getPilotOutput(self, pilotReference): shutil.rmtree(ce.ceParameters["WorkingDirectory"]) return S_OK(resultDict) + def _getRemotePilotOutput(self, pilotReference, pilotDict): + """ + Get remote pilot log files. + + :param str pilotReference: + :return: S_OK Dirac object + :rtype: dict + """ + + pilotStamp = pilotDict["PilotStamp"] + group = pilotDict["OwnerGroup"] + vo = getVOForGroup(group) + + if self.loggingPlugin is None: + result = ObjectLoader().loadObject( + f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue + ) + if not result["OK"]: + self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}") + return result + + componentClass = result["Value"] + self.loggingPlugin = componentClass() + self.log.info("Loaded: PilotLoggingPlugin class", self.configValue) + + res = self.loggingPlugin.getRemotePilotLogs(pilotStamp, vo) + + if res["OK"]: + res["Value"]["OwnerGroup"] = group + res["Value"]["FileList"] = [] + # return res, correct or not + return res + ############################################################################## types_getPilotInfo = [[list, str]] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index a7aa4c615dc..eb43dda6910 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -178,7 +178,6 @@ def export_getJobPilotOutput(self, jobID): job reference :param str jobID: job ID - :return: S_OK(dict)/S_ERROR() """ pilotReference = "" From 21f3471c58b9538dc6f72bd5c542bbc08929afbd Mon Sep 17 00:00:00 2001 From: martynia Date: Mon, 11 Sep 2023 17:35:18 +0200 Subject: [PATCH 2/4] feat: add a remote non-finalised log file getter from Tornado --- .../FileCacheLoggingPlugin.py | 20 +++++++++++++++++++ .../Service/TornadoPilotLoggingHandler.py | 10 ++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py index 32ca1eb65cb..91ea6d89a3a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py @@ -104,6 +104,26 @@ def getMeta(self): return S_OK(self.meta) return S_ERROR("No Pilot logging directory defined") + def getLogs(self, logfile, vo): + """ + Get the "instant" logs from Tornado log storage area. There are not finalised (incomplete) logs. + + :return: Dirac S_OK containing the logs + :rtype: dict + """ + + filename = os.path.join(self.meta["LogPath"], vo, logfile) + resultDict = {} + try: + with open(filename) as f: + stdout = f.read() + resultDict["StdOut"] = stdout + except FileNotFoundError as err: + sLog.error(f"Error opening a log file:{filename}", err) + return S_ERROR(repr(err)) + + return S_OK(resultDict) + def _verifyUUIDPattern(self, logfile): """ Verify if the name of the log file matches the required pattern. diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py index cd98938096f..512a136ca7c 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -80,6 +80,16 @@ def export_getMetadata(self): """ return self.loggingPlugin.getMeta() + def export_getLogs(self, logfile, vo): + """ + Get (not yet finalised) logs from the server. + + :return: S_OK containing a metadata dictionary + :rtype: dict + """ + + return self.loggingPlugin.getLogs(logfile, vo) + def export_finaliseLogs(self, payload, pilotUUID): """ Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used. From f5613fa118b522308aa32b25c48227d01d812a4a Mon Sep 17 00:00:00 2001 From: martynia Date: Wed, 13 Sep 2023 13:12:43 +0200 Subject: [PATCH 3/4] feat: implement log file download from the server --- .../FileCacheDownloadPlugin.py | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py index c74582829ff..58b34bbd2c2 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py @@ -9,6 +9,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.Utilities.Proxy import executeWithUserProxy from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin +from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient sLog = gLogger.getSubLogger(__name__) @@ -20,10 +21,10 @@ class FileCacheDownloadPlugin(DownloadPlugin): def __init__(self): """ - Sets the pilot log files location for a WebServer. + Sets the client for downloading incomplete log files from the server cache. """ - pass + self.tornadoClient = TornadoPilotLoggingClient() def getRemotePilotLogs(self, pilotStamp, vo=None): """ @@ -39,8 +40,7 @@ def getRemotePilotLogs(self, pilotStamp, vo=None): uploadPath = opsHelper.getValue("Pilot/UploadPath", "") lfn = os.path.join(uploadPath, pilotStamp + ".log") sLog.info("LFN to download: ", lfn) - filepath = tempfile.TemporaryDirectory().name - os.makedirs(filepath, exist_ok=True) + # get pilot credentials which uploaded logs to an external storage: res = opsHelper.getOptionsDict("Shifter/DataManager") if not res["OK"]: @@ -53,9 +53,22 @@ def getRemotePilotLogs(self, pilotStamp, vo=None): sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") - res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg - lfn, filepath, proxyUserName=proxyUser, proxyUserGroup=proxyGroup - ) + # attempt to get logs from server first: + res = self._getLogsFromServer(pilotStamp, vo) + if not res["OK"]: + # from SE: + res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg + lfn, pilotStamp, proxyUserName=proxyUser, proxyUserGroup=proxyGroup + ) + + return res + + @executeWithUserProxy + def _downloadLogs(self, lfn, pilotStamp): + filepath = tempfile.TemporaryDirectory().name + os.makedirs(filepath, exist_ok=True) + + res = DataManager().getFile(lfn, destinationDir=filepath) sLog.debug("getFile result:", res) if not res["OK"]: sLog.error(f"Failed to contact storage") @@ -76,5 +89,19 @@ def getRemotePilotLogs(self, pilotStamp, vo=None): return S_OK(resultDict) @executeWithUserProxy - def _downloadLogs(self, lfn, filepath): - return DataManager().getFile(lfn, destinationDir=filepath) + def _getLogsFromServer(self, logfile, vo): + """ + Get a file from the server cache area. The file is most likely not finalised, since finalised files + are copied to an SE and deleted. Both logfile.log and logfile are tried should the finalised file still + be on the server. + + :param str logfile: pilot log filename + :param str vo: VO name + :return: S_OK or S_ERROR + :rtype: dict + """ + + res = self.tornadoClient.getLogs(logfile, vo) + if not res["OK"]: + res = self.tornadoClient.getLogs(logfile + ".log", vo) + return res From bad7e9edbb92134b7f6848eb4bfe9323f91fe8a7 Mon Sep 17 00:00:00 2001 From: martynia Date: Thu, 14 Sep 2023 09:50:08 +0200 Subject: [PATCH 4/4] fix: get classic logs first by default --- .../WorkloadManagementSystem/Service/PilotManagerHandler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index 6e935b9b16b..dcf4c053eed 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -98,7 +98,7 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC def export_getPilotOutput(self, pilotReference): """ Get the pilot job standard output and standard error files for a pilot reference. - Handles both classic, CE-based logs and remote logs. The type og logs returned is determined + Handles both classic, CE-based logs and remote logs. The type of logs returned is determined by the server. :param str pilotReference: @@ -118,7 +118,8 @@ def export_getPilotOutput(self, pilotReference): vo = getVOForGroup(pilotDict["OwnerGroup"]) opsHelper = Operations(vo=vo) remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False) - funcs = [self._getRemotePilotOutput, self._getPilotOutput] + # classic logs first, by default + funcs = [self._getPilotOutput, self._getRemotePilotOutput] if remote: funcs.reverse()