Skip to content

Commit

Permalink
feat: removed JobDB' SiteMask and Logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Aug 26, 2024
1 parent fde57b2 commit 490a4e1
Show file tree
Hide file tree
Showing 7 changed files with 1 addition and 462 deletions.
1 change: 0 additions & 1 deletion src/DIRAC/ResourceStatusSystem/Client/ResourceStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ def __setCSElementStatus(self, elementName, elementType, statusType, status):

return res


def isStorageElementAlwaysBanned(self, seName, statusType):
"""Checks if the AlwaysBanned policy is applied to the SE given as parameter
Expand Down
4 changes: 1 addition & 3 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,7 @@ def _buildQueueDict(
self.log.error("Can not get the status of computing elements: ", result["Message"])
return result
# Try to get CEs which have been probed and those unprobed (vO='all').
ceMaskList = [
ceName for ceName in result["Value"] if result["Value"][ceName]["all"] in ("Active", "Degraded")
]
ceMaskList = [ceName for ceName in result["Value"] if result["Value"][ceName]["all"] in ("Active", "Degraded")]

# Filter the unusable queues
for queueName in list(self.queueDict.keys()):
Expand Down
14 changes: 0 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ Services
{
Default = Operator
getJobPilotOutput = authenticated
getSiteMask = authenticated
getSiteMaskStatus = authenticated
ping = authenticated
allowSite = SiteManager
allowSite += Operator
banSite = SiteManager
banSite += Operator
}
}
##BEGIN TornadoWMSAdministrator
Expand All @@ -111,13 +104,6 @@ Services
{
Default = Operator
getJobPilotOutput = authenticated
getSiteMask = authenticated
getSiteMaskStatus = authenticated
ping = authenticated
allowSite = SiteManager
allowSite += Operator
banSite = SiteManager
banSite += Operator
}
}
##END
Expand Down
256 changes: 0 additions & 256 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSiteTier
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
Expand Down Expand Up @@ -1220,261 +1219,6 @@ def rescheduleJob(self, jobID):

return retVal

#############################################################################
def getUserSitesTuple(self, sites):
"""Returns tuple of active/banned/invalid sties from a user provided list."""
ret = self._escapeValues(sites)
if not ret["OK"]:
return ret

sites = set(sites)
sitesSql = ret["Value"]
sitesSql[0] = f"SELECT {sitesSql[0]} AS Site"
sitesSql = " UNION SELECT ".join(sitesSql)
cmd = f"SELECT Site FROM ({sitesSql}) "
cmd += "AS tmptable WHERE Site NOT IN (SELECT Site FROM SiteMask WHERE Status='Active')"
result = self._query(cmd)
if not result["OK"]:
return result
nonActiveSites = {x[0] for x in result["Value"]}
activeSites = sites.difference(nonActiveSites)
bannedSites = nonActiveSites.intersection(set(self.getSiteMask("Banned")))
invalidSites = nonActiveSites.difference(bannedSites)
return S_OK((activeSites, bannedSites, invalidSites))

#############################################################################
def getSiteMask(self, siteState="Active"):
"""Get the currently active site list"""

ret = self._escapeString(siteState)
if not ret["OK"]:
return ret
siteState = ret["Value"]

if siteState == "All":
cmd = "SELECT Site FROM SiteMask"
else:
cmd = f"SELECT Site FROM SiteMask WHERE Status={siteState}"

result = self._query(cmd)
siteList = []
if result["OK"]:
siteList = [x[0] for x in result["Value"]]
else:
return S_ERROR(DErrno.EMYSQL, f"SQL query failed: {cmd}")

return S_OK(siteList)

#############################################################################
def getSiteMaskStatus(self, sites=None):
"""Get the current site mask status
:param sites: A string for a single site to check, or a list to check multiple sites.
:returns: If input was a list, a dictionary of sites, keys are site
names and values are the site statuses. Unknown sites are
not included in the output dictionary.
If input was a string, then a single value with that site's
status, or S_ERROR if the site does not exist in the DB.
"""
if isinstance(sites, list):
safeSites = []
for site in sites:
res = self._escapeString(site)
if not res["OK"]:
return res
safeSites.append(res["Value"])
sitesString = ",".join(safeSites)
cmd = f"SELECT Site, Status FROM SiteMask WHERE Site in ({sitesString})"

result = self._query(cmd)
return S_OK(dict(result["Value"]))

elif isinstance(sites, str):
ret = self._escapeString(sites)
if not ret["OK"]:
return ret
cmd = f"SELECT Status FROM SiteMask WHERE Site={ret['Value']}"
result = self._query(cmd)
if result["Value"]:
return S_OK(result["Value"][0][0])
return S_ERROR(f"Unknown site {sites}")

else:
cmd = "SELECT Site,Status FROM SiteMask"

result = self._query(cmd)
siteDict = {}
if result["OK"]:
for site, status in result["Value"]:
siteDict[site] = status
else:
return S_ERROR(DErrno.EMYSQL, f"SQL query failed: {cmd}")

return S_OK(siteDict)

#############################################################################
def getAllSiteMaskStatus(self):
"""Get the everything from site mask status"""
cmd = "SELECT Site,Status,LastUpdateTime,Author,Comment FROM SiteMask"

result = self._query(cmd)

if not result["OK"]:
return result["Message"]

siteDict = {}
if result["OK"]:
for site, status, lastUpdateTime, author, comment in result["Value"]:
try:
# TODO: This is only needed in DIRAC v8.0.x while moving from BLOB -> TEXT
comment = comment.decode()
except AttributeError:
pass
siteDict[site] = status, lastUpdateTime, author, comment

return S_OK(siteDict)

#############################################################################
def setSiteMask(self, siteMaskList, author="Unknown", comment="No comment"):
"""Set the Site Mask to the given mask in a form of a list of tuples (site,status)"""

for site, status in siteMaskList:
result = self.__setSiteStatusInMask(site, status, author, comment)
if not result["OK"]:
return result

return S_OK()

#############################################################################
def __setSiteStatusInMask(self, site, status, author="Unknown", comment="No comment"):
"""Set the given site status to 'status' or add a new active site"""

result = self._escapeString(site)
if not result["OK"]:
return result
site = result["Value"]

result = self._escapeString(status)
if not result["OK"]:
return result
status = result["Value"]

result = self._escapeString(author)
if not result["OK"]:
return result
author = result["Value"]

result = self._escapeString(comment)
if not result["OK"]:
return result
comment = result["Value"]

req = f"SELECT Status FROM SiteMask WHERE Site={site}"
result = self._query(req)
if result["OK"]:
if result["Value"]:
current_status = result["Value"][0][0]
if current_status == status:
return S_OK()
else:
req = (
"UPDATE SiteMask SET Status=%s,LastUpdateTime=UTC_TIMESTAMP(),"
"Author=%s, Comment=%s WHERE Site=%s"
)
req = req % (status, author, comment, site)
else:
req = f"INSERT INTO SiteMask VALUES ({site},{status},UTC_TIMESTAMP(),{author},{comment})"
result = self._update(req)
if not result["OK"]:
return S_ERROR("Failed to update the Site Mask")
# update the site mask logging record
req = f"INSERT INTO SiteMaskLogging VALUES ({site},{status},UTC_TIMESTAMP(),{author},{comment})"
result = self._update(req)
if not result["OK"]:
self.log.warn("Failed to update site mask logging", f"for {site}")
else:
return S_ERROR("Failed to get the Site Status from the Mask")

return S_OK()

#############################################################################
def banSiteInMask(self, site, author="Unknown", comment="No comment"):
"""Forbid the given site in the Site Mask"""

return self.__setSiteStatusInMask(site, "Banned", author, comment)

#############################################################################
def allowSiteInMask(self, site, author="Unknown", comment="No comment"):
"""Forbid the given site in the Site Mask"""

return self.__setSiteStatusInMask(site, "Active", author, comment)

#############################################################################
def removeSiteFromMask(self, site=None):
"""Remove the given site from the mask"""
if not site:
req = "DELETE FROM SiteMask"
else:
ret = self._escapeString(site)
if not ret["OK"]:
return ret
site = ret["Value"]
req = f"DELETE FROM SiteMask WHERE Site={site}"

return self._update(req)

#############################################################################
def getSiteMaskLogging(self, siteList):
"""Get the site mask logging history for the list if site names"""

if siteList:
siteString = ",".join(["'" + x + "'" for x in siteList])
req = f"SELECT Site,Status,UpdateTime,Author,Comment FROM SiteMaskLogging WHERE Site in ({siteString})"
else:
req = "SELECT Site,Status,UpdateTime,Author,Comment FROM SiteMaskLogging"
req += " ORDER BY UpdateTime ASC"

result = self._query(req)
if not result["OK"]:
return result

availableSiteList = []
for site, status, utime, author, comment in result["Value"]:
availableSiteList.append(site)

resultDict = {}
for site in siteList:
if not result["Value"] or site not in availableSiteList:
ret = self._escapeString(site)
if not ret["OK"]:
continue
e_site = ret["Value"]
req = f"SELECT Status Site,Status,LastUpdateTime,Author,Comment FROM SiteMask WHERE Site={e_site}"
resSite = self._query(req)
if resSite["OK"]:
if resSite["Value"]:
site, status, lastUpdate, author, comment = resSite["Value"][0]
try:
# TODO: This is only needed in DIRAC v8.0.x while moving from BLOB -> TEXT
comment = comment.decode()
except AttributeError:
pass
resultDict[site] = [[status, str(lastUpdate), author, comment]]
else:
resultDict[site] = [["Unknown", "", "", "Site not present in logging table"]]

for site, status, utime, author, comment in result["Value"]:
if site not in resultDict:
resultDict[site] = []
try:
# TODO: This is only needed in DIRAC v8.0.x while moving from BLOB -> TEXT
comment = comment.decode()
except AttributeError:
pass
resultDict[site].append([status, str(utime), author, comment])

return S_OK(resultDict)

#############################################################################
def getSiteSummary(self):
"""Get the summary of jobs in a given status on all the sites"""
Expand Down
21 changes: 0 additions & 21 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,6 @@ CREATE TABLE `AtticJobParameters` (
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `SiteMask`;
CREATE TABLE `SiteMask` (
`Site` VARCHAR(64) NOT NULL,
`Status` VARCHAR(64) NOT NULL,
`LastUpdateTime` DATETIME NOT NULL,
`Author` VARCHAR(255) NOT NULL,
`Comment` TEXT NOT NULL,
PRIMARY KEY (`Site`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

DROP TABLE IF EXISTS `SiteMaskLogging`;
CREATE TABLE `SiteMaskLogging` (
`Site` VARCHAR(64) NOT NULL,
`Status` VARCHAR(64) NOT NULL,
`UpdateTime` DATETIME NOT NULL,
`Author` VARCHAR(255) NOT NULL,
`Comment` TEXT NOT NULL,
PRIMARY KEY (`Site`,`UpdateTime`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `HeartBeatLoggingInfo`;
CREATE TABLE `HeartBeatLoggingInfo` (
Expand Down
Loading

0 comments on commit 490a4e1

Please sign in to comment.