Skip to content

Commit

Permalink
feat: pilot log download enhancement: Add remote logs download to dir…
Browse files Browse the repository at this point in the history
…ac-admin-get-pilot-output and dirac-admin-get-job-pilot-outpu
  • Loading branch information
martynia committed Jul 21, 2023
1 parent e653528 commit 52086da
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 12 deletions.
21 changes: 16 additions & 5 deletions src/DIRAC/Interfaces/API/DiracAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def resetJob(self, jobID):
return result

#############################################################################
def getJobPilotOutput(self, jobID, directory=""):
def getJobPilotOutput(self, jobID, directory="", remote=False):
"""Retrieve the pilot output for an existing job in the WMS.
The output will be retrieved in a local directory unless
otherwise specified.
Expand All @@ -441,6 +441,8 @@ def getJobPilotOutput(self, jobID, directory=""):
:param job: JobID
:type job: integer or string
:param str directory: a directory to download logs to.
:param bool remote: if True a remote storage is used, otherwise classic CE logs
:return: S_OK,S_ERROR
"""
if not directory:
Expand Down Expand Up @@ -483,14 +485,17 @@ def getJobPilotOutput(self, jobID, directory=""):
return result

#############################################################################
def getPilotOutput(self, gridReference, directory=""):
def getPilotOutput(self, gridReference, directory="", remote=False):
"""Retrieve the pilot output (std.out and std.err) for an existing job in the WMS.
When remote flag is true, logs are retrieved from a remote storage.
>>> gLogger.notice(dirac.getJobPilotOutput(12345))
{'OK': True, 'Value': {}}
:param job: JobID
:type job: integer or string
:param str gridReference: pilot reference
:param str directory: a directory to download logs to.
:param bool remote: if True a remote storage is used, otherwise classic CE logs
are downloaded
:return: S_OK,S_ERROR
"""
if not isinstance(gridReference, str):
Expand All @@ -502,7 +507,13 @@ def getPilotOutput(self, gridReference, directory=""):
if not os.path.exists(directory):
return self._errorReport(f"Directory {directory} does not exist")

result = PilotManagerClient().getPilotOutput(gridReference)
if remote:
# get remote pilot logs:
result = PilotManagerClient().getRemotePilotOutput(gridReference)
else:
# get classic pilot log files from CE
result = PilotManagerClient().getPilotOutput(gridReference)

if not result["OK"]:
return result

Expand Down
9 changes: 7 additions & 2 deletions src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
def main():
# Registering arguments will automatically add their description to the help menu
Script.registerArgument(["PilotID: Grid ID of the pilot"])
_, args = Script.parseCommandLine(ignoreErrors=True)
Script.registerSwitch("r", "", "get remote pilot output")
switches, args = Script.parseCommandLine(ignoreErrors=True)

from DIRAC import exit as DIRACExit
from DIRAC.Interfaces.API.DiracAdmin import DiracAdmin

remote = False
if ("r", "") in switches:
remote = True

diracAdmin = DiracAdmin()
exitCode = 0
errorList = []

for gridID in args:
result = diracAdmin.getPilotOutput(gridID)
result = diracAdmin.getPilotOutput(gridID, remote=remote)
if not result["OK"]:
errorList.append((gridID, result["Message"]))
exitCode = 2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Pilot logging plugin abstract class.
"""
from abc import ABC, abstractmethod
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class DownloadPlugin(ABC):
"""
Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote
storage to the server.
Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by
:class:`PilotManagerHandler`.
"""

@abstractmethod
def getRemotePilotLogs(self, pilotStamp, vo):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.
:param str pilotStamp: pilot stamp.
:param str vo: VO name of a pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
File cache pilot log downloader.
"""
import os
import tempfile
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin

sLog = gLogger.getSubLogger(__name__)


class FileCacheDownloadPlugin(DownloadPlugin):
"""
Class to handle log file download from an SE
"""

def __init__(self):
"""
Sets the pilot log files location for a WebServer.
"""
self.setup = gConfig.getValue("/DIRAC/Setup", None)

@executeWithUserProxy
def getRemotePilotLogs(self, pilotStamp, vo=None):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.
:param str pilotStamp: pilot stamp.
:param str vo: VO name of a user/pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

