Skip to content

Commit

Permalink
Add cluster request handler and refactor brokers to use request dicti…
Browse files Browse the repository at this point in the history
…onary (vmware#392)

Add cluster handler and refactor brokers to only use request dictionary
  • Loading branch information
andrew-ni authored Jul 31, 2019
1 parent 1b84cfd commit a0aeee5
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 304 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
language: python

python:
- '3.6'
- '3.7'

install:
- sudo mkdir -p /cse && sudo chmod a+rwx /cse
- pip install -r requirements.txt
- python setup.py develop
- pip install .
- pip install tox

script:
Expand Down
6 changes: 2 additions & 4 deletions container_service_extension/abstract_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ def get_cluster_info(self):
"""

@abc.abstractmethod
def get_cluster_config(self, cluster_name):
def get_cluster_config(self):
"""Get the configuration for the cluster.
:param: str cluster_name: Name of the cluster.
:return: Configuration of cluster
:rtype: dict
Expand All @@ -66,7 +64,7 @@ def list_clusters(self):
:return: response object
:rtype: dict
:rtype: list
"""
pass
Expand Down
156 changes: 29 additions & 127 deletions container_service_extension/broker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import container_service_extension.ovdc_utils as ovdc_utils
from container_service_extension.pksbroker import PKSBroker
from container_service_extension.pksbroker_manager import PksBrokerManager
from container_service_extension.server_constants import CseOperation
from container_service_extension.server_constants import K8S_PROVIDER_KEY
from container_service_extension.server_constants import K8sProvider
from container_service_extension.server_constants import PKS_CLUSTER_DOMAIN_KEY
Expand All @@ -21,19 +20,12 @@


# TODO(Constants)
# 1. Scan and classify all broker-related constants in server code into
# either common, vcd_broker_specific, pks_broker_specific constants.
# Design and refactor them into one or more relevant files.
# 2. Scan through both CSE client and server to identify HTTP request/response
# body params and define all of them as constants into a file
# from where both client and server can access them.
# 3. Refactor both client and server code accordingly
# 4. As part of refactoring, avoid accessing HTTP request body directly
# As part of refactoring, avoid accessing HTTP request body directly
# from VcdBroker and PksBroker. We should try to limit processing request to
# request_processor.py and broker_manager.py.


class BrokerManager(object):
class BrokerManager:
"""Manage calls to vCD and PKS brokers.
Handles:
Expand All @@ -45,126 +37,37 @@ def __init__(self, tenant_auth_token, request_spec):
self.tenant_auth_token = tenant_auth_token
self.req_spec = request_spec
self.pks_cache = get_pks_cache()
self.vcdbroker_manager = VcdBrokerManager(
tenant_auth_token, request_spec)
self.pksbroker_manager = PksBrokerManager(
tenant_auth_token, request_spec)
self.is_ovdc_present_in_request = False
self.vcdbroker_manager = VcdBrokerManager(tenant_auth_token, request_spec) # noqa: E501
self.pksbroker_manager = PksBrokerManager(tenant_auth_token, request_spec) # noqa: E501
self.is_ovdc_present_in_request = bool(request_spec.get(RequestKey.OVDC_NAME)) # noqa: E501

def invoke(self, op):
"""Invoke right broker(s) to perform the operation requested.
def _get_cluster_config(self):
_, broker = self._get_cluster_info()
return broker.get_cluster_config()

Might result in further (pre/post)processing on the request/result(s).
Depending on the operation requested, this method may do one or more
of below mentioned points.
1. Extract and construct the relevant params for the operation.
2. Choose right broker to perform the operation/method requested.
3. Scan through available brokers to aggregate (or) filter results.
:param CseOperation op: Operation to be performed by one of the
brokers.
:return result: result of the operation.
:rtype: dict
"""
result = {}
self.is_ovdc_present_in_request = bool(self.req_spec.get(RequestKey.OVDC_NAME)) # noqa: E501

if op == CseOperation.CLUSTER_CONFIG:
cluster_spec = \
{'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME)}
result = self._get_cluster_config(**cluster_spec)
elif op == CseOperation.CLUSTER_CREATE:
# TODO(ClusterSpec) Create an inner class "ClusterSpec"
# in abstract_broker.py and have subclasses define and use it
# as instance variable.
# Method 'Create_cluster' in VcdBroker and PksBroker should take
# ClusterSpec either as a param (or)
# read from instance variable (if needed only).
cluster_spec = {
'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME),
'vdc_name': self.req_spec.get(RequestKey.OVDC_NAME),
'org_name': self.req_spec.get(RequestKey.ORG_NAME),
'node_count': self.req_spec.get(RequestKey.NUM_WORKERS),
'storage_profile': self.req_spec.get(RequestKey.STORAGE_PROFILE_NAME), # noqa: E501
'network_name': self.req_spec.get(RequestKey.NETWORK_NAME),
'template': self.req_spec.get(RequestKey.TEMPLATE_NAME),
}
result = self._create_cluster(**cluster_spec)
elif op == CseOperation.CLUSTER_DELETE:
cluster_spec = \
{'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME)}
result = self._delete_cluster(**cluster_spec)
elif op == CseOperation.CLUSTER_INFO:
cluster_spec = \
{'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME)}
result = self._get_cluster_info(**cluster_spec)[0]
elif op == CseOperation.CLUSTER_LIST:
result = self._list_clusters()
elif op == CseOperation.CLUSTER_RESIZE:
cluster_spec = {
'cluster_name': self.req_spec.get(RequestKey.CLUSTER_NAME),
'node_count': self.req_spec.get(RequestKey.NUM_WORKERS)
}
result = self._resize_cluster(**cluster_spec)
elif op == CseOperation.NODE_CREATE:
# Currently node create is a vCD only operation.
broker = VcdBroker(self.tenant_auth_token, self.req_spec)
result = broker.create_nodes()
elif op == CseOperation.NODE_DELETE:
# Currently node delete is a vCD only operation.
broker = VcdBroker(self.tenant_auth_token, self.req_spec)
result = broker.delete_nodes()
elif op == CseOperation.NODE_INFO:
cluster_name = self.req_spec.get(RequestKey.CLUSTER_NAME)
node_name = self.req_spec.get(RequestKey.NODE_NAME)
# Currently node info is a vCD only operation.
broker = VcdBroker(self.tenant_auth_token, self.req_spec)
result = broker.get_node_info(cluster_name, node_name)

return result

def _get_cluster_config(self, **cluster_spec):
"""Get the cluster configuration.
:param str cluster_name: Name of cluster.
:return: Cluster config.
:rtype: str
"""
cluster, broker = self._get_cluster_info(**cluster_spec)
return broker.get_cluster_config(cluster_name=cluster['name'])

def _create_cluster(self, **cluster_spec):
cluster_name = cluster_spec['cluster_name']
vdc_name = cluster_spec.get('vdc_name')
org_name = cluster_spec.get('org_name')
def _create_cluster(self):
cluster_name = self.req_spec[RequestKey.CLUSTER_NAME]
# 'is_org_admin_search' is used here to prevent cluster creation with
# same cluster-name by users within org, which might span over to PKS.
# If it is true, cluster list is filtered by the org name of the
# logged-in user to check for duplicates.
cluster, _ = self._find_cluster_in_org(cluster_name,
is_org_admin_search=True)
cluster, _ = self._find_cluster_in_org(is_org_admin_search=True)
if cluster:
raise ClusterAlreadyExistsError(f"Cluster {cluster_name} "
f"already exists.")

ctr_prov_ctx = ovdc_utils.get_ovdc_k8s_provider_metadata(org_name=org_name, ovdc_name=vdc_name, include_credentials=True, include_nsxt_info=True) # noqa: E501
ctr_prov_ctx = ovdc_utils.get_ovdc_k8s_provider_metadata(org_name=self.req_spec[RequestKey.ORG_NAME], ovdc_name=self.req_spec[RequestKey.OVDC_NAME], include_credentials=True, include_nsxt_info=True) # noqa: E501
if ctr_prov_ctx.get(K8S_PROVIDER_KEY) == K8sProvider.PKS:
cluster_spec['pks_plan'] = ctr_prov_ctx[PKS_PLANS_KEY][0]
cluster_spec['pks_ext_host'] = f"{cluster_name}.{ctr_prov_ctx[PKS_CLUSTER_DOMAIN_KEY]}" # noqa: E501
self.req_spec[RequestKey.PKS_PLAN_NAME] = ctr_prov_ctx[PKS_PLANS_KEY][0] # noqa: E501
self.req_spec['pks_ext_host'] = f"{cluster_name}.{ctr_prov_ctx[PKS_CLUSTER_DOMAIN_KEY]}" # noqa: E501
broker = self._get_broker_based_on_ctr_prov_ctx(ctr_prov_ctx)
return broker.create_cluster(**cluster_spec)
return broker.create_cluster()

def _delete_cluster(self, **cluster_spec):
cluster, broker = self._get_cluster_info(**cluster_spec)
return broker.delete_cluster(cluster_name=cluster['name'])
def _delete_cluster(self):
_, broker = self._get_cluster_info()
return broker.delete_cluster()

def _get_cluster_info(self, **cluster_spec):
def _get_cluster_info(self):
"""Get cluster details directly from cloud provider.
Logic of the method is as follows.
Expand All @@ -180,12 +83,12 @@ def _get_cluster_info(self, **cluster_spec):
:rtype: tuple
"""
cluster_name = cluster_spec['cluster_name']
cluster_name = self.req_spec[RequestKey.CLUSTER_NAME]
if self.is_ovdc_present_in_request:
broker = self._get_broker_based_on_vdc()
return broker.get_cluster_info(cluster_name=cluster_name), broker
return broker.get_cluster_info(), broker
else:
cluster, broker = self._find_cluster_in_org(cluster_name)
cluster, broker = self._find_cluster_in_org()
if cluster:
return cluster, broker

Expand Down Expand Up @@ -221,16 +124,15 @@ def _list_clusters(self):

return result

def _resize_cluster(self, **cluster_spec):
cluster, broker = self._get_cluster_info(**cluster_spec)
return broker.resize_cluster(curr_cluster_info=cluster, **cluster_spec)
def _resize_cluster(self):
cluster, broker = self._get_cluster_info()
return broker.resize_cluster(curr_cluster_info=cluster)

def _find_cluster_in_org(self, is_org_admin_search=False):
cluster, broker = self.vcdbroker_manager.find_cluster_in_org()

def _find_cluster_in_org(self, cluster_name, is_org_admin_search=False):
cluster, broker = self.vcdbroker_manager.find_cluster_in_org(
cluster_name, is_org_admin_search)
if not cluster and is_pks_enabled():
cluster, broker = self.pksbroker_manager.find_cluster_in_org(
cluster_name, is_org_admin_search)
cluster, broker = self.pksbroker_manager.find_cluster_in_org(is_org_admin_search=is_org_admin_search) # noqa: E501

return cluster, broker

Expand Down
Loading

0 comments on commit a0aeee5

Please sign in to comment.