Skip to content

Commit

Permalink
Refresh Kubernetes credentials if we want to talk to Kubernetes and t…
Browse files Browse the repository at this point in the history
…he credentials are even a little old
  • Loading branch information
adamnovak committed Nov 22, 2019
1 parent 91eab2b commit 9508391
Showing 1 changed file with 98 additions and 50 deletions.
148 changes: 98 additions & 50 deletions src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,14 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
# reveal on CI.
logging.getLogger('kubernetes').setLevel(logging.ERROR)
logging.getLogger('requests_oauthlib').setLevel(logging.ERROR)

try:
# Load ~/.kube/config or KUBECONFIG
kubernetes.config.load_kube_config()

# We loaded it; we need to figure out our namespace the config-file way

# Find all contexts and the active context.
# The active context gets us our namespace.
contexts, activeContext = kubernetes.config.list_kube_config_contexts()
if not contexts:
raise RuntimeError("No Kubernetes contexts available in ~/.kube/config or $KUBECONFIG")

# Identify the namespace to work in
self.namespace = activeContext.get('context', {}).get('namespace', 'default')

except TypeError:
# Didn't work. Try pod-based credentials in case we are in a pod.
try:
kubernetes.config.load_incluster_config()

# We got pod-based credentials. Our namespace comes from a particular file.
self.namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r').read().strip()

except kubernetes.config.ConfigException:
raise RuntimeError('Could not load Kubernetes configuration from ~/.kube/config, $KUBECONFIG, or current pod.')


# This will hold the last time our Kubernetes credentials were refreshed
self.credential_time = None
# And this will hold our cache of API objects
self._apis = {}

# Get our namespace (and our Kubernetes credentials to make sure they exist)
self.namespace = self._api('namespace')

# Make a Kubernetes-acceptable version of our username: not too long,
# and all lowercase letters, numbers, or - or .
Expand All @@ -125,11 +105,79 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
# Set this to True to enable the experimental wait-for-job-update code
self.enableWatching = True

# Required APIs needed from kubernetes
self.batchApi = kubernetes.client.BatchV1Api()
self.coreApi = kubernetes.client.CoreV1Api()

self.jobIds = set()

def _api(self, kind, max_age_seconds = 5 * 60):
"""
The Kubernetes module isn't clever enough to renew its credentials when
they are about to expire. See
https://github.com/kubernetes-client/python/issues/741.
We work around this by making sure that every time we are about to talk
to Kubernetes, we have fresh credentials. And we do that by reloading
the config and replacing our Kubernetes API objects before we do any
Kubernetes things.
TODO: We can still get in trouble if a single watch or listing loop
goes on longer than our credentials last, though.
This method is the Right Way to get any Kubernetes API. You call it
with the API you want ('batch' or 'core') and it returns an API object
with guaranteed fresh credentials.
It also recognizes 'namespace' and returns our namespace as a string.
max_age_seconds needs to be << your cluster's credential expiry time.
"""

now = datetime.datetime.now()

if self.credential_time is None or (now - self.credential_time).total_seconds() > max_age_seconds:
# Credentials need a refresh
try:
# Load ~/.kube/config or KUBECONFIG
kubernetes.config.load_kube_config()
# Worked. We're using kube config
config_source = 'kube'
except TypeError:
# Didn't work. Try pod-based credentials in case we are in a pod.
try:
kubernetes.config.load_incluster_config()
# Worked. We're using in_cluster config
config_source = 'in_cluster'
except kubernetes.config.ConfigException:
raise RuntimeError('Could not load Kubernetes configuration from ~/.kube/config, $KUBECONFIG, or current pod.')


# Now fill in the API objects with these credentials
self._apis['batch'] = kubernetes.client.BatchV1Api()
self._apis['core'] = kubernetes.client.CoreV1Api()

# And save the time
self.credential_time = now

if kind == 'namespace':
# We just need the namespace string
if config_source == 'in_cluster':
# Our namespace comes from a particular file.
return open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r').read().strip()
else:
# Find all contexts and the active context.
# The active context gets us our namespace.
contexts, activeContext = kubernetes.config.list_kube_config_contexts()
if not contexts:
raise RuntimeError("No Kubernetes contexts available in ~/.kube/config or $KUBECONFIG")

# Identify the namespace to work in
return activeContext.get('context', {}).get('namespace', 'default')

else:
# We need an API object
try:
return self._apis[kind]
except KeyError:
raise RuntimeError("Unknown Kubernetes API type: {}".format(kind))


def setUserScript(self, userScript):
logger.info('Setting user script for deployment: {}'.format(userScript))
Expand Down Expand Up @@ -240,7 +288,7 @@ def issueBatchJob(self, jobNode):
kind="Job")

