From 048a3307fd210461b5d77f47816f24b52ee87b82 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Wed, 31 Jul 2024 13:25:46 +0200 Subject: [PATCH 1/9] docs: more scaling doc --- .../ServerInstallations/scalingAndLimitations.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst b/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst index 91d78261d77..c6a5dc6b1ff 100644 --- a/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst +++ b/docs/source/AdministratorGuide/ServerInstallations/scalingAndLimitations.rst @@ -62,6 +62,15 @@ Databases Every now and then, it is interesting to look at the fragmentation status of your database. This is done by using the ``analyze table`` statement (https://dev.mysql.com/doc/refman/8.4/en/analyze-table.html) possibly followed by the ``optimize table`` statement (https://dev.mysql.com/doc/refman/8.4/en/optimize-table.html). +To know whether your tables are fragmented:: + + select table_schema,table_name, sys.format_bytes(data_length) table_size, sys.format_bytes(data_free) empty_space from information_schema.tables where data_length >= (1024*1024*1024) order by data_length desc; + + +The fragmented space should be very small with respect to the overall table size. + + + Duplications ============ From 5ecd70e9d309bbb731d8864c110053c56a1ee893 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Wed, 31 Jul 2024 14:07:20 +0200 Subject: [PATCH 2/9] fix (TS): only look at requests in a final status to update FileTasks status --- .../TransformationSystem/Client/RequestTasks.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/DIRAC/TransformationSystem/Client/RequestTasks.py b/src/DIRAC/TransformationSystem/Client/RequestTasks.py index a3fee6a7202..858994d95f8 100644 --- a/src/DIRAC/TransformationSystem/Client/RequestTasks.py +++ b/src/DIRAC/TransformationSystem/Client/RequestTasks.py @@ -370,6 +370,22 @@ def getSubmittedFileStatus(self, fileDicts): updateDict = {} for requestID, lfnList in requestFiles.items(): + # We only take request in final state to avoid race conditions + # https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414 + reqStatus = self.requestClient.getRequestStatus(requestID) + if not reqStatus["OK"]: + log = self._logVerbose if "not exist" in reqStatus["Message"] else self._logWarn + log( + "Failed to get request status", + reqStatus["Message"], + transID=transID, + method="getSubmittedFileStatus", + ) + continue + reqStatus = reqStatus["Value"] + if reqStatus not in Request.FINAL_STATES: + continue + statusDict = self.requestClient.getRequestFileStatus(requestID, lfnList) if not statusDict["OK"]: log = self._logVerbose if "not exist" in statusDict["Message"] else self._logWarn From e1a84c172df29777e16487fe800e5526cddf7404 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Wed, 31 Jul 2024 15:49:47 +0200 Subject: [PATCH 3/9] fix (TS): set Problematic files that are only partially processed --- .../Client/ReqClient.py | 2 +- .../Client/RequestTasks.py | 21 +++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/DIRAC/RequestManagementSystem/Client/ReqClient.py b/src/DIRAC/RequestManagementSystem/Client/ReqClient.py index f0a1c55a970..bb9f64e9d9c 100755 --- a/src/DIRAC/RequestManagementSystem/Client/ReqClient.py +++ b/src/DIRAC/RequestManagementSystem/Client/ReqClient.py @@ -258,7 +258,7 @@ def getRequestStatus(self, requestID): self.log.debug("getRequestStatus: attempting to get status for '%d' request." % requestID) requestStatus = self._getRPC().getRequestStatus(requestID) if not requestStatus["OK"]: - self.log.error( + self.log.verbose( "getRequestStatus: unable to get status for request", ": '%d' %s" % (requestID, requestStatus["Message"]), ) diff --git a/src/DIRAC/TransformationSystem/Client/RequestTasks.py b/src/DIRAC/TransformationSystem/Client/RequestTasks.py index 858994d95f8..762b85d9d06 100644 --- a/src/DIRAC/TransformationSystem/Client/RequestTasks.py +++ b/src/DIRAC/TransformationSystem/Client/RequestTasks.py @@ -395,10 +395,19 @@ def getSubmittedFileStatus(self, fileDicts): transID=transID, method="getSubmittedFileStatus", ) - else: - for lfn, newStatus in statusDict["Value"].items(): - if newStatus == "Done": - updateDict[lfn] = TransformationFilesStatus.PROCESSED - elif newStatus == "Failed": - updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC + continue + + # If we are here, it means the Request is in a final state. + # In principle, you could expect everyfile also be in a final state + # but this is only true for simple Request. + # Hence, the file is marked as PROCESSED only if the file status is Done + # In any other case, we mark it problematic + # This is dangerous though, as complex request may not be re-entrant + # We would need a way to make sure it is safe to do so. + # See https://github.com/DIRACGrid/DIRAC/issues/7116 for more details + for lfn, newStatus in statusDict["Value"].items(): + if newStatus == "Done": + updateDict[lfn] = TransformationFilesStatus.PROCESSED + else: + updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC return S_OK(updateDict) From 62c03e7b37124f3d199c66f7bbed3b897a80b27a Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 12 Aug 2024 15:08:19 +0200 Subject: [PATCH 4/9] feat (RMS): introduce getBulkRequestStatus --- .../Client/ReqClient.py | 18 ++++++++++++++++++ .../RequestManagementSystem/DB/RequestDB.py | 10 ++++++++++ .../Service/ReqManagerHandler.py | 10 ++++++++++ 3 files changed, 38 insertions(+) diff --git a/src/DIRAC/RequestManagementSystem/Client/ReqClient.py b/src/DIRAC/RequestManagementSystem/Client/ReqClient.py index bb9f64e9d9c..7c73c81fc7f 100755 --- a/src/DIRAC/RequestManagementSystem/Client/ReqClient.py +++ b/src/DIRAC/RequestManagementSystem/Client/ReqClient.py @@ -5,6 +5,7 @@ :synopsis: implementation of client for RequestDB using DISET framework """ + import os import time import random @@ -470,6 +471,23 @@ def resetFailedRequest(self, requestID, allR=False): return self.putRequest(req) return S_OK("Not reset") + @ignoreEncodeWarning + def getBulkRequestStatus(self, requestIDs: list[int]): + """get the Status for the supplied request IDs. + + :param self: self reference + :param list requestIDs: list of job IDs (integers) + :return: S_ERROR or S_OK( { reqID1:status, requID2:status2, ... }) + """ + res = self._getRPC().getBulkRequestStatus(requestIDs) + if not res["OK"]: + return res + + # Cast the requestIDs back to int + statuses = strToIntDict(res["Value"]) + + return S_OK(statuses) + # ============= Some useful functions to be shared =========== diff --git a/src/DIRAC/RequestManagementSystem/DB/RequestDB.py b/src/DIRAC/RequestManagementSystem/DB/RequestDB.py index 5bd6b372923..978bb8bd06f 100644 --- a/src/DIRAC/RequestManagementSystem/DB/RequestDB.py +++ b/src/DIRAC/RequestManagementSystem/DB/RequestDB.py @@ -869,6 +869,16 @@ def getRequestStatus(self, requestID): session.close() return S_OK(status[0]) + def getBulkRequestStatus(self, requestIDs): + """get requests statuses for given request IDs""" + session = self.DBSession() + try: + statuses = session.query(Request.RequestID, Request._Status).filter(Request.RequestID.in_(requestIDs)).all() + status_dict = {req_id: req_status for req_id, req_status in statuses} + finally: + session.close() + return S_OK(status_dict) + def getRequestFileStatus(self, requestID, lfnList): """get status for files in request given its id diff --git a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py index 9d51ba7a17d..ca616c17306 100755 --- a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py +++ b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py @@ -340,6 +340,16 @@ def export_getRequestStatus(cls, requestID): gLogger.error(f"getRequestStatus: {status['Message']}") return status + types_getBulkRequestStatus = [list] + + @classmethod + def export_getBulkRequestStatus(cls, requestIDs): + """get requests statuses given their ids""" + res = cls.__requestDB.getBulkRequestStatus(requestIDs) + if not res["OK"]: + gLogger.error(f"getRequestStatus: {res['Message']}") + return res + types_getRequestFileStatus = [int, [str, list]] @classmethod From ffed84c259eba68ef7c3e4c3f7b57c95b47b9174 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 12 Aug 2024 15:40:16 +0200 Subject: [PATCH 5/9] feat (TS): RequestTasks use getBulkRequestStatus --- .../RequestManagementSystem/DB/RequestDB.py | 3 + .../Service/ReqManagerHandler.py | 2 +- .../Client/RequestTasks.py | 79 +++++++++++++------ 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/src/DIRAC/RequestManagementSystem/DB/RequestDB.py b/src/DIRAC/RequestManagementSystem/DB/RequestDB.py index 978bb8bd06f..5781a1007e7 100644 --- a/src/DIRAC/RequestManagementSystem/DB/RequestDB.py +++ b/src/DIRAC/RequestManagementSystem/DB/RequestDB.py @@ -875,6 +875,9 @@ def getBulkRequestStatus(self, requestIDs): try: statuses = session.query(Request.RequestID, Request._Status).filter(Request.RequestID.in_(requestIDs)).all() status_dict = {req_id: req_status for req_id, req_status in statuses} + except Exception as e: + # log as well? + return S_ERROR(f"Failed to getBulkRequestStatus {e!r}") finally: session.close() return S_OK(status_dict) diff --git a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py index ca616c17306..45a86f05e94 100755 --- a/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py +++ b/src/DIRAC/RequestManagementSystem/Service/ReqManagerHandler.py @@ -347,7 +347,7 @@ def export_getBulkRequestStatus(cls, requestIDs): """get requests statuses given their ids""" res = cls.__requestDB.getBulkRequestStatus(requestIDs) if not res["OK"]: - gLogger.error(f"getRequestStatus: {res['Message']}") + gLogger.error("getBulkRequestStatus", res["Message"]) return res types_getRequestFileStatus = [int, [str, list]] diff --git a/src/DIRAC/TransformationSystem/Client/RequestTasks.py b/src/DIRAC/TransformationSystem/Client/RequestTasks.py index 762b85d9d06..b34cf191c5a 100644 --- a/src/DIRAC/TransformationSystem/Client/RequestTasks.py +++ b/src/DIRAC/TransformationSystem/Client/RequestTasks.py @@ -317,26 +317,48 @@ def getSubmittedTaskStatus(self, taskDicts): Check if tasks changed status, and return a list of tasks per new status """ updateDict = {} - badRequestID = 0 + externalIDs = [ + int(taskDict["ExternalID"]) + for taskDict in taskDicts + if taskDict["ExternalID"] and int(taskDict["ExternalID"]) + ] + # Count how many tasks don't have an valid external ID + badRequestID = len(taskDicts) - len(externalIDs) + + res = self.requestClient.getBulkRequestStatus(externalIDs) + if not res["OK"]: + # We need a transformationID for the log, and although we expect a single one, + # do things ~ properly + tids = list({taskDict["TransformationID"] for taskDict in taskDicts}) + try: + tid = tids[0] + except IndexError: + tid = 0 + + self._logWarn( + "getSubmittedTaskStatus: Failed to get bulk requestIDs", + res["Message"], + transID=tid, + ) + return S_OK({}) + new_statuses = res["Value"] + for taskDict in taskDicts: oldStatus = taskDict["ExternalStatus"] # ExternalID is normally a string - if taskDict["ExternalID"] and int(taskDict["ExternalID"]): - newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"]) - if not newStatus["OK"]: - log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn - log( - "getSubmittedTaskStatus: Failed to get requestID for request", - newStatus["Message"], - transID=taskDict["TransformationID"], - ) - else: - newStatus = newStatus["Value"] - # We don't care updating the tasks to Assigned while the request is being processed - if newStatus != oldStatus and newStatus != "Assigned": - updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + + newStatus = new_statuses.get(int(taskDict["ExternalID"])) + if not newStatus: + self._logVerbose( + "getSubmittedTaskStatus: Failed to get requestID for request", + f"No such RequestID {taskDict['ExternalID']}", + transID=taskDict["TransformationID"], + ) else: - badRequestID += 1 + # We do not update the tasks status if the Request is Assigned, as it is a very temporary status + if newStatus != oldStatus and newStatus != "Assigned": + updateDict.setdefault(newStatus, []).append(taskDict["TaskID"]) + if badRequestID: self._logWarn("%d requests have identifier 0" % badRequestID) return S_OK(updateDict) @@ -363,26 +385,35 @@ def getSubmittedFileStatus(self, fileDicts): requestFiles = {} for taskDict in res["Value"]: taskID = taskDict["TaskID"] - externalID = taskDict["ExternalID"] + externalID = int(taskDict["ExternalID"]) # Only consider tasks that are submitted, ExternalID is a string if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID): requestFiles[externalID] = taskFiles[taskID] + res = self.requestClient.getBulkRequestStatus(list(requestFiles)) + if not res["OK"]: + self._logWarn( + "Failed to get request status", + res["Message"], + transID=transID, + method="getSubmittedFileStatus", + ) + return S_OK({}) + reqStatuses = res["Value"] + updateDict = {} for requestID, lfnList in requestFiles.items(): # We only take request in final state to avoid race conditions # https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414 - reqStatus = self.requestClient.getRequestStatus(requestID) - if not reqStatus["OK"]: - log = self._logVerbose if "not exist" in reqStatus["Message"] else self._logWarn - log( + reqStatus = reqStatuses.get(requestID) + if not reqStatus: + self._logVerbose( "Failed to get request status", - reqStatus["Message"], + f"Request {requestID} does not exist", transID=transID, method="getSubmittedFileStatus", ) continue - reqStatus = reqStatus["Value"] if reqStatus not in Request.FINAL_STATES: continue @@ -398,7 +429,7 @@ def getSubmittedFileStatus(self, fileDicts): continue # If we are here, it means the Request is in a final state. - # In principle, you could expect everyfile also be in a final state + # In principle, you could expect every file also be in a final state # but this is only true for simple Request. # Hence, the file is marked as PROCESSED only if the file status is Done # In any other case, we mark it problematic From 2cda3d36de50beca6f691046d66c1651c2c89c51 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 23 Aug 2024 17:12:10 +0200 Subject: [PATCH 6/9] feat (FTS): only take active job for active operation (speedup) --- src/DIRAC/DataManagementSystem/DB/FTS3DB.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/DIRAC/DataManagementSystem/DB/FTS3DB.py b/src/DIRAC/DataManagementSystem/DB/FTS3DB.py index 5dd87228493..c944436e4b8 100644 --- a/src/DIRAC/DataManagementSystem/DB/FTS3DB.py +++ b/src/DIRAC/DataManagementSystem/DB/FTS3DB.py @@ -311,6 +311,7 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned") session.query(FTS3Job) .join(FTS3Operation) .filter(FTS3Job.status.in_(FTS3Job.NON_FINAL_STATES)) + .filter(FTS3Operation.status == "Active") .filter(FTS3Job.assignment.is_(None)) .filter(FTS3Operation.assignment.is_(None)) ) From be635b65851e18f830479542d972f89dddbf07e3 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 30 Aug 2024 15:08:39 +0200 Subject: [PATCH 7/9] feat (FTS): do not retry files if the failure is irrecoverable --- src/DIRAC/DataManagementSystem/Client/FTS3Job.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 708edfd9d30..83f2724027a 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -207,6 +207,11 @@ def monitor(self, context=None, ftsServer=None, ucert=None): filesStatus[file_id]["ftsGUID"] = None # TODO: update status to defunct if not recoverable here ? + # If the file is failed, check if it is recoverable + if file_state in FTS3File.FTS_FAILED_STATES: + if not fileDict.get("Recoverable", True): + filesStatus[file_id]["status"] = "Defunct" + # If the file is not in a final state, but the job is, we return an error # FTS can have inconsistencies where the FTS Job is in a final state # but not all the files. From 7610e2d2a94930c8121ba7961796475e331e4b4f Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 30 Aug 2024 15:08:57 +0200 Subject: [PATCH 8/9] feat (FTS): make the monitoring batch size an agent option --- src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py | 6 ++++-- src/DIRAC/DataManagementSystem/ConfigTemplate.cfg | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index 4151a3fffb8..4e009158d43 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -106,6 +106,8 @@ def __readConf(self): # lifetime of the proxy we download to delegate to FTS self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME) + self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE) + return S_OK() def initialize(self): @@ -318,7 +320,7 @@ def monitorJobsLoop(self): log.info("Getting next batch of jobs to monitor", f"{loopId}/{nbOfLoops}") # get jobs from DB res = self.fts3db.getActiveJobs( - limit=JOB_MONITORING_BATCH_SIZE, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag + limit=self.jobMonitoringBatchSize, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag ) if not res["OK"]: @@ -353,7 +355,7 @@ def monitorJobsLoop(self): # If we got less to monitor than what we asked, # stop looping - if len(activeJobs) < JOB_MONITORING_BATCH_SIZE: + if len(activeJobs) < self.jobMonitoringBatchSize: break # Commit records after each loop self.dataOpSender.concludeSending() diff --git a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg index 406f9d90bc7..c4b9636dab2 100644 --- a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg @@ -143,6 +143,12 @@ Agents OperationBulkSize = 20 # How many Job we will monitor in one loop JobBulkSize = 20 + # split jobBulkSize in several chunks + # Bigger numbers (like 100) are efficient when there's a single agent + # When there are multiple agents, it may slow down the overall because + # of lock and race conditions + # (This number should of course be smaller or equal than JobBulkSize) + JobMonitoringBatchSize = 20 # Max number of files to go in a single job MaxFilesPerJob = 100 # Max number of attempt per file From 36520977c623c3b0aa79c4f0a329e9c7d04862ce Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 20 Sep 2024 15:56:47 +0200 Subject: [PATCH 9/9] feat (TS): further speedups --- src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py | 2 +- .../TransformationSystem/Agent/TransformationAgent.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py b/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py index dd5d3149459..0eccbec8fdb 100644 --- a/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py +++ b/src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py @@ -1,6 +1,7 @@ """ This module allows to resolve output SEs for Job based on SE and site/country association """ + from random import shuffle from DIRAC import gLogger, gConfig @@ -70,7 +71,6 @@ def getDestinationSEList(outputSE, site, outputmode="Any"): raise RuntimeError(localSEs["Message"]) localSEs = localSEs["Value"] sLog.verbose("Local SE list is:", ", ".join(localSEs)) - # There is an alias defined for this Site associatedSEs = gConfig.getValue(f"/Resources/Sites/{prefix}/{site}/AssociatedSEs/{outputSE}", []) if associatedSEs: diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py index b1b5a45ad11..02da4ea5b5d 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationAgent.py @@ -8,6 +8,7 @@ :dedent: 2 :caption: TransformationAgent options """ + import time import os import datetime @@ -241,7 +242,7 @@ def processTransformation(self, transDict, clients): if transID not in self.replicaCache: self.__readCache(transID) transFiles = transFiles["Value"] - unusedLfns = [f["LFN"] for f in transFiles] + unusedLfns = {f["LFN"] for f in transFiles} unusedFiles = len(unusedLfns) plugin = transDict.get("Plugin", "Standard") @@ -250,7 +251,7 @@ def processTransformation(self, transDict, clients): maxFiles = Operations().getValue(f"TransformationPlugins/{plugin}/MaxFilesToProcess", 0) # Get plugin-specific limit in number of files (0 means no limit) totLfns = len(unusedLfns) - lfnsToProcess = self.__applyReduction(unusedLfns, maxFiles=maxFiles) + lfnsToProcess = set(self.__applyReduction(unusedLfns, maxFiles=maxFiles)) if len(lfnsToProcess) != totLfns: self._logInfo( "Reduced number of files from %d to %d" % (totLfns, len(lfnsToProcess)), @@ -533,8 +534,10 @@ def _getDataReplicasDM(self, transID, lfns, clients, forJobs=True, ignoreMissing method=method, transID=transID, ) + successful_set = set(replicas["Successful"]) + failed_set = set(replicas["Failed"]) # If files are neither Successful nor Failed, they are set problematic in the FC - problematicLfns = [lfn for lfn in lfns if lfn not in replicas["Successful"] and lfn not in replicas["Failed"]] + problematicLfns = [lfn for lfn in lfns if lfn not in successful_set and lfn not in failed_set] if problematicLfns: self._logInfo(f"{len(problematicLfns)} files found problematic in the catalog, set ProbInFC") res = clients["TransformationClient"].setFileStatusForTransformation(