Skip to content

Commit

Permalink
Merge pull request #7679 from fstagni/90_JobParameters_improvements
Browse files Browse the repository at this point in the history
[9.0] jobParametersDB improvements
  • Loading branch information
fstagni authored Aug 30, 2024
2 parents 89ae7ff + b1db301 commit b627e05
Show file tree
Hide file tree
Showing 19 changed files with 151 additions and 327 deletions.
2 changes: 1 addition & 1 deletion dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ Systems
{
Databases
{
ElasticJobParametersDB
JobParametersDB
{
# Host of OpenSearch instance
Host=host.some.where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ It is based on layered architecture and is based on DIRAC architecture:
* SandboxMetadataDB
SandboxMetadataDB class is a front-end to the metadata for sandboxes.

* ElasticJobParametersDB
ElasticJobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters.
* JobParametersDB
JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters.
It is used in most of the WMS components and is based on Elastic/OpenSearch.
4 changes: 2 additions & 2 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,14 @@ def getDoc(self, index: str, docID: str) -> dict:
return S_ERROR(re)

@ifConnected
def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]:
def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
"""Efficiently retrieve many documents from an index.
:param index: name of the index
:param docIDs: document IDs
"""
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs]
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
except RequestError as re:
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,7 @@ def getAvailableESDatabases(self, extensions):
Result should be something like::
{'MonitoringDB': {'Type': 'ES', 'System': 'Monitoring', 'Extension': ''},
'ElasticJobParametersDB': {'Type': 'ES', 'System': 'WorkloadManagement', 'Extension': ''}}
'JobParametersDB': {'Type': 'ES', 'System': 'WorkloadManagement', 'Extension': ''}}
:param list extensions: list of DIRAC extensions
:return: dict of ES DBs
Expand Down
21 changes: 10 additions & 11 deletions src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
""" This object is a wrapper for setting and getting jobs states
"""
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, singleValueDefFields, multiValueDefFields
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, RIGHT_RESCHEDULE
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_RESET, RIGHT_CHANGE_STATUS
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_CHANGE_STATUS,
RIGHT_GET_INFO,
RIGHT_RESCHEDULE,
RIGHT_RESET,
)
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility


Expand Down Expand Up @@ -96,7 +100,7 @@ def commitCache(self, initialState, cache, jobLog):
return S_OK(False)
gLogger.verbose(f"Job {self.__jid}: About to execute trace. Current state {initialState}")

data = {"att": [], "jobp": [], "optp": []}
data = {"att": [], "optp": []}
for key in cache:
for dk in data:
if key.find(f"{dk}.") == 0:
Expand All @@ -111,11 +115,6 @@ def commitCache(self, initialState, cache, jobLog):
if not result["OK"]:
return result

if data["jobp"]:
result = self.__retryFunction(5, JobState.__db.jobDB.setJobParameters, (self.__jid, data["jobp"]))
if not result["OK"]:
return result

for k, v in data["optp"]:
result = self.__retryFunction(5, JobState.__db.jobDB.setJobOptParameter, (self.__jid, k, v))
if not result["OK"]:
Expand Down
65 changes: 2 additions & 63 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
Expand Down Expand Up @@ -264,6 +265,7 @@ def getJobAttribute(self, jobID, attribute):
return S_OK(result["Value"].get(attribute))

#############################################################################
@deprecated("Use JobParametersDB instead")
def getJobParameter(self, jobID, parameter):
"""Get the given parameter of a job specified by its jobID"""

Expand Down Expand Up @@ -668,50 +670,6 @@ def setStartExecTime(self, jobID, startDate=None):
req = f"UPDATE Jobs SET StartExecTime={startDate} WHERE JobID={jobID} AND StartExecTime IS NULL"
return self._update(req)

#############################################################################
def setJobParameter(self, jobID, key, value):
"""Set a parameter specified by name,value pair for the job JobID"""

ret = self._escapeString(key)
if not ret["OK"]:
return ret
e_key = ret["Value"]
ret = self._escapeString(value)
if not ret["OK"]:
return ret
e_value = ret["Value"]

