From fa651f620e5d68b12420f5716badf6e2d62f8e9c Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 23 Sep 2024 14:12:53 +0200 Subject: [PATCH] sweep: #7741 Change the updating logic of the TS FileStatus --- .../scalingAndLimitations.rst | 9 ++ .../DataManagementSystem/Agent/FTS3Agent.py | 6 +- .../DataManagementSystem/Client/FTS3Job.py | 5 + .../DataManagementSystem/ConfigTemplate.cfg | 6 ++ src/DIRAC/DataManagementSystem/DB/FTS3DB.py | 1 + .../Utilities/ResolveSE.py | 2 +- .../Client/ReqClient.py | 20 +++- .../RequestManagementSystem/DB/RequestDB.py | 13 +++ .../Service/ReqManagerHandler.py | 10 ++ .../Agent/TransformationAgent.py | 9 +- .../Client/RequestTasks.py | 102 ++++++++++++++---- 11 files changed, 153 insertions(+), 30 deletions(-) diff --git a/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst b/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst index 7c0a12fd2b4..258b2cfada0 100644 --- a/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst +++ b/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst @@ -62,6 +62,15 @@ Databases Every now and then, it is interesting to look at the fragmentation status of your database. This is done by using the ``analyze table`` statement (https://dev.mysql.com/doc/refman/8.4/en/analyze-table.html) possibly followed by the ``optimize table`` statement (https://dev.mysql.com/doc/refman/8.4/en/optimize-table.html). +To know whether your tables are fragmented:: + + select table_schema,table_name, sys.format_bytes(data_length) table_size, sys.format_bytes(data_free) empty_space from information_schema.tables where data_length >= (1024*1024*1024) order by data_length desc; + + +The fragmented space should be very small with respect to the overall table size. + + + Duplications ============ diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index cf21652750e..4531178b64f 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -104,6 +104,8 @@ def __readConf(self): # lifetime of the proxy we download to delegate to FTS self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME) + self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE) + return S_OK() def initialize(self): @@ -316,7 +318,7 @@ def monitorJobsLoop(self): log.info("Getting next batch of jobs to monitor", f"{loopId}/{nbOfLoops}") # get jobs from DB res = self.fts3db.getActiveJobs( - limit=JOB_MONITORING_BATCH_SIZE, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag + limit=self.jobMonitoringBatchSize, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag ) if not res["OK"]: @@ -351,7 +353,7 @@ def monitorJobsLoop(self): # If we got less to monitor than what we asked, # stop looping - if len(activeJobs) < JOB_MONITORING_BATCH_SIZE: + if len(activeJobs) < self.jobMonitoringBatchSize: break # Commit records after each loop self.dataOpSender.concludeSending() diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 708edfd9d30..83f2724027a 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -207,6 +207,11 @@ def monitor(self, context=None, ftsServer=None, ucert=None): filesStatus[file_id]["ftsGUID"] = None # TODO: update status to defunct if not recoverable here ? + # If the file is failed, check if it is recoverable + if file_state in FTS3File.FTS_FAILED_STATES: + if not fileDict.get("Recoverable", True): + filesStatus[file_id]["status"] = "Defunct" + # If the file is not in a final state, but the job is, we return an error # FTS can have inconsistencies where the FTS Job is in a final state # but not all the files. diff --git a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg index ef20117d498..b36078cef1c 100644 --- a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg @@ -132,6 +132,12 @@ Agents OperationBulkSize = 20 # How many Job we will monitor in one loop JobBulkSize = 20 + # split jobBulkSize in several chunks + # Bigger numbers (like 100) are efficient when there's a single agent + # When there are multiple agents, it may slow down the overall because + # of lock and race conditions + # (This number should of course be smaller or equal than JobBulkSize) + JobMonitoringBatchSize = 20 # Max number of files to go in a single job MaxFilesPerJob = 100 # Max number of attempt per file diff --git a/src/DIRAC/DataManagementSystem/DB/FTS3DB.py b/src/DIRAC/DataManagementSystem/DB/FTS3DB.py index 41e4796ac2c..de7b789ae55 100644 --- a/src/DIRAC/DataManagementSystem/DB/FTS3DB.py +++ b/src/DIRAC/DataManagementSystem/DB/FTS3DB.py @@ -311,6 +311,7 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned") session.query(FTS3Job) .join(FTS3Operation) .filter(FTS3Job.status.in_(FTS3Job.NON_FINAL_STATES)) + .filter(FTS3Operation.status == "Active") .filter(FTS3Job.assignment.is_(None)) .filter(FTS3Operation.assignment.is_(None)) ) diff --git a/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py b/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py index dd5d3149459..0eccbec8fdb 100644 --- a/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py +++ b/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py @@ -1,6 +1,7 @@ """ This module allows to resolve output SEs for Job based on SE and site/country association """ + from random import shuffle from DIRAC import gLogger, gConfig @@ -70,7 +71,6 @@ def getDestinationSEList(outputSE, site, outputmode="Any"): raise RuntimeError(localSEs["Message"]) localSEs = localSEs["Value"] sLog.verbose("Local SE list is:", ", ".join(localSEs)) - # There is an alias defined for this Site associatedSEs = gConfig.getValue(f"/Resources/Sites/{prefix}/{site}/AssociatedSEs/{outputSE}", []) if associatedSEs: diff --git a/src/DIRAC/RequestManagementSystem/Client/ReqClient.py b/src/DIRAC/RequestManagementSystem/Client/ReqClient.py index 0a10d07f35c..9255b2d09c4 100755 --- a/src/DIRAC/RequestManagementSystem/Client/ReqClient.py +++ b/src/DIRAC/RequestManagementSystem/Client/ReqClient.py @@ -5,6 +5,7 @@ :synopsis: implementation of client for RequestDB using DISET framework """ + import os import time import random @@ -258,7 +259,7 @@ def getRequestStatus(self, requestID): self.log.debug("getRequestStatus: attempting to get status for '%d' request." % requestID) requestStatus = self._getRPC().getRequestStatus(requestID) if not requestStatus["OK"]: - self.log.error( + self.log.verbose( "getRequestStatus: unable to get status for request", ": '%d' %s" % (requestID, requestStatus["Message"]), ) @@ -470,6 +471,23 @@ def resetFailedRequest(self, requestID, allR=False): return self.putRequest(req) return S_OK("Not reset") + @ignoreEncodeWarning + def getBulkRequestStatus(self, requestIDs: list[int]): + """get the Status for the supplied request IDs. + + :param self: self reference + :param list requestIDs: list of job IDs (integers) + :return: S_ERROR or S_OK( { reqID1:status, requID2:status2, ... }) + """ + res = self._getRPC().getBulkRequestStatus(requestIDs) + if not res["OK"]: + return res + + # Cast the requestIDs back to int + statuses = strToIntDict(res["Value"]) + + return S_OK(statuses) + # ============= Some useful functions to be shared =========== diff --git a/src/DIRAC/RequestManagementSystem/DB/RequestDB.py b/src/DIRAC/RequestManagementSystem/DB/RequestDB.py index ed4fc4ac374..8d10c866a42 100644 --- a/src/DIRAC/RequestManagementSystem/DB/RequestDB.py +++ b/src/DIRAC/RequestManagementSystem/DB/RequestDB.py @@ -918,6 +918,19 @@ def getRequestStatus(self, requestID): session.close() return S_OK(status[0]) + def getBulkRequestStatus(self, requestIDs): + """get requests statuses for given request IDs""" + session = self.DBSession() + try: + statuses = session.query(Request.RequestID, Request._Status).filter(Request.RequestID.in_(requestIDs)).all() + status_dict = {req_id: req_status for req_id, req_status in statuses} + except Exception as e: + # log as well? + return S_ERROR(f"Failed to getBulkRequestStatus {e!r}") + finally: + session.close() + return S_OK(status_dict) + def getRequestFileStatus(self, requestID, lfnList): """get status for files in request given its id diff --git a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py index 6b53f32bb2f..afdd072000f 100755 --- a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py +++ b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py @@ -337,6 +337,16 @@ def export_getRequestStatus(cls, requestID): gLogger.error("getRequestStatus", status["Message"]) return status + types_getBulkRequestStatus = [list] + + @classmethod + def export_getBulkRequestStatus(cls, requestIDs): + """get requests statuses given their ids""" + res = cls.__requestDB.getBulkRequestStatus(requestIDs) + if not res["OK"]: + gLogger.error("getBulkRequestStatus", res["Message"]) + return res + types_getRequestFileStatus = [int, [str, list]] @classmethod diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py index cfe543adf7a..8b527b54945 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py @@ -9,6 +9,7 @@ :caption: TransformationAgent options """ from importlib import import_module + import time import os import datetime @@ -242,7 +243,7 @@ def processTransformation(self, transDict, clients): if transID not in self.replicaCache: self.__readCache(transID) transFiles = transFiles["Value"] - unusedLfns = [f["LFN"] for f in transFiles] + unusedLfns = {f["LFN"] for f in transFiles} unusedFiles = len(unusedLfns) plugin = transDict.get("Plugin", "Standard") @@ -251,7 +252,7 @@ def processTransformation(self, transDict, clients): maxFiles = Operations().getValue(f"TransformationPlugins/{plugin}/MaxFilesToProcess", 0) # Get plugin-specific limit in number of files (0 means no limit) totLfns = len(unusedLfns) - lfnsToProcess = self.__applyReduction(unusedLfns, maxFiles=maxFiles) + lfnsToProcess = set(self.__applyReduction(unusedLfns, maxFiles=maxFiles)) if len(lfnsToProcess) != totLfns: self._logInfo( "Reduced number of files from %d to %d" % (totLfns, len(lfnsToProcess)), @@ -534,8 +535,10 @@ def _getDataReplicasDM(self, transID, lfns, clients, forJobs=True, ignoreMissing method=method, transID=transID, ) + successful_set = set(replicas["Successful"]) + failed_set = set(replicas["Failed"]) # If files are neither Successful nor Failed, they are set problematic in the FC - problematicLfns = [lfn for lfn in lfns if lfn not in replicas["Successful"] and lfn not in replicas["Failed"]] + problematicLfns = [lfn for lfn in lfns if lfn not in successful_set and lfn not in failed_set] if problematicLfns: self._logInfo(f"{len(problematicLfns)} files found problematic in the catalog, set ProbInFC") res = clients["TransformationClient"].setFileStatusForTransformation( diff --git a/src/DIRAC/TransformationSystem/Client/RequestTasks.py b/src/DIRAC/TransformationSystem/Client/RequestTasks.py index d487526ef95..4748ca583e7 100644 --- a/src/DIRAC/TransformationSystem/Client/RequestTasks.py +++ b/src/DIRAC/TransformationSystem/Client/RequestTasks.py @@ -309,26 +309,48 @@ def getSubmittedTaskStatus(self, taskDicts): Check if tasks changed status, and return a list of tasks per new status """ updateDict = {} - badRequestID = 0 + externalIDs = [ + int(taskDict["ExternalID"]) + for taskDict in taskDicts + if taskDict["ExternalID"] and int(taskDict["ExternalID"]) + ] + # Count how many tasks don't have an valid external ID + badRequestID = len(taskDicts) - len(externalIDs) + + res = self.requestClient.getBulkRequestStatus(externalIDs) + if not res["OK"]: + # We need a transformationID for the log, and although we expect a single one, + # do things ~ properly + tids = list({taskDict["TransformationID"] for taskDict in taskDicts}) + try: + tid = tids[0] + except IndexError: + tid = 0 + + self._logWarn( + "getSubmittedTaskStatus: Failed to get bulk requestIDs", + res["Message"], + transID=tid, + ) + return S_OK({}) + new_statuses = res["Value"] + for taskDict in taskDicts: oldStatus = taskDict["ExternalStatus"] # ExternalID is normally a string - if taskDict["ExternalID"] and int(taskDict["ExternalID"]): - newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"]) - if not newStatus["OK"]: - log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn - log( - "getSubmittedTaskStatus: Failed to get requestID for request", - newStatus["Message"], - transID=taskDict["TransformationID"], - ) - else: - newStatus = newStatus["Value"] - # We don't care updating the tasks to Assigned while the request is being processed - if newStatus != oldStatus and newStatus != "Assigned": - updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + + newStatus = new_statuses.get(int(taskDict["ExternalID"])) + if not newStatus: + self._logVerbose( + "getSubmittedTaskStatus: Failed to get requestID for request", + f"No such RequestID {taskDict['ExternalID']}", + transID=taskDict["TransformationID"], + ) else: - badRequestID += 1 + # We do not update the tasks status if the Request is Assigned, as it is a very temporary status + if newStatus != oldStatus and newStatus != "Assigned": + updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + if badRequestID: self._logWarn("%d requests have identifier 0" % badRequestID) return S_OK(updateDict) @@ -355,13 +377,38 @@ def getSubmittedFileStatus(self, fileDicts): requestFiles = {} for taskDict in res["Value"]: taskID = taskDict["TaskID"] - externalID = taskDict["ExternalID"] + externalID = int(taskDict["ExternalID"]) # Only consider tasks that are submitted, ExternalID is a string if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID): requestFiles[externalID] = taskFiles[taskID] + res = self.requestClient.getBulkRequestStatus(list(requestFiles)) + if not res["OK"]: + self._logWarn( + "Failed to get request status", + res["Message"], + transID=transID, + method="getSubmittedFileStatus", + ) + return S_OK({}) + reqStatuses = res["Value"] + updateDict = {} for requestID, lfnList in requestFiles.items(): + # We only take request in final state to avoid race conditions + # https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414 + reqStatus = reqStatuses.get(requestID) + if not reqStatus: + self._logVerbose( + "Failed to get request status", + f"Request {requestID} does not exist", + transID=transID, + method="getSubmittedFileStatus", + ) + continue + if reqStatus not in Request.FINAL_STATES: + continue + statusDict = self.requestClient.getRequestFileStatus(requestID, lfnList) if not statusDict["OK"]: log = self._logVerbose if "not exist" in statusDict["Message"] else self._logWarn @@ -371,10 +418,19 @@ def getSubmittedFileStatus(self, fileDicts): transID=transID, method="getSubmittedFileStatus", ) - else: - for lfn, newStatus in statusDict["Value"].items(): - if newStatus == "Done": - updateDict[lfn] = TransformationFilesStatus.PROCESSED - elif newStatus == "Failed": - updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC + continue + + # If we are here, it means the Request is in a final state. + # In principle, you could expect every file also be in a final state + # but this is only true for simple Request. + # Hence, the file is marked as PROCESSED only if the file status is Done + # In any other case, we mark it problematic + # This is dangerous though, as complex request may not be re-entrant + # We would need a way to make sure it is safe to do so. + # See https://github.com/DIRACGrid/DIRAC/issues/7116 for more details + for lfn, newStatus in statusDict["Value"].items(): + if newStatus == "Done": + updateDict[lfn] = TransformationFilesStatus.PROCESSED + else: + updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC return S_OK(updateDict)