diff --git a/src/DIRAC/Interfaces/API/DiracAdmin.py b/src/DIRAC/Interfaces/API/DiracAdmin.py index 54205ac7641..2d9905f5e66 100755 --- a/src/DIRAC/Interfaces/API/DiracAdmin.py +++ b/src/DIRAC/Interfaces/API/DiracAdmin.py @@ -425,6 +425,7 @@ def getJobPilotOutput(self, jobID, directory=""): :param job: JobID :type job: integer or string + :param str directory: a directory to download logs to. :return: S_OK,S_ERROR """ if not directory: @@ -468,13 +469,13 @@ def getJobPilotOutput(self, jobID, directory=""): ############################################################################# def getPilotOutput(self, gridReference, directory=""): - """Retrieve the pilot output (std.out and std.err) for an existing job in the WMS. + """Retrieve the pilot output (std.out and std.err) for an existing pilot reference. >>> gLogger.notice(dirac.getJobPilotOutput(12345)) {'OK': True, 'Value': {}} - :param job: JobID - :type job: integer or string + :param str gridReference: pilot reference + :param str directory: a directory to download logs to. :return: S_OK,S_ERROR """ if not isinstance(gridReference, str): diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py new file mode 100644 index 00000000000..9c607850123 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/DownloadPlugin.py @@ -0,0 +1,29 @@ +""" +Pilot logging plugin abstract class. +""" +from abc import ABC, abstractmethod +from DIRAC import S_OK, S_ERROR, gLogger + +sLog = gLogger.getSubLogger(__name__) + + +class DownloadPlugin(ABC): + """ + Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote + storage to the server. + Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by + :class:`PilotManagerHandler`. + """ + + @abstractmethod + def getRemotePilotLogs(self, pilotStamp, vo): + """ + Pilot log getter method, carrying the unique pilot identity and a VO name. + + :param str pilotStamp: pilot stamp. + :param str vo: VO name of a pilot which generated the logs. + :return: S_OK or S_ERROR + :rtype: dict + """ + + pass diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py new file mode 100644 index 00000000000..c74582829ff --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheDownloadPlugin.py @@ -0,0 +1,80 @@ +""" +File cache pilot log downloader. +""" +import os +import tempfile +from DIRAC import S_OK, S_ERROR, gLogger, gConfig +from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.Core.Utilities.Proxy import executeWithUserProxy +from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin + +sLog = gLogger.getSubLogger(__name__) + + +class FileCacheDownloadPlugin(DownloadPlugin): + """ + Class to handle log file download from an SE + """ + + def __init__(self): + """ + Sets the pilot log files location for a WebServer. + + """ + pass + + def getRemotePilotLogs(self, pilotStamp, vo=None): + """ + Pilot log getter method, carrying the unique pilot identity and a VO name. + + :param str pilotStamp: pilot stamp. + :param str vo: VO name of a user/pilot which generated the logs. + :return: S_OK or S_ERROR + :rtype: dict + """ + + opsHelper = Operations(vo=vo) + uploadPath = opsHelper.getValue("Pilot/UploadPath", "") + lfn = os.path.join(uploadPath, pilotStamp + ".log") + sLog.info("LFN to download: ", lfn) + filepath = tempfile.TemporaryDirectory().name + os.makedirs(filepath, exist_ok=True) + # get pilot credentials which uploaded logs to an external storage: + res = opsHelper.getOptionsDict("Shifter/DataManager") + if not res["OK"]: + message = f"No shifter defined for VO: {vo} - needed to retrieve the logs !" + sLog.error(message) + return S_ERROR(message) + + proxyUser = res["Value"].get("User") + proxyGroup = res["Value"].get("Group") + + sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") + + res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg + lfn, filepath, proxyUserName=proxyUser, proxyUserGroup=proxyGroup + ) + sLog.debug("getFile result:", res) + if not res["OK"]: + sLog.error(f"Failed to contact storage") + return res + if lfn in res["Value"]["Failed"]: + sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"]) + return S_ERROR(f"Failed to retrieve a log file: {res['Value']['Failed']}") + try: + filename = os.path.join(filepath, pilotStamp + ".log") + with open(filename) as f: + stdout = f.read() + except FileNotFoundError as err: + sLog.error(f"Error opening a log file:{filename}", err) + return S_ERROR(repr(err)) + + resultDict = {} + resultDict["StdOut"] = stdout + return S_OK(resultDict) + + @executeWithUserProxy + def _downloadLogs(self, lfn, filepath): + return DataManager().getFile(lfn, destinationDir=filepath) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index a8676d0fa6b..6e935b9b16b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -6,12 +6,12 @@ from DIRAC import S_OK, S_ERROR import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities - -from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.Core.Utilities.Decorators import deprecated +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup +from DIRAC.Core.DISET.RequestHandler import getServiceOption from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import ( getPilotCE, @@ -35,6 +35,10 @@ def initializeHandler(cls, serviceInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") + # prepare remote pilot plugin initialization + defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin" + cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass) + cls.loggingPlugin = None return S_OK() ############################################################################## @@ -92,9 +96,16 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC types_getPilotOutput = [str] def export_getPilotOutput(self, pilotReference): - """Get the pilot job standard output and standard error files for the Grid - job reference """ + Get the pilot job standard output and standard error files for a pilot reference. + Handles both classic, CE-based logs and remote logs. The type og logs returned is determined + by the server. + + :param str pilotReference: + :return: S_OK or S_ERROR Dirac object + :rtype: dict + """ + result = self.pilotAgentsDB.getPilotInfo(pilotReference) if not result["OK"]: self.log.error("Failed to get info for pilot", result["Message"]) @@ -104,6 +115,25 @@ def export_getPilotOutput(self, pilotReference): return S_ERROR("Pilot info is empty") pilotDict = result["Value"][pilotReference] + vo = getVOForGroup(pilotDict["OwnerGroup"]) + opsHelper = Operations(vo=vo) + remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False) + funcs = [self._getRemotePilotOutput, self._getPilotOutput] + if remote: + funcs.reverse() + + result = funcs[0](pilotReference, pilotDict) + if not result["OK"]: + self.log.warn("Pilot log retrieval failed (first attempt), remote ?", remote) + result = funcs[1](pilotReference, pilotDict) + return result + else: + return result + + def _getPilotOutput(self, pilotReference, pilotDict): + """Get the pilot job standard output and standard error files for the Grid + job reference + """ group = pilotDict["OwnerGroup"] @@ -158,6 +188,39 @@ def export_getPilotOutput(self, pilotReference): shutil.rmtree(ce.ceParameters["WorkingDirectory"]) return S_OK(resultDict) + def _getRemotePilotOutput(self, pilotReference, pilotDict): + """ + Get remote pilot log files. + + :param str pilotReference: + :return: S_OK Dirac object + :rtype: dict + """ + + pilotStamp = pilotDict["PilotStamp"] + group = pilotDict["OwnerGroup"] + vo = getVOForGroup(group) + + if self.loggingPlugin is None: + result = ObjectLoader().loadObject( + f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue + ) + if not result["OK"]: + self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}") + return result + + componentClass = result["Value"] + self.loggingPlugin = componentClass() + self.log.info("Loaded: PilotLoggingPlugin class", self.configValue) + + res = self.loggingPlugin.getRemotePilotLogs(pilotStamp, vo) + + if res["OK"]: + res["Value"]["OwnerGroup"] = group + res["Value"]["FileList"] = [] + # return res, correct or not + return res + ############################################################################## types_getPilotInfo = [[list, str]] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index a7aa4c615dc..eb43dda6910 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -178,7 +178,6 @@ def export_getJobPilotOutput(self, jobID): job reference :param str jobID: job ID - :return: S_OK(dict)/S_ERROR() """ pilotReference = ""