From 5ac5af627f87591a6674991f4f3b8ab14c44b51c Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Mon, 18 Nov 2019 14:29:50 -0800 Subject: [PATCH 1/2] Refresh Kubernetes credentials if we want to talk to Kubernetes and the credentials are even a little old --- src/toil/batchSystems/kubernetes.py | 148 ++++++++++++++++++---------- 1 file changed, 98 insertions(+), 50 deletions(-) diff --git a/src/toil/batchSystems/kubernetes.py b/src/toil/batchSystems/kubernetes.py index 9aecfa04a3..7b668132e5 100644 --- a/src/toil/batchSystems/kubernetes.py +++ b/src/toil/batchSystems/kubernetes.py @@ -71,34 +71,14 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): # reveal on CI. logging.getLogger('kubernetes').setLevel(logging.ERROR) logging.getLogger('requests_oauthlib').setLevel(logging.ERROR) - - try: - # Load ~/.kube/config or KUBECONFIG - kubernetes.config.load_kube_config() - - # We loaded it; we need to figure out our namespace the config-file way - - # Find all contexts and the active context. - # The active context gets us our namespace. - contexts, activeContext = kubernetes.config.list_kube_config_contexts() - if not contexts: - raise RuntimeError("No Kubernetes contexts available in ~/.kube/config or $KUBECONFIG") - - # Identify the namespace to work in - self.namespace = activeContext.get('context', {}).get('namespace', 'default') - - except TypeError: - # Didn't work. Try pod-based credentials in case we are in a pod. - try: - kubernetes.config.load_incluster_config() - - # We got pod-based credentials. Our namespace comes from a particular file. - self.namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r').read().strip() - - except kubernetes.config.ConfigException: - raise RuntimeError('Could not load Kubernetes configuration from ~/.kube/config, $KUBECONFIG, or current pod.') - + # This will hold the last time our Kubernetes credentials were refreshed + self.credential_time = None + # And this will hold our cache of API objects + self._apis = {} + + # Get our namespace (and our Kubernetes credentials to make sure they exist) + self.namespace = self._api('namespace') # Make a Kubernetes-acceptable version of our username: not too long, # and all lowercase letters, numbers, or - or . @@ -125,11 +105,79 @@ def __init__(self, config, maxCores, maxMemory, maxDisk): # Set this to True to enable the experimental wait-for-job-update code self.enableWatching = True - # Required APIs needed from kubernetes - self.batchApi = kubernetes.client.BatchV1Api() - self.coreApi = kubernetes.client.CoreV1Api() - self.jobIds = set() + + def _api(self, kind, max_age_seconds = 5 * 60): + """ + The Kubernetes module isn't clever enough to renew its credentials when + they are about to expire. See + https://github.com/kubernetes-client/python/issues/741. + + We work around this by making sure that every time we are about to talk + to Kubernetes, we have fresh credentials. And we do that by reloading + the config and replacing our Kubernetes API objects before we do any + Kubernetes things. + + TODO: We can still get in trouble if a single watch or listing loop + goes on longer than our credentials last, though. + + This method is the Right Way to get any Kubernetes API. You call it + with the API you want ('batch' or 'core') and it returns an API object + with guaranteed fresh credentials. + + It also recognizes 'namespace' and returns our namespace as a string. + + max_age_seconds needs to be << your cluster's credential expiry time. + """ + + now = datetime.datetime.now() + + if self.credential_time is None or (now - self.credential_time).total_seconds() > max_age_seconds: + # Credentials need a refresh + try: + # Load ~/.kube/config or KUBECONFIG + kubernetes.config.load_kube_config() + # Worked. We're using kube config + config_source = 'kube' + except TypeError: + # Didn't work. Try pod-based credentials in case we are in a pod. + try: + kubernetes.config.load_incluster_config() + # Worked. We're using in_cluster config + config_source = 'in_cluster' + except kubernetes.config.ConfigException: + raise RuntimeError('Could not load Kubernetes configuration from ~/.kube/config, $KUBECONFIG, or current pod.') + + + # Now fill in the API objects with these credentials + self._apis['batch'] = kubernetes.client.BatchV1Api() + self._apis['core'] = kubernetes.client.CoreV1Api() + + # And save the time + self.credential_time = now + + if kind == 'namespace': + # We just need the namespace string + if config_source == 'in_cluster': + # Our namespace comes from a particular file. + return open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r').read().strip() + else: + # Find all contexts and the active context. + # The active context gets us our namespace. + contexts, activeContext = kubernetes.config.list_kube_config_contexts() + if not contexts: + raise RuntimeError("No Kubernetes contexts available in ~/.kube/config or $KUBECONFIG") + + # Identify the namespace to work in + return activeContext.get('context', {}).get('namespace', 'default') + + else: + # We need an API object + try: + return self._apis[kind] + except KeyError: + raise RuntimeError("Unknown Kubernetes API type: {}".format(kind)) + def setUserScript(self, userScript): logger.info('Setting user script for deployment: {}'.format(userScript)) @@ -240,7 +288,7 @@ def issueBatchJob(self, jobNode): kind="Job") # Make the job - launched = self.batchApi.create_namespaced_job(self.namespace, job) + launched = self._api('batch').create_namespaced_job(self.namespace, job) logger.debug('Launched job: %s', jobName) @@ -303,7 +351,7 @@ def _ourJobObjects(self, onlySucceeded=False, limit=None): kwargs['field_selector'] = 'status.successful==1' if token is not None: kwargs['_continue'] = token - results = self.batchApi.list_namespaced_job(self.namespace, **kwargs) + results = self._api('batch').list_namespaced_job(self.namespace, **kwargs) for job in results.items: if self._isJobOurs(job): @@ -348,7 +396,7 @@ def _getPodForJob(self, jobObject): kwargs = {'label_selector': query} if token is not None: kwargs['_continue'] = token - results = self.coreApi.list_namespaced_pod(self.namespace, **kwargs) + results = self._api('core').list_namespaced_pod(self.namespace, **kwargs) for pod in results.items: # Return the first pod we find @@ -376,8 +424,8 @@ def _getLogForPod(self, podObject): """ - return self.coreApi.read_namespaced_pod_log(podObject.metadata.name, - namespace=self.namespace) + return self._api('core').read_namespaced_pod_log(podObject.metadata.name, + namespace=self.namespace) def _getIDForOurJob(self, jobObject): """ @@ -411,7 +459,7 @@ def getUpdatedBatchJob(self, maxWait): if self.enableWatching: for j in self._ourJobObjects(): - for event in w.stream(self.coreApi.list_namespaced_pod, self.namespace, timeout_seconds=maxWait): + for event in w.stream(self._api('core').list_namespaced_pod, self.namespace, timeout_seconds=maxWait): pod = event['object'] if pod.metadata.name.startswith(self.jobPrefix): if pod.status.phase == 'Failed' or pod.status.phase == 'Succeeded': @@ -426,9 +474,9 @@ def getUpdatedBatchJob(self, maxWait): terminated = pod.status.container_statuses[0].state.terminated runtime = (terminated.finished_at - terminated.started_at).total_seconds() result = (jobID, terminated.exit_code, runtime) - self.batchApi.delete_namespaced_job(pod.metadata.owner_references[0].name, - self.namespace, - propagation_policy='Foreground') + self._api('batch').delete_namespaced_job(pod.metadata.owner_references[0].name, + self.namespace, + propagation_policy='Foreground') self._waitForJobDeath(pod.metadata.owner_references[0].name) return result @@ -598,9 +646,9 @@ def _getUpdatedBatchJobImmediately(self): try: # Delete the job and all dependents (pods) - self.batchApi.delete_namespaced_job(jobObject.metadata.name, - self.namespace, - propagation_policy='Foreground') + self._api('batch').delete_namespaced_job(jobObject.metadata.name, + self.namespace, + propagation_policy='Foreground') # That just kicks off the deletion process. Foreground doesn't # actually block. See @@ -632,7 +680,7 @@ def _waitForJobDeath(self, jobName): while True: try: # Look for the job - self.batchApi.read_namespaced_job(jobName, self.namespace) + self._api('batch').read_namespaced_job(jobName, self.namespace) # If we didn't 404, wait a bit with exponential backoff time.sleep(backoffTime) if backoffTime < maxBackoffTime: @@ -662,9 +710,9 @@ def shutdown(self): # Kill jobs whether they succeeded or failed try: # Delete with background poilicy so we can quickly issue lots of commands - response = self.batchApi.delete_namespaced_job(jobName, - self.namespace, - propagation_policy='Background') + response = self._api('batch').delete_namespaced_job(jobName, + self.namespace, + propagation_policy='Background') logger.debug('Killed job for shutdown: %s', jobName) except ApiException as e: logger.error("Exception when calling BatchV1Api->delte_namespaced_job: %s" % e) @@ -731,9 +779,9 @@ def killBatchJobs(self, jobIDs): # Delete the requested job in the foreground. # This doesn't block, but it does delete expeditiously. - response = self.batchApi.delete_namespaced_job(jobName, - self.namespace, - propagation_policy='Foreground') + response = self._api('batch').delete_namespaced_job(jobName, + self.namespace, + propagation_policy='Foreground') logger.debug('Killed job by request: %s', jobName) for jobID in jobIDs: From 116852a6d88a2d7baf0a3be90b00f16a08d0029e Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 19 Dec 2019 16:38:22 -0800 Subject: [PATCH 2/2] Use a with to clearly close our namespace file --- src/toil/batchSystems/kubernetes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/toil/batchSystems/kubernetes.py b/src/toil/batchSystems/kubernetes.py index 7b668132e5..d053c911bf 100644 --- a/src/toil/batchSystems/kubernetes.py +++ b/src/toil/batchSystems/kubernetes.py @@ -160,7 +160,8 @@ def _api(self, kind, max_age_seconds = 5 * 60): # We just need the namespace string if config_source == 'in_cluster': # Our namespace comes from a particular file. - return open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r').read().strip() + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r') as fh + return fh.read().strip() else: # Find all contexts and the active context. # The active context gets us our namespace.