Skip to content

Commit

Permalink
Merge pull request #7292 from fstagni/jobParameters_v9
Browse files Browse the repository at this point in the history
[9.0] Job parameters: use only OpenSearch
  • Loading branch information
fstagni authored Apr 17, 2024
2 parents bcc9a80 + d54a3d4 commit 09da32c
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 401 deletions.
14 changes: 14 additions & 0 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,20 @@ Systems
##END
}
}
WorkloadManagementSystem
{
Databases
{
ElasticJobParametersDB
{
# Host of OpenSearch instance
Host=host.some.where

# index name (default is "job_parameters")
index_name=a_different_name
}
}
}
}
Resources
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ SandboxMetadataDB
TaskQueueDB
The TaskQueueDB is used to organize jobs requirements into task queues, for easier matching.

All the DBs above are MySQL DBs, and should be installed using the :ref:`system administrator console <system-admin-console>`.

The JobDB MySQL table *JobParameters* can be replaced by an JobParameters backend built in Elastic/OpenSearch.
To enable it, set the following flag::

/Operations/[Defaults | Setup]/Services/JobMonitoring/useESForJobParametersFlag=True

If you decide to make use of this Elastic/OpenSearch backend for storing job parameters, you would be in charge of setting
the index policies, as Job Parameters stored in Elastic/OpenSearch are not deleted together with the jobs.
All the DBs above are MySQL DBs with the only exception of the Elastic/OpenSearch backend for storing job parameters.


Services
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Base/ElasticDB.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
""" ElasticDB is a base class used to connect an Elasticsearch database and manages queries.
"""
from DIRAC.ConfigurationSystem.Client.Utilities import getElasticDBParameters
from DIRAC.Core.Base.DIRACDB import DIRACDB
from DIRAC.Core.Utilities.ElasticSearchDB import ElasticSearchDB
from DIRAC.ConfigurationSystem.Client.Utilities import getElasticDBParameters


class ElasticDB(DIRACDB, ElasticSearchDB):
Expand Down
29 changes: 10 additions & 19 deletions src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
optimizer instances and associated actions are performed there.
"""

from DIRAC import S_OK, S_ERROR, exit as dExit

from DIRAC import S_ERROR, S_OK
from DIRAC import exit as dExit
from DIRAC.AccountingSystem.Client.Types.Job import Job as AccountingJob
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB

Expand Down Expand Up @@ -48,17 +46,12 @@ def initialize(self, jobDB=None, logDB=None):
if not self.jobDB.isValid():
dExit(1)

useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
self.elasticJobParametersDB = result["Value"]()

self.logDB = JobLoggingDB() if logDB is None else logDB

Expand Down Expand Up @@ -244,9 +237,7 @@ def setJobParam(self, job, reportName, value):
return S_OK()

self.log.debug(f"setJobParameter({job},'{reportName}','{value}')")
if self.elasticJobParametersDB:
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)
return self.jobDB.setJobParameter(job, reportName, value)
return self.elasticJobParametersDB.setJobParameter(int(job), reportName, value)

#############################################################################
def setFailedJob(self, job, msg, classAdJob=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ def getAttribute(self, name):
def getAttributes(self, nameList=None):
return self.__cacheDict("att", self.__jobState.getAttributes, nameList)

# JobParameters --- REMOVED

# Optimizer params

def setOptParameter(self, name, value):
Expand Down
20 changes: 6 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB.
This is a drop-in replacement for MySQL-based table JobDB.JobParameters.
The reason for switching to a ES-based JobParameters lies in the extended searching
capabilities of ES.
This results in higher traceability for DIRAC jobs.
The following class methods are provided for public usage
- getJobParameters()
- setJobParameter()
- deleteJobParameters()
"""
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities


