diff --git a/dynamic/__init__.py b/dynamic/__init__.py new file mode 100644 index 00000000..91ba0501 --- /dev/null +++ b/dynamic/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .client import * # NOQA diff --git a/dynamic/client.py b/dynamic/client.py new file mode 100644 index 00000000..02bb984b --- /dev/null +++ b/dynamic/client.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python + +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six +import json + +from kubernetes import watch +from kubernetes.client.rest import ApiException + +from .discovery import EagerDiscoverer, LazyDiscoverer +from .exceptions import api_exception, KubernetesValidateMissing +from .resource import Resource, ResourceList, Subresource, ResourceInstance, ResourceField + +try: + import kubernetes_validate + HAS_KUBERNETES_VALIDATE = True +except ImportError: + HAS_KUBERNETES_VALIDATE = False + +try: + from kubernetes_validate.utils import VersionNotSupportedError +except ImportError: + class VersionNotSupportedError(NotImplementedError): + pass + +__all__ = [ + 'DynamicClient', + 'ResourceInstance', + 'Resource', + 'ResourceList', + 'Subresource', + 'EagerDiscoverer', + 'LazyDiscoverer', + 'ResourceField', +] + + +def meta_request(func): + """ Handles parsing response structure and translating API Exceptions """ + def inner(self, *args, **kwargs): + serialize_response = kwargs.pop('serialize', True) + serializer = kwargs.pop('serializer', ResourceInstance) + try: + resp = func(self, *args, **kwargs) + except ApiException as e: + raise api_exception(e) + if serialize_response: + try: + if six.PY2: + return serializer(self, json.loads(resp.data)) + return serializer(self, json.loads(resp.data.decode('utf8'))) + except ValueError: + if six.PY2: + return resp.data + return resp.data.decode('utf8') + return resp + + return inner + + +class DynamicClient(object): + """ A kubernetes client that dynamically discovers and interacts with + the kubernetes API + """ + + def __init__(self, client, cache_file=None, discoverer=None): + # Setting default here to delay evaluation of LazyDiscoverer class + # until constructor is called + discoverer = discoverer or LazyDiscoverer + + self.client = client + self.configuration = client.configuration + self.__discoverer = discoverer(self, cache_file) + + @property + def resources(self): + return self.__discoverer + + @property + def version(self): + return self.__discoverer.version + + def ensure_namespace(self, resource, namespace, body): + namespace = namespace or body.get('metadata', {}).get('namespace') + if not namespace: + raise ValueError("Namespace is required for {}.{}".format(resource.group_version, resource.kind)) + return namespace + + def serialize_body(self, body): + if hasattr(body, 'to_dict'): + return body.to_dict() + return body or {} + + def get(self, resource, name=None, namespace=None, **kwargs): + path = resource.path(name=name, namespace=namespace) + return self.request('get', path, **kwargs) + + def create(self, resource, body=None, namespace=None, **kwargs): + body = self.serialize_body(body) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + path = resource.path(namespace=namespace) + return self.request('post', path, body=body, **kwargs) + + def delete(self, resource, name=None, namespace=None, body=None, label_selector=None, field_selector=None, **kwargs): + if not (name or label_selector or field_selector): + raise ValueError("At least one of name|label_selector|field_selector is required") + if resource.namespaced and not (label_selector or field_selector or namespace): + raise ValueError("At least one of namespace|label_selector|field_selector is required") + path = resource.path(name=name, namespace=namespace) + return self.request('delete', path, body=body, label_selector=label_selector, field_selector=field_selector, **kwargs) + + def replace(self, resource, body=None, name=None, namespace=None, **kwargs): + body = self.serialize_body(body) + name = name or body.get('metadata', {}).get('name') + if not name: + raise ValueError("name is required to replace {}.{}".format(resource.group_version, resource.kind)) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + path = resource.path(name=name, namespace=namespace) + return self.request('put', path, body=body, **kwargs) + + def patch(self, resource, body=None, name=None, namespace=None, **kwargs): + body = self.serialize_body(body) + name = name or body.get('metadata', {}).get('name') + if not name: + raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind)) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + + content_type = kwargs.pop('content_type', 'application/strategic-merge-patch+json') + path = resource.path(name=name, namespace=namespace) + + return self.request('patch', path, body=body, content_type=content_type, **kwargs) + + def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None): + """ + Stream events for a resource from the Kubernetes API + + :param resource: The API resource object that will be used to query the API + :param namespace: The namespace to query + :param name: The name of the resource instance to query + :param label_selector: The label selector with which to filter results + :param field_selector: The field selector with which to filter results + :param resource_version: The version with which to filter results. Only events with + a resource_version greater than this value will be returned + :param timeout: The amount of time in seconds to wait before terminating the stream + + :return: Event object with these keys: + 'type': The type of event such as "ADDED", "DELETED", etc. + 'raw_object': a dict representing the watched object. + 'object': A ResourceInstance wrapping raw_object. + + Example: + client = DynamicClient(k8s_client) + v1_pods = client.resources.get(api_version='v1', kind='Pod') + + for e in v1_pods.watch(resource_version=0, namespace=default, timeout=5): + print(e['type']) + print(e['object'].metadata) + """ + watcher = watch.Watch() + for event in watcher.stream( + resource.get, + namespace=namespace, + name=name, + field_selector=field_selector, + label_selector=label_selector, + resource_version=resource_version, + serialize=False, + timeout_seconds=timeout + ): + event['object'] = ResourceInstance(resource, event['object']) + yield event + + @meta_request + def request(self, method, path, body=None, **params): + if not path.startswith('/'): + path = '/' + path + + path_params = params.get('path_params', {}) + query_params = params.get('query_params', []) + if params.get('pretty') is not None: + query_params.append(('pretty', params['pretty'])) + if params.get('_continue') is not None: + query_params.append(('continue', params['_continue'])) + if params.get('include_uninitialized') is not None: + query_params.append(('includeUninitialized', params['include_uninitialized'])) + if params.get('field_selector') is not None: + query_params.append(('fieldSelector', params['field_selector'])) + if params.get('label_selector') is not None: + query_params.append(('labelSelector', params['label_selector'])) + if params.get('limit') is not None: + query_params.append(('limit', params['limit'])) + if params.get('resource_version') is not None: + query_params.append(('resourceVersion', params['resource_version'])) + if params.get('timeout_seconds') is not None: + query_params.append(('timeoutSeconds', params['timeout_seconds'])) + if params.get('watch') is not None: + query_params.append(('watch', params['watch'])) + if params.get('grace_period_seconds') is not None: + query_params.append(('gracePeriodSeconds', params['grace_period_seconds'])) + if params.get('propagation_policy') is not None: + query_params.append(('propagationPolicy', params['propagation_policy'])) + if params.get('orphan_dependents') is not None: + query_params.append(('orphanDependents', params['orphan_dependents'])) + + header_params = params.get('header_params', {}) + form_params = [] + local_var_files = {} + # HTTP header `Accept` + header_params['Accept'] = self.client.select_header_accept([ + 'application/json', + 'application/yaml', + ]) + + # HTTP header `Content-Type` + if params.get('content_type'): + header_params['Content-Type'] = params['content_type'] + else: + header_params['Content-Type'] = self.client.select_header_content_type(['*/*']) + + # Authentication setting + auth_settings = ['BearerToken'] + + return self.client.call_api( + path, + method.upper(), + path_params, + query_params, + header_params, + body=body, + post_params=form_params, + async_req=params.get('async_req'), + files=local_var_files, + auth_settings=auth_settings, + _preload_content=False, + _return_http_data_only=params.get('_return_http_data_only', True) + ) + + def validate(self, definition, version=None, strict=False): + """validate checks a kubernetes resource definition + + Args: + definition (dict): resource definition + version (str): version of kubernetes to validate against + strict (bool): whether unexpected additional properties should be considered errors + + Returns: + warnings (list), errors (list): warnings are missing validations, errors are validation failures + """ + if not HAS_KUBERNETES_VALIDATE: + raise KubernetesValidateMissing() + + errors = list() + warnings = list() + try: + if version is None: + try: + version = self.version['kubernetes']['gitVersion'] + except KeyError: + version = kubernetes_validate.latest_version() + kubernetes_validate.validate(definition, version, strict) + except kubernetes_validate.utils.ValidationError as e: + errors.append("resource definition validation error at %s: %s" % ('.'.join([str(item) for item in e.path]), e.message)) # noqa: B306 + except VersionNotSupportedError: + errors.append("Kubernetes version %s is not supported by kubernetes-validate" % version) + except kubernetes_validate.utils.SchemaNotFoundError as e: + warnings.append("Could not find schema for object kind %s with API version %s in Kubernetes version %s (possibly Custom Resource?)" % + (e.kind, e.api_version, e.version)) + return warnings, errors diff --git a/dynamic/discovery.py b/dynamic/discovery.py new file mode 100644 index 00000000..a646a96a --- /dev/null +++ b/dynamic/discovery.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python + +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import six +import json +import hashlib +import tempfile +from collections import defaultdict +from abc import abstractmethod, abstractproperty + +from urllib3.exceptions import ProtocolError, MaxRetryError + +from kubernetes import __version__ +from .exceptions import NotFoundError, ResourceNotFoundError, ResourceNotUniqueError, ApiException +from .resource import Resource, ResourceList + + +DISCOVERY_PREFIX = 'apis' + + +class Discoverer(object): + """ + A convenient container for storing discovered API resources. Allows + easy searching and retrieval of specific resources. + + Subclasses implement the abstract methods with different loading strategies. + """ + + def __init__(self, client, cache_file): + self.client = client + default_cache_id = self.client.configuration.host + if six.PY3: + default_cache_id = default_cache_id.encode('utf-8') + default_cachefile_name = 'osrcp-{0}.json'.format(hashlib.md5(default_cache_id).hexdigest()) + self.__cache_file = cache_file or os.path.join(tempfile.gettempdir(), default_cachefile_name) + self.__init_cache() + + def __init_cache(self, refresh=False): + if refresh or not os.path.exists(self.__cache_file): + self._cache = {'library_version': __version__} + refresh = True + else: + try: + with open(self.__cache_file, 'r') as f: + self._cache = json.load(f, cls=CacheDecoder(self.client)) + if self._cache.get('library_version') != __version__: + # Version mismatch, need to refresh cache + self.invalidate_cache() + except Exception: + self.invalidate_cache() + self._load_server_info() + self.discover() + if refresh: + self._write_cache() + + def _write_cache(self): + try: + with open(self.__cache_file, 'w') as f: + json.dump(self._cache, f, cls=CacheEncoder) + except Exception: + # Failing to write the cache isn't a big enough error to crash on + pass + + def invalidate_cache(self): + self.__init_cache(refresh=True) + + @abstractproperty + def api_groups(self): + pass + + @abstractmethod + def search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + pass + + @abstractmethod + def discover(self): + pass + + @property + def version(self): + return self.__version + + def default_groups(self, request_resources=False): + groups = {} + groups['api'] = { '': { + 'v1': (ResourceGroup( True, resources=self.get_resources_for_api_version('api', '', 'v1', True) ) + if request_resources else ResourceGroup(True)) + }} + + groups[DISCOVERY_PREFIX] = {'': { + 'v1': ResourceGroup(True, resources = {"List": [ResourceList(self.client)]}) + }} + return groups + + def parse_api_groups(self, request_resources=False, update=False): + """ Discovers all API groups present in the cluster """ + if not self._cache.get('resources') or update: + self._cache['resources'] = self._cache.get('resources', {}) + groups_response = self.client.request('GET', '/{}'.format(DISCOVERY_PREFIX)).groups + + groups = self.default_groups(request_resources=request_resources) + + for group in groups_response: + new_group = {} + for version_raw in group['versions']: + version = version_raw['version'] + resource_group = self._cache.get('resources', {}).get(DISCOVERY_PREFIX, {}).get(group['name'], {}).get(version) + preferred = version_raw == group['preferredVersion'] + resources = resource_group.resources if resource_group else {} + if request_resources: + resources = self.get_resources_for_api_version(DISCOVERY_PREFIX, group['name'], version, preferred) + new_group[version] = ResourceGroup(preferred, resources=resources) + groups[DISCOVERY_PREFIX][group['name']] = new_group + self._cache['resources'].update(groups) + self._write_cache() + + return self._cache['resources'] + + def _load_server_info(self): + def just_json(_, serialized): + return serialized + + if not self._cache.get('version'): + try: + self._cache['version'] = { + 'kubernetes': self.client.request('get', '/version', serializer=just_json) + } + except (ValueError, MaxRetryError) as e: + if isinstance(e, MaxRetryError) and not isinstance(e.reason, ProtocolError): + raise + if not self.client.configuration.host.startswith("https://"): + raise ValueError("Host value %s should start with https:// when talking to HTTPS endpoint" % + self.client.configuration.host) + else: + raise + + self.__version = self._cache['version'] + + def get_resources_for_api_version(self, prefix, group, version, preferred): + """ returns a dictionary of resources associated with provided (prefix, group, version)""" + + resources = defaultdict(list) + subresources = {} + + path = '/'.join(filter(None, [prefix, group, version])) + resources_response = self.client.request('GET', path).resources or [] + + resources_raw = list(filter(lambda resource: '/' not in resource['name'], resources_response)) + subresources_raw = list(filter(lambda resource: '/' in resource['name'], resources_response)) + for subresource in subresources_raw: + resource, name = subresource['name'].split('/') + if not subresources.get(resource): + subresources[resource] = {} + subresources[resource][name] = subresource + + for resource in resources_raw: + # Prevent duplicate keys + for key in ('prefix', 'group', 'api_version', 'client', 'preferred'): + resource.pop(key, None) + + resourceobj = Resource( + prefix=prefix, + group=group, + api_version=version, + client=self.client, + preferred=preferred, + subresources=subresources.get(resource['name']), + **resource + ) + resources[resource['kind']].append(resourceobj) + + resource_list = ResourceList(self.client, group=group, api_version=version, base_kind=resource['kind']) + resources[resource_list.kind].append(resource_list) + return resources + + def get(self, **kwargs): + """ Same as search, but will throw an error if there are multiple or no + results. If there are multiple results and only one is an exact match + on api_version, that resource will be returned. + """ + results = self.search(**kwargs) + # If there are multiple matches, prefer exact matches on api_version + if len(results) > 1 and kwargs.get('api_version'): + results = [ + result for result in results if result.group_version == kwargs['api_version'] + ] + # If there are multiple matches, prefer non-List kinds + if len(results) > 1 and not all([isinstance(x, ResourceList) for x in results]): + results = [result for result in results if not isinstance(result, ResourceList)] + if len(results) == 1: + return results[0] + elif not results: + raise ResourceNotFoundError('No matches found for {}'.format(kwargs)) + else: + raise ResourceNotUniqueError('Multiple matches found for {}: {}'.format(kwargs, results)) + + +class LazyDiscoverer(Discoverer): + """ A convenient container for storing discovered API resources. Allows + easy searching and retrieval of specific resources. + + Resources for the cluster are loaded lazily. + """ + + def __init__(self, client, cache_file): + Discoverer.__init__(self, client, cache_file) + self.__update_cache = False + + def discover(self): + self.__resources = self.parse_api_groups(request_resources=False) + + def __maybe_write_cache(self): + if self.__update_cache: + self._write_cache() + self.__update_cache = False + + @property + def api_groups(self): + return self.parse_api_groups(request_resources=False, update=True)['apis'].keys() + + def search(self, **kwargs): + results = self.__search(self.__build_search(**kwargs), self.__resources, []) + if not results: + self.invalidate_cache() + results = self.__search(self.__build_search(**kwargs), self.__resources, []) + self.__maybe_write_cache() + return results + + def __search(self, parts, resources, reqParams): + part = parts[0] + if part != '*': + + resourcePart = resources.get(part) + if not resourcePart: + return [] + elif isinstance(resourcePart, ResourceGroup): + if len(reqParams) != 2: + raise ValueError("prefix and group params should be present, have %s" % reqParams) + # Check if we've requested resources for this group + if not resourcePart.resources: + prefix, group, version = reqParams[0], reqParams[1], part + try: + resourcePart.resources = self.get_resources_for_api_version(prefix, + group, part, resourcePart.preferred) + except NotFoundError: + raise ResourceNotFoundError + self._cache['resources'][prefix][group][version] = resourcePart + self.__update_cache=True + return self.__search(parts[1:], resourcePart.resources, reqParams) + elif isinstance(resourcePart, dict): + # In this case parts [0] will be a specified prefix, group, version + # as we recurse + return self.__search(parts[1:], resourcePart, reqParams + [part] ) + else: + if parts[1] != '*' and isinstance(parts[1], dict): + for _resource in resourcePart: + for term, value in parts[1].items(): + if getattr(_resource, term) == value: + return [_resource] + + return [] + else: + return resourcePart + else: + matches = [] + for key in resources.keys(): + matches.extend(self.__search([key] + parts[1:], resources, reqParams)) + return matches + + def __build_search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + if not group and api_version and '/' in api_version: + group, api_version = api_version.split('/') + + items = [prefix, group, api_version, kind, kwargs] + return list(map(lambda x: x or '*', items)) + + def __iter__(self): + for prefix, groups in self.__resources.items(): + for group, versions in groups.items(): + for version, rg in versions.items(): + # Request resources for this groupVersion if we haven't yet + if not rg.resources: + rg.resources = self.get_resources_for_api_version( + prefix, group, version, rg.preferred) + self._cache['resources'][prefix][group][version] = rg + self.__update_cache = True + for _, resource in six.iteritems(rg.resources): + yield resource + self.__maybe_write_cache() + + +class EagerDiscoverer(Discoverer): + """ A convenient container for storing discovered API resources. Allows + easy searching and retrieval of specific resources. + + All resources are discovered for the cluster upon object instantiation. + """ + + def update(self, resources): + self.__resources = resources + + def __init__(self, client, cache_file): + Discoverer.__init__(self, client, cache_file) + + def discover(self): + self.__resources = self.parse_api_groups(request_resources=True) + + @property + def api_groups(self): + """ list available api groups """ + return self.parse_api_groups(request_resources=True, update=True)['apis'].keys() + + + def search(self, **kwargs): + """ Takes keyword arguments and returns matching resources. The search + will happen in the following order: + prefix: The api prefix for a resource, ie, /api, /oapi, /apis. Can usually be ignored + group: The api group of a resource. Will also be extracted from api_version if it is present there + api_version: The api version of a resource + kind: The kind of the resource + arbitrary arguments (see below), in random order + + The arbitrary arguments can be any valid attribute for an Resource object + """ + results = self.__search(self.__build_search(**kwargs), self.__resources) + if not results: + self.invalidate_cache() + results = self.__search(self.__build_search(**kwargs), self.__resources) + return results + + def __build_search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + if not group and api_version and '/' in api_version: + group, api_version = api_version.split('/') + + items = [prefix, group, api_version, kind, kwargs] + return list(map(lambda x: x or '*', items)) + + def __search(self, parts, resources): + part = parts[0] + resourcePart = resources.get(part) + + if part != '*' and resourcePart: + if isinstance(resourcePart, ResourceGroup): + return self.__search(parts[1:], resourcePart.resources) + elif isinstance(resourcePart, dict): + return self.__search(parts[1:], resourcePart) + else: + if parts[1] != '*' and isinstance(parts[1], dict): + for _resource in resourcePart: + for term, value in parts[1].items(): + if getattr(_resource, term) == value: + return [_resource] + return [] + else: + return resourcePart + elif part == '*': + matches = [] + for key in resources.keys(): + matches.extend(self.__search([key] + parts[1:], resources)) + return matches + return [] + + def __iter__(self): + for _, groups in self.__resources.items(): + for _, versions in groups.items(): + for _, resources in versions.items(): + for _, resource in resources.items(): + yield resource + + +class ResourceGroup(object): + """Helper class for Discoverer container""" + def __init__(self, preferred, resources=None): + self.preferred = preferred + self.resources = resources or {} + + def to_dict(self): + return { + '_type': 'ResourceGroup', + 'preferred': self.preferred, + 'resources': self.resources, + } + + +class CacheEncoder(json.JSONEncoder): + + def default(self, o): + return o.to_dict() + + +class CacheDecoder(json.JSONDecoder): + def __init__(self, client, *args, **kwargs): + self.client = client + json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs) + + def object_hook(self, obj): + if '_type' not in obj: + return obj + _type = obj.pop('_type') + if _type == 'Resource': + return Resource(client=self.client, **obj) + elif _type == 'ResourceList': + return ResourceList(self.client, **obj) + elif _type == 'ResourceGroup': + return ResourceGroup(obj['preferred'], resources=self.object_hook(obj['resources'])) + return obj diff --git a/dynamic/exceptions.py b/dynamic/exceptions.py new file mode 100644 index 00000000..d940d429 --- /dev/null +++ b/dynamic/exceptions.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python + +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys +import traceback + +from kubernetes.client.rest import ApiException + + +def api_exception(e): + """ + Returns the proper Exception class for the given kubernetes.client.rest.ApiException object + https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#success-codes + """ + _, _, exc_traceback = sys.exc_info() + tb = '\n'.join(traceback.format_tb(exc_traceback)) + return { + 400: BadRequestError, + 401: UnauthorizedError, + 403: ForbiddenError, + 404: NotFoundError, + 405: MethodNotAllowedError, + 409: ConflictError, + 410: GoneError, + 422: UnprocessibleEntityError, + 429: TooManyRequestsError, + 500: InternalServerError, + 503: ServiceUnavailableError, + 504: ServerTimeoutError, + }.get(e.status, DynamicApiError)(e, tb) + + +class DynamicApiError(ApiException): + """ Generic API Error for the dynamic client """ + def __init__(self, e, tb=None): + self.status = e.status + self.reason = e.reason + self.body = e.body + self.headers = e.headers + self.original_traceback = tb + + def __str__(self): + error_message = [str(self.status), "Reason: {}".format(self.reason)] + if self.headers: + error_message.append("HTTP response headers: {}".format(self.headers)) + + if self.body: + error_message.append("HTTP response body: {}".format(self.body)) + + if self.original_traceback: + error_message.append("Original traceback: \n{}".format(self.original_traceback)) + + return '\n'.join(error_message) + + def summary(self): + if self.body: + if self.headers and self.headers.get('Content-Type') == 'application/json': + message = json.loads(self.body).get('message') + if message: + return message + + return self.body + else: + return "{} Reason: {}".format(self.status, self.reason) + +class ResourceNotFoundError(Exception): + """ Resource was not found in available APIs """ +class ResourceNotUniqueError(Exception): + """ Parameters given matched multiple API resources """ + +class KubernetesValidateMissing(Exception): + """ kubernetes-validate is not installed """ + +# HTTP Errors +class BadRequestError(DynamicApiError): + """ 400: StatusBadRequest """ +class UnauthorizedError(DynamicApiError): + """ 401: StatusUnauthorized """ +class ForbiddenError(DynamicApiError): + """ 403: StatusForbidden """ +class NotFoundError(DynamicApiError): + """ 404: StatusNotFound """ +class MethodNotAllowedError(DynamicApiError): + """ 405: StatusMethodNotAllowed """ +class ConflictError(DynamicApiError): + """ 409: StatusConflict """ +class GoneError(DynamicApiError): + """ 410: StatusGone """ +class UnprocessibleEntityError(DynamicApiError): + """ 422: StatusUnprocessibleEntity """ +class TooManyRequestsError(DynamicApiError): + """ 429: StatusTooManyRequests """ +class InternalServerError(DynamicApiError): + """ 500: StatusInternalServer """ +class ServiceUnavailableError(DynamicApiError): + """ 503: StatusServiceUnavailable """ +class ServerTimeoutError(DynamicApiError): + """ 504: StatusServerTimeout """ diff --git a/dynamic/resource.py b/dynamic/resource.py new file mode 100644 index 00000000..3e2897cd --- /dev/null +++ b/dynamic/resource.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python + +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import yaml +from functools import partial + +from pprint import pformat + + +class Resource(object): + """ Represents an API resource type, containing the information required to build urls for requests """ + + def __init__(self, prefix=None, group=None, api_version=None, kind=None, + namespaced=False, verbs=None, name=None, preferred=False, client=None, + singularName=None, shortNames=None, categories=None, subresources=None, **kwargs): + + if None in (api_version, kind, prefix): + raise ValueError("At least prefix, kind, and api_version must be provided") + + self.prefix = prefix + self.group = group + self.api_version = api_version + self.kind = kind + self.namespaced = namespaced + self.verbs = verbs + self.name = name + self.preferred = preferred + self.client = client + self.singular_name = singularName or (name[:-1] if name else "") + self.short_names = shortNames + self.categories = categories + self.subresources = { + k: Subresource(self, **v) for k, v in (subresources or {}).items() + } + + self.extra_args = kwargs + + def to_dict(self): + return { + '_type': 'Resource', + 'prefix': self.prefix, + 'group': self.group, + 'api_version': self.api_version, + 'kind': self.kind, + 'namespaced': self.namespaced, + 'verbs': self.verbs, + 'name': self.name, + 'preferred': self.preferred, + 'singular_name': self.singular_name, + 'short_names': self.short_names, + 'categories': self.categories, + 'subresources': {k: sr.to_dict() for k, sr in self.subresources.items()}, + 'extra_args': self.extra_args, + } + + @property + def group_version(self): + if self.group: + return '{}/{}'.format(self.group, self.api_version) + return self.api_version + + def __repr__(self): + return '<{}({}/{})>'.format(self.__class__.__name__, self.group_version, self.name) + + @property + def urls(self): + full_prefix = '{}/{}'.format(self.prefix, self.group_version) + resource_name = self.name.lower() + return { + 'base': '/{}/{}'.format(full_prefix, resource_name), + 'namespaced_base': '/{}/namespaces/{{namespace}}/{}'.format(full_prefix, resource_name), + 'full': '/{}/{}/{{name}}'.format(full_prefix, resource_name), + 'namespaced_full': '/{}/namespaces/{{namespace}}/{}/{{name}}'.format(full_prefix, resource_name) + } + + def path(self, name=None, namespace=None): + url_type = [] + path_params = {} + if self.namespaced and namespace: + url_type.append('namespaced') + path_params['namespace'] = namespace + if name: + url_type.append('full') + path_params['name'] = name + else: + url_type.append('base') + return self.urls['_'.join(url_type)].format(**path_params) + + def __getattr__(self, name): + if name in self.subresources: + return self.subresources[name] + return partial(getattr(self.client, name), self) + + +class ResourceList(Resource): + """ Represents a list of API objects """ + + def __init__(self, client, group='', api_version='v1', base_kind='', kind=None): + self.client = client + self.group = group + self.api_version = api_version + self.kind = kind or '{}List'.format(base_kind) + self.base_kind = base_kind + self.__base_resource = None + + def base_resource(self): + if self.__base_resource: + return self.__base_resource + elif self.base_kind: + self.__base_resource = self.client.resources.get(group=self.group, api_version=self.api_version, kind=self.base_kind) + return self.__base_resource + return None + + def _items_to_resources(self, body): + """ Takes a List body and return a dictionary with the following structure: + { + 'api_version': str, + 'kind': str, + 'items': [{ + 'resource': Resource, + 'name': str, + 'namespace': str, + }] + } + """ + if body is None: + raise ValueError("You must provide a body when calling methods on a ResourceList") + + api_version = body['apiVersion'] + kind = body['kind'] + items = body.get('items') + if not items: + raise ValueError('The `items` field in the body must be populated when calling methods on a ResourceList') + + if self.kind != kind: + raise ValueError('Methods on a {} must be called with a body containing the same kind. Receieved {} instead'.format(self.kind, kind)) + + return { + 'api_version': api_version, + 'kind': kind, + 'items': [self._item_to_resource(item) for item in items] + } + + def _item_to_resource(self, item): + metadata = item.get('metadata', {}) + resource = self.base_resource() + if not resource: + api_version = item.get('apiVersion', self.api_version) + kind = item.get('kind', self.base_kind) + resource = self.client.resources.get(api_version=api_version, kind=kind) + return { + 'resource': resource, + 'definition': item, + 'name': metadata.get('name'), + 'namespace': metadata.get('namespace') + } + + def get(self, body, name=None, namespace=None, **kwargs): + if name: + raise ValueError('Operations on ResourceList objects do not support the `name` argument') + resource_list = self._items_to_resources(body) + response = copy.deepcopy(body) + + response['items'] = [ + item['resource'].get(name=item['name'], namespace=item['namespace'] or namespace, **kwargs).to_dict() + for item in resource_list['items'] + ] + return ResourceInstance(self, response) + + def delete(self, body, name=None, namespace=None, **kwargs): + if name: + raise ValueError('Operations on ResourceList objects do not support the `name` argument') + resource_list = self._items_to_resources(body) + response = copy.deepcopy(body) + + response['items'] = [ + item['resource'].delete(name=item['name'], namespace=item['namespace'] or namespace, **kwargs).to_dict() + for item in resource_list['items'] + ] + return ResourceInstance(self, response) + + def verb_mapper(self, verb, body, **kwargs): + resource_list = self._items_to_resources(body) + response = copy.deepcopy(body) + response['items'] = [ + getattr(item['resource'], verb)(body=item['definition'], **kwargs).to_dict() + for item in resource_list['items'] + ] + return ResourceInstance(self, response) + + def create(self, *args, **kwargs): + return self.verb_mapper('create', *args, **kwargs) + + def replace(self, *args, **kwargs): + return self.verb_mapper('replace', *args, **kwargs) + + def patch(self, *args, **kwargs): + return self.verb_mapper('patch', *args, **kwargs) + + def to_dict(self): + return { + '_type': 'ResourceList', + 'group': self.group, + 'api_version': self.api_version, + 'kind': self.kind, + 'base_kind': self.base_kind + } + + def __getattr__(self, name): + if self.base_resource(): + return getattr(self.base_resource(), name) + return None + + +class Subresource(Resource): + """ Represents a subresource of an API resource. This generally includes operations + like scale, as well as status objects for an instantiated resource + """ + + def __init__(self, parent, **kwargs): + self.parent = parent + self.prefix = parent.prefix + self.group = parent.group + self.api_version = parent.api_version + self.kind = kwargs.pop('kind') + self.name = kwargs.pop('name') + self.subresource = self.name.split('/')[1] + self.namespaced = kwargs.pop('namespaced', False) + self.verbs = kwargs.pop('verbs', None) + self.extra_args = kwargs + + #TODO(fabianvf): Determine proper way to handle differences between resources + subresources + def create(self, body=None, name=None, namespace=None, **kwargs): + name = name or body.get('metadata', {}).get('name') + body = self.parent.client.serialize_body(body) + if self.parent.namespaced: + namespace = self.parent.client.ensure_namespace(self.parent, namespace, body) + path = self.path(name=name, namespace=namespace) + return self.parent.client.request('post', path, body=body, **kwargs) + + @property + def urls(self): + full_prefix = '{}/{}'.format(self.prefix, self.group_version) + return { + 'full': '/{}/{}/{{name}}/{}'.format(full_prefix, self.parent.name, self.subresource), + 'namespaced_full': '/{}/namespaces/{{namespace}}/{}/{{name}}/{}'.format(full_prefix, self.parent.name, self.subresource) + } + + def __getattr__(self, name): + return partial(getattr(self.parent.client, name), self) + + def to_dict(self): + return { + 'kind': self.kind, + 'name': self.name, + 'subresource': self.subresource, + 'namespaced': self.namespaced, + 'verbs': self.verbs, + 'extra_args': self.extra_args, + } + + +class ResourceInstance(object): + """ A parsed instance of an API resource. It exists solely to + ease interaction with API objects by allowing attributes to + be accessed with '.' notation. + """ + + def __init__(self, client, instance): + self.client = client + # If we have a list of resources, then set the apiVersion and kind of + # each resource in 'items' + kind = instance['kind'] + if kind.endswith('List') and 'items' in instance: + kind = instance['kind'][:-4] + for item in instance['items']: + if 'apiVersion' not in item: + item['apiVersion'] = instance['apiVersion'] + if 'kind' not in item: + item['kind'] = kind + + self.attributes = self.__deserialize(instance) + self.__initialised = True + + def __deserialize(self, field): + if isinstance(field, dict): + return ResourceField(**{ + k: self.__deserialize(v) for k, v in field.items() + }) + elif isinstance(field, (list, tuple)): + return [self.__deserialize(item) for item in field] + else: + return field + + def __serialize(self, field): + if isinstance(field, ResourceField): + return { + k: self.__serialize(v) for k, v in field.__dict__.items() + } + elif isinstance(field, (list, tuple)): + return [self.__serialize(item) for item in field] + elif isinstance(field, ResourceInstance): + return field.to_dict() + else: + return field + + def to_dict(self): + return self.__serialize(self.attributes) + + def to_str(self): + return repr(self) + + def __repr__(self): + return "ResourceInstance[{}]:\n {}".format( + self.attributes.kind, + ' '.join(yaml.safe_dump(self.to_dict()).splitlines(True)) + ) + + def __getattr__(self, name): + if not '_ResourceInstance__initialised' in self.__dict__: + return super(ResourceInstance, self).__getattr__(name) + return getattr(self.attributes, name) + + def __setattr__(self, name, value): + if not '_ResourceInstance__initialised' in self.__dict__: + return super(ResourceInstance, self).__setattr__(name, value) + elif name in self.__dict__: + return super(ResourceInstance, self).__setattr__(name, value) + else: + self.attributes[name] = value + + def __getitem__(self, name): + return self.attributes[name] + + def __setitem__(self, name, value): + self.attributes[name] = value + + def __dir__(self): + return dir(type(self)) + list(self.attributes.__dict__.keys()) + + +class ResourceField(object): + """ A parsed instance of an API resource attribute. It exists + solely to ease interaction with API objects by allowing + attributes to be accessed with '.' notation + """ + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + def __repr__(self): + return pformat(self.__dict__) + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __getitem__(self, name): + return self.__dict__.get(name) + + # Here resource.items will return items if available or resource.__dict__.items function if not + # resource.get will call resource.__dict__.get after attempting resource.__dict__.get('get') + def __getattr__(self, name): + return self.__dict__.get(name, getattr(self.__dict__, name, None)) + + def __setattr__(self, name, value): + self.__dict__[name] = value + + def __dir__(self): + return dir(type(self)) + list(self.__dict__.keys()) + + def __iter__(self): + for k, v in self.__dict__.items(): + yield (k, v) diff --git a/dynamic/test_client.py b/dynamic/test_client.py new file mode 100644 index 00000000..d6d65c6d --- /dev/null +++ b/dynamic/test_client.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python + +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest +import uuid + +from kubernetes.e2e_test import base +from kubernetes.client import api_client + +from . import DynamicClient +from .exceptions import ResourceNotFoundError + + +def short_uuid(): + id = str(uuid.uuid4()) + return id[-12:] + + +class TestDynamicClient(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.config = base.get_e2e_configuration() + + def test_cluster_custom_resources(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + + with self.assertRaises(ResourceNotFoundError): + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ClusterChangeMe') + + crd_api = client.resources.get(kind='CustomResourceDefinition') + name = 'clusterchangemes.apps.example.com' + crd_manifest = { + 'apiVersion': 'apiextensions.k8s.io/v1beta1', + 'kind': 'CustomResourceDefinition', + 'metadata': { + 'name': name, + }, + 'spec': { + 'group': 'apps.example.com', + 'names': { + 'kind': 'ClusterChangeMe', + 'listKind': 'ClusterChangeMeList', + 'plural': 'clusterchangemes', + 'singular': 'clusterchangeme', + }, + 'scope': 'Cluster', + 'version': 'v1', + 'subresources': { + 'status': {} + } + } + } + resp = crd_api.create(crd_manifest) + + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + resp = crd_api.get( + name=name, + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + try: + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ClusterChangeMe') + except ResourceNotFoundError: + # Need to wait a sec for the discovery layer to get updated + time.sleep(2) + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ClusterChangeMe') + resp = changeme_api.get() + self.assertEqual(resp.items, []) + changeme_name = 'custom-resource' + short_uuid() + changeme_manifest = { + 'apiVersion': 'apps.example.com/v1', + 'kind': 'ClusterChangeMe', + 'metadata': { + 'name': changeme_name, + }, + 'spec': {} + } + + resp = changeme_api.create(body=changeme_manifest) + self.assertEqual(resp.metadata.name, changeme_name) + + resp = changeme_api.get(name=changeme_name) + self.assertEqual(resp.metadata.name, changeme_name) + + changeme_manifest['spec']['size'] = 3 + resp = changeme_api.patch( + body=changeme_manifest, + content_type='application/merge-patch+json' + ) + self.assertEqual(resp.spec.size, 3) + + resp = changeme_api.get(name=changeme_name) + self.assertEqual(resp.spec.size, 3) + + resp = changeme_api.get() + self.assertEqual(len(resp.items), 1) + + resp = changeme_api.delete( + name=changeme_name, + ) + + resp = changeme_api.get() + self.assertEqual(len(resp.items), 0) + + resp = crd_api.delete( + name=name, + ) + + time.sleep(2) + client.resources.invalidate_cache() + with self.assertRaises(ResourceNotFoundError): + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ClusterChangeMe') + + def test_namespaced_custom_resources(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + + with self.assertRaises(ResourceNotFoundError): + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ChangeMe') + + crd_api = client.resources.get(kind='CustomResourceDefinition') + name = 'changemes.apps.example.com' + crd_manifest = { + 'apiVersion': 'apiextensions.k8s.io/v1beta1', + 'kind': 'CustomResourceDefinition', + 'metadata': { + 'name': name, + }, + 'spec': { + 'group': 'apps.example.com', + 'names': { + 'kind': 'ChangeMe', + 'listKind': 'ChangeMeList', + 'plural': 'changemes', + 'singular': 'changeme', + }, + 'scope': 'Namespaced', + 'version': 'v1', + 'subresources': { + 'status': {} + } + } + } + resp = crd_api.create(crd_manifest) + + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + resp = crd_api.get( + name=name, + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + try: + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ChangeMe') + except ResourceNotFoundError: + # Need to wait a sec for the discovery layer to get updated + time.sleep(2) + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ChangeMe') + resp = changeme_api.get() + self.assertEqual(resp.items, []) + changeme_name = 'custom-resource' + short_uuid() + changeme_manifest = { + 'apiVersion': 'apps.example.com/v1', + 'kind': 'ChangeMe', + 'metadata': { + 'name': changeme_name, + }, + 'spec': {} + } + + resp = changeme_api.create(body=changeme_manifest, namespace='default') + self.assertEqual(resp.metadata.name, changeme_name) + + resp = changeme_api.get(name=changeme_name, namespace='default') + self.assertEqual(resp.metadata.name, changeme_name) + + changeme_manifest['spec']['size'] = 3 + resp = changeme_api.patch( + body=changeme_manifest, + namespace='default', + content_type='application/merge-patch+json' + ) + self.assertEqual(resp.spec.size, 3) + + resp = changeme_api.get(name=changeme_name, namespace='default') + self.assertEqual(resp.spec.size, 3) + + resp = changeme_api.get(namespace='default') + self.assertEqual(len(resp.items), 1) + + resp = changeme_api.get() + self.assertEqual(len(resp.items), 1) + + resp = changeme_api.delete( + name=changeme_name, + namespace='default' + ) + + resp = changeme_api.get(namespace='default') + self.assertEqual(len(resp.items), 0) + + resp = changeme_api.get() + self.assertEqual(len(resp.items), 0) + + resp = crd_api.delete( + name=name, + ) + + time.sleep(2) + client.resources.invalidate_cache() + with self.assertRaises(ResourceNotFoundError): + changeme_api = client.resources.get( + api_version='apps.example.com/v1', kind='ChangeMe') + + def test_service_apis(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + api = client.resources.get(api_version='v1', kind='Service') + + name = 'frontend-' + short_uuid() + service_manifest = {'apiVersion': 'v1', + 'kind': 'Service', + 'metadata': {'labels': {'name': name}, + 'name': name, + 'resourceversion': 'v1'}, + 'spec': {'ports': [{'name': 'port', + 'port': 80, + 'protocol': 'TCP', + 'targetPort': 80}], + 'selector': {'name': name}}} + + resp = api.create( + body=service_manifest, + namespace='default' + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + resp = api.get( + name=name, + namespace='default' + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + service_manifest['spec']['ports'] = [{'name': 'new', + 'port': 8080, + 'protocol': 'TCP', + 'targetPort': 8080}] + resp = api.patch( + body=service_manifest, + name=name, + namespace='default' + ) + self.assertEqual(2, len(resp.spec.ports)) + self.assertTrue(resp.status) + + resp = api.delete( + name=name, body={}, + namespace='default' + ) + + def test_replication_controller_apis(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + api = client.resources.get( + api_version='v1', kind='ReplicationController') + + name = 'frontend-' + short_uuid() + rc_manifest = { + 'apiVersion': 'v1', + 'kind': 'ReplicationController', + 'metadata': {'labels': {'name': name}, + 'name': name}, + 'spec': {'replicas': 2, + 'selector': {'name': name}, + 'template': {'metadata': { + 'labels': {'name': name}}, + 'spec': {'containers': [{ + 'image': 'nginx', + 'name': 'nginx', + 'ports': [{'containerPort': 80, + 'protocol': 'TCP'}]}]}}}} + + resp = api.create( + body=rc_manifest, namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertEqual(2, resp.spec.replicas) + + resp = api.get( + name=name, namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertEqual(2, resp.spec.replicas) + + resp = api.delete( + name=name, body={}, namespace='default') + + def test_configmap_apis(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + api = client.resources.get(api_version='v1', kind='ConfigMap') + + name = 'test-configmap-' + short_uuid() + test_configmap = { + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": name, + }, + "data": { + "config.json": "{\"command\":\"/usr/bin/mysqld_safe\"}", + "frontend.cnf": "[mysqld]\nbind-address = 10.0.0.3\n" + } + } + + resp = api.create( + body=test_configmap, namespace='default' + ) + self.assertEqual(name, resp.metadata.name) + + resp = api.get( + name=name, namespace='default') + self.assertEqual(name, resp.metadata.name) + + test_configmap['data']['config.json'] = "{}" + resp = api.patch( + name=name, namespace='default', body=test_configmap) + + resp = api.delete( + name=name, body={}, namespace='default') + + resp = api.get(namespace='default', pretty=True) + self.assertEqual([], resp.items) + + def test_node_apis(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + api = client.resources.get(api_version='v1', kind='Node') + + for item in api.get().items: + node = api.get(name=item.metadata.name) + self.assertTrue(len(dict(node.metadata.labels)) > 0)