Skip to content

Commit

Permalink
Merge pull request #7101 from fstagni/81_fixes11
Browse files Browse the repository at this point in the history
[8.1] JobManagerHandler: get the OwnerDN only when needed.
  • Loading branch information
fstagni authored Aug 2, 2023
2 parents dfd248e + 7f9d0f6 commit d0b65b8
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength

from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
Expand All @@ -35,6 +32,7 @@
RIGHT_SUBMIT,
JobPolicy,
)
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength

MAX_PARAMETRIC_JOBS = 20
Expand Down Expand Up @@ -85,7 +83,6 @@ def initializeRequest(self):
self.maxParametricJobs = self.srv_getCSOption("MaxParametricJobs", MAX_PARAMETRIC_JOBS)
self.jobPolicy = JobPolicy(self.owner, self.ownerGroup, self.userProperties)
self.jobPolicy.jobDB = self.jobDB
self.ownerDN = getDNForUsername(self.owner)["Value"][0]
return S_OK()

def __sendJobsToOptimizationMind(self, jids):
Expand Down Expand Up @@ -128,6 +125,8 @@ def export_submitJob(self, jobDesc):
:return: S_OK/S_ERROR, a list of newly created job IDs in case of S_OK.
"""

ownerDN = getDNForUsername(self.owner)["Value"][0]

if self.peerUsesLimitedProxy:
return S_ERROR(EWMSSUBM, "Can't submit using a limited proxy")

Expand Down Expand Up @@ -160,9 +159,9 @@ def export_submitJob(self, jobDesc):
if nJobs > self.maxParametricJobs:
self.log.error(
"Maximum of parametric jobs exceeded:",
"limit %d smaller than number of jobs %d" % (self.maxParametricJobs, nJobs),
f"limit {self.maxParametricJobs} smaller than number of jobs {nJobs}",
)
return S_ERROR(EWMSJDL, "Number of parametric jobs exceeds the limit of %d" % self.maxParametricJobs)
return S_ERROR(EWMSJDL, f"Number of parametric jobs exceeds the limit of {self.maxParametricJobs}")
result = generateParametricJobs(jobClassAd)
if not result["OK"]:
return result
Expand Down Expand Up @@ -190,7 +189,7 @@ def export_submitJob(self, jobDesc):
JobDescriptionModel(
**baseJobDescritionModel.dict(exclude_none=True),
owner=self.owner,
ownerDN=self.ownerDN,
ownerDN=ownerDN,
ownerGroup=self.ownerGroup,
vo=getVOForGroup(self.ownerGroup),
)
Expand Down Expand Up @@ -218,9 +217,9 @@ def export_submitJob(self, jobDesc):
jobIDList.append(jobID)

# Set persistency flag
retVal = gProxyManager.getUserPersistence(self.ownerDN, self.ownerGroup)
retVal = gProxyManager.getUserPersistence(ownerDN, self.ownerGroup)
if "Value" not in retVal or not retVal["Value"]:
gProxyManager.setPersistency(self.ownerDN, self.ownerGroup, True)
gProxyManager.setPersistency(ownerDN, self.ownerGroup, True)

if parametricJob:
result = S_OK(jobIDList)
Expand Down Expand Up @@ -295,7 +294,8 @@ def __checkIfProxyUploadIsRequired(self):
:return: bool
"""
result = gProxyManager.userHasProxy(self.ownerDN, self.ownerGroup, validSeconds=18000)
ownerDN = getDNForUsername(self.owner)["Value"][0]
result = gProxyManager.userHasProxy(ownerDN, self.ownerGroup, validSeconds=18000)
if not result["OK"]:
self.log.error("Can't check if the user has proxy uploaded", result["Message"])
return True
Expand Down Expand Up @@ -407,18 +407,18 @@ def export_removeJob(self, jobIDs):
for jobID in validJobList:
resultTQ = self.taskQueueDB.deleteJob(jobID)
if not resultTQ["OK"]:
self.log.warn("Failed to remove job from TaskQueueDB", "(%d): %s" % (jobID, resultTQ["Message"]))
self.log.warn("Failed to remove job from TaskQueueDB", f"({jobID}): {resultTQ['Message']}")
error_count += 1
else:
count += 1

if not (result := self.jobLoggingDB.deleteJob(validJobList))["OK"]:
if not self.jobLoggingDB.deleteJob(validJobList)["OK"]:
self.log.error("Failed to remove jobs from JobLoggingDB", f"(n={len(validJobList)})")
else:
self.log.info("Removed jobs from JobLoggingDB", f"(n={len(validJobList)})")

if count > 0 or error_count > 0:
self.log.info("Removed jobs from DB", "(%d jobs with %d errors)" % (count, error_count))
self.log.info("Removed jobs from DB", f"({count} jobs with {error_count} errors)")

if invalidJobList or nonauthJobList:
self.log.error(
Expand Down Expand Up @@ -446,12 +446,10 @@ def __deleteJob(self, jobID):
:param int jobID: job ID
:return: S_OK()/S_ERROR()
"""
result = self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting")
if not result["OK"]:
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting"))["OK"]:
return result

result = self.taskQueueDB.deleteJob(jobID)
if not result["OK"]:
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
self.log.warn("Failed to delete job from the TaskQueue")

# if it was the last job for the pilot
Expand All @@ -469,8 +467,7 @@ def __deleteJob(self, jobID):
if not result["OK"]:
self.log.error("Failed to get pilot info", result["Message"])
return result
pilotRef = result[0]["PilotJobReference"]
ret = self.pilotAgentsDB.deletePilot(pilot)
ret = self.pilotAgentsDB.deletePilot(result["Value"]["PilotJobReference"])
if not ret["OK"]:
self.log.error("Failed to delete pilot from PilotAgentsDB", ret["Message"])
return ret
Expand Down Expand Up @@ -522,7 +519,7 @@ def __kill_delete_jobs(self, jobIDList, right):
deleteJobList = []
markKilledJobList = []
stagingJobList = []
for jobID, sDict in result["Value"].items(): # can be an iterator
for jobID, sDict in result["Value"].items():
if sDict["Status"] in (JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED):
killJobList.append(jobID)
elif sDict["Status"] in (
Expand Down

0 comments on commit d0b65b8

Please sign in to comment.