Skip to content

Commit

Permalink
sweep: DIRACGrid#7104 fix: MultiVO proxy renewal in AREXCE
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr authored and web-flow committed Aug 2, 2023
1 parent 7b5fedf commit 45a3b9e
Showing 1 changed file with 89 additions and 42 deletions.
131 changes: 89 additions & 42 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def __init__(self, ceUniqueID):
self.restVersion = "1.0"
# Time left before proxy renewal: 3 hours is a good default
self.proxyTimeLeftBeforeRenewal = 10800
# Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
self._delegationID = None
# Timeout
self.timeout = 5.0
# Request session
Expand Down Expand Up @@ -179,6 +177,10 @@ def _checkSession(self):
self.headers.pop("Authorization", None)

# Get a proxy: still mandatory, even if tokens are used to authenticate
if not self.proxy:
self.log.error("Proxy not set")
return S_ERROR("Proxy not set")

result = self._prepareProxy()
if not result["OK"]:
self.log.error("Failed to set up proxy", result["Message"])
Expand All @@ -190,7 +192,7 @@ def _checkSession(self):
return S_OK()

# Attach the proxy to the session, only if the token is unavailable
self.session.cert = Locations.getProxyLocation()
self.session.cert = os.environ["X509_USER_PROXY"]
return S_OK()

#############################################################################
Expand Down Expand Up @@ -233,15 +235,8 @@ def __uploadCertificate(self, delegationID, csrContent):
headers = {"Content-Type": "x-pem-file"}
query = self._urlJoin(os.path.join("delegations", delegationID))

# Get a proxy and sign the CSR
proxy = X509Chain()
proxyFile = Locations.getProxyLocation()
if not proxyFile:
return S_ERROR(f"No proxy available")
result = proxy.loadProxyFromFile(proxyFile)
if not result["OK"]:
return S_ERROR(f"Can't load {proxyFile}: {result['Message']}")
result = proxy.generateChainFromRequestString(csrContent)
# Sign the CSR
result = self.proxy.generateChainFromRequestString(csrContent)
if not result["OK"]:
self.log.error("Problem with the Certificate Signing Request:", result["Message"])
return S_ERROR("Problem with the Certificate Signing Request")
Expand Down Expand Up @@ -314,6 +309,32 @@ def _getDelegationIDs(self):
delegationIDs = [delegationContent["id"] for delegationContent in delegations]
return S_OK(delegationIDs)

def _getProxyFromDelegationID(self, delegationID):
"""Get proxy stored within the delegation
:param str delegationID: delegation ID
"""
query = self._urlJoin(os.path.join("delegations", delegationID))
params = {"action": "get"}

# Submit the POST request to get the delegation
result = self._request("post", query, params=params)
if not result["OK"]:
self.log.error("Issue while interacting with the delegations.", result["Message"])
return S_ERROR("Issue while interacting with the delegations")
response = result["Value"]

proxyContent = response.text
proxy = X509Chain()
result = proxy.loadChainFromString(proxyContent)
if not result["OK"]:
self.log.error(
"Issue while trying to load proxy content from delegation", f"{delegationID}: {result['Message']}"
)
return S_ERROR("Issue while trying to load proxy content from delegation")

return S_OK(proxy)

#############################################################################

def _getArcJobID(self, executableFile, inputs, outputs, delegation):
Expand Down Expand Up @@ -398,18 +419,33 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")

delegationIDs = result["Value"]
if not delegationIDs:

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)
if not result["OK"]:
return result
proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
return S_ERROR("Could not get a new delegation")
self._delegationID = result["Value"]
else:
self._delegationID = delegationIDs[0]
delegation = f"\n(delegationid={self._delegationID})"
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

if not inputs:
inputs = []
Expand Down Expand Up @@ -554,10 +590,14 @@ def getCEStatus(self):
self.log.error("Cannot get CE Status", result["Message"])
return result

# Try to find out which VO we are running for.
# Find out which VO we are running for.
# Essential now for REST interface.
res = getVOfromProxyGroup()
vo = res["Value"] if res["OK"] else ""
result = getVOfromProxyGroup()
if not result["OK"]:
return result
if not result["Value"]:
return S_ERROR("Could not get VO value from the proxy group")
vo = result["Value"]

