Skip to content

Commit

Permalink
feat: pilot log download enhancement: Add remote logs download possib…
Browse files Browse the repository at this point in the history
…ility
  • Loading branch information
martynia committed Sep 12, 2023
1 parent df6a89f commit 5109e00
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 9 deletions.
7 changes: 4 additions & 3 deletions src/DIRAC/Interfaces/API/DiracAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def getJobPilotOutput(self, jobID, directory=""):
:param job: JobID
:type job: integer or string
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not directory:
Expand Down Expand Up @@ -468,13 +469,13 @@ def getJobPilotOutput(self, jobID, directory=""):

#############################################################################
def getPilotOutput(self, gridReference, directory=""):
"""Retrieve the pilot output (std.out and std.err) for an existing job in the WMS.
"""Retrieve the pilot output (std.out and std.err) for an existing pilot reference.
>>> gLogger.notice(dirac.getJobPilotOutput(12345))
{'OK': True, 'Value': {}}
:param job: JobID
:type job: integer or string
:param str gridReference: pilot reference
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not isinstance(gridReference, str):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Pilot logging plugin abstract class.
"""
from abc import ABC, abstractmethod
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class DownloadPlugin(ABC):
"""
Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote
storage to the server.
Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by
:class:`PilotManagerHandler`.
"""

@abstractmethod
def getRemotePilotLogs(self, pilotStamp, vo):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.
:param str pilotStamp: pilot stamp.
:param str vo: VO name of a pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
File cache pilot log downloader.
"""
import os
import tempfile
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin

sLog = gLogger.getSubLogger(__name__)


class FileCacheDownloadPlugin(DownloadPlugin):
"""
Class to handle log file download from an SE
"""

def __init__(self):
"""
Sets the pilot log files location for a WebServer.
"""
pass

def getRemotePilotLogs(self, pilotStamp, vo=None):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.
:param str pilotStamp: pilot stamp.
:param str vo: VO name of a user/pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

opsHelper = Operations(vo=vo)
uploadPath = opsHelper.getValue("Pilot/UploadPath", "")
lfn = os.path.join(uploadPath, pilotStamp + ".log")
sLog.info("LFN to download: ", lfn)
filepath = tempfile.TemporaryDirectory().name
os.makedirs(filepath, exist_ok=True)
# get pilot credentials which uploaded logs to an external storage:
res = opsHelper.getOptionsDict("Shifter/DataManager")
if not res["OK"]:
message = f"No shifter defined for VO: {vo} - needed to retrieve the logs !"
sLog.error(message)
return S_ERROR(message)

proxyUser = res["Value"].get("User")
proxyGroup = res["Value"].get("Group")

sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")

res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg
lfn, filepath, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
)
sLog.debug("getFile result:", res)
if not res["OK"]:
sLog.error(f"Failed to contact storage")
return res
if lfn in res["Value"]["Failed"]:
sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"])
return S_ERROR(f"Failed to retrieve a log file: {res['Value']['Failed']}")
try:
filename = os.path.join(filepath, pilotStamp + ".log")
with open(filename) as f:
stdout = f.read()
except FileNotFoundError as err:
sLog.error(f"Error opening a log file:{filename}", err)
return S_ERROR(repr(err))

resultDict = {}
resultDict["StdOut"] = stdout
return S_OK(resultDict)

@executeWithUserProxy
def _downloadLogs(self, lfn, filepath):
return DataManager().getFile(lfn, destinationDir=filepath)
73 changes: 68 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from DIRAC import S_OK, S_ERROR
import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities

from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Core.DISET.RequestHandler import getServiceOption
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import (
getPilotCE,
Expand All @@ -35,6 +35,10 @@ def initializeHandler(cls, serviceInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

# prepare remote pilot plugin initialization
defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin"
cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass)
cls.loggingPlugin = None
return S_OK()

##############################################################################
Expand Down Expand Up @@ -92,9 +96,16 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC
types_getPilotOutput = [str]

def export_getPilotOutput(self, pilotReference):
"""Get the pilot job standard output and standard error files for the Grid
job reference
"""
Get the pilot job standard output and standard error files for a pilot reference.
Handles both classic, CE-based logs and remote logs. The type og logs returned is determined
by the server.
:param str pilotReference:
:return: S_OK or S_ERROR Dirac object
:rtype: dict
"""

result = self.pilotAgentsDB.getPilotInfo(pilotReference)
if not result["OK"]:
self.log.error("Failed to get info for pilot", result["Message"])
Expand All @@ -104,6 +115,25 @@ def export_getPilotOutput(self, pilotReference):
return S_ERROR("Pilot info is empty")

pilotDict = result["Value"][pilotReference]
vo = getVOForGroup(pilotDict["OwnerGroup"])
opsHelper = Operations(vo=vo)
remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False)
funcs = [self._getRemotePilotOutput, self._getPilotOutput]
if remote:
funcs.reverse()

result = funcs[0](pilotReference, pilotDict)
if not result["OK"]:
self.log.warn("Pilot log retrieval failed (first attempt), remote ?", remote)
result = funcs[1](pilotReference, pilotDict)
return result
else:
return result

def _getPilotOutput(self, pilotReference, pilotDict):
"""Get the pilot job standard output and standard error files for the Grid
job reference
"""

group = pilotDict["OwnerGroup"]

Expand Down Expand Up @@ -158,6 +188,39 @@ def export_getPilotOutput(self, pilotReference):
shutil.rmtree(ce.ceParameters["WorkingDirectory"])
return S_OK(resultDict)

def _getRemotePilotOutput(self, pilotReference, pilotDict):
"""
Get remote pilot log files.
:param str pilotReference:
:return: S_OK Dirac object
:rtype: dict
"""

pilotStamp = pilotDict["PilotStamp"]
group = pilotDict["OwnerGroup"]
vo = getVOForGroup(group)

if self.loggingPlugin is None:
result = ObjectLoader().loadObject(
f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue
)
if not result["OK"]:
self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}")
return result

componentClass = result["Value"]
self.loggingPlugin = componentClass()
self.log.info("Loaded: PilotLoggingPlugin class", self.configValue)

res = self.loggingPlugin.getRemotePilotLogs(pilotStamp, vo)

if res["OK"]:
res["Value"]["OwnerGroup"] = group
res["Value"]["FileList"] = []
# return res, correct or not
return res

##############################################################################
types_getPilotInfo = [[list, str]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def export_getJobPilotOutput(self, jobID):
job reference
:param str jobID: job ID
:return: S_OK(dict)/S_ERROR()
"""
pilotReference = ""
Expand Down

0 comments on commit 5109e00

Please sign in to comment.