Skip to content

Commit

Permalink
fix: get batch system info from the pilot
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Dec 11, 2023
1 parent d7da5b0 commit bc604ec
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 28 deletions.
18 changes: 14 additions & 4 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, pilotParams):
try:
from Pilot.pilotTools import (
CommandBase,
getFlavour,
getSubmitterInfo,
retrieveUrlTimeout,
safe_listdir,
sendMessage,
Expand All @@ -56,7 +56,7 @@ def __init__(self, pilotParams):
except ImportError:
from pilotTools import (
CommandBase,
getFlavour,
getSubmitterInfo,
retrieveUrlTimeout,
safe_listdir,
sendMessage,
Expand Down Expand Up @@ -550,8 +550,7 @@ def execute(self):
VOs may want to replace/extend the _getBasicsCFG and _getSecurityCFG functions
"""

self.pp.flavour, self.pp.pilotReference = getFlavour(self.pp.ceName)
self.pp.flavour, self.pp.pilotReference, self.pp.batchSystemInfo = getSubmitterInfo(self.pp.ceName)

self._getBasicsCFG()
self._getSecurityCFG()
Expand Down Expand Up @@ -846,6 +845,17 @@ def execute(self):
"""Setup configuration parameters"""
self.cfg.append("-o /LocalSite/GridMiddleware=%s" % self.pp.flavour)

# Add batch system details to the configuration
# Can be used by the pilot/job later on, to interact with the batch system
self.cfg.append("-o /LocalSite/BatchSystem/Type=%s" % self.pp.batchSystemInfo.get("Type", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/JobID=%s" % self.pp.batchSystemInfo.get("JobID", "Unknown"))

batchSystemParams = self.pp.batchSystemInfo.get("Parameters", {})
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Queue=%s" % batchSystemParams.get("Queue", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/BinaryPath=%s" % batchSystemParams.get("BinaryPath", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Host=%s" % batchSystemParams.get("Host", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/InfoPath=%s" % batchSystemParams.get("InfoPath", "Unknown"))

self.cfg.append('-n "%s"' % self.pp.site)
self.cfg.append('-S "%s"' % self.pp.setup)

Expand Down
111 changes: 87 additions & 24 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datetime import datetime
from functools import partial, wraps
from threading import RLock
import warnings

############################
# python 2 -> 3 "hacks"
Expand Down Expand Up @@ -214,53 +215,103 @@ def listdir(directory):
return contents


def getFlavour(ceName):
def getSubmitterInfo(ceName):
"""Get information about the submitter of the pilot.
Check the environment variables to determine the type of batch system and CE used
to submit the pilot being used and return this information in a tuple.
"""

pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "")
# Batch system taking care of the pilot
# Might be useful to extract the info to interact with it later on
batchSystemType = "Unknown"
batchSystemJobID = "Unknown"
batchSystemParameters = {
"BinaryPath": "Unknown",
"Host": "Unknown",
"InfoPath": "Unknown",
"Queue": "Unknown",
}
# Flavour of the pilot
# Inform whether the pilot was sent through SSH+batch system or a CE
flavour = "DIRAC"

# # Batch systems

# Take the reference from the Torque batch system
# Torque
if "PBS_JOBID" in os.environ:
flavour = "SSHTorque"
pilotReference = "sshtorque://" + ceName + "/" + os.environ["PBS_JOBID"].split(".")[0]
batchSystemType = "PBS"
batchSystemJobID = os.environ["PBS_JOBID"]
batchSystemParameters["BinaryPath"] = os.environ.get("PBS_O_PATH", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("PBS_O_QUEUE", "Unknown")

# Take the reference from the OAR batch system
flavour = "SSH%s" % batchSystemType
pilotReference = "sshpbs://" + ceName + "/" + batchSystemJobID.split(".")[0]

# OAR
if "OAR_JOBID" in os.environ:
flavour = "SSHOAR"
pilotReference = "sshoar://" + ceName + "/" + os.environ["OAR_JOBID"]
batchSystemType = "OAR"
batchSystemJobID = os.environ["OAR_JOBID"]

flavour = "SSH%s" % batchSystemType
pilotReference = "sshoar://" + ceName + "/" + batchSystemJobID

# Grid Engine
if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ:
flavour = "SSHGE"
pilotReference = "sshge://" + ceName + "/" + os.environ["JOB_ID"]
# Generic JOB_ID
elif "JOB_ID" in os.environ:
flavour = "Generic"
pilotReference = "generic://" + ceName + "/" + os.environ["JOB_ID"]
if "SGE_TASK_ID" in os.environ:
batchSystemType = "SGE"
batchSystemJobID = os.environ["JOB_ID"]
batchSystemParameters["BinaryPath"] = os.environ.get("SGE_BINARY_PATH", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("QUEUE", "Unknown")

flavour = "SSH%s" % batchSystemType
pilotReference = "sshge://" + ceName + "/" + batchSystemJobID

# LSF
if "LSB_BATCH_JID" in os.environ:
flavour = "SSHLSF"
pilotReference = "sshlsf://" + ceName + "/" + os.environ["LSB_BATCH_JID"]
batchSystemType = "LSF"
batchSystemJobID = os.environ["LSB_BATCH_JID"]
batchSystemParameters["BinaryPath"] = os.environ.get("LSF_BINDIR", "Unknown")
batchSystemParameters["Host"] = os.environ.get("LSB_HOSTS", "Unknown")
batchSystemParameters["InfoPath"] = os.environ.get("LSF_ENVDIR", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("LSB_QUEUE", "Unknown")

# SLURM batch system
flavour = "SSH%s" % batchSystemType
pilotReference = "sshlsf://" + ceName + "/" + batchSystemJobID

# SLURM
if "SLURM_JOBID" in os.environ:
flavour = "SSHSLURM"
pilotReference = "sshslurm://" + ceName + "/" + os.environ["SLURM_JOBID"]
batchSystemType = "SLURM"
batchSystemJobID = os.environ["SLURM_JOBID"]

flavour = "SSH%s" % batchSystemType
pilotReference = "sshslurm://" + ceName + "/" + batchSystemJobID

# Condor
if "CONDOR_JOBID" in os.environ:
flavour = "SSHCondor"
pilotReference = "sshcondor://" + ceName + "/" + os.environ["CONDOR_JOBID"]
batchSystemType = "HTCondor"
batchSystemJobID = os.environ["CONDOR_JOBID"]
batchSystemParameters["InfoPath"] = os.environ.get("_CONDOR_JOB_AD", "Unknown")

# # CEs
flavour = "SSH%s" % batchSystemType
pilotReference = "sshcondor://" + ceName + "/" + batchSystemJobID

# # CEs/Batch Systems

# HTCondor
if "HTCONDOR_JOBID" in os.environ:
batchSystemType = "HTCondor"
batchSystemJobID = os.environ["HTCONDOR_JOBID"]

flavour = "HTCondorCE"
pilotReference = "htcondorce://" + ceName + "/" + os.environ["HTCONDOR_JOBID"]
pilotReference = "htcondorce://" + ceName + "/" + batchSystemJobID

# # Local/SSH

# Local submission to the host
if "LOCAL_JOBID" in os.environ:
flavour = "Local"
pilotReference = "local://" + ceName + "/" + os.environ["LOCAL_JOBID"]

# Direct SSH tunnel submission
if "SSHCE_JOBID" in os.environ:
Expand All @@ -274,6 +325,8 @@ def getFlavour(ceName):
"sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"]
)

# # CEs

# ARC
if "GRID_GLOBAL_JOBURL" in os.environ:
flavour = "ARC"
Expand All @@ -284,9 +337,18 @@ def getFlavour(ceName):
flavour = "VMDIRAC"
pilotReference = "vm://" + ceName + "/" + os.environ["JOB_ID"]

return flavour, pilotReference
return flavour, pilotReference, {"Type": batchSystemType, "JobID": batchSystemJobID, "Parameters": batchSystemParameters}


def getFlavour(ceName):
"""Old method to get the flavour of the pilot. Deprecated.
Please use getSubmitterInfo instead.
"""
warnings.warn("getFlavour() is deprecated. Please use getSubmitterInfo() instead.", category=DeprecationWarning, stacklevel=2)
flavour, pilotReference, _ = getSubmitterInfo(ceName)
return flavour, pilotReference

class ObjectLoader(object):
"""Simplified class for loading objects from a DIRAC installation.
Expand Down Expand Up @@ -834,6 +896,7 @@ def __init__(self):
self.stopOnApplicationFailure = True
self.stopAfterFailedMatches = 10
self.flavour = "DIRAC"
self.batchSystemInfo = {}
self.pilotReference = ""
self.releaseVersion = ""
self.releaseProject = ""
Expand Down

0 comments on commit bc604ec

Please sign in to comment.