Skip to content

Commit

Permalink
fix: removed deprecated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Jun 28, 2024
1 parent de39667 commit 95bbc20
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 81 deletions.
19 changes: 13 additions & 6 deletions src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
""" This object is a wrapper for setting and getting jobs states
"""
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, singleValueDefFields, multiValueDefFields
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, RIGHT_RESCHEDULE
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_RESET, RIGHT_CHANGE_STATUS
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_CHANGE_STATUS,
RIGHT_GET_INFO,
RIGHT_RESCHEDULE,
RIGHT_RESET,
)
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility


Expand All @@ -21,6 +26,7 @@ def reset(self):
self.jobDB = None
self.logDB = None
self.tqDB = None
self.jobParametersDB = None

__db = DBHold()

Expand All @@ -32,6 +38,7 @@ def checkDBAccess(cls):
JobState.__db.jobDB = JobDB()
JobState.__db.logDB = JobLoggingDB()
JobState.__db.tqDB = TaskQueueDB()
JobState.__db.jpDB = JobParametersDB()

def __init__(self, jid):
self.__jid = jid
Expand Down Expand Up @@ -112,7 +119,7 @@ def commitCache(self, initialState, cache, jobLog):
return result

if data["jobp"]:
result = self.__retryFunction(5, JobState.__db.jobDB.setJobParameters, (self.__jid, data["jobp"]))
result = self.__retryFunction(5, JobState.__db.jpDB.setJobParameters, (self.__jid, data["jobp"]))
if not result["OK"]:
return result

Expand Down
65 changes: 2 additions & 63 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
Expand Down Expand Up @@ -264,6 +265,7 @@ def getJobAttribute(self, jobID, attribute):
return S_OK(result["Value"].get(attribute))

#############################################################################
@deprecated("Use JobParametersDB instead")
def getJobParameter(self, jobID, parameter):
"""Get the given parameter of a job specified by its jobID"""

Expand Down Expand Up @@ -668,50 +670,6 @@ def setStartExecTime(self, jobID, startDate=None):
req = f"UPDATE Jobs SET StartExecTime={startDate} WHERE JobID={jobID} AND StartExecTime IS NULL"
return self._update(req)

#############################################################################
def setJobParameter(self, jobID, key, value):
"""Set a parameter specified by name,value pair for the job JobID"""

ret = self._escapeString(key)
if not ret["OK"]:
return ret
e_key = ret["Value"]
ret = self._escapeString(value)
if not ret["OK"]:
return ret
e_value = ret["Value"]

cmd = "REPLACE JobParameters (JobID,Name,Value) VALUES (%d,%s,%s)" % (int(jobID), e_key, e_value)
return self._update(cmd)

#############################################################################
def setJobParameters(self, jobID, parameters):
"""Set parameters specified by a list of name/value pairs for the job JobID
:param int jobID: Job ID
:param list parameters: list of tuples (name, value) pairs
:return: S_OK/S_ERROR
"""

if not parameters:
return S_OK()

insertValueList = []
for name, value in parameters:
ret = self._escapeString(name)
if not ret["OK"]:
return ret
e_name = ret["Value"]
ret = self._escapeString(value)
if not ret["OK"]:
return ret
e_value = ret["Value"]
insertValueList.append(f"({jobID},{e_name},{e_value})")

cmd = f"REPLACE JobParameters (JobID,Name,Value) VALUES {', '.join(insertValueList)}"
return self._update(cmd)

#############################################################################
def setJobOptParameter(self, jobID, name, value):
"""Set an optimzer parameter specified by name,value pair for the job JobID"""
Expand Down Expand Up @@ -780,16 +738,6 @@ def setAtticJobParameter(self, jobID, key, value, rescheduleCounter):
)
return self._update(cmd)

#############################################################################
def __setInitialJobParameters(self, classadJob, jobID):
"""Set initial job parameters as was defined in the Classad"""

