From 6e70ef8e566f53b98c2a6abab4704ce67bd6ceda Mon Sep 17 00:00:00 2001 From: Daniel Chaffelson Date: Wed, 15 Sep 2021 10:49:49 +0100 Subject: [PATCH] Move DFX Beta implementation to GA process Cdpy: * Rework cdpy/df to use the 'service' nomenclature instead of 'environment' for DataFlow * Allow separate submission of a name, env_crn, or df_crn for filtering the list of enabled DataFlow services * Allow submission of an env_crn or df_crn for the description of a Dataflow Service, as you cannot know the df_crn during creation until it is actually created * Standardise on use of df_crn for the service crn in DataFlow to provide differentiation from the env crn and other crns in CDP * Add new service enablement parameters to the dfx enable_service call * Add new terminate option to the DFX disable_service call * Change delete_environment to reset_service to reflect new usage * Squelch 403 response on describing a recently deleted Dataflow Service while it is being cleaned up cloudera.cloud: * renamed df.py to df_service.py to reflect that other DataFlow modules for Deployments will be needed in future * Include changes to submission parameters and method names for 'cdp df' calls in cdpy * Modify enable and disable service logic to match new methods and parameters * df_info now supports name, df_crn, or env_crn for filtering results * env_info once again checks for DataFlow during detailed Environment information collection * Added input validation for DataFlow service creation cloudera.exe: * Added defaults handling for new DataFlow submission parameters * renamed various methods and parameters to reflect updated DataFlow service naming and options Signed-off-by: Daniel Chaffelson --- src/cdpy/df.py | 63 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/src/cdpy/df.py b/src/cdpy/df.py index c18c1ab..40156a5 100644 --- a/src/cdpy/df.py +++ b/src/cdpy/df.py @@ -7,11 +7,11 @@ class CdpyDf(CdpSdkBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - def list_environments(self, only_enabled=False, name=None): + def list_services(self, only_enabled=False, env_crn=None, df_crn=None, name=None): result = self.sdk.call( - svc='df', func='list_environments', ret_field='environments', squelch=[ + svc='df', func='list_services', ret_field='services', squelch=[ Squelch(value='NOT_FOUND', default=list(), - warning='No Environments for DataFlow found in Tenant') + warning='No DataFlow Deployments found') ], pageSize=self.sdk.DEFAULT_PAGE_SIZE ) @@ -19,36 +19,57 @@ def list_environments(self, only_enabled=False, name=None): result = [x for x in result if x['status']['state'] not in ['NOT_ENABLED']] if name is not None: result = [x for x in result if x['name'] == name] + if df_crn is not None: + result = [x for x in result if x['crn'] == df_crn] + if env_crn is not None: + result = [x for x in result if x['environmentCrn'] == env_crn] return result - def describe_environment(self, env_crn: str = None): + def describe_service(self, df_crn: str = None, env_crn: str = None): + if df_crn is not None: + resolved_df_crn = df_crn + elif env_crn is not None: + services = self.list_services(env_crn=env_crn) + if len(services) == 0: + return None + elif len(services) == 1: + resolved_df_crn = services[0]['crn'] + else: + self.sdk.throw_error( + CdpError('More than one DataFlow service found for env_crn, please try list instead') + ) + else: + self.sdk.throw_error(CdpError("Either df_crn or env_crn must be supplied to df.describe_service")) return self.sdk.call( - svc='df', func='get_environment', ret_field='environment', squelch=[ + svc='df', func='describe_service', ret_field='service', squelch=[ Squelch(value='NOT_FOUND', - warning='No Environment with crn %s for DataFlow found in Tenant' % env_crn) + warning='No DataFlow Deployment with crn %s found' % df_crn), + Squelch(value='PERMISSION_DENIED') # DF GRPC sometimes returns 403 when finishing deletion ], - crn=env_crn + serviceCrn=resolved_df_crn ) - def enable_environment(self, env_crn: str, authorized_ips: list = None, min_nodes: int = 3, max_nodes: int = 3, - enable_public_ip: bool = True): + def enable_service(self, env_crn: str, lb_ips: list = None, min_nodes: int = 3, max_nodes: int = 3, + enable_public_ip: bool = True, kube_ips: list = None, cluster_subnets: list = None, + lb_subnets: list = None): self.sdk.validate_crn(env_crn) return self.sdk.call( - svc='df', func='enable_environment', ret_field='environment', - crn=env_crn, minK8sNodeCount=min_nodes, maxK8sNodeCount=max_nodes, - usePublicLoadBalancer=enable_public_ip, authorizedIpRanges=authorized_ips + svc='df', func='enable_service', ret_field='service', + environmentCrn=env_crn, minK8sNodeCount=min_nodes, maxK8sNodeCount=max_nodes, + usePublicLoadBalancer=enable_public_ip, kubeApiAuthorizedIpRanges=kube_ips, + loadBalancerAuthorizedIpRanges=lb_ips, clusterSubnets=cluster_subnets, loadBalancerSubnets=lb_subnets ) - def disable_environment(self, env_crn: str, persist: bool = False): - self.sdk.validate_crn(env_crn) + def disable_service(self, df_crn: str, persist: bool = False, terminate=False): + self.sdk.validate_crn(df_crn) return self.sdk.call( - svc='df', func='disable_environment', ret_field='status', ret_error=True, - crn=env_crn, persist=persist + svc='df', func='disable_service', ret_field='status', ret_error=True, + serviceCrn=df_crn, persist=persist, terminateDeployments=terminate ) - def delete_environment(self, env_crn: str): - self.sdk.validate_crn(env_crn) + def reset_service(self, df_crn: str): + self.sdk.validate_crn(df_crn) return self.sdk.call( - svc='df', func='delete_environment', - crn=env_crn - ) \ No newline at end of file + svc='df', func='reset_service', + serviceCrn=df_crn + )