cmd = "REPLACE JobParameters (JobID,Name,Value) VALUES (%d,%s,%s)" % (int(jobID), e_key, e_value)
return self._update(cmd)

#############################################################################
def setJobParameters(self, jobID, parameters):
"""Set parameters specified by a list of name/value pairs for the job JobID
:param int jobID: Job ID
:param list parameters: list of tuples (name, value) pairs
:return: S_OK/S_ERROR
"""

if not parameters:
return S_OK()

insertValueList = []
for name, value in parameters:
ret = self._escapeString(name)
if not ret["OK"]:
return ret
e_name = ret["Value"]
ret = self._escapeString(value)
if not ret["OK"]:
return ret
e_value = ret["Value"]
insertValueList.append(f"({jobID},{e_name},{e_value})")

cmd = f"REPLACE JobParameters (JobID,Name,Value) VALUES {', '.join(insertValueList)}"
return self._update(cmd)

#############################################################################
def setJobOptParameter(self, jobID, name, value):
"""Set an optimzer parameter specified by name,value pair for the job JobID"""
Expand Down Expand Up @@ -780,16 +738,6 @@ def setAtticJobParameter(self, jobID, key, value, rescheduleCounter):
)
return self._update(cmd)

#############################################################################
def __setInitialJobParameters(self, classadJob, jobID):
"""Set initial job parameters as was defined in the Classad"""

# Extract initital job parameters
parameters = {}
if classadJob.lookupAttribute("Parameters"):
parameters = classadJob.getDictionaryFromSubJDL("Parameters")
return self.setJobParameters(jobID, list(parameters.items()))

#############################################################################
def setJobJDL(self, jobID, jdl=None, originalJDL=None):
"""Insert JDL's for job specified by jobID"""
Expand Down Expand Up @@ -969,11 +917,6 @@ def insertNewJobIntoDB(
if not result["OK"]:
return result

# Setting the Job parameters
result = self.__setInitialJobParameters(classAdJob, jobID)
if not result["OK"]:
return result

# Looking for the Input Data
inputData = []
if classAdJob.lookupAttribute("InputData"):
Expand Down Expand Up @@ -1205,10 +1148,6 @@ def rescheduleJob(self, jobID):
if not result["OK"]:
return result

result = self.__setInitialJobParameters(classAdJob, jobID)
if not result["OK"]:
return result

result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True)
if not result["OK"]:
return result
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB.
""" Module containing a front-end to the ElasticSearch-based JobParametersDB.
This is a drop-in replacement for MySQL-based table JobDB.JobParameters.
The following class methods are provided for public usage
- getJobParameters()
- setJobParameter()
- deleteJobParameters()
"""
from typing import Union

from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.ElasticDB import ElasticDB
Expand Down Expand Up @@ -34,7 +33,7 @@
}


class ElasticJobParametersDB(ElasticDB):
class JobParametersDB(ElasticDB):
def __init__(self, parentLogger=None):
"""Standard Constructor"""

Expand All @@ -44,17 +43,17 @@ def __init__(self, parentLogger=None):
try:
# Connecting to the ES cluster
super().__init__(self.fullname, self.index_name, parentLogger=parentLogger)
except Exception as ex:
raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex
except Exception:
RuntimeError("Can't connect to JobParameters index")
self.addIndexTemplate("elasticjobparametersdb", index_patterns=[f"{self.index_name}_*"], mapping=mapping)

