Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration] Modify dirac-admin-get-pilot-output to get remote pilot log #7105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/DIRAC/Interfaces/API/DiracAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def getJobPilotOutput(self, jobID, directory=""):

:param job: JobID
:type job: integer or string
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not directory:
Expand Down Expand Up @@ -468,13 +469,13 @@ def getJobPilotOutput(self, jobID, directory=""):

#############################################################################
def getPilotOutput(self, gridReference, directory=""):
"""Retrieve the pilot output (std.out and std.err) for an existing job in the WMS.
"""Retrieve the pilot output (std.out and std.err) for an existing pilot reference.

>>> gLogger.notice(dirac.getJobPilotOutput(12345))
{'OK': True, 'Value': {}}

:param job: JobID
:type job: integer or string
:param str gridReference: pilot reference
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not isinstance(gridReference, str):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Pilot logging plugin abstract class.
"""
from abc import ABC, abstractmethod
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class DownloadPlugin(ABC):
"""
Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote
storage to the server.
Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by
:class:`PilotManagerHandler`.
"""

@abstractmethod
def getRemotePilotLogs(self, pilotStamp, vo):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.

:param str pilotStamp: pilot stamp.
:param str vo: VO name of a pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
File cache pilot log downloader.
"""
import os
import tempfile
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient

sLog = gLogger.getSubLogger(__name__)


class FileCacheDownloadPlugin(DownloadPlugin):
"""
Class to handle log file download from an SE
"""

def __init__(self):
"""
Sets the client for downloading incomplete log files from the server cache.

"""
self.tornadoClient = TornadoPilotLoggingClient()

def getRemotePilotLogs(self, pilotStamp, vo=None):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.

:param str pilotStamp: pilot stamp.
:param str vo: VO name of a user/pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

opsHelper = Operations(vo=vo)
uploadPath = opsHelper.getValue("Pilot/UploadPath", "")
lfn = os.path.join(uploadPath, pilotStamp + ".log")
sLog.info("LFN to download: ", lfn)
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really an LFN? Looks like just a path to a file to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. It is passed to getFile later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid we have been using this pattern in other places, but os.path is not the most precise way to manipulate/create LFNs, as this is OS-dependent. We might argue that we only run on Linux so it does not really matter, but more precise would be to at least to use posixpath instead (@chaen and @atsareg might comment)


# get pilot credentials which uploaded logs to an external storage:
res = opsHelper.getOptionsDict("Shifter/DataManager")
if not res["OK"]:
message = f"No shifter defined for VO: {vo} - needed to retrieve the logs !"
sLog.error(message)
return S_ERROR(message)

proxyUser = res["Value"].get("User")
proxyGroup = res["Value"].get("Group")

sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")

# attempt to get logs from server first:
res = self._getLogsFromServer(pilotStamp, vo)
if not res["OK"]:
# from SE:
res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg
lfn, pilotStamp, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
)

return res

@executeWithUserProxy
def _downloadLogs(self, lfn, pilotStamp):
filepath = tempfile.TemporaryDirectory().name
os.makedirs(filepath, exist_ok=True)

res = DataManager().getFile(lfn, destinationDir=filepath)
sLog.debug("getFile result:", res)
if not res["OK"]:
sLog.error(f"Failed to contact storage")
return res
if lfn in res["Value"]["Failed"]:
sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"])
return S_ERROR(f"Failed to retrieve a log file: {res['Value']['Failed']}")
try:
filename = os.path.join(filepath, pilotStamp + ".log")
with open(filename) as f:
stdout = f.read()
except FileNotFoundError as err:
sLog.error(f"Error opening a log file:{filename}", err)
return S_ERROR(repr(err))

resultDict = {}
resultDict["StdOut"] = stdout
return S_OK(resultDict)
fstagni marked this conversation as resolved.
Show resolved Hide resolved

@executeWithUserProxy
def _getLogsFromServer(self, logfile, vo):
"""
Get a file from the server cache area. The file is most likely not finalised, since finalised files
are copied to an SE and deleted. Both logfile.log and logfile are tried should the finalised file still
be on the server.

:param str logfile: pilot log filename
:param str vo: VO name
:return: S_OK or S_ERROR
:rtype: dict
"""

res = self.tornadoClient.getLogs(logfile, vo)
if not res["OK"]:
res = self.tornadoClient.getLogs(logfile + ".log", vo)
return res
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ def getMeta(self):
return S_OK(self.meta)
return S_ERROR("No Pilot logging directory defined")

