diff --git a/.travis.yml b/.travis.yml index 1a1d2f534..1d9162a62 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,11 @@ language: python python: - - '3.6' + - '3.7' install: - sudo mkdir -p /cse && sudo chmod a+rwx /cse - - pip install -r requirements.txt - - python setup.py develop + - pip install . - pip install tox script: diff --git a/container_service_extension/abstract_broker.py b/container_service_extension/abstract_broker.py index 0cb34d8a4..56e2c6f4b 100644 --- a/container_service_extension/abstract_broker.py +++ b/container_service_extension/abstract_broker.py @@ -49,11 +49,9 @@ def get_cluster_info(self): """ @abc.abstractmethod - def get_cluster_config(self, cluster_name): + def get_cluster_config(self): """Get the configuration for the cluster. - :param: str cluster_name: Name of the cluster. - :return: Configuration of cluster :rtype: dict @@ -66,7 +64,7 @@ def list_clusters(self): :return: response object - :rtype: dict + :rtype: list """ pass diff --git a/container_service_extension/broker_manager.py b/container_service_extension/broker_manager.py index 8a078f7ef..21ca1091d 100644 --- a/container_service_extension/broker_manager.py +++ b/container_service_extension/broker_manager.py @@ -8,7 +8,6 @@ import container_service_extension.ovdc_utils as ovdc_utils from container_service_extension.pksbroker import PKSBroker from container_service_extension.pksbroker_manager import PksBrokerManager -from container_service_extension.server_constants import CseOperation from container_service_extension.server_constants import K8S_PROVIDER_KEY from container_service_extension.server_constants import K8sProvider from container_service_extension.server_constants import PKS_CLUSTER_DOMAIN_KEY @@ -21,19 +20,12 @@ # TODO(Constants) -# 1. Scan and classify all broker-related constants in server code into -# either common, vcd_broker_specific, pks_broker_specific constants. -# Design and refactor them into one or more relevant files. -# 2. Scan through both CSE client and server to identify HTTP request/response -# body params and define all of them as constants into a file -# from where both client and server can access them. -# 3. Refactor both client and server code accordingly -# 4. As part of refactoring, avoid accessing HTTP request body directly +# As part of refactoring, avoid accessing HTTP request body directly # from VcdBroker and PksBroker. We should try to limit processing request to # request_processor.py and broker_manager.py. -class BrokerManager(object): +class BrokerManager: """Manage calls to vCD and PKS brokers. Handles: @@ -45,126 +37,37 @@ def __init__(self, tenant_auth_token, request_spec): self.tenant_auth_token = tenant_auth_token self.req_spec = request_spec self.pks_cache = get_pks_cache() - self.vcdbroker_manager = VcdBrokerManager( - tenant_auth_token, request_spec) - self.pksbroker_manager = PksBrokerManager( - tenant_auth_token, request_spec) - self.is_ovdc_present_in_request = False + self.vcdbroker_manager = VcdBrokerManager(tenant_auth_token, request_spec) # noqa: E501 + self.pksbroker_manager = PksBrokerManager(tenant_auth_token, request_spec) # noqa: E501 + self.is_ovdc_present_in_request = bool(request_spec.get(RequestKey.OVDC_NAME)) # noqa: E501 - def invoke(self, op): - """Invoke right broker(s) to perform the operation requested. + def _get_cluster_config(self): + _, broker = self._get_cluster_info() + return broker.get_cluster_config() - Might result in further (pre/post)processing on the request/result(s). - - - Depending on the operation requested, this method may do one or more - of below mentioned points. - 1. Extract and construct the relevant params for the operation. - 2. Choose right broker to perform the operation/method requested. - 3. Scan through available brokers to aggregate (or) filter results. - - :param CseOperation op: Operation to be performed by one of the - brokers. - - :return result: result of the operation. - - :rtype: dict - """ - result = {} - self.is_ovdc_present_in_request = bool(self.req_spec.get(RequestKey.OVDC_NAME)) # noqa: E501 - - if op == CseOperation.CLUSTER_CONFIG: - cluster_spec = \ - {'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME)} - result = self._get_cluster_config(**cluster_spec) - elif op == CseOperation.CLUSTER_CREATE: - # TODO(ClusterSpec) Create an inner class "ClusterSpec" - # in abstract_broker.py and have subclasses define and use it - # as instance variable. - # Method 'Create_cluster' in VcdBroker and PksBroker should take - # ClusterSpec either as a param (or) - # read from instance variable (if needed only). - cluster_spec = { - 'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME), - 'vdc_name': self.req_spec.get(RequestKey.OVDC_NAME), - 'org_name': self.req_spec.get(RequestKey.ORG_NAME), - 'node_count': self.req_spec.get(RequestKey.NUM_WORKERS), - 'storage_profile': self.req_spec.get(RequestKey.STORAGE_PROFILE_NAME), # noqa: E501 - 'network_name': self.req_spec.get(RequestKey.NETWORK_NAME), - 'template': self.req_spec.get(RequestKey.TEMPLATE_NAME), - } - result = self._create_cluster(**cluster_spec) - elif op == CseOperation.CLUSTER_DELETE: - cluster_spec = \ - {'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME)} - result = self._delete_cluster(**cluster_spec) - elif op == CseOperation.CLUSTER_INFO: - cluster_spec = \ - {'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME)} - result = self._get_cluster_info(**cluster_spec)[0] - elif op == CseOperation.CLUSTER_LIST: - result = self._list_clusters() - elif op == CseOperation.CLUSTER_RESIZE: - cluster_spec = { - 'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME), - 'node_count': self.req_spec.get(RequestKey.NUM_WORKERS) - } - result = self._resize_cluster(**cluster_spec) - elif op == CseOperation.NODE_CREATE: - # Currently node create is a vCD only operation. - broker = VcdBroker(self.tenant_auth_token, self.req_spec) - result = broker.create_nodes() - elif op == CseOperation.NODE_DELETE: - # Currently node delete is a vCD only operation. - broker = VcdBroker(self.tenant_auth_token, self.req_spec) - result = broker.delete_nodes() - elif op == CseOperation.NODE_INFO: - cluster_name = self.req_spec.get(RequestKey.CLUSTER_NAME) - node_name = self.req_spec.get(RequestKey.NODE_NAME) - # Currently node info is a vCD only operation. - broker = VcdBroker(self.tenant_auth_token, self.req_spec) - result = broker.get_node_info(cluster_name, node_name) - - return result - - def _get_cluster_config(self, **cluster_spec): - """Get the cluster configuration. - - :param str cluster_name: Name of cluster. - - :return: Cluster config. - - :rtype: str - """ - cluster, broker = self._get_cluster_info(**cluster_spec) - return broker.get_cluster_config(cluster_name=cluster['name']) - - def _create_cluster(self, **cluster_spec): - cluster_name = cluster_spec['cluster_name'] - vdc_name = cluster_spec.get('vdc_name') - org_name = cluster_spec.get('org_name') + def _create_cluster(self): + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] # 'is_org_admin_search' is used here to prevent cluster creation with # same cluster-name by users within org, which might span over to PKS. # If it is true, cluster list is filtered by the org name of the # logged-in user to check for duplicates. - cluster, _ = self._find_cluster_in_org(cluster_name, - is_org_admin_search=True) + cluster, _ = self._find_cluster_in_org(is_org_admin_search=True) if cluster: raise ClusterAlreadyExistsError(f"Cluster {cluster_name} " f"already exists.") - ctr_prov_ctx = ovdc_utils.get_ovdc_k8s_provider_metadata(org_name=org_name, ovdc_name=vdc_name, include_credentials=True, include_nsxt_info=True) # noqa: E501 + ctr_prov_ctx = ovdc_utils.get_ovdc_k8s_provider_metadata(org_name=self.req_spec[RequestKey.ORG_NAME], ovdc_name=self.req_spec[RequestKey.OVDC_NAME], include_credentials=True, include_nsxt_info=True) # noqa: E501 if ctr_prov_ctx.get(K8S_PROVIDER_KEY) == K8sProvider.PKS: - cluster_spec['pks_plan'] = ctr_prov_ctx[PKS_PLANS_KEY][0] - cluster_spec['pks_ext_host'] = f"{cluster_name}.{ctr_prov_ctx[PKS_CLUSTER_DOMAIN_KEY]}" # noqa: E501 + self.req_spec[RequestKey.PKS_PLAN_NAME] = ctr_prov_ctx[PKS_PLANS_KEY][0] # noqa: E501 + self.req_spec['pks_ext_host'] = f"{cluster_name}.{ctr_prov_ctx[PKS_CLUSTER_DOMAIN_KEY]}" # noqa: E501 broker = self._get_broker_based_on_ctr_prov_ctx(ctr_prov_ctx) - return broker.create_cluster(**cluster_spec) + return broker.create_cluster() - def _delete_cluster(self, **cluster_spec): - cluster, broker = self._get_cluster_info(**cluster_spec) - return broker.delete_cluster(cluster_name=cluster['name']) + def _delete_cluster(self): + _, broker = self._get_cluster_info() + return broker.delete_cluster() - def _get_cluster_info(self, **cluster_spec): + def _get_cluster_info(self): """Get cluster details directly from cloud provider. Logic of the method is as follows. @@ -180,12 +83,12 @@ def _get_cluster_info(self, **cluster_spec): :rtype: tuple """ - cluster_name = cluster_spec['cluster_name'] + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] if self.is_ovdc_present_in_request: broker = self._get_broker_based_on_vdc() - return broker.get_cluster_info(cluster_name=cluster_name), broker + return broker.get_cluster_info(), broker else: - cluster, broker = self._find_cluster_in_org(cluster_name) + cluster, broker = self._find_cluster_in_org() if cluster: return cluster, broker @@ -221,16 +124,15 @@ def _list_clusters(self): return result - def _resize_cluster(self, **cluster_spec): - cluster, broker = self._get_cluster_info(**cluster_spec) - return broker.resize_cluster(curr_cluster_info=cluster, **cluster_spec) + def _resize_cluster(self): + cluster, broker = self._get_cluster_info() + return broker.resize_cluster(curr_cluster_info=cluster) + + def _find_cluster_in_org(self, is_org_admin_search=False): + cluster, broker = self.vcdbroker_manager.find_cluster_in_org() - def _find_cluster_in_org(self, cluster_name, is_org_admin_search=False): - cluster, broker = self.vcdbroker_manager.find_cluster_in_org( - cluster_name, is_org_admin_search) if not cluster and is_pks_enabled(): - cluster, broker = self.pksbroker_manager.find_cluster_in_org( - cluster_name, is_org_admin_search) + cluster, broker = self.pksbroker_manager.find_cluster_in_org(is_org_admin_search=is_org_admin_search) # noqa: E501 return cluster, broker diff --git a/container_service_extension/pksbroker.py b/container_service_extension/pksbroker.py index f97e519e0..6545b15fb 100644 --- a/container_service_extension/pksbroker.py +++ b/container_service_extension/pksbroker.py @@ -57,8 +57,7 @@ from container_service_extension.server_constants import SYSTEM_ORG_NAME from container_service_extension.shared_constants import RequestKey from container_service_extension.uaaclient.uaaclient import UaaClient -from container_service_extension.utils import get_pks_cache - +import container_service_extension.utils as utils # Delimiter to append with user id context USER_ID_SEPARATOR = "---" @@ -105,7 +104,7 @@ def __init__(self, tenant_auth_token, request_spec, pks_ctx): if pks_ctx.get('proxy') else None self.compute_profile = pks_ctx.get(PKS_COMPUTE_PROFILE_KEY, None) self.nsxt_server = \ - get_pks_cache().get_nsxt_info(pks_ctx.get('vc')) + utils.get_pks_cache().get_nsxt_info(pks_ctx.get('vc')) if self.nsxt_server: self.nsxt_client = NSXTClient( host=self.nsxt_server.get('host'), @@ -261,7 +260,7 @@ def _list_clusters(self): return list_of_cluster_dicts @secure(required_rights=[CSE_PKS_DEPLOY_RIGHT_NAME]) - def create_cluster(self, **cluster_spec): + def create_cluster(self): """Create cluster in PKS environment. To retain the user context, user-id of the logged-in user is appended @@ -275,16 +274,25 @@ def create_cluster(self, **cluster_spec): :rtype: dict """ - cluster_name = cluster_spec['cluster_name'] + required = [ + RequestKey.CLUSTER_NAME, + RequestKey.NUM_WORKERS, + RequestKey.PKS_PLAN_NAME, + 'pks_ext_host' # noqa: E501 this should not be part of the request spec, check broker_manager._create_cluster(). request spec should not be getting modified ever + ] + utils.ensure_keys_in_dict(required, self.req_spec, dict_name='request') + + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] qualified_cluster_name = self._append_user_id(cluster_name) - cluster_spec['cluster_name'] = qualified_cluster_name + self.req_spec[RequestKey.CLUSTER_NAME] = qualified_cluster_name if not self.nsxt_server: raise CseServerError( "NSX-T server details not found for PKS server selected for " f"cluster : {cluster_name}. Aborting creation of cluster.") - cluster_info = self._create_cluster(**cluster_spec) + # this needs to be refactored + cluster_info = self._create_cluster(**self.req_spec) self._isolate_cluster(cluster_name, qualified_cluster_name, cluster_info.get('uuid')) @@ -313,12 +321,6 @@ def _create_cluster(self, cluster_name, node_count, pks_plan, pks_ext_host, :rtype: dict """ - # TODO(ClusterSpec) Create an inner class "ClusterSpec" - # in abstract_broker.py and have subclasses define and use it - # as instance variable. - # Method 'Create_cluster' in VcdBroker and PksBroker should take - # ClusterSpec either as a param (or) - # read from instance variable (if needed only). compute_profile = compute_profile \ if compute_profile else self.compute_profile cluster_api = ClusterApiV1Beta(api_client=self.client_v1beta) @@ -348,7 +350,7 @@ def _create_cluster(self, cluster_name, node_count, pks_plan, pks_ext_host, return cluster_dict - def get_cluster_info(self, cluster_name, **kwargs): + def get_cluster_info(self, **kwargs): """Get the details of a cluster with a given name in PKS environment. System administrator gets the given cluster information regardless of @@ -360,6 +362,8 @@ def get_cluster_info(self, cluster_name, **kwargs): :rtype: dict """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + if self.tenant_client.is_sysadmin() \ or is_org_admin(self.client_session) \ or kwargs.get('is_org_admin_search'): @@ -375,16 +379,18 @@ def get_cluster_info(self, cluster_name, **kwargs): if len(filtered_cluster_list) == 0: raise PksServerError(requests.codes.not_found, f"cluster {cluster_name} not found.") - cluster_info = filtered_cluster_list[0] - else: - cluster_info = \ - self._get_cluster_info(self._append_user_id(cluster_name)) - self._restore_original_name(cluster_info) - if not kwargs.get('is_admin_request'): - self._filter_pks_properties(cluster_info) + return filtered_cluster_list[0] + + cluster_info = \ + self._get_cluster_info(self._append_user_id(cluster_name)) + self._restore_original_name(cluster_info) + if not kwargs.get('is_admin_request'): + self._filter_pks_properties(cluster_info) return cluster_info + # this function is still being used by _list_clusters for some reason + # ideally we would like to merge this function with the above function def _get_cluster_info(self, cluster_name): """Get the details of a cluster with a given name in PKS environment. @@ -413,52 +419,42 @@ def _get_cluster_info(self, cluster_name): return cluster_dict - def get_cluster_config(self, cluster_name): + def get_cluster_config(self): """Get the configuration of the cluster with the given name in PKS. System administrator gets the given cluster config regardless of who is the owner of the cluster. Other users get config only on the cluster they own. - :param str cluster_name: Name of the cluster :return: Configuration of the cluster. :rtype: str """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + if self.tenant_client.is_sysadmin() or \ is_org_admin(self.client_session): - cluster_info = self.get_cluster_info(cluster_name) + cluster_info = self.get_cluster_info() qualified_cluster_name = cluster_info['pks_cluster_name'] else: qualified_cluster_name = self._append_user_id(cluster_name) self._check_cluster_isolation(cluster_name, qualified_cluster_name) - config_info = self._get_cluster_config(qualified_cluster_name) - return self.filter_traces_of_user_context(config_info) - - def _get_cluster_config(self, cluster_name): - """Get the configuration of the cluster with the given name in PKS. - - :param str cluster_name: Name of the cluster - :return: Configuration of the cluster. - - :rtype: str - """ cluster_api = ClusterApiV1(api_client=self.client_v1) LOGGER.debug(f"Sending request to PKS: {self.pks_host_uri} to get" f" detailed configuration of cluster with name: " f"{cluster_name}") - config = cluster_api.create_user(cluster_name=cluster_name) - + config = cluster_api.create_user(cluster_name=qualified_cluster_name) LOGGER.debug(f"Received response from PKS: {self.pks_host_uri} on " f"cluster: {cluster_name} with details: {config}") cluster_config = yaml.safe_dump(config, default_flow_style=False) - return cluster_config + + return self.filter_traces_of_user_context(cluster_config) @secure(required_rights=[CSE_PKS_DEPLOY_RIGHT_NAME]) - def delete_cluster(self, cluster_name): + def delete_cluster(self): """Delete the cluster with a given name in PKS environment. System administrator can delete the given cluster regardless of @@ -467,6 +463,8 @@ def delete_cluster(self, cluster_name): :param str cluster_name: Name of the cluster """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + if self.tenant_client.is_sysadmin() \ or is_org_admin(self.client_session): cluster_info = self.get_cluster_info(cluster_name) @@ -474,7 +472,20 @@ def delete_cluster(self, cluster_name): else: qualified_cluster_name = self._append_user_id(cluster_name) - result = self._delete_cluster(qualified_cluster_name) + result = {} + cluster_api = ClusterApiV1(api_client=self.client_v1) + LOGGER.debug(f"Sending request to PKS: {self.pks_host_uri} to delete " + f"the cluster with name: {qualified_cluster_name}") + try: + cluster_api.delete_cluster(cluster_name=qualified_cluster_name) + except v1Exception as err: + LOGGER.debug(f"Deleting cluster {qualified_cluster_name} failed" + f" with error:\n {err}") + raise PksServerError(err.status, err.body) + LOGGER.debug(f"PKS: {self.pks_host_uri} accepted the request to delete" + f" the cluster: {qualified_cluster_name}") + result['name'] = qualified_cluster_name + result['task_status'] = 'in progress' # remove cluster network isolation LOGGER.debug(f"Removing network isolation of cluster {cluster_name}.") @@ -493,51 +504,22 @@ def delete_cluster(self, cluster_name): self._filter_pks_properties(result) return result - def _delete_cluster(self, cluster_name): - """Delete the cluster with a given name in PKS environment. - - Also deletes associated NSX-T Distributed Firewall rules that kept the - cluster network isolated from other clusters. - - :param str cluster_name: Name of the cluster - """ - result = {} - - cluster_api = ClusterApiV1(api_client=self.client_v1) - - LOGGER.debug(f"Sending request to PKS: {self.pks_host_uri} to delete " - f"the cluster with name: {cluster_name}") - try: - cluster_api.delete_cluster(cluster_name=cluster_name) - except v1Exception as err: - LOGGER.debug(f"Deleting cluster {cluster_name} failed with " - f"error:\n {err}") - raise PksServerError(err.status, err.body) - - LOGGER.debug(f"PKS: {self.pks_host_uri} accepted the request to delete" - f" the cluster: {cluster_name}") - - result['name'] = cluster_name - result['task_status'] = 'in progress' - return result - @secure(required_rights=[CSE_PKS_DEPLOY_RIGHT_NAME]) - def resize_cluster(self, **cluster_spec): + def resize_cluster(self): """Resize the cluster of a given name to given number of worker nodes. System administrator can resize the given cluster regardless of who is the owner of the cluster. Other users can only resize the cluster they own. - :param dict cluster_spec: named parameters that are required to - resize cluster (cluster_name, node_count) :return: response status :rtype: dict """ - cluster_name = cluster_spec['cluster_name'] + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + num_workers = self.req_spec[RequestKey.NUM_WORKERS] if self.tenant_client.is_sysadmin() \ or is_org_admin(self.client_session): @@ -548,39 +530,25 @@ def resize_cluster(self, **cluster_spec): self._check_cluster_isolation(cluster_name, qualified_cluster_name) - cluster_spec['cluster_name'] = qualified_cluster_name - result = self._resize_cluster(**cluster_spec) - self._restore_original_name(result) - self._filter_pks_properties(result) - return result - - def _resize_cluster(self, cluster_name, node_count, **kwargs): - """Resize the cluster of a given name to given number of worker nodes. - - :param str cluster_name: Name of the cluster - :param int node_count: New size of the worker nodes - """ result = {} cluster_api = ClusterApiV1(api_client=self.client_v1) LOGGER.debug(f"Sending request to PKS:{self.pks_host_uri} to resize " - f"the cluster with name: {cluster_name} to " - f"{node_count} worker nodes") - - resize_params = UpdateClusterParameters( - kubernetes_worker_instances=node_count) + f"the cluster with name: {qualified_cluster_name} to " + f"{num_workers} worker nodes") + resize_params = UpdateClusterParameters(kubernetes_worker_instances=num_workers) # noqa: E501 try: - cluster_api.update_cluster(cluster_name, body=resize_params) + cluster_api.update_cluster(qualified_cluster_name, body=resize_params) # noqa: E501 except v1Exception as err: - LOGGER.debug(f"Resizing cluster {cluster_name} failed with " - f"error:\n {err}") + LOGGER.debug(f"Resizing cluster {qualified_cluster_name} failed" + f" with error:\n {err}") raise PksServerError(err.status, err.body) - LOGGER.debug(f"PKS: {self.pks_host_uri} accepted the request to resize" - f" the cluster: {cluster_name}") + f" the cluster: {qualified_cluster_name}") - result['name'] = cluster_name + result['name'] = qualified_cluster_name result['task_status'] = 'in progress' - + self._restore_original_name(result) + self._filter_pks_properties(result) return result def _check_cluster_isolation(self, cluster_name, qualified_cluster_name): diff --git a/container_service_extension/pksbroker_manager.py b/container_service_extension/pksbroker_manager.py index 104c68608..18578356d 100644 --- a/container_service_extension/pksbroker_manager.py +++ b/container_service_extension/pksbroker_manager.py @@ -13,10 +13,11 @@ from container_service_extension.pyvcloud_utils import connect_vcd_user_via_token # noqa: E501 from container_service_extension.server_constants import K8S_PROVIDER_KEY from container_service_extension.server_constants import K8sProvider +from container_service_extension.shared_constants import RequestKey from container_service_extension.utils import get_pks_cache -class PksBrokerManager(object): +class PksBrokerManager: def __init__(self, tenant_auth_token, request_spec): self.tenant_auth_token = tenant_auth_token self.req_spec = request_spec @@ -38,7 +39,7 @@ def list_clusters(self): pks_clusters.append(pks_cluster) return pks_clusters - def find_cluster_in_org(self, cluster_name, is_org_admin_search=False): + def find_cluster_in_org(self, is_org_admin_search=False): """Invoke all PKS brokers in the org to find the cluster. 'is_org_admin_search' is used here to prevent cluster creation with @@ -51,15 +52,14 @@ def find_cluster_in_org(self, cluster_name, is_org_admin_search=False): Else: (None, None) if cluster not found. """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] pks_ctx_list = \ self.create_pks_context_for_all_accounts_in_org() for pks_ctx in pks_ctx_list: pksbroker = PKSBroker(self.tenant_auth_token, self.req_spec, pks_ctx) try: - return pksbroker.get_cluster_info( - cluster_name=cluster_name, - is_org_admin_search=is_org_admin_search), pksbroker + return pksbroker.get_cluster_info(is_org_admin_search=is_org_admin_search), pksbroker # noqa: E501 except PksClusterNotFoundError as err: # If a cluster is not found, then broker_manager will # decide if it wants to raise an error or ignore it if was it diff --git a/container_service_extension/request_handlers/cluster_handler.py b/container_service_extension/request_handlers/cluster_handler.py new file mode 100644 index 000000000..11dd5d9b0 --- /dev/null +++ b/container_service_extension/request_handlers/cluster_handler.py @@ -0,0 +1,181 @@ +from container_service_extension.broker_manager import BrokerManager +from container_service_extension.shared_constants import RequestKey +import container_service_extension.utils as utils +from container_service_extension.vcdbroker import VcdBroker + + +def cluster_create(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, ovdc_name, cluster_name, num_nodes. + Conditional data: if k8s_provider is 'native', num_cpu, mb_memory, + network_name, storage_profile_name, template_name, enable_nfs, + rollback are required (validation handled elsewhere). + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.OVDC_NAME, + RequestKey.CLUSTER_NAME, + RequestKey.NUM_WORKERS + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + broker_manager = BrokerManager(tenant_auth_token, request_dict) + return broker_manager._create_cluster() + + +def cluster_resize(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, cluster_name, num_nodes. + Conditional data: if k8s_provider is 'native', network_name, + rollback are required (validation handled elsewhere). + Optional data: ovdc_name. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + RequestKey.NUM_WORKERS + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + broker_manager = BrokerManager(tenant_auth_token, request_dict) + return broker_manager._resize_cluster() + + +def cluster_delete(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, cluster_name. + Optional data: ovdc_name. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + broker_manager = BrokerManager(tenant_auth_token, request_dict) + return broker_manager._delete_cluster() + + +def cluster_info(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, cluster_name. + Optional data: ovdc_name. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + broker_manager = BrokerManager(tenant_auth_token, request_dict) + return broker_manager._get_cluster_info()[0] + + +def cluster_config(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, cluster_name. + Optional data: ovdc_name. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + broker_manager = BrokerManager(tenant_auth_token, request_dict) + return broker_manager._get_cluster_config() + + +def cluster_list(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + :return: List + """ + broker_manager = BrokerManager(tenant_auth_token, request_dict) + return broker_manager._list_clusters() + + +def node_create(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org name, ovdc name, cluster name, num nodes, num cpu, + mb memory, network name, storage profile name, template name, + rollback, enable nfs. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + RequestKey.NUM_WORKERS, + RequestKey.NUM_CPU, + RequestKey.MB_MEMORY, + RequestKey.NETWORK_NAME, + RequestKey.STORAGE_PROFILE_NAME, + RequestKey.TEMPLATE_NAME, + RequestKey.ROLLBACK, + RequestKey.ENABLE_NFS, + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + # Currently node create is a vCD only operation. + # Different from resize because this can create nfs nodes + broker = VcdBroker(tenant_auth_token, request_dict) + return broker.create_nodes() + + +def node_delete(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, cluster_name, node_names_list. + Optional data: ssh_key_file, ovdc_name. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + RequestKey.NODE_NAMES_LIST + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + # Currently node delete is a vCD only operation. + # TODO remove once resize is able to scale down native clusters + broker = VcdBroker(tenant_auth_token, request_dict) + return broker.delete_nodes() + + +def node_info(request_dict, tenant_auth_token): + """Request handler for cluster operation. + + Required data: org_name, cluster_name, node_name. + Optional data: ovdc_name. + + :return: Dict + """ + required = [ + RequestKey.ORG_NAME, + RequestKey.CLUSTER_NAME, + RequestKey.NODE_NAME + ] + utils.ensure_keys_in_dict(required, request_dict, dict_name="request") + + # Currently node info is a vCD only operation. + broker = VcdBroker(tenant_auth_token, request_dict) + return broker.get_node_info(request_dict.get(RequestKey.CLUSTER_NAME), + request_dict.get(RequestKey.NODE_NAME)) diff --git a/container_service_extension/request_processor.py b/container_service_extension/request_processor.py index 513256d11..de02c90f6 100644 --- a/container_service_extension/request_processor.py +++ b/container_service_extension/request_processor.py @@ -12,6 +12,7 @@ from container_service_extension.exception_handler import handle_exception from container_service_extension.exceptions import CseRequestError from container_service_extension.logger import SERVER_LOGGER as LOGGER +import container_service_extension.request_handlers.cluster_handler as cluster_handler # noqa: E501 import container_service_extension.request_handlers.ovdc_handler as ovdc_handler # noqa: E501 import container_service_extension.request_handlers.system_handler as system_handler # noqa: E501 import container_service_extension.request_handlers.template_handler as template_handler # noqa: E501 @@ -46,6 +47,15 @@ """ # noqa: E501 OPERATION_TO_HANDLER = { + CseOperation.CLUSTER_CONFIG: cluster_handler.cluster_config, + CseOperation.CLUSTER_CREATE: cluster_handler.cluster_create, + CseOperation.CLUSTER_DELETE: cluster_handler.cluster_delete, + CseOperation.CLUSTER_INFO: cluster_handler.cluster_info, + CseOperation.CLUSTER_LIST: cluster_handler.cluster_list, + CseOperation.CLUSTER_RESIZE: cluster_handler.cluster_resize, + CseOperation.NODE_CREATE: cluster_handler.node_create, + CseOperation.NODE_DELETE: cluster_handler.node_delete, + CseOperation.NODE_INFO: cluster_handler.node_info, CseOperation.OVDC_UPDATE: ovdc_handler.ovdc_update, CseOperation.OVDC_INFO: ovdc_handler.ovdc_info, CseOperation.OVDC_LIST: ovdc_handler.ovdc_list, @@ -238,18 +248,9 @@ def process_request(body): }) # process the request - reply = {} - reply['status_code'] = operation.ideal_response_code - try: - reply['body'] = OPERATION_TO_HANDLER[operation](request_data, - tenant_auth_token) - LOGGER.debug(f"reply: {str(reply)}") - return reply - except KeyError: - pass - # TODO all cluster operations should route to cluster_handler.py - from container_service_extension.broker_manager import BrokerManager - broker_manager = BrokerManager(tenant_auth_token, request_data) - reply['body'] = broker_manager.invoke(op=operation) + reply = { + 'status_code': operation.ideal_response_code, + 'body': OPERATION_TO_HANDLER[operation](request_data, tenant_auth_token) # noqa: E501 + } LOGGER.debug(f"reply: {str(reply)}") return reply diff --git a/container_service_extension/vcdbroker.py b/container_service_extension/vcdbroker.py index b54996364..6dda4c46f 100644 --- a/container_service_extension/vcdbroker.py +++ b/container_service_extension/vcdbroker.py @@ -41,9 +41,7 @@ from container_service_extension.exceptions import NodeNotFoundError from container_service_extension.exceptions import WorkerNodeCreationError from container_service_extension.logger import SERVER_LOGGER as LOGGER -from container_service_extension.pyvcloud_utils import \ - get_org_name_from_ovdc_id -from container_service_extension.pyvcloud_utils import get_sys_admin_client +import container_service_extension.pyvcloud_utils as vcd_utils from container_service_extension.server_constants import \ CSE_NATIVE_DEPLOY_RIGHT_NAME from container_service_extension.server_constants import K8S_PROVIDER_KEY @@ -53,7 +51,7 @@ from container_service_extension.shared_constants import ERROR_MESSAGE_KEY from container_service_extension.shared_constants import ERROR_STACKTRACE_KEY from container_service_extension.shared_constants import RequestKey -from container_service_extension.utils import get_server_runtime_config +import container_service_extension.utils as utils OP_CREATE_CLUSTER = 'create_cluster' @@ -123,7 +121,7 @@ def __init__(self, tenant_auth_token, request_spec): self.daemon = False def _connect_sys_admin(self): - self.sys_admin_client = get_sys_admin_client() + self.sys_admin_client = vcd_utils.get_sys_admin_client() def _disconnect_sys_admin(self): if self.sys_admin_client is not None: @@ -177,7 +175,7 @@ def _is_valid_name(self, name): return all(allowed.match(x) for x in name.split(".")) def _get_template(self, name=None): - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() name = name or \ self.req_spec.get(RequestKey.TEMPLATE_NAME) or \ server_config['broker']['default_template_name'] @@ -201,7 +199,7 @@ def _get_nfs_exports(self, ip, vapp, node): """ # TODO(right template) find a right way to retrieve # the template from which nfs node was created. - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() template = server_config['broker']['templates'][0] script = f"#!/usr/bin/env bash\nshowmount -e {ip}" result = execute_script_in_nodes( @@ -225,7 +223,7 @@ def node_rollback(self, node_list): vapp = VApp(self.tenant_client, href=self.cluster['vapp_href']) template = self._get_template() try: - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() delete_nodes_from_cluster(server_config, vapp, template, node_list, force=True) except Exception: @@ -282,18 +280,20 @@ def list_clusters(self): 'vdc': c['vdc_name'], 'status': c['status'], 'vdc_id': c['vdc_id'], - 'org_name': get_org_name_from_ovdc_id(c['vdc_id']), + 'org_name': vcd_utils.get_org_name_from_ovdc_id(c['vdc_id']), K8S_PROVIDER_KEY: K8sProvider.NATIVE }) return clusters - def get_cluster_info(self, cluster_name): + def get_cluster_info(self, **kwargs): """Get the info of the cluster. :param cluster_name: (str): Name of the cluster :return: (dict): Info of the cluster. """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + self._connect_tenant() clusters = load_from_metadata( self.tenant_client, @@ -388,8 +388,10 @@ def get_node_info(self, cluster_name, node_name): f"cluster '{cluster_name}'") return node_info - def get_cluster_config(self, cluster_name): + def get_cluster_config(self): self._connect_tenant() + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + clusters = load_from_metadata( self.tenant_client, name=cluster_name, @@ -403,32 +405,30 @@ def get_cluster_config(self, cluster_name): vapp = VApp(self.tenant_client, href=clusters[0]['vapp_href']) template = self._get_template(name=clusters[0]['template']) - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() result = get_cluster_config(server_config, vapp, template['admin_password']) return result @secure(required_rights=[CSE_NATIVE_DEPLOY_RIGHT_NAME]) - def create_cluster(self, cluster_name, vdc_name, node_count, - storage_profile, network_name, template, **kwargs): - - # TODO(ClusterSpec) Create an inner class "ClusterSpec" - # in abstract_broker.py and have subclasses define and use it - # as instance variable. - # Method 'Create_cluster' in VcdBroker and PksBroker should take - # ClusterParams either as a param (or) - # read from instance variable (if needed only). - - if not network_name: - raise CseServerError(f"Cluster cannot be created. " - f"Please provide a valid value for org " - f"vDC network param.") - - LOGGER.debug(f"About to create cluster {cluster_name} on {vdc_name} " - f"with {node_count} nodes, sp={storage_profile}") - + def create_cluster(self): + required = [ + RequestKey.NETWORK_NAME + ] + utils.ensure_keys_in_dict(required, self.req_spec, dict_name='request') + + # check that requested/default template is valid + self._get_template(name=self.req_spec.get(RequestKey.TEMPLATE_NAME)) + + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + LOGGER.debug(f"About to create cluster {cluster_name} on " + f"{self.req_spec[RequestKey.OVDC_NAME]} with " + f"{self.req_spec[RequestKey.NUM_WORKERS]} worker nodes, " + f"storage profile=" + f"{self.req_spec[RequestKey.STORAGE_PROFILE_NAME]}") if not self._is_valid_name(cluster_name): raise CseServerError(f"Invalid cluster name '{cluster_name}'") + self._connect_tenant() self._connect_sys_admin() self.cluster_name = cluster_name @@ -492,7 +492,7 @@ def create_cluster_thread(self): f"({self.cluster_id})") vapp.reload() - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() try: add_nodes(1, template, NodeType.MASTER, server_config, self.tenant_client, org, vdc, vapp, self.req_spec) @@ -577,7 +577,8 @@ def create_cluster_thread(self): self._disconnect_sys_admin() @secure(required_rights=[CSE_NATIVE_DEPLOY_RIGHT_NAME]) - def delete_cluster(self, cluster_name): + def delete_cluster(self): + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] LOGGER.debug(f"About to delete cluster with name: {cluster_name}") self.cluster_name = cluster_name @@ -669,7 +670,7 @@ def create_nodes_thread(self): LOGGER.debug(f"About to add nodes to cluster with name: " f"{self.cluster_name}") try: - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() org_resource = self.tenant_client.get_org() org = Org(self.tenant_client, resource=org_resource) vdc = VDC(self.tenant_client, href=self.cluster['vdc_href']) @@ -789,7 +790,7 @@ def delete_nodes_thread(self): f" node(s) from " f"{self.cluster_name}({self.cluster_id})") try: - server_config = get_server_runtime_config() + server_config = utils.get_server_runtime_config() delete_nodes_from_cluster( server_config, vapp, @@ -838,7 +839,7 @@ def delete_nodes_thread(self): self._disconnect_sys_admin() @secure(required_rights=[CSE_NATIVE_DEPLOY_RIGHT_NAME]) - def resize_cluster(self, cluster_name, node_count, curr_cluster_info=None): + def resize_cluster(self, curr_cluster_info=None): """Resize the cluster of a given name to given number of worker nodes. :param str name: Name of the cluster @@ -849,20 +850,23 @@ def resize_cluster(self, cluster_name, node_count, curr_cluster_info=None): :return response: response returned by create_nodes() :rtype: dict """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] + num_workers = self.req_spec[RequestKey.NUM_WORKERS] + if curr_cluster_info: curr_worker_count = len(curr_cluster_info['nodes']) else: cluster = self.get_cluster_info(cluster_name=cluster_name) curr_worker_count = len(cluster['nodes']) - if curr_worker_count > node_count: + if curr_worker_count > num_workers: raise CseServerError(f"Automatic scale down is not supported for " f"vCD powered Kubernetes clusters. Use " f"'vcd cse delete node' command.") - elif curr_worker_count == node_count: + elif curr_worker_count == num_workers: raise CseServerError(f"Cluster - {cluster_name} is already at the " f"size of {curr_worker_count}.") - self.req_spec[RequestKey.NUM_WORKERS] = node_count - curr_worker_count + self.req_spec[RequestKey.NUM_WORKERS] = num_workers - curr_worker_count response = self.create_nodes() return response diff --git a/container_service_extension/vcdbroker_manager.py b/container_service_extension/vcdbroker_manager.py index 349b896ef..4ef15ea8f 100644 --- a/container_service_extension/vcdbroker_manager.py +++ b/container_service_extension/vcdbroker_manager.py @@ -5,10 +5,11 @@ from container_service_extension.exceptions import ClusterNotFoundError from container_service_extension.exceptions import CseDuplicateClusterError from container_service_extension.logger import SERVER_LOGGER as LOGGER +from container_service_extension.shared_constants import RequestKey from container_service_extension.vcdbroker import VcdBroker -class VcdBrokerManager(object): +class VcdBrokerManager: def __init__(self, tenant_auth_token, request_spec): self.tenant_auth_token = tenant_auth_token self.req_spec = request_spec @@ -20,22 +21,19 @@ def list_clusters(self): vcd_clusters.append(cluster) return vcd_clusters - def find_cluster_in_org(self, cluster_name, is_org_admin_search=False): + def find_cluster_in_org(self): """Invoke vCD broker to find the cluster in the org. - 'is_org_admin_search' is used here to prevent cluster creation with - same cluster-name by users within org. If it is true, - cluster list is filtered by the org name of the logged-in user. - If cluster found: Return a tuple of (cluster and the broker instance used to find the cluster) Else: (None, None) if cluster not found. """ + cluster_name = self.req_spec[RequestKey.CLUSTER_NAME] vcd_broker = VcdBroker(self.tenant_auth_token, self.req_spec) try: - return vcd_broker.get_cluster_info(cluster_name), vcd_broker + return vcd_broker.get_cluster_info(), vcd_broker except ClusterNotFoundError as err: # If a cluster is not found, then broker_manager will # decide if it wants to raise an error or ignore it if was it just diff --git a/system_tests/test_cse_client.py b/system_tests/test_cse_client.py index 2d20b2c5d..254010dff 100644 --- a/system_tests/test_cse_client.py +++ b/system_tests/test_cse_client.py @@ -180,11 +180,7 @@ def vcd_sys_admin(): yield - cmd = "logout" - result = env.CLI_RUNNER.invoke(vcd, cmd.split(), catch_exceptions=False) - assert result.exit_code == 0,\ - testutils.format_command_info('vcd', cmd, result.exit_code, - result.output) + result = env.CLI_RUNNER.invoke(vcd, ['logout']) @pytest.fixture @@ -218,9 +214,6 @@ def vcd_org_admin(): yield result = env.CLI_RUNNER.invoke(vcd, ['logout']) - assert result.exit_code == 0,\ - testutils.format_command_info('vcd', cmd, result.exit_code, - result.output) @pytest.fixture @@ -254,9 +247,6 @@ def vcd_vapp_author(): yield result = env.CLI_RUNNER.invoke(vcd, ['logout']) - assert result.exit_code == 0,\ - testutils.format_command_info('vcd', cmd, result.exit_code, - result.output) @pytest.fixture