def _indexName(self, jobID: int) -> str:
def _indexName(self, jobID: int, vo: str) -> str:
"""construct the index name
:param jobID: Job ID
"""
indexSplit = int(jobID) // 1e6
return f"{self.index_name}_{indexSplit}m"
indexSplit = int(int(jobID) // 1e6)
return f"{self.index_name}_{vo}_{indexSplit}m"

def _createIndex(self, indexName: str) -> None:
"""Create a new index if needed
Expand All @@ -70,7 +69,7 @@ def _createIndex(self, indexName: str) -> None:
raise RuntimeError(result["Message"])
self.log.always("Index created:", indexName)

def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict:
def getJobParameters(self, jobIDs: int | list[int], vo: str, paramList=None) -> dict:
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters are returned.
Expand All @@ -86,7 +85,7 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic
paramList = paramList.replace(" ", "").split(",")
self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}")

res = self.getDocs(self._indexName, jobIDs)
res = self.getDocs(self._indexName, jobIDs, vo)
if not res["OK"]:
return res
result = {}
Expand All @@ -98,9 +97,9 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic

return S_OK(result)

def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
def setJobParameter(self, jobID: int, key: str, value: str, vo: str) -> dict:
"""
Inserts data into ElasticJobParametersDB index
Inserts data into JobParametersDB index
:param self: self reference
:param jobID: Job ID
Expand All @@ -115,44 +114,44 @@ def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
# The _id in ES can't exceed 512 bytes, this is a ES hard-coded limitation.

# If a record with this jobID update and add parameter, otherwise create a new record
if self.existsDoc(self._indexName(jobID), docID=str(jobID)):
if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)):
self.log.debug("A document for this job already exists, it will now be updated")
result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": data})
result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": data})
else:
self.log.debug("No document has this job id, creating a new document for this job")
self._createIndex(self._indexName(jobID))
result = self.index(indexName=self._indexName(jobID), body=data, docID=str(jobID))
self._createIndex(self._indexName(jobID, vo))
result = self.index(indexName=self._indexName(jobID, vo), body=data, docID=str(jobID))
if not result["OK"]:
self.log.error("Couldn't insert or update data", result["Message"])
return result

def setJobParameters(self, jobID: int, parameters: list) -> dict:
def setJobParameters(self, jobID: int, parameters: list, vo: str) -> dict:
"""
Inserts data into ElasticJobParametersDB index using bulk indexing
Inserts data into JobParametersDB index using bulk indexing
:param self: self reference
:param jobID: Job ID
:param parameters: list of tuples (name, value) pairs
:returns: S_OK/S_ERROR as result of indexing
"""
self.log.debug("Inserting parameters", f"in {self._indexName(jobID)}: for job {jobID}: {parameters}")
self.log.debug("Inserting parameters", f"in {self._indexName(jobID, vo)}: for job {jobID}: {parameters}")

parametersDict = dict(parameters)
parametersDict["JobID"] = jobID
parametersDict["timestamp"] = int(TimeUtilities.toEpochMilliSeconds())

if self.existsDoc(self._indexName(jobID), docID=str(jobID)):
if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)):
self.log.debug("A document for this job already exists, it will now be updated")
result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": parametersDict})
result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": parametersDict})
else:
self.log.debug("Creating a new document for this job")
self._createIndex(self._indexName(jobID))
result = self.index(self._indexName(jobID), body=parametersDict, docID=str(jobID))
self._createIndex(self._indexName(jobID, vo))
result = self.index(self._indexName(jobID, vo), body=parametersDict, docID=str(jobID))
if not result["OK"]:
self.log.error("Couldn't insert or update data", result["Message"])
return result

def deleteJobParameters(self, jobID: int, paramList=None) -> dict:
def deleteJobParameters(self, jobID: int, paramList=None, vo: str = "") -> dict:
"""Deletes Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters for the job are removed
Expand All @@ -169,13 +168,13 @@ def deleteJobParameters(self, jobID: int, paramList=None) -> dict:
if not paramList:
# Deleting the whole record
self.log.debug("Deleting record of job {jobID}")
result = self.deleteDoc(self._indexName(jobID), docID=str(jobID))
result = self.deleteDoc(self._indexName(jobID, vo), docID=str(jobID))
else:
# Deleting the specific parameters
self.log.debug(f"JobDB.getParameters: Deleting Parameters {paramList} for job {jobID}")
for paramName in paramList:
result = self.updateDoc(
index=self._indexName(jobID),
index=self._indexName(jobID, vo),
docID=str(jobID),
body={"script": "ctx._source.remove('" + paramName + "')"},
)
Expand Down
Loading

0 comments on commit b627e05

Please sign in to comment.