def getLogs(self, logfile, vo):
"""
Get the "instant" logs from Tornado log storage area. There are not finalised (incomplete) logs.

:return: Dirac S_OK containing the logs
:rtype: dict
"""

filename = os.path.join(self.meta["LogPath"], vo, logfile)
resultDict = {}
try:
with open(filename) as f:
stdout = f.read()
resultDict["StdOut"] = stdout
except FileNotFoundError as err:
sLog.error(f"Error opening a log file:{filename}", err)
return S_ERROR(repr(err))

return S_OK(resultDict)

def _verifyUUIDPattern(self, logfile):
"""
Verify if the name of the log file matches the required pattern.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from DIRAC import S_OK, S_ERROR
import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities

from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Core.DISET.RequestHandler import getServiceOption
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import (
getPilotCE,
Expand All @@ -35,6 +35,10 @@ def initializeHandler(cls, serviceInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

# prepare remote pilot plugin initialization
defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin"
cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass)
cls.loggingPlugin = None
return S_OK()

##############################################################################
Expand Down Expand Up @@ -92,9 +96,16 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC
types_getPilotOutput = [str]

def export_getPilotOutput(self, pilotReference):
"""Get the pilot job standard output and standard error files for the Grid
job reference
"""
Get the pilot job standard output and standard error files for a pilot reference.
Handles both classic, CE-based logs and remote logs. The type of logs returned is determined
by the server.

:param str pilotReference:
:return: S_OK or S_ERROR Dirac object
:rtype: dict
"""

result = self.pilotAgentsDB.getPilotInfo(pilotReference)
if not result["OK"]:
self.log.error("Failed to get info for pilot", result["Message"])
Expand All @@ -104,6 +115,26 @@ def export_getPilotOutput(self, pilotReference):
return S_ERROR("Pilot info is empty")

pilotDict = result["Value"][pilotReference]
vo = getVOForGroup(pilotDict["OwnerGroup"])
opsHelper = Operations(vo=vo)
remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False)
# classic logs first, by default
funcs = [self._getPilotOutput, self._getRemotePilotOutput]
if remote:
funcs.reverse()

result = funcs[0](pilotReference, pilotDict)
if not result["OK"]:
self.log.warn("Pilot log retrieval failed (first attempt), remote ?", remote)
result = funcs[1](pilotReference, pilotDict)
return result
else:
return result

def _getPilotOutput(self, pilotReference, pilotDict):
"""Get the pilot job standard output and standard error files for the Grid
job reference
"""

group = pilotDict["OwnerGroup"]

Expand Down Expand Up @@ -158,6 +189,39 @@ def export_getPilotOutput(self, pilotReference):
shutil.rmtree(ce.ceParameters["WorkingDirectory"])
return S_OK(resultDict)

def _getRemotePilotOutput(self, pilotReference, pilotDict):
"""
Get remote pilot log files.

:param str pilotReference:
:return: S_OK Dirac object
:rtype: dict
"""

pilotStamp = pilotDict["PilotStamp"]
group = pilotDict["OwnerGroup"]
vo = getVOForGroup(group)

if self.loggingPlugin is None:
result = ObjectLoader().loadObject(
f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue
)
if not result["OK"]:
self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}")
return result

componentClass = result["Value"]
self.loggingPlugin = componentClass()
self.log.info("Loaded: PilotLoggingPlugin class", self.configValue)

res = self.loggingPlugin.getRemotePilotLogs(pilotStamp, vo)

if res["OK"]:
res["Value"]["OwnerGroup"] = group
res["Value"]["FileList"] = []
# return res, correct or not
return res

##############################################################################
types_getPilotInfo = [[list, str]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ def export_getMetadata(self):
"""
return self.loggingPlugin.getMeta()

def export_getLogs(self, logfile, vo):
"""
Get (not yet finalised) logs from the server.

:return: S_OK containing a metadata dictionary
:rtype: dict
"""

return self.loggingPlugin.getLogs(logfile, vo)

def export_finaliseLogs(self, payload, pilotUUID):
"""
Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def export_getJobPilotOutput(self, jobID):
job reference

:param str jobID: job ID

:return: S_OK(dict)/S_ERROR()
"""
pilotReference = ""
Expand Down
Loading