# Extract initital job parameters
parameters = {}
if classadJob.lookupAttribute("Parameters"):
parameters = classadJob.getDictionaryFromSubJDL("Parameters")
return self.setJobParameters(jobID, list(parameters.items()))

#############################################################################
def setJobJDL(self, jobID, jdl=None, originalJDL=None):
"""Insert JDL's for job specified by jobID"""
Expand Down Expand Up @@ -969,11 +917,6 @@ def insertNewJobIntoDB(
if not result["OK"]:
return result

# Setting the Job parameters
result = self.__setInitialJobParameters(classAdJob, jobID)
if not result["OK"]:
return result

# Looking for the Input Data
inputData = []
if classAdJob.lookupAttribute("InputData"):
Expand Down Expand Up @@ -1205,10 +1148,6 @@ def rescheduleJob(self, jobID):
if not result["OK"]:
return result

result = self.__setInitialJobParameters(classAdJob, jobID)
if not result["OK"]:
return result

result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True)
if not result["OK"]:
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def initializeHandler(cls, svcInfoDict):
def initializeRequest(self):
credDict = self.getRemoteCredentials()
self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"]))

@classmethod
def parseSelectors(cls, selectDict=None):
"""Parse selectors before DB query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ def initializeHandler(cls, svcInfoDict):
return result
cls.elasticJobParametersDB = result["Value"]()

cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB)
cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB)

return S_OK()


def initializeRequest(self):
credDict = self.getRemoteCredentials()
self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"]))
Expand Down Expand Up @@ -162,7 +161,7 @@ def export_setJobParameter(self, jobID, name, value):
"""Set arbitrary parameter specified by name/value pair
for job specified by its JobId
"""
return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo) # pylint: disable=no-member
return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo)

###########################################################################
types_setJobsParameter = [dict]
Expand Down Expand Up @@ -227,7 +226,9 @@ def export_sendHeartBeat(self, jobID, dynamicData, staticData):

status = result["Value"]["Status"]
if status in (JobStatus.STALLED, JobStatus.MATCHED):
result = self.jobDB.setJobAttribute(jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True)
result = self.jobDB.setJobAttribute(
jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True
)
if not result["OK"]:
self.log.warn("Failed to restore the job status to Running")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(
self.log.error("Can't connect to the JobLoggingDB")
raise


def setJobStatus(
self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,7 @@ def test__setJobStatusBulk(
jobLoggingDB_mock = MagicMock()
jobLoggingDB_mock.getWMSTimeStamps.return_value = jobLoggingDB_getWMSTimeStamps_rv

esJobParameters_mock = MagicMock()
esJobParameters_mock.getJobAttributes.return_value = jobDB_getJobAttributes_rv
esJobParameters_mock.setJobAttributes.return_value = jobDB_setJobAttributes_rv

jsu = JobStatusUtility(jobDB_mock, jobLoggingDB_mock, esJobParameters_mock)
jsu = JobStatusUtility(jobDB_mock, jobLoggingDB_mock)

# Act
res = jsu.setJobStatusBulk(1, statusDict_in, force)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ def test_WMSClient_rescheduleJob() -> None:
jobID = result["Value"]

try:

res = jobMonitoringClient.getJobJDL(jobID, False)
assert res["OK"], res["Message"]
print(f"Job description: {res['Value']}")
Expand Down Expand Up @@ -320,6 +319,8 @@ def test_JobStateUpdateAndJobMonitoring() -> None:
assert res["OK"], res["Message"]
assert res["Value"][jobID]["Status"] == JobStatus.RUNNING

time.sleep(3)

res = jobMonitoringClient.getJobParameters(jobID, ["par1", "par2"])
assert res["OK"], res["Message"]
assert res["Value"] == {jobID: {"par1": "par1Value", "par2": "par2Value"}}
Expand Down

0 comments on commit 95bbc20

Please sign in to comment.