opsHelper = Operations(vo=vo, setup=self.setup)
uploadPath = opsHelper.getValue("Pilot/UploadPath", "")
lfn = os.path.join(uploadPath, pilotStamp + ".log")
sLog.info("LFN to download: ", lfn)
filepath = tempfile.TemporaryDirectory().name
os.makedirs(filepath, exist_ok=True)
res = DataManager().getFile(lfn, destinationDir=filepath)
sLog.debug("getFile result:", res)
if not res["OK"]:
sLog.error(f"Failed to contact storage")
return res
if lfn in res["Value"]["Failed"]:
sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"])
return res
try:
filename = os.path.join(filepath, pilotStamp + ".log")
with open(filename) as f:
stdout = f.read()
except FileNotFoundError as err:
sLog.error(f"Error opening a log file:{filename}", err)
return S_ERROR(repr(err))

resultDict = {}
resultDict["StdOut"] = stdout
return S_OK(resultDict)
65 changes: 62 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
"""
import shutil
import datetime
import os

from DIRAC import S_OK, S_ERROR
import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities

from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername, getVOForGroup
from DIRAC.Core.DISET.RequestHandler import getServiceOption
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import (
getPilotCE,
Expand All @@ -32,7 +33,10 @@ def initializeHandler(cls, serviceInfoDict):

except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

# prepare remote pilot plugin initialization
defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin"
cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass)
cls.loggingPlugin = None
return S_OK()

##############################################################################
Expand Down Expand Up @@ -152,6 +156,61 @@ def export_getPilotOutput(self, pilotReference):
shutil.rmtree(ce.ceParameters["WorkingDirectory"])
return S_OK(resultDict)

##############################################################################
types_getRemotePilotOutput = [str]

def export_getRemotePilotOutput(self, pilotReference):
"""
Get remote pilot log files.
:param str pilotReference:
:return: S_OK Dirac object
:rtype: dict
"""

# get pilot stamp by pilot reference:
result = self.pilotAgentsDB.getPilotInfo(pilotReference)
if not result["OK"]:
self.log.error("Failed to get info for pilot", result["Message"])
return S_ERROR("Failed to get info for pilot")
if not result["Value"]:
self.log.warn("The pilot info is empty", pilotReference)
return S_ERROR("Pilot info is empty")

pilotDict = result["Value"][pilotReference]
pilotStamp = pilotDict["PilotStamp"]
owner = pilotDict["OwnerDN"]
group = pilotDict["OwnerGroup"]

if self.loggingPlugin is None:
result = ObjectLoader().loadObject(
f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue
)
if not result["OK"]:
self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}")
return result

componentClass = result["Value"]
self.loggingPlugin = componentClass()
self.log.info("Loaded: PilotLoggingPlugin class", self.configValue)

remoteCredentials = self.getRemoteCredentials()
proxyUser = remoteCredentials["user"]
proxyGroup = remoteCredentials["group"]
vo = getVOForGroup(proxyGroup)
self.log.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")

res = self.loggingPlugin.getRemotePilotLogs(
pilotStamp, vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
) # pylint: disable=unexpected-keyword-arg

if res["OK"]:
res["Value"]["OwnerDN"] = owner
res["Value"]["OwnerGroup"] = group
res["Value"]["FileList"] = []
# return res, correct or not
return res

##############################################################################
types_getPilotInfo = [[list, str]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ def export_getSiteMaskSummary(cls):
##############################################################################
types_getJobPilotOutput = [[str, int]]

def export_getJobPilotOutput(self, jobID):
def export_getJobPilotOutput(self, jobID, remote=False):
"""Get the pilot job standard output and standard error files for the DIRAC
job reference
:param str jobID: job ID
:param bool remote: if True a remote storage is used, otherwise classic CE logs
:return: S_OK(dict)/S_ERROR()
"""
Expand Down Expand Up @@ -209,7 +210,7 @@ def export_getJobPilotOutput(self, jobID):
c = cycle

if pilotReference:
return self.pilotManager.getPilotOutput(pilotReference)
return self.pilotManager.getPilotOutput(pilotReference, remote=remote)
return S_ERROR("No pilot job reference found")

##############################################################################
Expand Down

0 comments on commit 52086da

Please sign in to comment.