# Prepare the command
params = {"schema": "glue2"}
Expand Down Expand Up @@ -591,33 +631,36 @@ def getCEStatus(self):

#############################################################################

def _renewDelegation(self):
"""Renew the delegations"""
def _renewDelegation(self, delegationID):
"""Renew the delegation
:params delegationID: delegation ID to renew
"""
# Prepare the command
params = {"action": "get"}
query = self._urlJoin(os.path.join("delegations", self._delegationID))
query = self._urlJoin(os.path.join("delegations", delegationID))

# Submit the POST request to get the proxy
result = self._request("post", query, params=params)
if not result["OK"]:
self.log.error("Could not get a proxy for", f"delegation {self._delegationID}: {result['Message']}")
return S_ERROR(f"Could not get a proxy for delegation {self._delegationID}")
self.log.error("Could not get a proxy for", f"delegation {delegationID}: {result['Message']}")
return S_ERROR(f"Could not get a proxy for delegation {delegationID}")
response = result["Value"]

proxy = X509Chain()
result = proxy.loadChainFromString(response.text)
if not result["OK"]:
self.log.error("Could not load proxy for", f"delegation {self._delegationID}: {result['Message']}")
return S_ERROR(f"Could not load proxy for delegation {self._delegationID}")
self.log.error("Could not load proxy for", f"delegation {delegationID}: {result['Message']}")
return S_ERROR(f"Could not load proxy for delegation {delegationID}")

# Now test and renew the proxy
result = proxy.getRemainingSecs()
if not result["OK"]:
self.log.error(
"Could not get remaining time from the proxy for",
f"delegation {self._delegationID}: {result['Message']}",
f"delegation {delegationID}: {result['Message']}",
)
return S_ERROR(f"Could not get remaining time from the proxy for delegation {self._delegationID}")
return S_ERROR(f"Could not get remaining time from the proxy for delegation {delegationID}")
timeLeft = result["Value"]

if timeLeft >= self.proxyTimeLeftBeforeRenewal:
Expand All @@ -626,31 +669,31 @@ def _renewDelegation(self):

self.log.verbose(
"Renewing delegation",
f"{self._delegationID} whose proxy expires at {timeLeft}",
f"{delegationID} whose proxy expires at {timeLeft}",
)
# Proxy needs to be renewed - try to renew it
# First, get a new CSR from the delegation
params = {"action": "renew"}
query = self._urlJoin(os.path.join("delegations", self._delegationID))
query = self._urlJoin(os.path.join("delegations", delegationID))
result = self._request("post", query, params=params)
if not result["OK"]:
self.log.error(
"Proxy not renewed, failed to get CSR",
f"for delegation {self._delegationID}",
f"for delegation {delegationID}",
)
return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {self._delegationID}")
return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {delegationID}")
response = result["Value"]

# Then, sign and upload the certificate
result = self.__uploadCertificate(self._delegationID, response.text)
result = self.__uploadCertificate(delegationID, response.text)
if not result["OK"]:
self.log.error(
"Proxy not renewed, failed to send renewed proxy",
f"delegation {self._delegationID}: {result['Message']}",
f"delegation {delegationID}: {result['Message']}",
)
return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {self._delegationID}")
return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {delegationID}")

self.log.verbose("Proxy successfully renewed", f"for delegation {self._delegationID}")
self.log.verbose("Proxy successfully renewed", f"for delegation {delegationID}")

return S_OK()

Expand Down Expand Up @@ -706,12 +749,16 @@ def getJobStatus(self, jobIDList):
jobsToCancel.append(arcJob["id"])
self.log.debug(f"Killing held job {jobID}")

# Renew delegation to renew the proxies of the jobs
if self._delegationID:
result = self._renewDelegation()
# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
if not result["OK"]:
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{self._delegationID}: {result['Message']}")
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")

# Kill held jobs
if jobsToCancel:
Expand Down

0 comments on commit 45a3b9e

Please sign in to comment.