mapping = {
"properties": {
"JobID": {"type": "long"},
Expand All @@ -38,24 +32,22 @@ class ElasticJobParametersDB(ElasticDB):
def __init__(self, parentLogger=None):
"""Standard Constructor"""

try:
indexPrefix = CSGlobals.getSetup().lower()
self.fullname = "WorkloadManagement/ElasticJobParametersDB"
self.index_name = self.getCSOption("index_name", "job_parameters")

try:
# Connecting to the ES cluster
super().__init__("WorkloadManagement/ElasticJobParametersDB", indexPrefix, parentLogger=parentLogger)
super().__init__(self.fullname, self.index_name, parentLogger=parentLogger)
except Exception as ex:
self.log.error("Can't connect to ElasticJobParametersDB", repr(ex))
raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex

self.indexName_base = f"{self.getIndexPrefix()}_elasticjobparameters_index"

def _indexName(self, jobID: int) -> str:
"""construct the index name
:param jobID: Job ID
"""
indexSplit = int(jobID) // 1e6
return f"{self.indexName_base}_{indexSplit}m"
indexSplit = int(jobID // 1e6)
return f"{self.index_name}_{indexSplit}m"

def _createIndex(self, indexName: str) -> None:
"""Create a new index if needed
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,11 +1062,10 @@ def removeJobFromDB(self, jobIDs):
if not result["OK"]:
failedTablesList.append(table)

result = S_OK()
if failedTablesList:
result = S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")

return result
return S_OK()

#############################################################################
def rescheduleJob(self, jobID):
Expand Down
76 changes: 30 additions & 46 deletions src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False)
if useESForJobParametersFlag:
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log)

cls.pilotManager = PilotManagerClient()
return S_OK()
Expand Down Expand Up @@ -446,14 +440,7 @@ def export_getJobParameter(cls, jobID, parName):
:param str/int jobID: one single Job ID
:param str parName: one single parameter name
"""
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
if res["Value"].get(int(jobID)):
return S_OK(res["Value"][int(jobID)])

res = cls.jobDB.getJobParameters(jobID, [parName])
res = cls.elasticJobParametersDB.getJobParameters(jobID, [parName])
if not res["OK"]:
return res
return S_OK(res["Value"].get(int(jobID), {}))
Expand All @@ -475,34 +462,31 @@ def export_getJobParameters(cls, jobIDs, parName=None):
:param str/int/list jobIDs: one single job ID or a list of them
:param str parName: one single parameter name, a list or None (meaning all of them)
"""
if cls.elasticJobParametersDB:
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

return cls.jobDB.getJobParameters(jobIDs, parName)
parameters.update(res["Value"])

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parametersM = res["Value"]

# and now combine
final = dict(parametersM)
# if job in JobDB, update with parameters from ES if any
for jobID in final:
final[jobID].update(parameters.get(jobID, {}))
# if job in ES and not in JobDB, take ES
for jobID in parameters:
if jobID not in final:
final[jobID] = parameters[jobID]
return S_OK(final)

##############################################################################
types_getAtticJobParameters = [int]
Expand Down
69 changes: 20 additions & 49 deletions src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility

Expand All @@ -37,17 +36,12 @@ def initializeHandler(cls, svcInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

cls.elasticJobParametersDB = None
if Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False):
try:
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB"
)
if not result["OK"]:
return result
cls.elasticJobParametersDB = result["Value"]()

cls.jsu = JobStatusUtility(cls.jobDB, cls.jobLoggingDB, cls.elasticJobParametersDB)

Expand Down Expand Up @@ -165,10 +159,7 @@ def export_setJobParameter(cls, jobID, name, value):
for job specified by its JobId
"""

if cls.elasticJobParametersDB:
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

return cls.jobDB.setJobParameter(int(jobID), name, value)
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member

###########################################################################
types_setJobsParameter = [dict]
Expand All @@ -182,23 +173,13 @@ def export_setJobsParameter(cls, jobsParameterDict):
failed = False

for jobID in jobsParameterDict:
if cls.elasticJobParametersDB:
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

else:
res = cls.jobDB.setJobParameter(
jobID, str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to MySQL", res["Message"])
failed = True
message = res["Message"]
res = cls.elasticJobParametersDB.setJobParameter(
int(jobID), str(jobsParameterDict[jobID][0]), str(jobsParameterDict[jobID][1])
)
if not res["OK"]:
cls.log.error("Failed to add Job Parameter to elasticJobParametersDB", res["Message"])
failed = True
message = res["Message"]

if failed:
return S_ERROR(message)
Expand All @@ -213,14 +194,9 @@ def export_setJobParameters(cls, jobID, parameters):
"""Set arbitrary parameters specified by a list of name/value pairs
for job specified by its JobId
"""
if cls.elasticJobParametersDB:
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticJobParametersDB", result["Message"])

return result

Expand All @@ -235,15 +211,10 @@ def export_sendHeartBeat(cls, jobID, dynamicData, staticData):
if not result["OK"]:
cls.log.warn("Failed to set the heart beat data", f"for job {jobID} ")

if cls.elasticJobParametersDB:
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])
else:
result = cls.jobDB.setJobParameters(int(jobID), list(staticData.items()))
for key, value in staticData.items():
result = cls.elasticJobParametersDB.setJobParameter(int(jobID), key, value)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to MySQL", result["Message"])
cls.log.error("Failed to add Job Parameters to ElasticSearch", result["Message"])

# Restore the Running status if necessary
result = cls.jobDB.getJobAttributes(jobID, ["Status"])
Expand Down
Loading

0 comments on commit 09da32c

Please sign in to comment.