From 00c2af7cce3cf33c38bacb17801eb5e5f3c34464 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 27 Jun 2024 15:01:02 +0200 Subject: [PATCH] fix: removed deprecated methods --- .../Client/JobState/JobState.py | 19 ++++-- .../WorkloadManagementSystem/DB/JobDB.py | 65 +------------------ .../Service/JobMonitoringHandler.py | 2 +- .../Service/JobStateUpdateHandler.py | 9 +-- .../Utilities/JobStatusUtility.py | 1 - .../Utilities/test/Test_JobStatusUtility.py | 6 +- .../Test_Client_WMS.py | 3 +- 7 files changed, 24 insertions(+), 81 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py index e26f646ceb6..7a80d9f17bb 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py @@ -1,13 +1,18 @@ """ This object is a wrapper for setting and getting jobs states """ -from DIRAC import gLogger, S_OK, S_ERROR -from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest +from DIRAC import S_ERROR, S_OK, gLogger from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, singleValueDefFields, multiValueDefFields -from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, RIGHT_RESCHEDULE -from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_RESET, RIGHT_CHANGE_STATUS +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields +from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( + RIGHT_CHANGE_STATUS, + RIGHT_GET_INFO, + RIGHT_RESCHEDULE, + RIGHT_RESET, +) from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility @@ -21,6 +26,7 @@ def reset(self): self.jobDB = None self.logDB = None self.tqDB = None + self.jobParametersDB = None __db = DBHold() @@ -32,6 +38,7 @@ def checkDBAccess(cls): JobState.__db.jobDB = JobDB() JobState.__db.logDB = JobLoggingDB() JobState.__db.tqDB = TaskQueueDB() + JobState.__db.jpDB = JobParametersDB() def __init__(self, jid): self.__jid = jid @@ -112,7 +119,7 @@ def commitCache(self, initialState, cache, jobLog): return result if data["jobp"]: - result = self.__retryFunction(5, JobState.__db.jobDB.setJobParameters, (self.__jid, data["jobp"])) + result = self.__retryFunction(5, JobState.__db.jpDB.setJobParameters, (self.__jid, data["jobp"])) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 191a776c5a6..d0de65e6652 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -19,6 +19,7 @@ from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd +from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus @@ -264,6 +265,7 @@ def getJobAttribute(self, jobID, attribute): return S_OK(result["Value"].get(attribute)) ############################################################################# + @deprecated("Use JobParametersDB instead") def getJobParameter(self, jobID, parameter): """Get the given parameter of a job specified by its jobID""" @@ -668,50 +670,6 @@ def setStartExecTime(self, jobID, startDate=None): req = f"UPDATE Jobs SET StartExecTime={startDate} WHERE JobID={jobID} AND StartExecTime IS NULL" return self._update(req) - ############################################################################# - def setJobParameter(self, jobID, key, value): - """Set a parameter specified by name,value pair for the job JobID""" - - ret = self._escapeString(key) - if not ret["OK"]: - return ret - e_key = ret["Value"] - ret = self._escapeString(value) - if not ret["OK"]: - return ret - e_value = ret["Value"] - - cmd = "REPLACE JobParameters (JobID,Name,Value) VALUES (%d,%s,%s)" % (int(jobID), e_key, e_value) - return self._update(cmd) - - ############################################################################# - def setJobParameters(self, jobID, parameters): - """Set parameters specified by a list of name/value pairs for the job JobID - - :param int jobID: Job ID - :param list parameters: list of tuples (name, value) pairs - - :return: S_OK/S_ERROR - """ - - if not parameters: - return S_OK() - - insertValueList = [] - for name, value in parameters: - ret = self._escapeString(name) - if not ret["OK"]: - return ret - e_name = ret["Value"] - ret = self._escapeString(value) - if not ret["OK"]: - return ret - e_value = ret["Value"] - insertValueList.append(f"({jobID},{e_name},{e_value})") - - cmd = f"REPLACE JobParameters (JobID,Name,Value) VALUES {', '.join(insertValueList)}" - return self._update(cmd) - ############################################################################# def setJobOptParameter(self, jobID, name, value): """Set an optimzer parameter specified by name,value pair for the job JobID""" @@ -780,16 +738,6 @@ def setAtticJobParameter(self, jobID, key, value, rescheduleCounter): ) return self._update(cmd) - ############################################################################# - def __setInitialJobParameters(self, classadJob, jobID): - """Set initial job parameters as was defined in the Classad""" - - # Extract initital job parameters - parameters = {} - if classadJob.lookupAttribute("Parameters"): - parameters = classadJob.getDictionaryFromSubJDL("Parameters") - return self.setJobParameters(jobID, list(parameters.items())) - ############################################################################# def setJobJDL(self, jobID, jdl=None, originalJDL=None): """Insert JDL's for job specified by jobID""" @@ -969,11 +917,6 @@ def insertNewJobIntoDB( if not result["OK"]: return result - # Setting the Job parameters - result = self.__setInitialJobParameters(classAdJob, jobID) - if not result["OK"]: - return result - # Looking for the Input Data inputData = [] if classAdJob.lookupAttribute("InputData"): @@ -1205,10 +1148,6 @@ def rescheduleJob(self, jobID): if not result["OK"]: return result - result = self.__setInitialJobParameters(classAdJob, jobID) - if not result["OK"]: - return result - result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 8c26f7b6ea7..f7ddf31052d 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -46,7 +46,7 @@ def initializeHandler(cls, svcInfoDict): def initializeRequest(self): credDict = self.getRemoteCredentials() self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) - + @classmethod def parseSelectors(cls, selectDict=None): """Parse selectors before DB query diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index 3e19bd46cb9..f78569e3759 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -43,11 +43,10 @@ def initializeHandler(cls, svcInfoDict): return result cls.elasticJobParametersDB = result["Value"]() - cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB) + cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB) return S_OK() - def initializeRequest(self): credDict = self.getRemoteCredentials() self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) @@ -162,7 +161,7 @@ def export_setJobParameter(self, jobID, name, value): """Set arbitrary parameter specified by name/value pair for job specified by its JobId """ - return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo) # pylint: disable=no-member + return self.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=self.vo) ########################################################################### types_setJobsParameter = [dict] @@ -227,7 +226,9 @@ def export_sendHeartBeat(self, jobID, dynamicData, staticData): status = result["Value"]["Status"] if status in (JobStatus.STALLED, JobStatus.MATCHED): - result = self.jobDB.setJobAttribute(jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True) + result = self.jobDB.setJobAttribute( + jobID=jobID, attrName="Status", attrValue=JobStatus.RUNNING, update=True + ) if not result["OK"]: self.log.warn("Failed to restore the job status to Running") diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index ebdd0e76546..7ff43246afe 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -50,7 +50,6 @@ def __init__( self.log.error("Can't connect to the JobLoggingDB") raise - def setJobStatus( self, jobID: int, status=None, minorStatus=None, appStatus=None, source=None, dateTime=None, force=False ): diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py index 1205beed32c..913ab1c038a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobStatusUtility.py @@ -216,11 +216,7 @@ def test__setJobStatusBulk( jobLoggingDB_mock = MagicMock() jobLoggingDB_mock.getWMSTimeStamps.return_value = jobLoggingDB_getWMSTimeStamps_rv - esJobParameters_mock = MagicMock() - esJobParameters_mock.getJobAttributes.return_value = jobDB_getJobAttributes_rv - esJobParameters_mock.setJobAttributes.return_value = jobDB_setJobAttributes_rv - - jsu = JobStatusUtility(jobDB_mock, jobLoggingDB_mock, esJobParameters_mock) + jsu = JobStatusUtility(jobDB_mock, jobLoggingDB_mock) # Act res = jsu.setJobStatusBulk(1, statusDict_in, force) diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index c3939f4c7f2..a392bd54756 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -202,7 +202,6 @@ def test_WMSClient_rescheduleJob() -> None: jobID = result["Value"] try: - res = jobMonitoringClient.getJobJDL(jobID, False) assert res["OK"], res["Message"] print(f"Job description: {res['Value']}") @@ -320,6 +319,8 @@ def test_JobStateUpdateAndJobMonitoring() -> None: assert res["OK"], res["Message"] assert res["Value"][jobID]["Status"] == JobStatus.RUNNING + time.sleep(3) + res = jobMonitoringClient.getJobParameters(jobID, ["par1", "par2"]) assert res["OK"], res["Message"] assert res["Value"] == {jobID: {"par1": "par1Value", "par2": "par2Value"}}