# Make the job
launched = self.batchApi.create_namespaced_job(self.namespace, job)
launched = self._api('batch').create_namespaced_job(self.namespace, job)

logger.debug('Launched job: %s', jobName)

Expand Down Expand Up @@ -303,7 +351,7 @@ def _ourJobObjects(self, onlySucceeded=False, limit=None):
kwargs['field_selector'] = 'status.successful==1'
if token is not None:
kwargs['_continue'] = token
results = self.batchApi.list_namespaced_job(self.namespace, **kwargs)
results = self._api('batch').list_namespaced_job(self.namespace, **kwargs)

for job in results.items:
if self._isJobOurs(job):
Expand Down Expand Up @@ -348,7 +396,7 @@ def _getPodForJob(self, jobObject):
kwargs = {'label_selector': query}
if token is not None:
kwargs['_continue'] = token
results = self.coreApi.list_namespaced_pod(self.namespace, **kwargs)
results = self._api('core').list_namespaced_pod(self.namespace, **kwargs)

for pod in results.items:
# Return the first pod we find
Expand Down Expand Up @@ -376,8 +424,8 @@ def _getLogForPod(self, podObject):
"""

return self.coreApi.read_namespaced_pod_log(podObject.metadata.name,
namespace=self.namespace)
return self._api('core').read_namespaced_pod_log(podObject.metadata.name,
namespace=self.namespace)

def _getIDForOurJob(self, jobObject):
"""
Expand Down Expand Up @@ -411,7 +459,7 @@ def getUpdatedBatchJob(self, maxWait):

if self.enableWatching:
for j in self._ourJobObjects():
for event in w.stream(self.coreApi.list_namespaced_pod, self.namespace, timeout_seconds=maxWait):
for event in w.stream(self._api('core').list_namespaced_pod, self.namespace, timeout_seconds=maxWait):
pod = event['object']
if pod.metadata.name.startswith(self.jobPrefix):
if pod.status.phase == 'Failed' or pod.status.phase == 'Succeeded':
Expand All @@ -426,9 +474,9 @@ def getUpdatedBatchJob(self, maxWait):
terminated = pod.status.container_statuses[0].state.terminated
runtime = (terminated.finished_at - terminated.started_at).total_seconds()
result = (jobID, terminated.exit_code, runtime)
self.batchApi.delete_namespaced_job(pod.metadata.owner_references[0].name,
self.namespace,
propagation_policy='Foreground')
self._api('batch').delete_namespaced_job(pod.metadata.owner_references[0].name,
self.namespace,
propagation_policy='Foreground')

self._waitForJobDeath(pod.metadata.owner_references[0].name)
return result
Expand Down Expand Up @@ -598,9 +646,9 @@ def _getUpdatedBatchJobImmediately(self):

try:
# Delete the job and all dependents (pods)
self.batchApi.delete_namespaced_job(jobObject.metadata.name,
self.namespace,
propagation_policy='Foreground')
self._api('batch').delete_namespaced_job(jobObject.metadata.name,
self.namespace,
propagation_policy='Foreground')

# That just kicks off the deletion process. Foreground doesn't
# actually block. See
Expand Down Expand Up @@ -632,7 +680,7 @@ def _waitForJobDeath(self, jobName):
while True:
try:
# Look for the job
self.batchApi.read_namespaced_job(jobName, self.namespace)
self._api('batch').read_namespaced_job(jobName, self.namespace)
# If we didn't 404, wait a bit with exponential backoff
time.sleep(backoffTime)
if backoffTime < maxBackoffTime:
Expand Down Expand Up @@ -662,9 +710,9 @@ def shutdown(self):
# Kill jobs whether they succeeded or failed
try:
# Delete with background poilicy so we can quickly issue lots of commands
response = self.batchApi.delete_namespaced_job(jobName,
self.namespace,
propagation_policy='Background')
response = self._api('batch').delete_namespaced_job(jobName,
self.namespace,
propagation_policy='Background')
logger.debug('Killed job for shutdown: %s', jobName)
except ApiException as e:
logger.error("Exception when calling BatchV1Api->delte_namespaced_job: %s" % e)
Expand Down Expand Up @@ -731,9 +779,9 @@ def killBatchJobs(self, jobIDs):

# Delete the requested job in the foreground.
# This doesn't block, but it does delete expeditiously.
response = self.batchApi.delete_namespaced_job(jobName,
self.namespace,
propagation_policy='Foreground')
response = self._api('batch').delete_namespaced_job(jobName,
self.namespace,
propagation_policy='Foreground')
logger.debug('Killed job by request: %s', jobName)

for jobID in jobIDs:
Expand Down

0 comments on commit 9508391

Please sign in to comment.