Skip to content

Commit

Permalink
Merge pull request #7609 from fstagni/cherry-pick-2-716726bfe-integra…
Browse files Browse the repository at this point in the history
…tion

[sweep:integration] Perform bulk lookup of job parameters from elasticsearch
  • Loading branch information
fstagni authored May 16, 2024
2 parents 3b78e80 + 062d6a2 commit df16ef4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/DIRAC/Core/LCG/GOCDBClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ def getStatus(self, granularity, name=None, startDate=None, startingInHours=None
.. code-block:: python
{'OK': True,
'Value': {'92569G0 lhcbsrm-kit.gridka.de': {
'Value': {'92569G0 lhcbdcache-kit-tape.gridka.de': {
'DESCRIPTION': 'Annual site downtime for various major tasks i...',
'FORMATED_END_DATE': '2014-05-27 15:21',
'FORMATED_START_DATE': '2014-05-26 04:00',
'GOCDB_PORTAL_URL': 'https://goc.egi.eu/portal/index.php?Page_Type=Downtime&id=14051',
'HOSTED_BY': 'FZK-LCG2',
'HOSTNAME': 'lhcbsrm-kit.gridka.de',
'SERVICE_TYPE': 'SRM.nearline',
'HOSTNAME': 'lhcbdcache-kit-tape.gridka.de',
'SERVICE_TYPE': 'wlcg.webdav.tape',
'SEVERITY': 'OUTAGE'},
'99873G0 srm.pic.esSRM': {
'HOSTED_BY': 'pic',
Expand All @@ -97,7 +97,7 @@ def getStatus(self, granularity, name=None, startDate=None, startingInHours=None
'URL': 'srm.pic.es',
'GOCDB_PORTAL_URL': 'https://goc.egi.eu/portal/index.php?Page_Type=Downtime&id=21303',
'FORMATED_START_DATE': '2016-09-14 06:00',
'SERVICE_TYPE': 'SRM',
'SERVICE_TYPE': 'webdav',
'FORMATED_END_DATE': '2016-09-14 15:00',
'DESCRIPTION': 'Outage declared due to network and dCache upgrades'}
}
Expand Down
17 changes: 17 additions & 0 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ def getDoc(self, index: str, docID: str) -> dict:
except RequestError as re:
return S_ERROR(re)

@ifConnected
def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]:
"""Efficiently retrieve many documents from an index.
:param index: name of the index
:param docIDs: document IDs
"""
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
except RequestError as re:
return S_ERROR(re)
else:
results = {int(x["_id"]): x["_source"] if x.get("found") else {} for x in response["docs"]}
return S_OK(results)

@ifConnected
def updateDoc(self, index: str, docID: str, body) -> dict:
"""Update an existing document with a script or partial document
Expand Down
27 changes: 16 additions & 11 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- setJobParameter()
- deleteJobParameters()
"""
from typing import Union

from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities
Expand Down Expand Up @@ -46,7 +48,7 @@ def _indexName(self, jobID: int) -> str:
:param jobID: Job ID
"""
indexSplit = int(jobID // 1e6)
indexSplit = int(jobID) // 1e6
return f"{self.index_name}_{indexSplit}m"

def _createIndex(self, indexName: str) -> None:
Expand All @@ -63,7 +65,7 @@ def _createIndex(self, indexName: str) -> None:
raise RuntimeError(result["Message"])
self.log.always("Index created:", indexName)

def getJobParameters(self, jobID: int, paramList=None) -> dict:
def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict:
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters are returned.
Expand All @@ -73,20 +75,23 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict:
:param paramList: list of parameters to be returned (also a string is treated)
:return: dict with all Job Parameter values
"""
if isinstance(jobIDs, int):
jobIDs = [jobIDs]
if isinstance(paramList, str):
paramList = paramList.replace(" ", "").split(",")
self.log.debug(f"JobDB.getParameters: Getting Parameters for job {jobID}")
self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}")

res = self.getDoc(self._indexName(jobID), str(jobID))
res = self.getDocs(self._indexName, jobIDs)
if not res["OK"]:
return res
resultDict = res["Value"]
if paramList:
for k in list(resultDict):
if k not in paramList:
resultDict.pop(k)

return S_OK({jobID: resultDict})
result = {}
for job_id, doc in res["Value"].items():
if paramList:
result[job_id] = {k: v for k, v in doc.items() if k in paramList}
else:
result[job_id] = doc

return S_OK(result)

def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,10 @@ def export_getJobParameters(cls, jobIDs, parName=None):
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
jobIDs = [int(jobID) for jobID in jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])
res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parameters = res["Value"]

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
Expand Down

0 comments on commit df16ef4

Please sign in to comment.