Skip to content

Commit

Permalink
Merge pull request #222 from aldbr/devel_FIX_collect-batch-system-info
Browse files Browse the repository at this point in the history
[devel] Collect batch system info
  • Loading branch information
fstagni authored Jan 17, 2024
2 parents d7a8ce3 + bc604ec commit 8f2617f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 28 deletions.
18 changes: 14 additions & 4 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, pilotParams):
try:
from Pilot.pilotTools import (
CommandBase,
getFlavour,
getSubmitterInfo,
retrieveUrlTimeout,
safe_listdir,
sendMessage,
Expand All @@ -56,7 +56,7 @@ def __init__(self, pilotParams):
except ImportError:
from pilotTools import (
CommandBase,
getFlavour,
getSubmitterInfo,
retrieveUrlTimeout,
safe_listdir,
sendMessage,
Expand Down Expand Up @@ -559,8 +559,7 @@ def execute(self):
VOs may want to replace/extend the _getBasicsCFG and _getSecurityCFG functions
"""

self.pp.flavour, self.pp.pilotReference = getFlavour(self.pp.ceName)
self.pp.flavour, self.pp.pilotReference, self.pp.batchSystemInfo = getSubmitterInfo(self.pp.ceName)

self._getBasicsCFG()
self._getSecurityCFG()
Expand Down Expand Up @@ -855,6 +854,17 @@ def execute(self):
"""Setup configuration parameters"""
self.cfg.append("-o /LocalSite/GridMiddleware=%s" % self.pp.flavour)

# Add batch system details to the configuration
# Can be used by the pilot/job later on, to interact with the batch system
self.cfg.append("-o /LocalSite/BatchSystem/Type=%s" % self.pp.batchSystemInfo.get("Type", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/JobID=%s" % self.pp.batchSystemInfo.get("JobID", "Unknown"))

batchSystemParams = self.pp.batchSystemInfo.get("Parameters", {})
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Queue=%s" % batchSystemParams.get("Queue", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/BinaryPath=%s" % batchSystemParams.get("BinaryPath", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Host=%s" % batchSystemParams.get("Host", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/InfoPath=%s" % batchSystemParams.get("InfoPath", "Unknown"))

self.cfg.append('-n "%s"' % self.pp.site)
self.cfg.append('-S "%s"' % self.pp.setup)

Expand Down
111 changes: 87 additions & 24 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datetime import datetime
from functools import partial, wraps
from threading import RLock
import warnings

############################
# python 2 -> 3 "hacks"
Expand Down Expand Up @@ -214,53 +215,103 @@ def listdir(directory):
return contents


def getFlavour(ceName):
def getSubmitterInfo(ceName):
"""Get information about the submitter of the pilot.
Check the environment variables to determine the type of batch system and CE used
to submit the pilot being used and return this information in a tuple.
"""

pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "")
# Batch system taking care of the pilot
# Might be useful to extract the info to interact with it later on
batchSystemType = "Unknown"
batchSystemJobID = "Unknown"
batchSystemParameters = {
"BinaryPath": "Unknown",
"Host": "Unknown",
"InfoPath": "Unknown",
"Queue": "Unknown",
}
# Flavour of the pilot
# Inform whether the pilot was sent through SSH+batch system or a CE
flavour = "DIRAC"

# # Batch systems

# Take the reference from the Torque batch system
# Torque
if "PBS_JOBID" in os.environ:
flavour = "SSHTorque"
pilotReference = "sshtorque://" + ceName + "/" + os.environ["PBS_JOBID"].split(".")[0]
batchSystemType = "PBS"
batchSystemJobID = os.environ["PBS_JOBID"]
batchSystemParameters["BinaryPath"] = os.environ.get("PBS_O_PATH", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("PBS_O_QUEUE", "Unknown")

# Take the reference from the OAR batch system
flavour = "SSH%s" % batchSystemType
pilotReference = "sshpbs://" + ceName + "/" + batchSystemJobID.split(".")[0]

# OAR
if "OAR_JOBID" in os.environ:
flavour = "SSHOAR"
pilotReference = "sshoar://" + ceName + "/" + os.environ["OAR_JOBID"]
batchSystemType = "OAR"
batchSystemJobID = os.environ["OAR_JOBID"]

flavour = "SSH%s" % batchSystemType
pilotReference = "sshoar://" + ceName + "/" + batchSystemJobID

# Grid Engine
if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ:
flavour = "SSHGE"
pilotReference = "sshge://" + ceName + "/" + os.environ["JOB_ID"]
# Generic JOB_ID
elif "JOB_ID" in os.environ:
flavour = "Generic"
pilotReference = "generic://" + ceName + "/" + os.environ["JOB_ID"]
if "SGE_TASK_ID" in os.environ:
batchSystemType = "SGE"
batchSystemJobID = os.environ["JOB_ID"]
batchSystemParameters["BinaryPath"] = os.environ.get("SGE_BINARY_PATH", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("QUEUE", "Unknown")

flavour = "SSH%s" % batchSystemType
pilotReference = "sshge://" + ceName + "/" + batchSystemJobID

# LSF
if "LSB_BATCH_JID" in os.environ:
flavour = "SSHLSF"
pilotReference = "sshlsf://" + ceName + "/" + os.environ["LSB_BATCH_JID"]
batchSystemType = "LSF"
batchSystemJobID = os.environ["LSB_BATCH_JID"]
batchSystemParameters["BinaryPath"] = os.environ.get("LSF_BINDIR", "Unknown")
batchSystemParameters["Host"] = os.environ.get("LSB_HOSTS", "Unknown")
batchSystemParameters["InfoPath"] = os.environ.get("LSF_ENVDIR", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("LSB_QUEUE", "Unknown")

# SLURM batch system
flavour = "SSH%s" % batchSystemType
pilotReference = "sshlsf://" + ceName + "/" + batchSystemJobID

# SLURM
if "SLURM_JOBID" in os.environ:
flavour = "SSHSLURM"
pilotReference = "sshslurm://" + ceName + "/" + os.environ["SLURM_JOBID"]
batchSystemType = "SLURM"
batchSystemJobID = os.environ["SLURM_JOBID"]

flavour = "SSH%s" % batchSystemType
pilotReference = "sshslurm://" + ceName + "/" + batchSystemJobID

# Condor
if "CONDOR_JOBID" in os.environ:
flavour = "SSHCondor"
pilotReference = "sshcondor://" + ceName + "/" + os.environ["CONDOR_JOBID"]
batchSystemType = "HTCondor"
batchSystemJobID = os.environ["CONDOR_JOBID"]
batchSystemParameters["InfoPath"] = os.environ.get("_CONDOR_JOB_AD", "Unknown")

# # CEs
flavour = "SSH%s" % batchSystemType
pilotReference = "sshcondor://" + ceName + "/" + batchSystemJobID

# # CEs/Batch Systems

# HTCondor
if "HTCONDOR_JOBID" in os.environ:
batchSystemType = "HTCondor"
batchSystemJobID = os.environ["HTCONDOR_JOBID"]

flavour = "HTCondorCE"
pilotReference = "htcondorce://" + ceName + "/" + os.environ["HTCONDOR_JOBID"]
pilotReference = "htcondorce://" + ceName + "/" + batchSystemJobID

# # Local/SSH

# Local submission to the host
if "LOCAL_JOBID" in os.environ:
flavour = "Local"
pilotReference = "local://" + ceName + "/" + os.environ["LOCAL_JOBID"]

# Direct SSH tunnel submission
if "SSHCE_JOBID" in os.environ:
Expand All @@ -274,6 +325,8 @@ def getFlavour(ceName):
"sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"]
)

# # CEs

# ARC
if "GRID_GLOBAL_JOBURL" in os.environ:
flavour = "ARC"
Expand All @@ -284,9 +337,18 @@ def getFlavour(ceName):
flavour = "VMDIRAC"
pilotReference = "vm://" + ceName + "/" + os.environ["JOB_ID"]

return flavour, pilotReference
return flavour, pilotReference, {"Type": batchSystemType, "JobID": batchSystemJobID, "Parameters": batchSystemParameters}


def getFlavour(ceName):
"""Old method to get the flavour of the pilot. Deprecated.
Please use getSubmitterInfo instead.
"""
warnings.warn("getFlavour() is deprecated. Please use getSubmitterInfo() instead.", category=DeprecationWarning, stacklevel=2)
flavour, pilotReference, _ = getSubmitterInfo(ceName)
return flavour, pilotReference

class ObjectLoader(object):
"""Simplified class for loading objects from a DIRAC installation.
Expand Down Expand Up @@ -834,6 +896,7 @@ def __init__(self):
self.stopOnApplicationFailure = True
self.stopAfterFailedMatches = 10
self.flavour = "DIRAC"
self.batchSystemInfo = {}
self.pilotReference = ""
self.releaseVersion = ""
self.releaseProject = ""
Expand Down

0 comments on commit 8f2617f

Please sign in to comment.