From 51b242f4d5640f4e8d68d8c52b6038014b9e541f Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 24 Sep 2024 11:16:50 +0200 Subject: [PATCH] feat: add RunNumber as a possible parametric sequence --- release.notes | 2 +- src/DIRAC/Interfaces/API/Job.py | 5 +++++ src/DIRAC/Interfaces/API/test/Test_JobAPI.py | 7 +++++++ src/DIRAC/Interfaces/API/test/testWF.jdl | 9 ++++++++- src/DIRAC/Interfaces/API/test/testWF.xml | 2 ++ .../TransformationSystem/Client/WorkflowTasks.py | 15 +++++++++++++-- 6 files changed, 36 insertions(+), 4 deletions(-) diff --git a/release.notes b/release.notes index 99db46f9323..84885982619 100644 --- a/release.notes +++ b/release.notes @@ -8,7 +8,7 @@ FIX: (#7787) added a 30s gfal2 timeout for downloading the SRR FIX: (#7741) RequestTaskAgent only considers requests in final states, and consider files in intermediate state as problematic (https://github.com/DIRACGrid/DIRAC/issues/7116) NEW: (#7741) RequestTaskAgent uses getBulkRequestStatus instead of getRequestStatus -RMS: (#7741) +RMS: (#7741) NEW: (#7741) implement getRequestStatus [v8.0.52] diff --git a/src/DIRAC/Interfaces/API/Job.py b/src/DIRAC/Interfaces/API/Job.py index 870c49b62e6..afc204724c5 100755 --- a/src/DIRAC/Interfaces/API/Job.py +++ b/src/DIRAC/Interfaces/API/Job.py @@ -904,6 +904,7 @@ def __setJobDefaults(self): self._addParameter(self.workflow, "StdOutput", "JDL", self.stdout, "Standard output file") self._addParameter(self.workflow, "StdError", "JDL", self.stderr, "Standard error file") self._addParameter(self.workflow, "InputData", "JDL", "", "Default null input data value") + self._addParameter(self.workflow, "RunNumber", "JDL", "", "Default null input run value") self._addParameter(self.workflow, "LogLevel", "JDL", self.logLevel, "Job Logging Level") self._addParameter(self.workflow, "arguments", "string", "", "Arguments to executable Step") # Those 2 below are need for on-site resolution @@ -913,6 +914,9 @@ def __setJobDefaults(self): self._addParameter( self.workflow, "ParametricInputSandbox", "string", "", "Default null parametric input sandbox value" ) + self._addParameter( + self.workflow, "ParametricRunNumber", "string", "", "Default null parametric run number value" + ) ############################################################################# @@ -1021,6 +1025,7 @@ def _handleParameterSequences(self, paramsDict, arguments): # If a parameter with the same name as the sequence name already exists # and is a list, then extend it by the sequence value. If it is not a # list, then replace it by the sequence value + if isinstance(paramsDict[pName]["value"], list): currentParams = paramsDict[pName]["value"] tmpList = [] diff --git a/src/DIRAC/Interfaces/API/test/Test_JobAPI.py b/src/DIRAC/Interfaces/API/test/Test_JobAPI.py index ff6c3e612cb..47bf40b0cd8 100644 --- a/src/DIRAC/Interfaces/API/test/Test_JobAPI.py +++ b/src/DIRAC/Interfaces/API/test/Test_JobAPI.py @@ -47,6 +47,10 @@ def test_SimpleParametricJob(): ] job.setParameterSequence("InputData", inputDataList, addToWorkflow=True) + runNumberList = [123, 456, 789] + res = job.setParameterSequence("RunNumber", runNumberList, addToWorkflow=True) + assert res["OK"] + jdl = job._toJDL() with open(join(dirname(__file__), "testWF.jdl")) as fd: @@ -59,13 +63,16 @@ def test_SimpleParametricJob(): arguments = clad.getAttributeString("Arguments") job_id = clad.getAttributeString("JOB_ID") inputData = clad.getAttributeString("InputData") + runNumber = clad.getAttributeString("RunNumber") assert job_id == "%(JOB_ID)s" assert inputData == "%(InputData)s" + assert runNumber == "%(RunNumber)s" assert "jobDescription.xml" in arguments assert "-o LogLevel=DEBUG" in arguments assert "-p JOB_ID=%(JOB_ID)s" in arguments assert "-p InputData=%(InputData)s" in arguments + assert "-p RunNumber=%(RunNumber)s" in arguments @pytest.mark.parametrize( diff --git a/src/DIRAC/Interfaces/API/test/testWF.jdl b/src/DIRAC/Interfaces/API/test/testWF.jdl index 2d6a4b6446e..9e5445629ac 100644 --- a/src/DIRAC/Interfaces/API/test/testWF.jdl +++ b/src/DIRAC/Interfaces/API/test/testWF.jdl @@ -1,5 +1,5 @@ - Arguments = "jobDescription.xml -o LogLevel=DEBUG -p JOB_ID=%(JOB_ID)s -p InputData=%(InputData)s"; + Arguments = "jobDescription.xml -o LogLevel=DEBUG -p JOB_ID=%(JOB_ID)s -p InputData=%(InputData)s -p RunNumber=%(RunNumber)s"; Executable = "dirac-jobexec"; InputData = %(InputData)s; InputSandbox = jobDescription.xml; @@ -29,6 +29,13 @@ 2, 3 }; + Parameters.RunNumber = + { + 123, + 456, + 789 + }; Priority = 1; + RunNumber = %(RunNumber)s; StdError = std.err; StdOutput = std.out; \ No newline at end of file diff --git a/src/DIRAC/Interfaces/API/test/testWF.xml b/src/DIRAC/Interfaces/API/test/testWF.xml index 25845b52eea..c426f4ea46a 100644 --- a/src/DIRAC/Interfaces/API/test/testWF.xml +++ b/src/DIRAC/Interfaces/API/test/testWF.xml @@ -12,10 +12,12 @@ + + diff --git a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py index 313a204010c..eda9c0f6cae 100644 --- a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py +++ b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py @@ -211,8 +211,19 @@ def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): self._logError("Invalid mixture of jobs with and without input data") return S_ERROR(ETSDATA, "Invalid mixture of jobs with and without input data") + # Handle Run Number + runNumber = paramsDict.get("RunNumber") + if runNumber: + if isinstance(runNumber, str): + runNumber = runNumber.replace(" ", "").split(";") + self._logVerbose(f"Setting run number to {runNumber}", transID=transID, method=method) + seqDict["RunNumber"] = runNumber + elif paramSeqDict.get("RunNumber") is not None: + self._logError("Invalid mixture of jobs with and without run number") + return S_ERROR(ETSDATA, "Invalid mixture of jobs with and without run number") + for paramName, paramValue in paramsDict.items(): - if paramName not in ("InputData", "Site", "TargetSE"): + if paramName not in ("InputData", "RunNumber", "Site", "TargetSE"): if paramValue: self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method) seqDict[paramName] = paramValue @@ -244,7 +255,7 @@ def __prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): paramSeqDict.setdefault(pName, []).append(seq) for paramName, paramSeq in paramSeqDict.items(): - if paramName in ["JOB_ID", "PRODUCTION_ID", "InputData"] + outputParameterList: + if paramName in ["JOB_ID", "PRODUCTION_ID", "InputData", "RunNumber"] + outputParameterList: res = oJob.setParameterSequence(paramName, paramSeq, addToWorkflow=paramName) else: res = oJob.setParameterSequence(paramName, paramSeq)