diff --git a/.gitignore b/.gitignore index 8879f1230..cbd705191 100644 --- a/.gitignore +++ b/.gitignore @@ -101,3 +101,6 @@ _site/ cse_cache tests/private.config.yml + +# intellij metadata +.idea/* diff --git a/cluster_scripts/v2_x_tkgm/control_plane.sh b/cluster_scripts/v2_x_tkgm/control_plane.sh index e8ffde3cd..ee0d6d9d2 100644 --- a/cluster_scripts/v2_x_tkgm/control_plane.sh +++ b/cluster_scripts/v2_x_tkgm/control_plane.sh @@ -1,37 +1,102 @@ #!/usr/bin/env bash + +catch() {{ + vmtoolsd --cmd "info-set guestinfo.post_customization_script_execution_status $?" + error_message="$(date) $(caller): $BASH_COMMAND" + echo "$error_message" &>> /var/log/cse/customization/error.log + vmtoolsd --cmd "info-set guestinfo.post_customization_script_execution_failure_reason $error_message" +}} + +mkdir -p /var/log/cse/customization + +trap 'catch $? $LINENO' ERR + set -e +echo "$(date) This script was called with $1" &>> /var/log/cse/customization/status.log + +if [ "$1" == "precustomization" ] +then + echo "$(date) Exiting early since phase is [$1]" &>> /var/log/cse/customization/status.log + exit 0 +elif [ "$1" != "postcustomization" ] +then + echo "$(date) Exiting early since phase is [$1]" &>> /var/log/cse/customization/status.log + exit 0 +fi + +echo "$(date) Post Customization script execution in progress" &>> /var/log/cse/customization/status.log + kubeadm_config_path=/root/kubeadm-defaults.conf -vcloud_basic_auth_path=/root/vcloud-basic-auth.yaml vcloud_configmap_path=/root/vcloud-configmap.yaml vcloud_ccm_path=/root/cloud-director-ccm.yaml csi_driver_path=/root/csi-driver.yaml csi_controller_path=/root/csi-controller.yaml csi_node_path=/root/csi-node.yaml -# tag images -coredns_image_version="" -etcd_image_version="" -kubernetes_version="" -for image in "coredns" "etcd" "kube-proxy" "kube-apiserver" "kube-controller-manager" "kube-scheduler" -do - image_ref=$(ctr -n=k8s.io image list | cut -d" " -f1 | grep $image) - ref_path=$(echo $image_ref | sed 's/:.*//') - new_tag_version=$(echo $image_ref | sed 's/.*://' | sed 's/_/-/') - ctr -n=k8s.io image tag $image_ref $ref_path:$new_tag_version - - # save image tags for later - if [[ "$image" = "coredns" ]]; then - coredns_image_version=$new_tag_version - elif [[ "$image" = "etcd" ]]; then - etcd_image_version=$new_tag_version - elif [[ "$image" = "kube-proxy" ]]; then # selecting other kube-* images would work too - kubernetes_version=$new_tag_version + +# This is a simple command but its execution is crucial to kubeadm join. There are a few versions of ubuntu +# where the dbus.service is not started in a timely enough manner to set the hostname correctly. Hence +# this needs to be set by us. +vmtoolsd --cmd "info-set guestinfo.postcustomization.hostname.status in_progress" + hostnamectl set-hostname {vm_host_name} +vmtoolsd --cmd "info-set guestinfo.postcustomization.hostname.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.networkconfiguration.status in_progress" + echo 'net.ipv6.conf.all.disable_ipv6 = 1' >> /etc/sysctl.conf + echo 'net.ipv6.conf.default.disable_ipv6 = 1' >> /etc/sysctl.conf + echo 'net.ipv6.conf.lo.disable_ipv6 = 1' >> /etc/sysctl.conf + sudo sysctl -p + + # also remove ipv6 localhost entry from /etc/hosts + sed -i 's/::1/127.0.0.1/g' /etc/hosts || true +vmtoolsd --cmd "info-set guestinfo.postcustomization.networkconfiguration.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.store.sshkey.status in_progress" + ssh_key="{ssh_key}" + if [[ ! -z "$ssh_key" ]]; + then + mkdir -p /root/.ssh + echo $ssh_key >> /root/.ssh/authorized_keys + chmod -R go-rwx /root/.ssh fi -done +vmtoolsd --cmd "info-set guestinfo.postcustomization.store.sshkey.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.nameserverconfig.resolvconf.status in_progress" + echo 'nameserver 8.8.8.8 +nameserver 8.8.4.4 +nameserver 10.16.188.210 +nameserver 10.118.254.1' > /etc/resolv.conf +vmtoolsd --cmd "info-set guestinfo.postcustomization.nameserverconfig.resolvconf.status successful" -# create /root/kubeadm-defaults.conf -echo "--- + +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeinit.status in_progress" + # tag images + coredns_image_version="" + etcd_image_version="" + kubernetes_version="" + for image in "coredns" "etcd" "kube-proxy" "kube-apiserver" "kube-controller-manager" "kube-scheduler" + do + image_ref=$(ctr -n=k8s.io image list | cut -d" " -f1 | grep $image) + ref_path=$(echo $image_ref | sed 's/:.*//') + new_tag_version=$(echo $image_ref | sed 's/.*://' | sed 's/_/-/') + ctr -n=k8s.io image tag $image_ref $ref_path:$new_tag_version + + # save image tags for later + if [[ "$image" = "coredns" ]]; then + coredns_image_version=$new_tag_version + elif [[ "$image" = "etcd" ]]; then + etcd_image_version=$new_tag_version + elif [[ "$image" = "kube-proxy" ]]; then # selecting other kube-* images would work too + kubernetes_version=$new_tag_version + fi + done + + # create /root/kubeadm-defaults.conf + echo "--- apiVersion: kubeadm.k8s.io/v1beta2 kind: InitConfiguration bootstrapTokens: @@ -48,6 +113,7 @@ nodeRegistration: --- apiVersion: kubeadm.k8s.io/v1beta2 kind: ClusterConfiguration +controlPlaneEndpoint: "{control_plane_endpoint}" dns: type: CoreDNS imageRepository: projects.registry.vmware.com/tkg @@ -62,35 +128,43 @@ networking: imageRepository: projects.registry.vmware.com/tkg kubernetesVersion: $kubernetes_version ---" > /root/kubeadm-defaults.conf -kubeadm init --config $kubeadm_config_path > /root/kubeadm-init.out - -mkdir -p /root/.kube -cp -f /etc/kubernetes/admin.conf /root/.kube/config -chown $(id -u):$(id -g) /root/.kube/config - -kubectl apply -f https://github.com/vmware-tanzu/antrea/releases/download/v0.11.3/antrea.yml -systemctl restart kubelet -while [[ `systemctl is-active kubelet` != 'active' ]]; do echo 'waiting for kubelet'; sleep 5; done - -# Download cpi and csi yaml -#wget -O /root/vcloud-basic-auth.yaml https://raw.githubusercontent.com/vmware/cloud-provider-for-cloud-director/main/manifests/vcloud-basic-auth.yaml -#wget -O /root/vcloud-configmap.yaml https://raw.githubusercontent.com/vmware/cloud-provider-for-cloud-director/main/manifests/vcloud-configmap.yaml -#wget -O /root/cloud-director-ccm.yaml https://raw.githubusercontent.com/vmware/cloud-provider-for-cloud-director/main/manifests/cloud-director-ccm.yaml -# TODO: change to use main branch links -wget -O $vcloud_basic_auth_path https://raw.githubusercontent.com/ltimothy7/cloud-provider-for-cloud-director/auth_mount_internal/manifests/vcloud-basic-auth.yaml -wget -O $vcloud_configmap_path https://raw.githubusercontent.com/ltimothy7/cloud-provider-for-cloud-director/auth_mount_internal/manifests/vcloud-configmap.yaml -wget -O $vcloud_ccm_path https://raw.githubusercontent.com/ltimothy7/cloud-provider-for-cloud-director/auth_mount_internal/manifests/cloud-director-ccm.yaml -wget -O $csi_driver_path https://github.com/vmware/cloud-director-named-disk-csi-driver/raw/main/manifests/csi-driver.yaml -wget -O $csi_controller_path https://github.com/vmware/cloud-director-named-disk-csi-driver/raw/main/manifests/csi-controller.yaml -wget -O $csi_node_path https://github.com/vmware/cloud-director-named-disk-csi-driver/raw/main/manifests/csi-node.yaml - -# TODO: look into if not https vcd host -sed -i 's/BASE64_USERNAME/{base64_username}/; s/BASE64_PASSWORD/{base64_password}/' $vcloud_basic_auth_path -sed -i 's/VCD_HOST/"https:\/\/{vcd_host}"/; s/ORG/"{org}"/; s/OVDC/"{ovdc}"/; s/NETWORK/"{ovdc_network}"/; s/VIP_SUBNET_CIDR/"{vip_subnet_cidr_ip}\/{vip_subnet_cidr_suffix}"/; s/CLUSTER_ID/"{cluster_id}"/' $vcloud_configmap_path - -kubectl apply -f $vcloud_basic_auth_path -kubectl apply -f $vcloud_configmap_path -kubectl apply -f $vcloud_ccm_path -kubectl apply -f $csi_driver_path -kubectl apply -f $csi_controller_path -kubectl apply -f $csi_node_path + kubeadm init --config $kubeadm_config_path > /root/kubeadm-init.out + export KUBECONFIG=/etc/kubernetes/admin.conf +vmtoolsd --cmd "info-set guestinfo.kubeconfig $(cat /etc/kubernetes/admin.conf | base64 | tr -d '\n')" +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeinit.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubectl.apply.cni.status in_progress" + kubectl apply -f https://github.com/vmware-tanzu/antrea/releases/download/{antrea_cni_version}/antrea.yml +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubectl.apply.cni.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubectl.cpi.install.status in_progress" + # TODO: change to use main branch links + wget -O $vcloud_configmap_path https://raw.githubusercontent.com/ltimothy7/cloud-provider-for-cloud-director/auth_mount_internal/manifests/vcloud-configmap.yaml + wget -O $vcloud_ccm_path https://raw.githubusercontent.com/ltimothy7/cloud-provider-for-cloud-director/auth_mount_internal/manifests/cloud-director-ccm.yaml + + kubectl apply -f $vcloud_configmap_path + kubectl apply -f $vcloud_ccm_path +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubectl.cpi.install.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubectl.csi.install.status in_progress" + wget -O $csi_driver_path https://github.com/vmware/cloud-director-named-disk-csi-driver/raw/main/manifests/csi-driver.yaml + wget -O $csi_controller_path https://github.com/vmware/cloud-director-named-disk-csi-driver/raw/main/manifests/csi-controller.yaml + wget -O $csi_node_path https://github.com/vmware/cloud-director-named-disk-csi-driver/raw/main/manifests/csi-node.yaml + + kubectl apply -f $csi_driver_path + kubectl apply -f $csi_controller_path + kubectl apply -f $csi_node_path +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubectl.csi.install.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeadm.token.generate.status in_progress" + kubeadm_join_info=$(kubeadm token create --print-join-command 2> /dev/null) + vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeadm.token.info $kubeadm_join_info" +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeadm.token.generate.status successful" + + +echo "$(date) post customization script execution completed" &>> /var/log/cse/customization/status.log +exit 0 diff --git a/cluster_scripts/v2_x_tkgm/node.sh b/cluster_scripts/v2_x_tkgm/node.sh index e5e9e6565..40b2140ad 100644 --- a/cluster_scripts/v2_x_tkgm/node.sh +++ b/cluster_scripts/v2_x_tkgm/node.sh @@ -1,9 +1,96 @@ #!/usr/bin/env bash + +catch() {{ + vmtoolsd --cmd "info-set guestinfo.post_customization_script_execution_status $?" + error_message="$(date) $(caller): $BASH_COMMAND" + echo "$error_message" &>> /var/log/cse/customization/error.log + vmtoolsd --cmd "info-set guestinfo.post_customization_script_execution_failure_reason $error_message" +}} + +mkdir -p /var/log/cse/customization + +trap 'catch $? $LINENO' ERR + set -e -kubeadm_config_path=/root/kubeadm-defaults-join.conf +echo "$(date) This script was called with $1" &>> /var/log/cse/customization/status.log + +if [ "$1" == "precustomization" ] +then + echo "$(date) Exiting early since phase is [$1]" &>> /var/log/cse/customization/status.log + vmtoolsd --cmd "info-set guestinfo.precustomization.script.status successful" + exit 0 +elif [ "$1" != "postcustomization" ] +then + echo "$(date) Exiting early since phase is [$1]" &>> /var/log/cse/customization/status.log + exit 0 +fi + +echo "$(date) Post Customization script execution in progress" &>> /var/log/cse/customization/status.log + +# This is a simple command but its execution is crucial to kubeadm join. There are a few versions of ubuntu +# where the dbus.service is not started in a timely enough manner to set the hostname correctly. Hence +# this needs to be set by us. +vmtoolsd --cmd "info-set guestinfo.postcustomization.hostname.status in_progress" +hostnamectl set-hostname {vm_host_name} +vmtoolsd --cmd "info-set guestinfo.postcustomization.hostname.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.networkconfiguration.status in_progress" + echo 'net.ipv6.conf.all.disable_ipv6 = 1' >> /etc/sysctl.conf + echo 'net.ipv6.conf.default.disable_ipv6 = 1' >> /etc/sysctl.conf + echo 'net.ipv6.conf.lo.disable_ipv6 = 1' >> /etc/sysctl.conf + sudo sysctl -p + + # also remove ipv6 localhost entry from /etc/hosts + sed -i 's/::1/127.0.0.1/g' /etc/hosts || true +vmtoolsd --cmd "info-set guestinfo.postcustomization.networkconfiguration.status successful" + -echo "--- +vmtoolsd --cmd "info-set guestinfo.postcustomization.store.sshkey.status in_progress" + ssh_key="{ssh_key}" + if [[ ! -z "$ssh_key" ]]; + then + mkdir -p /root/.ssh + echo $ssh_key >> /root/.ssh/authorized_keys + chmod -R go-rwx /root/.ssh + fi +vmtoolsd --cmd "info-set guestinfo.postcustomization.store.sshkey.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.nameserverconfig.resolvconf.status in_progress" + echo 'nameserver 8.8.8.8 +nameserver 8.8.4.4 +nameserver 10.16.188.210 +nameserver 10.118.254.1' > /etc/resolv.conf +vmtoolsd --cmd "info-set guestinfo.postcustomization.nameserverconfig.resolvconf.status successful" + + +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeadm.node.join.status in_progress" + kubeadm_config_path=/root/kubeadm-defaults-join.conf + + # tag images + coredns_image_version="" + etcd_image_version="" + kubernetes_version="" + for image in "coredns" "etcd" "kube-proxy" "kube-apiserver" "kube-controller-manager" "kube-scheduler" + do + image_ref=$(ctr -n=k8s.io image list | cut -d" " -f1 | grep $image) + ref_path=$(echo $image_ref | sed 's/:.*//') + new_tag_version=$(echo $image_ref | sed 's/.*://' | sed 's/_/-/') + ctr -n=k8s.io image tag $image_ref $ref_path:$new_tag_version + + # save image tags for later + if [[ "$image" = "coredns" ]]; then + coredns_image_version=$new_tag_version + elif [[ "$image" = "etcd" ]]; then + etcd_image_version=$new_tag_version + elif [[ "$image" = "kube-proxy" ]]; then # selecting other kube-* images would work too + kubernetes_version=$new_tag_version + fi + done + + echo "--- apiVersion: kubeadm.k8s.io/v1beta2 kind: JoinConfiguration caCertPath: /etc/kubernetes/pki/ca.crt @@ -18,6 +105,12 @@ nodeRegistration: criSocket: /run/containerd/containerd.sock kubeletExtraArgs: cloud-provider: external -" > /root/kubeadm-defaults-join.conf +---" > /root/kubeadm-defaults-join.conf + + kubeadm join --config $kubeadm_config_path +vmtoolsd --cmd "info-set guestinfo.postcustomization.kubeadm.node.join.status successful" + +echo "$(date) post customization script execution completed" &>> /var/log/cse/customization/status.log + +exit 0 -kubeadm join --config $kubeadm_config_path diff --git a/container_service_extension/common/constants/server_constants.py b/container_service_extension/common/constants/server_constants.py index 1e0324bdb..7778e2f7a 100644 --- a/container_service_extension/common/constants/server_constants.py +++ b/container_service_extension/common/constants/server_constants.py @@ -768,10 +768,25 @@ class PostCustomizationStatus(Enum): SUCCESSFUL = 'successful' +@unique +class ToolsDeployPkgCustomizationStatus(Enum): + NONE = None + IN_PROGRESS = 'Started' + SUCCESSFUL = 'Successful' + + +@unique +class PreCustomizationPhase(Enum): + POST_BOOT_CUSTOMIZATION_SERVICE_SETUP = 'guestinfo.gc.status' + + @unique class PostCustomizationPhase(Enum): + HOSTNAME_SETUP = 'guestinfo.postcustomization.hostname.status' + NETWORK_CONFIGURATION = 'guestinfo.postcustomization.networkconfiguration.status' STORE_SSH_KEY = 'guestinfo.postcustomization.store.sshkey.status' KUBEADM_INIT = 'guestinfo.postcustomization.kubeinit.status' + NAMESERVER_SETUP = 'guestinfo.postcustomization.nameserverconfig.resolvconf.status' KUBECTL_APPLY_CNI = 'guestinfo.postcustomization.kubectl.apply.cni.status' # noqa: E501 KUBEADM_TOKEN_GENERATE = 'guestinfo.postcustomization.kubeadm.token.generate.status' # noqa: E501 KUBEADM_NODE_JOIN = 'guestinfo.postcustomization.kubeadm.node.join.status' diff --git a/container_service_extension/common/constants/shared_constants.py b/container_service_extension/common/constants/shared_constants.py index b9ab5eb94..7e000a819 100644 --- a/container_service_extension/common/constants/shared_constants.py +++ b/container_service_extension/common/constants/shared_constants.py @@ -40,7 +40,6 @@ class ClusterEntityKind(Enum): ClusterEntityKind.TKG_PLUS.value, ClusterEntityKind.TKG_M.value] - # Cluster runtimes and placement policies NATIVE_CLUSTER_RUNTIME_INTERNAL_NAME = 'native' TKG_PLUS_CLUSTER_RUNTIME_INTERNAL_NAME = 'tkgplus' @@ -60,7 +59,7 @@ class ClusterEntityKind(Enum): RUNTIME_INTERNAL_NAME_TO_DISPLAY_NAME_MAP = { NATIVE_CLUSTER_RUNTIME_INTERNAL_NAME: ClusterEntityKind.NATIVE.value, TKG_PLUS_CLUSTER_RUNTIME_INTERNAL_NAME: ClusterEntityKind.TKG_PLUS.value, - TKG_M_CLUSTER_RUNTIME_INTERNAL_NAME: ClusterEntityKind.TKG_M.value + TKG_M_CLUSTER_RUNTIME_INTERNAL_NAME: ClusterEntityKind.TKG_M.value, } # CSE Server Busy string diff --git a/container_service_extension/rde/backend/cluster_service_2_x_tkgm.py b/container_service_extension/rde/backend/cluster_service_2_x_tkgm.py index e7a1430c2..4a340096b 100644 --- a/container_service_extension/rde/backend/cluster_service_2_x_tkgm.py +++ b/container_service_extension/rde/backend/cluster_service_2_x_tkgm.py @@ -1,13 +1,14 @@ # container-service-extension # Copyright (c) 2021 VMware, Inc. All Rights Reserved. # SPDX-License-Identifier: BSD-2-Clause +import base64 from dataclasses import asdict import random import re import string import threading import time -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple import urllib import pkg_resources @@ -16,19 +17,20 @@ import pyvcloud.vcd.vapp as vcd_vapp from pyvcloud.vcd.vdc import VDC import pyvcloud.vcd.vm as vcd_vm -import semantic_version as semver from container_service_extension.common.constants.server_constants import CLUSTER_ENTITY # noqa: E501 +from container_service_extension.common.constants.server_constants import ToolsDeployPkgCustomizationStatus # noqa: E501 from container_service_extension.common.constants.server_constants import ClusterMetadataKey # noqa: E501 -from container_service_extension.common.constants.server_constants import ClusterScriptFile, TemplateScriptFile # noqa: E501 -from container_service_extension.common.constants.server_constants import CSE_CLUSTER_KUBECONFIG_PATH # noqa: E501 -from container_service_extension.common.constants.server_constants import DEFAULT_SUBNET_CIDR_IP # noqa: E501 -from container_service_extension.common.constants.server_constants import DEFAULT_SUBNET_CIDR_SUFFIX # noqa: E501 +from container_service_extension.common.constants.server_constants import ClusterScriptFile from container_service_extension.common.constants.server_constants import DefEntityOperation # noqa: E501 from container_service_extension.common.constants.server_constants import DefEntityOperationStatus # noqa: E501 from container_service_extension.common.constants.server_constants import DefEntityPhase # noqa: E501 +from container_service_extension.common.constants.server_constants import KUBE_CONFIG # noqa: E501 +from container_service_extension.common.constants.server_constants import KUBEADM_TOKEN_INFO # noqa: E501 from container_service_extension.common.constants.server_constants import LocalTemplateKey # noqa: E501 from container_service_extension.common.constants.server_constants import NodeType # noqa: E501 +from container_service_extension.common.constants.server_constants import PreCustomizationPhase # noqa: E501 +from container_service_extension.common.constants.server_constants import PostCustomizationPhase # noqa: E501 from container_service_extension.common.constants.server_constants import ThreadLocalData # noqa: E501 from container_service_extension.common.constants.server_constants import TKGM_DEFAULT_POD_NETWORK_CIDR # noqa: E501 from container_service_extension.common.constants.server_constants import TKGM_DEFAULT_SERVICE_CIDR # noqa: E501 @@ -42,9 +44,7 @@ from container_service_extension.common.utils.script_utils import get_cluster_script_file_contents # noqa: E501 import container_service_extension.common.utils.server_utils as server_utils import container_service_extension.common.utils.thread_utils as thread_utils -import container_service_extension.common.utils.vsphere_utils as vs_utils import container_service_extension.exception.exceptions as exceptions -import container_service_extension.installer.templates.local_template_manager as ltm # noqa: E501 import container_service_extension.lib.telemetry.constants as telemetry_constants # noqa: E501 import container_service_extension.lib.telemetry.telemetry_handler as telemetry_handler # noqa: E501 from container_service_extension.logging.logger import SERVER_LOGGER as LOGGER @@ -64,10 +64,13 @@ DEFAULT_API_VERSION = vcd_client.ApiVersion.VERSION_36.value +# Hardcode the Antrea CNI version until there's a better way to retrieve it +CNI_NAME = "antrea" +ANTREA_CNI_VERSION = 'v0.11.3' + CLUSTER_CREATE_OPERATION_MESSAGE = 'Cluster create' CLUSTER_RESIZE_OPERATION_MESSAGE = 'Cluster resize' CLUSTER_DELETE_OPERATION_MESSAGE = 'Cluster delete' -CLUSTER_UPGRADE_OPERATION_MESSAGE = 'Cluster upgrade' DOWNLOAD_KUBECONFIG_OPERATION_MESSAGE = 'Download kubeconfig' @@ -89,12 +92,13 @@ def __init__(self, ctx: RequestContext): self.mqtt_publisher: MQTTPublisher = ctx.mqtt_publisher cloudapi_client_v36 = self.context.get_cloudapi_client( api_version=DEFAULT_API_VERSION) - self.entity_svc = def_entity_svc.DefEntityService(cloudapi_client_v36) + self.entity_svc = def_entity_svc.DefEntityService( + cloudapi_client=cloudapi_client_v36) sysadmin_cloudapi_client_v36 = \ self.context.get_sysadmin_cloudapi_client( api_version=DEFAULT_API_VERSION) self.sysadmin_entity_svc = def_entity_svc.DefEntityService( - sysadmin_cloudapi_client_v36) + cloudapi_client=sysadmin_cloudapi_client_v36) def get_cluster_info(self, cluster_id: str) -> common_models.DefEntity: """Get the corresponding defined entity of the native cluster. @@ -195,36 +199,21 @@ def get_cluster_config(self, cluster_id: str): } ) - if curr_rde.externalId is None: - msg = f"Cannot find VApp href for cluster {curr_rde.name} " \ - f"with id {cluster_id}" - LOGGER.error(msg) - raise exceptions.CseServerError(msg) - - client_v36 = self.context.get_client(api_version=DEFAULT_API_VERSION) - vapp = vcd_vapp.VApp(client_v36, href=curr_rde.externalId) - control_plane_node_name = curr_native_entity.status.nodes.control_plane.name # noqa: E501 + # Get kube config from RDE + kube_config = None + if hasattr(curr_native_entity.status, + shared_constants.RDEProperty.PRIVATE.value) and \ + hasattr(curr_native_entity.status.private, + shared_constants.RDEProperty.KUBE_CONFIG.value): + kube_config = curr_native_entity.status.private.kube_config - LOGGER.debug(f"getting file from node {control_plane_node_name}") - password = vapp.get_admin_password(control_plane_node_name) - sysadmin_client_v36 = self.context.get_sysadmin_client( - api_version=DEFAULT_API_VERSION) - vs = vs_utils.get_vsphere(sysadmin_client_v36, vapp, - vm_name=control_plane_node_name, - logger=LOGGER) - vs.connect() - moid = vapp.get_vm_moid(control_plane_node_name) - vm = vs.get_vm_by_moid(moid) - result = vs.download_file_from_guest(vm, 'root', password, - CSE_CLUSTER_KUBECONFIG_PATH) - - if not result: + if not kube_config: msg = "Failed to get cluster kube-config" LOGGER.error(msg) raise exceptions.ClusterOperationError(msg) return self.mqtt_publisher.construct_behavior_payload( - message=result.content.decode(), + message=kube_config, status=BehaviorTaskStatus.SUCCESS.value) def create_cluster(self, entity_id: str, input_native_entity: rde_2_x.NativeEntity): # noqa: E501 @@ -242,7 +231,6 @@ def create_cluster(self, entity_id: str, input_native_entity: rde_2_x.NativeEnti cluster_name: Optional[str] = None org_name: Optional[str] = None ovdc_name: Optional[str] = None - input_native_entity = input_native_entity try: cluster_name = input_native_entity.metadata.name org_name = input_native_entity.metadata.org_name @@ -257,7 +245,7 @@ def create_cluster(self, entity_id: str, input_native_entity: rde_2_x.NativeEnti template_name=server_config['broker']['default_template_name'], # noqa: E501 template_revision=int(server_config['broker']['default_template_revision'])) # noqa: E501 template_name = input_native_entity.spec.distribution.template_name - template_revision = input_native_entity.spec.distribution.template_revision # noqa: E501 + template_revision = 1 # templateRevision for TKGm is always 1 # check that cluster name is syntactically valid if not _is_valid_cluster_name(cluster_name): @@ -278,14 +266,13 @@ def create_cluster(self, entity_id: str, input_native_entity: rde_2_x.NativeEnti f"Cluster '{cluster_name}' already exists.") # check that requested/default template is valid - template = _get_template( - name=template_name, revision=template_revision) + template = _get_tkgm_template(template_name) # TODO(DEF) design and implement telemetry VCDA-1564 defined entity # based clusters k8_distribution = rde_2_x.Distribution( template_name=template_name, - template_revision=template_revision + template_revision=1, ) cloud_properties = rde_2_x.CloudProperties( distribution=k8_distribution, @@ -302,12 +289,11 @@ def create_cluster(self, entity_id: str, input_native_entity: rde_2_x.NativeEnti template[LocalTemplateKey.KUBERNETES], template[LocalTemplateKey.KUBERNETES_VERSION]), cni=_create_k8s_software_string( - template[LocalTemplateKey.CNI], - template[LocalTemplateKey.CNI_VERSION]), - docker_version=template[LocalTemplateKey.DOCKER_VERSION], + CNI_NAME, + ANTREA_CNI_VERSION), os=template[LocalTemplateKey.OS], cloud_properties=cloud_properties, - uid=entity_id + uid=entity_id, ) msg = f"Creating cluster '{cluster_name}' " \ @@ -357,7 +343,8 @@ def create_cluster(self, entity_id: str, input_native_entity: rde_2_x.NativeEnti invoke_hooks=False) except Exception: msg = f"Failed to delete defined entity for cluster " \ - f"{cluster_name} ({entity_id})" + f"{cluster_name} ({entity_id} with state " \ + f"({curr_rde.state})" LOGGER.error(msg, exc_info=True) else: # update status to CREATE:FAILED @@ -398,24 +385,18 @@ def resize_cluster(self, cluster_id: str, curr_native_entity.status, server_utils.get_rde_version_in_use()) curr_worker_count: int = current_spec.topology.workers.count - curr_nfs_count: int = current_spec.topology.nfs.count phase: DefEntityPhase = DefEntityPhase.from_phase( curr_native_entity.status.phase) - # compute the values of workers and nfs to be added or removed by + # compute the values of workers to be added or removed by # comparing the desired and the current state. "num_workers_to_add" # can hold either +ve or -ve value. desired_worker_count: int = input_native_entity.spec.topology.workers.count # noqa: E501 - desired_nfs_count: int = input_native_entity.spec.topology.nfs.count num_workers_to_add: int = desired_worker_count - curr_worker_count - num_nfs_to_add: int = desired_nfs_count - curr_nfs_count if desired_worker_count < 0: raise exceptions.CseServerError( f"Worker count must be >= 0 (received {desired_worker_count})") - if num_nfs_to_add < 0: - raise exceptions.CseServerError( - "Scaling down nfs nodes is not supported") # Check for unexposing the cluster desired_expose_state: bool = \ @@ -423,13 +404,12 @@ def resize_cluster(self, cluster_id: str, is_exposed: bool = current_spec.settings.network.expose unexpose: bool = is_exposed and not desired_expose_state - # Check if the desired worker and nfs count is valid and raise + # Check if the desired worker count is valid and raise # an exception if the cluster does not need to be unexposed - if not unexpose and num_workers_to_add == 0 and num_nfs_to_add == 0: + if not unexpose and num_workers_to_add == 0: raise exceptions.CseServerError( f"Cluster '{cluster_name}' already has {desired_worker_count} " - f"workers and {desired_nfs_count} nfs nodes and is " - f"already not exposed.") + f"workers and is already not exposed.") # check if cluster is in a valid state if state != def_constants.DEF_RESOLVED_STATE or phase.is_entity_busy(): @@ -453,10 +433,11 @@ def resize_cluster(self, cluster_id: str, # update the task and defined entity. msg = f"Resizing the cluster '{cluster_name}' ({cluster_id}) to the " \ - f"desired worker count {desired_worker_count} and " \ - f"nfs count {desired_nfs_count}" + f"desired worker count {desired_worker_count}" + if unexpose: msg += " and unexposing the cluster" + self._update_task(BehaviorTaskStatus.RUNNING, message=msg) # set entity status to busy new_status: rde_2_x.Status = curr_native_entity.status @@ -542,125 +523,15 @@ def delete_cluster(self, cluster_id): message=f"{CLUSTER_DELETE_OPERATION_MESSAGE} ({cluster_id})", status=BehaviorTaskStatus.RUNNING.value, progress=5) - def get_cluster_upgrade_plan(self, cluster_id: str): + def get_cluster_upgrade_plan(self, cluster_id: str) -> List[Dict]: """Get the template names/revisions that the cluster can upgrade to. :param str cluster_id: :return: A list of dictionaries with keys defined in LocalTemplateKey - :rtype: List[Dict] + :rtype: Null list since TKGm upgrades are not supported yet """ - # TODO: Make use of current entity in the behavior payload - # Get entity required here to retrieve the cluster upgrade plan - curr_rde = self.entity_svc.get_entity(cluster_id) - curr_native_entity: rde_2_x.NativeEntity = curr_rde.entity - telemetry_handler.record_user_action_details( - cse_operation=telemetry_constants.CseOperation.V36_CLUSTER_UPGRADE_PLAN, # noqa: E501 - cse_params={ - CLUSTER_ENTITY: curr_rde, - telemetry_constants.PayloadKey.SOURCE_DESCRIPTION: thread_local_data.get_thread_local_data(ThreadLocalData.USER_AGENT) # noqa: E501 - } - ) - return _get_cluster_upgrade_target_templates( - curr_native_entity.status.cloud_properties.distribution.template_name, # noqa: E501 - str(curr_native_entity.status.cloud_properties.distribution.template_revision)) # noqa: E501 - - def upgrade_cluster(self, cluster_id: str, - input_native_entity: rde_2_x.NativeEntity): - """Start the upgrade cluster operation. - - Upgrading cluster is an asynchronous task, so the returned - `result['task_href']` can be polled to get updates on task progress. - - :param str cluster_id: id of the cluster to be upgraded - :param rde_2_x.NativeEntity input_native_entity: cluster spec with new - kubernetes distribution and revision - - :return: dictionary representing mqtt response published - :rtype: dict - """ - # TODO: Make use of current entity in the behavior payload - curr_rde = self.entity_svc.get_entity(cluster_id) - curr_native_entity: rde_2_x.NativeEntity = curr_rde.entity - cluster_name = curr_native_entity.metadata.name - new_template_name = input_native_entity.spec.distribution.template_name - new_template_revision = input_native_entity.spec.distribution.template_revision # noqa: E501 - - # check if cluster is in a valid state - phase: DefEntityPhase = DefEntityPhase.from_phase( - curr_native_entity.status.phase) - - state = curr_rde.state - if state != def_constants.DEF_RESOLVED_STATE or phase.is_entity_busy(): - raise exceptions.CseServerError( - f"Cluster {cluster_name} with id {cluster_id} is not in a " - f"valid state to be upgraded. Please contact administrator.") - - # check that the specified template is a valid upgrade target - template = {} - valid_templates = _get_cluster_upgrade_target_templates( - curr_native_entity.status.cloud_properties.distribution.template_name, # noqa: E501 - str(curr_native_entity.status.cloud_properties.distribution.template_revision)) # noqa: E501 - - for t in valid_templates: - if (t[LocalTemplateKey.NAME], str(t[LocalTemplateKey.REVISION])) == (new_template_name, str(new_template_revision)): # noqa: E501 - template = t - break - if not template: - try: - self._fail_operation(cluster_id, DefEntityOperation.UPGRADE) - except Exception: - msg = f"Failed to update defined entity status for cluster {cluster_id}" # noqa: E501 - LOGGER.error(msg, exc_info=True) - # TODO all of these e.CseServerError instances related to request - # should be changed to BadRequestError (400) - raise exceptions.CseServerError( - f"Specified template/revision ({new_template_name} revision " - f"{new_template_revision}) is not a valid upgrade target for " - f"cluster '{cluster_name}'.") - - telemetry_handler.record_user_action_details( - telemetry_constants.CseOperation.V36_CLUSTER_UPGRADE, - cse_params={ - CLUSTER_ENTITY: curr_rde, - telemetry_constants.PayloadKey.SOURCE_DESCRIPTION: thread_local_data.get_thread_local_data(ThreadLocalData.USER_AGENT) # noqa: E501 - } - ) - - msg = f"Upgrading cluster '{cluster_name}' " \ - f"software to match template {new_template_name} (revision " \ - f"{new_template_revision}): Kubernetes: " \ - f"{input_native_entity.status.kubernetes} -> " \ - f"{template[LocalTemplateKey.KUBERNETES_VERSION]}, Docker-CE: " \ - f"{input_native_entity.status.docker_version} -> " \ - f"{template[LocalTemplateKey.DOCKER_VERSION]}, CNI: " \ - f"{input_native_entity.status.cni} -> " \ - f"{template[LocalTemplateKey.CNI_VERSION]}" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - LOGGER.info(f"{msg} ({curr_rde.externalId})") - - new_status: rde_2_x.Status = input_native_entity.status - new_status.phase = str( - DefEntityPhase(DefEntityOperation.UPGRADE, DefEntityOperationStatus.IN_PROGRESS)) # noqa: E501 - new_status.task_href = self.task_href - try: - self._update_cluster_entity(cluster_id, - new_status) - except Exception as err: - self._update_task(BehaviorTaskStatus.ERROR, - message=msg, - error_message=str(err)) - LOGGER.error(str(err), exc_info=True) - raise - - self.context.is_async = True - self._upgrade_cluster_async(cluster_id=cluster_id, - template=template) - # TODO(test-upgrade): Verify if multiple messages are not published - # in update_cluster() - return self.mqtt_publisher.construct_behavior_payload( - message=f"{CLUSTER_UPGRADE_OPERATION_MESSAGE} ({cluster_id})", - status=BehaviorTaskStatus.RUNNING.value, progress=5) + return [] def update_cluster(self, cluster_id: str, input_native_entity: rde_2_x.NativeEntity): # noqa: E501 """Start the update cluster operation (resize or upgrade). @@ -670,7 +541,7 @@ def update_cluster(self, cluster_id: str, input_native_entity: rde_2_x.NativeEnt :param str cluster_id: id of the cluster to be updated :param rde_2_x.NativeEntity input_native_entity: cluster spec with new - worker/nfs node count or new kubernetes distribution and revision + worker node count or new kubernetes distribution and revision :return: dictionary representing mqtt response published :rtype: dict @@ -683,11 +554,9 @@ def update_cluster(self, cluster_id: str, input_native_entity: rde_2_x.NativeEnt curr_native_entity.status, server_utils.get_rde_version_in_use()) current_workers_count = current_spec.topology.workers.count - current_nfs_count = current_spec.topology.nfs.count desired_workers_count = input_native_entity.spec.topology.workers.count - desired_nfs_count = input_native_entity.spec.topology.nfs.count - if current_workers_count != desired_workers_count or current_nfs_count != desired_nfs_count: # noqa: E501 + if current_workers_count != desired_workers_count: return self.resize_cluster(cluster_id, input_native_entity) current_template_name = current_spec.distribution.template_name @@ -695,7 +564,7 @@ def update_cluster(self, cluster_id: str, input_native_entity: rde_2_x.NativeEnt desired_template_name = input_native_entity.spec.distribution.template_name # noqa: E501 desired_template_revision = input_native_entity.spec.distribution.template_revision # noqa: E501 if current_template_name != desired_template_name or current_template_revision != desired_template_revision: # noqa: E501 - return self.upgrade_cluster(cluster_id, input_native_entity) + raise Exception("Looks like an upgrade is required. Upgrades not supported for TKGm in this version of CSE") raise exceptions.CseServerError("update not supported for the specified input specification") # noqa: E501 def get_cluster_acl_info(self, cluster_id, page: int, page_size: int): @@ -826,7 +695,6 @@ def delete_nodes(self, cluster_id: str, nodes_to_del=None): self.context.is_async = True self._monitor_delete_nodes(cluster_id=cluster_id, nodes_to_del=nodes_to_del) - msg = f"Deleting NFS nodes: {nodes_to_del} for cluster {curr_rde.name} ({cluster_id})" # noqa: E501 return self.mqtt_publisher.construct_behavior_payload( message=msg, status=BehaviorTaskStatus.RUNNING.value, progress=5) @@ -851,9 +719,6 @@ def _create_cluster_async(self, cluster_id: str, worker_sizing_class = input_native_entity.spec.topology.workers.sizing_class # noqa: E501 control_plane_storage_profile = input_native_entity.spec.topology.control_plane.storage_profile # noqa: E501 worker_storage_profile = input_native_entity.spec.topology.workers.storage_profile # noqa: E501 - nfs_count = input_native_entity.spec.topology.nfs.count - nfs_sizing_class = input_native_entity.spec.topology.nfs.sizing_class # noqa: E501 - nfs_storage_profile = input_native_entity.spec.topology.nfs.storage_profile # noqa: E501 network_name = input_native_entity.spec.settings.ovdc_network template_name = input_native_entity.spec.distribution.template_name # noqa: E501 template_revision = input_native_entity.spec.distribution.template_revision # noqa: E501 @@ -881,22 +746,25 @@ def _create_cluster_async(self, cluster_id: str, f"Error while creating vApp: {err}") client_v36.get_task_monitor().wait_for_status(vapp_resource.Tasks.Task[0]) # noqa: E501 - template = _get_template(template_name, template_revision) + template = _get_tkgm_template(template_name) LOGGER.debug(f"Setting metadata on cluster vApp '{cluster_name}'") tags = { ClusterMetadataKey.CLUSTER_ID: cluster_id, ClusterMetadataKey.CSE_VERSION: pkg_resources.require('container-service-extension')[0].version, # noqa: E501 ClusterMetadataKey.TEMPLATE_NAME: template[LocalTemplateKey.NAME], # noqa: E501 - ClusterMetadataKey.TEMPLATE_REVISION: template[LocalTemplateKey.REVISION], # noqa: E501 + ClusterMetadataKey.TEMPLATE_REVISION: 1, # templateRevision is hardcoded as 1 for TKGm ClusterMetadataKey.OS: template[LocalTemplateKey.OS], - ClusterMetadataKey.DOCKER_VERSION: template[LocalTemplateKey.DOCKER_VERSION], # noqa: E501 ClusterMetadataKey.KUBERNETES: template[LocalTemplateKey.KUBERNETES], # noqa: E501 ClusterMetadataKey.KUBERNETES_VERSION: template[LocalTemplateKey.KUBERNETES_VERSION], # noqa: E501 - ClusterMetadataKey.CNI: template[LocalTemplateKey.CNI], - ClusterMetadataKey.CNI_VERSION: template[LocalTemplateKey.CNI_VERSION] # noqa: E501 + ClusterMetadataKey.CNI: CNI_NAME, + ClusterMetadataKey.CNI_VERSION: ANTREA_CNI_VERSION # noqa: E501 } - vapp = vcd_vapp.VApp(client_v36, + + sysadmin_client_v36 = self.context.get_sysadmin_client( + api_version=DEFAULT_API_VERSION) + # Extra config elements of VApp are visible only for admin client + vapp = vcd_vapp.VApp(sysadmin_client_v36, href=vapp_resource.get('href')) task = vapp.set_multiple_metadata(tags) client_v36.get_task_monitor().wait_for_status(task) @@ -908,115 +776,69 @@ def _create_cluster_async(self, cluster_id: str, vapp.reload() server_config = server_utils.get_server_runtime_config() catalog_name = server_config['broker']['catalog'] - sysadmin_client_v36 = self.context.get_sysadmin_client( - api_version=DEFAULT_API_VERSION) + + msg = f"Adding control plane node for '{cluster_name}' ({cluster_id})" # noqa: E501 + LOGGER.debug(msg) + self._update_task(BehaviorTaskStatus.RUNNING, message=msg) + vapp.reload() + try: - _add_nodes(sysadmin_client_v36, - num_nodes=1, - node_type=NodeType.CONTROL_PLANE, - org=org, - vdc=vdc, - vapp=vapp, - catalog_name=catalog_name, - template=template, - network_name=network_name, - storage_profile=control_plane_storage_profile, - ssh_key=ssh_key, - sizing_class_name=control_plane_sizing_class) + expose_ip, _ = _add_control_plane_nodes( + sysadmin_client_v36, + num_nodes=1, + org=org, + vdc=vdc, + vapp=vapp, + catalog_name=catalog_name, + template=template, + network_name=network_name, + storage_profile=control_plane_storage_profile, + ssh_key=ssh_key, + sizing_class_name=control_plane_sizing_class, + expose=expose, + cluster_name=cluster_name, + cluster_id=cluster_id + ) except Exception as err: LOGGER.error(err, exc_info=True) raise exceptions.ControlPlaneNodeCreationError( f"Error adding control plane node: {err}") - - msg = f"Initializing cluster '{cluster_name}' ({cluster_id})" - LOGGER.debug(msg) - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) vapp.reload() - control_plane_ip = _get_control_plane_ip( - sysadmin_client_v36, vapp, check_tools=True, + control_plane_join_cmd = _get_join_cmd( + sysadmin_client=sysadmin_client_v36, + vapp=vapp ) - # Handle exposing cluster - if expose: - try: - expose_ip = nw_exp_helper.expose_cluster( - client=self.context.client, - org_name=org_name, - ovdc_name=ovdc_name, - network_name=network_name, - cluster_name=cluster_name, - cluster_id=cluster_id, - internal_ip=control_plane_ip) - if expose_ip: - control_plane_ip = expose_ip - except Exception as err: - LOGGER.error( - f"Exposing cluster failed: {str(err)}", exc_info=True - ) - expose_ip = '' - - _init_cluster(sysadmin_client_v36, - vapp, - native_entity=input_native_entity, - cluster_id=cluster_id, - expose_ip=expose_ip) - task = vapp.set_metadata('GENERAL', 'READWRITE', 'cse.master.ip', - control_plane_ip) - client_v36.get_task_monitor().wait_for_status(task) - msg = f"Creating {num_workers} node(s) for cluster " \ f"'{cluster_name}' ({cluster_id})" LOGGER.debug(msg) self._update_task(BehaviorTaskStatus.RUNNING, message=msg) try: - _add_nodes(sysadmin_client_v36, - num_nodes=num_workers, - node_type=NodeType.WORKER, - org=org, - vdc=vdc, - vapp=vapp, - catalog_name=catalog_name, - template=template, - network_name=network_name, - storage_profile=worker_storage_profile, - ssh_key=ssh_key, - sizing_class_name=worker_sizing_class) + _add_worker_nodes( + sysadmin_client_v36, + num_nodes=num_workers, + org=org, + vdc=vdc, + vapp=vapp, + catalog_name=catalog_name, + template=template, + network_name=network_name, + storage_profile=worker_storage_profile, + ssh_key=ssh_key, + sizing_class_name=worker_sizing_class, + control_plane_join_cmd=control_plane_join_cmd + ) except Exception as err: LOGGER.error(err, exc_info=True) raise exceptions.WorkerNodeCreationError( f"Error creating worker node: {err}") - msg = f"Adding {num_workers} node(s) to cluster " \ + msg = f"Added {num_workers} node(s) to cluster " \ f"'{cluster_name}' ({cluster_id})" LOGGER.debug(msg) self._update_task(BehaviorTaskStatus.RUNNING, message=msg) vapp.reload() - _join_cluster(sysadmin_client_v36, vapp) - - if nfs_count > 0: - msg = f"Creating {nfs_count} NFS nodes for cluster " \ - f"'{cluster_name}' ({cluster_id})" - LOGGER.debug(msg) - # TODO should this task be commented out? - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - try: - _add_nodes(sysadmin_client_v36, - num_nodes=nfs_count, - node_type=NodeType.NFS, - org=org, - vdc=vdc, - vapp=vapp, - catalog_name=catalog_name, - template=template, - network_name=network_name, - storage_profile=nfs_storage_profile, - ssh_key=ssh_key, - sizing_class_name=nfs_sizing_class) - except Exception as err: - LOGGER.error(err, exc_info=True) - raise exceptions.NFSNodeCreationError( - f"Error creating NFS node: {err}") # Update defined entity instance with new properties like vapp_id, # control plane_ip and nodes. @@ -1026,6 +848,13 @@ def _create_cluster_async(self, cluster_id: str, curr_rde: common_models.DefEntity = self.entity_svc.get_entity(cluster_id) # noqa: E501 curr_native_entity: rde_2_x.NativeEntity = curr_rde.entity new_status: rde_2_x.Status = curr_native_entity.status + new_status.private = rde_2_x.Private( + kube_token=control_plane_join_cmd, + kube_config=_get_kube_config_from_control_plane_vm( + sysadmin_client=sysadmin_client_v36, + vapp=vapp + ) + ) new_status.uid = cluster_id new_status.phase = str( DefEntityPhase( @@ -1035,6 +864,11 @@ def _create_cluster_async(self, cluster_id: str, ) new_status.nodes = _get_nodes_details( sysadmin_client_v36, vapp) + new_status.cni = _create_k8s_software_string( + CNI_NAME, + ANTREA_CNI_VERSION, + ) + new_status.cloud_properties.distribution.template_revision = tags[ClusterMetadataKey.TEMPLATE_REVISION] # Update status with exposed ip if expose_ip: @@ -1054,7 +888,6 @@ def _create_cluster_async(self, cluster_id: str, return msg except (exceptions.ControlPlaneNodeCreationError, exceptions.WorkerNodeCreationError, - exceptions.NFSNodeCreationError, exceptions.ClusterJoiningError, exceptions.ClusterInitializationError, exceptions.ClusterOperationError) as err: @@ -1118,7 +951,8 @@ def _create_cluster_async(self, cluster_id: str, invoke_hooks=False) except Exception: LOGGER.error("Failed to delete the defined entity for " - f"cluster '{cluster_name}'", exc_info=True) + f"cluster '{cluster_name}' with state " + f"'{curr_rde.state}'", exc_info=True) self._update_task(BehaviorTaskStatus.ERROR, message=msg, @@ -1177,19 +1011,14 @@ def _monitor_resize(self, cluster_id: str, input_native_entity: rde_2_x.NativeEn curr_native_entity.status, server_utils.get_rde_version_in_use()) curr_worker_count: int = current_spec.topology.workers.count - curr_nfs_count: int = current_spec.topology.nfs.count template_name = current_spec.distribution.template_name - template_revision = current_spec.distribution.template_revision desired_worker_count: int = \ input_native_entity.spec.topology.workers.count - desired_nfs_count: int = \ - input_native_entity.spec.topology.nfs.count num_workers_to_add: int = desired_worker_count - curr_worker_count - num_nfs_to_add: int = desired_nfs_count - curr_nfs_count - if num_workers_to_add > 0 or num_nfs_to_add > 0: - _get_template(name=template_name, revision=template_revision) + if num_workers_to_add > 0: + _get_tkgm_template(template_name) self._create_nodes_async(input_native_entity) # TODO Below is the temporary fix to avoid parallel Recompose @@ -1235,10 +1064,10 @@ def _monitor_resize(self, cluster_id: str, input_native_entity: rde_2_x.NativeEn control_plane_internal_ip = _get_control_plane_ip( sysadmin_client=self.context.sysadmin_client, vapp=vapp, - check_tools=True) + ) # update kubeconfig with internal ip - self._replace_kubeconfig_expose_ip( + updated_kube_config = self._replace_kubeconfig_expose_ip( internal_ip=control_plane_internal_ip, cluster_id=cluster_id, vapp=vapp) @@ -1262,12 +1091,14 @@ def _monitor_resize(self, cluster_id: str, input_native_entity: rde_2_x.NativeEn curr_native_entity.status.cloud_properties.exposed = False curr_native_entity.status.external_ip = None + curr_native_entity.status.private.kube_config = updated_kube_config # noqa: E501 unexpose_success = True except Exception as err: LOGGER.error( f"Failed to unexpose cluster with error: {str(err)}", exc_info=True ) + raise Exception("Unexpose of exposed cluster is not a valid operation.") # update the defined entity and the task status. Check if one of # the child threads had set the status to ERROR. @@ -1282,8 +1113,7 @@ def _monitor_resize(self, cluster_id: str, input_native_entity: rde_2_x.NativeEn DefEntityOperationStatus.FAILED)) else: msg = f"Resized the cluster '{cluster_name}' ({cluster_id}) " \ - f"to the desired worker count {desired_worker_count} " \ - f"and nfs count {desired_nfs_count}" + f"to the desired worker count {desired_worker_count} " if unexpose_success: msg += " and un-exposed the cluster" elif unexpose and not unexpose_success: @@ -1329,7 +1159,7 @@ def _monitor_resize(self, cluster_id: str, input_native_entity: rde_2_x.NativeEn @thread_utils.run_async def _create_nodes_async(self, input_native_entity: rde_2_x.NativeEntity): - """Create worker and/or nfs nodes in vCD. + """Create worker nodes in vCD. This method is executed by a thread in an asynchronous manner. Do's: @@ -1364,26 +1194,21 @@ def _create_nodes_async(self, input_native_entity: rde_2_x.NativeEntity): org_name = curr_native_entity.metadata.org_name ovdc_name = curr_native_entity.metadata.virtual_data_center_name curr_worker_count: int = current_spec.topology.workers.count - curr_nfs_count: int = current_spec.topology.nfs.count # use the same settings with which cluster was originally created # viz., template, storage_profile, and network among others. worker_storage_profile = input_native_entity.spec.topology.workers.storage_profile # noqa: E501 worker_sizing_class = input_native_entity.spec.topology.workers.sizing_class # noqa: E501 - nfs_storage_profile = input_native_entity.spec.topology.nfs.storage_profile # noqa: E501 - nfs_sizing_class = input_native_entity.spec.topology.nfs.sizing_class # noqa: E501 network_name = input_native_entity.spec.settings.ovdc_network ssh_key = input_native_entity.spec.settings.ssh_key rollback = input_native_entity.spec.settings.rollback_on_failure template_name = input_native_entity.spec.distribution.template_name template_revision = input_native_entity.spec.distribution.template_revision # noqa: E501 - template = _get_template(template_name, template_revision) + template = _get_tkgm_template(template_name) - # compute the values of workers and nfs to be added or removed + # compute the values of workers to be added or removed desired_worker_count: int = input_native_entity.spec.topology.workers.count # noqa: E501 num_workers_to_add = desired_worker_count - curr_worker_count - desired_nfs_count = input_native_entity.spec.topology.nfs.count - num_nfs_to_add = desired_nfs_count - curr_nfs_count server_config = server_utils.get_server_runtime_config() catalog_name = server_config['broker']['catalog'] @@ -1391,7 +1216,8 @@ def _create_nodes_async(self, input_native_entity: rde_2_x.NativeEntity): api_version=DEFAULT_API_VERSION) org = vcd_utils.get_org(client_v36, org_name=org_name) ovdc = vcd_utils.get_vdc(client_v36, vdc_name=ovdc_name, org=org) - vapp = vcd_vapp.VApp(client_v36, href=vapp_href) + # Extra config elements of VApp are visible only for admin client + vapp = vcd_vapp.VApp(sysadmin_client_v36, href=vapp_href) if num_workers_to_add > 0: msg = f"Creating {num_workers_to_add} workers from template" \ @@ -1399,10 +1225,22 @@ def _create_nodes_async(self, input_native_entity: rde_2_x.NativeEntity): f"adding to cluster '{cluster_name}' ({cluster_id})" LOGGER.debug(msg) self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - worker_nodes = _add_nodes( + + msg = f"Adding {num_workers_to_add} node(s) to cluster " \ + f"{cluster_name}({cluster_id})" + self._update_task(BehaviorTaskStatus.RUNNING, message=msg) + + # Get join cmd from RDE;fallback to control plane extra config # noqa: E501 + control_plane_join_cmd = '' + if hasattr(curr_native_entity.status, + shared_constants.RDEProperty.PRIVATE.value) \ + and hasattr(curr_native_entity.status.private, + shared_constants.RDEProperty.KUBE_TOKEN.value): # noqa: E501 + control_plane_join_cmd = curr_native_entity.status.private.kube_token # noqa: E501 + + _add_worker_nodes( sysadmin_client_v36, num_nodes=num_workers_to_add, - node_type=NodeType.WORKER, org=org, vdc=ovdc, vapp=vapp, @@ -1411,43 +1249,14 @@ def _create_nodes_async(self, input_native_entity: rde_2_x.NativeEntity): network_name=network_name, storage_profile=worker_storage_profile, ssh_key=ssh_key, - sizing_class_name=worker_sizing_class) - msg = f"Adding {num_workers_to_add} node(s) to cluster " \ - f"{cluster_name}({cluster_id})" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - target_nodes = [] - for spec in worker_nodes['specs']: - target_nodes.append(spec['target_vm_name']) - vapp.reload() - _join_cluster(sysadmin_client_v36, - vapp, - target_nodes=target_nodes) + sizing_class_name=worker_sizing_class, + control_plane_join_cmd=control_plane_join_cmd + ) + msg = f"Added {num_workers_to_add} node(s) to cluster " \ f"{cluster_name}({cluster_id})" self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - if num_nfs_to_add > 0: - msg = f"Creating {num_nfs_to_add} nfs node(s) from template " \ - f"'{template_name}' (revision {template_revision}) " \ - f"for cluster '{cluster_name}' ({cluster_id})" - LOGGER.debug(msg) - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - _add_nodes(sysadmin_client_v36, - num_nodes=num_nfs_to_add, - node_type=NodeType.NFS, - org=org, - vdc=ovdc, - vapp=vapp, - catalog_name=catalog_name, - template=template, - network_name=network_name, - storage_profile=nfs_storage_profile, - ssh_key=ssh_key, - sizing_class_name=nfs_sizing_class) - msg = f"Created {num_nfs_to_add} nfs_node(s) for cluster " \ - f"'{cluster_name}' ({cluster_id})" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - msg = f"Created {num_workers_to_add} workers & {num_nfs_to_add}" \ - f" nfs nodes for '{cluster_name}' ({cluster_id}) " + msg = f"Created {num_workers_to_add} workers for '{cluster_name}' ({cluster_id}) " self._update_task(BehaviorTaskStatus.RUNNING, message=msg) except (exceptions.NodeCreationError, exceptions.ClusterJoiningError) as err: # noqa: E501 msg = f"Error adding nodes to cluster '{cluster_name}'" @@ -1584,209 +1393,6 @@ def _delete_cluster_async(self, # noqa: E501 self.context.end() - @thread_utils.run_async - def _upgrade_cluster_async(self, cluster_id: str, template: Dict): - cluster_name = None - vapp = None - try: - curr_rde: common_models.DefEntity = self.entity_svc.get_entity(cluster_id) # noqa: E501 - curr_native_entity: rde_2_x.NativeEntity = curr_rde.entity - cluster_name = curr_native_entity.metadata.name - vapp_href = curr_rde.externalId - - # TODO use cluster status field to get the control plane and worker nodes # noqa: E501 - client_v36 = self.context.get_client( - api_version=DEFAULT_API_VERSION) - vapp = vcd_vapp.VApp(client_v36, href=vapp_href) - all_node_names = [vm.get('name') for vm in vapp.get_all_vms() if not vm.get('name').startswith(NodeType.NFS)] # noqa: E501 - control_plane_node_names = [curr_native_entity.status.nodes.control_plane.name] # noqa: E501 - worker_node_names = [worker.name for worker in curr_native_entity.status.nodes.workers] # noqa: E501 - - template_name = template[LocalTemplateKey.NAME] - template_revision = template[LocalTemplateKey.REVISION] - template_cookbook_version = semver.Version(template[LocalTemplateKey.COOKBOOK_VERSION]) # noqa: E501 - - # semantic version doesn't allow leading zeros - # docker's version format YY.MM.patch allows us to directly use - # lexicographical string comparison - c_docker = curr_native_entity.status.docker_version - t_docker = template[LocalTemplateKey.DOCKER_VERSION] - k8s_details = curr_native_entity.status.kubernetes.split(' ') - c_k8s = semver.Version(k8s_details[-1]) - t_k8s = semver.Version(template[LocalTemplateKey.KUBERNETES_VERSION]) # noqa: E501 - cni_details = curr_native_entity.status.cni.split(' ') - c_cni = semver.Version(cni_details[-1]) - t_cni = semver.Version(template[LocalTemplateKey.CNI_VERSION]) - - upgrade_docker = t_docker > c_docker - upgrade_k8s = t_k8s >= c_k8s - upgrade_cni = t_cni > c_cni or t_k8s.major > c_k8s.major or t_k8s.minor > c_k8s.minor # noqa: E501 - - sysadmin_client_v36 = self.context.get_sysadmin_client( - api_version=DEFAULT_API_VERSION) - - if upgrade_k8s: - msg = f"Draining control plane node {control_plane_node_names}" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - _drain_nodes(sysadmin_client_v36, vapp_href, - control_plane_node_names, cluster_name=cluster_name) # noqa: E501 - - msg = f"Upgrading Kubernetes ({c_k8s} -> {t_k8s}) " \ - f"in control plane node {control_plane_node_names}" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - filepath = ltm.get_script_filepath(template_cookbook_version, - template_name, - template_revision, - TemplateScriptFile.CONTROL_PLANE_K8S_UPGRADE) # noqa: E501 - script = utils.read_data_file(filepath, logger=LOGGER) - _run_script_in_nodes(sysadmin_client_v36, vapp_href, - control_plane_node_names, script) - - msg = f"Uncordoning control plane node {control_plane_node_names}" # noqa: E501 - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - _uncordon_nodes(sysadmin_client_v36, - vapp_href, - control_plane_node_names, - cluster_name=cluster_name) - - filepath = ltm.get_script_filepath(template_cookbook_version, - template_name, - template_revision, - TemplateScriptFile.WORKER_K8S_UPGRADE) # noqa: E501 - script = utils.read_data_file(filepath, logger=LOGGER) - for node in worker_node_names: - msg = f"Draining node {node}" - self._update_task(BehaviorTaskStatus.RUNNING, - message=msg) - _drain_nodes(sysadmin_client_v36, - vapp_href, - [node], - cluster_name=cluster_name) - - msg = f"Upgrading Kubernetes ({c_k8s} " \ - f"-> {t_k8s}) in node {node}" - self._update_task(BehaviorTaskStatus.RUNNING, - message=msg) - _run_script_in_nodes(sysadmin_client_v36, - vapp_href, [node], script) - - msg = f"Uncordoning node {node}" - self._update_task(BehaviorTaskStatus.RUNNING, - message=msg) - _uncordon_nodes(sysadmin_client_v36, - vapp_href, [node], - cluster_name=cluster_name) - - if upgrade_docker or upgrade_cni: - msg = f"Draining all nodes {all_node_names}" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - _drain_nodes(sysadmin_client_v36, - vapp_href, all_node_names, - cluster_name=cluster_name) - - if upgrade_docker: - msg = f"Upgrading Docker-CE ({c_docker} -> {t_docker}) " \ - f"in nodes {all_node_names}" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - filepath = ltm.get_script_filepath( - template_cookbook_version, - template_name, - template_revision, - TemplateScriptFile.DOCKER_UPGRADE) - script = utils.read_data_file(filepath, logger=LOGGER) - _run_script_in_nodes(sysadmin_client_v36, vapp_href, - all_node_names, script) - - if upgrade_cni: - msg = "Applying CNI " \ - f"({curr_native_entity.status.cni} " \ - f"-> {t_cni}) in control plane node {control_plane_node_names}" # noqa: E501 - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - filepath = ltm.get_script_filepath(template_cookbook_version, - template_name, - template_revision, - TemplateScriptFile.CONTROL_PLANE_CNI_APPLY) # noqa: E501 - script = utils.read_data_file(filepath, logger=LOGGER) - _run_script_in_nodes(sysadmin_client_v36, vapp_href, - control_plane_node_names, script) - - # uncordon all nodes (sometimes redundant) - msg = f"Uncordoning all nodes {all_node_names}" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - _uncordon_nodes(sysadmin_client_v36, vapp_href, - all_node_names, cluster_name=cluster_name) - - # update cluster metadata - msg = f"Updating metadata for cluster '{cluster_name}'" - self._update_task(BehaviorTaskStatus.RUNNING, message=msg) - metadata = { - ClusterMetadataKey.TEMPLATE_NAME: template[LocalTemplateKey.NAME], # noqa: E501 - ClusterMetadataKey.TEMPLATE_REVISION: template[LocalTemplateKey.REVISION], # noqa: E501 - ClusterMetadataKey.DOCKER_VERSION: template[LocalTemplateKey.DOCKER_VERSION], # noqa: E501 - ClusterMetadataKey.KUBERNETES_VERSION: template[LocalTemplateKey.KUBERNETES_VERSION], # noqa: E501 - ClusterMetadataKey.CNI: template[LocalTemplateKey.CNI], - ClusterMetadataKey.CNI_VERSION: template[LocalTemplateKey.CNI_VERSION] # noqa: E501 - } - - task = vapp.set_multiple_metadata(metadata) - client_v36 = self.context.get_client( - api_version=DEFAULT_API_VERSION) - client_v36.get_task_monitor().wait_for_status(task) - - curr_rde_status: rde_2_x.Status = curr_native_entity.status - # update defined entity of the cluster - curr_rde_status.cloud_properties.distribution = \ - rde_2_x.Distribution(template_name=template[LocalTemplateKey.NAME], # noqa: E501 - template_revision=int(template[LocalTemplateKey.REVISION])) # noqa: E501 - curr_rde_status.cni = \ - _create_k8s_software_string(template[LocalTemplateKey.CNI], - template[LocalTemplateKey.CNI_VERSION]) # noqa: E501 - curr_rde_status.kubernetes = \ - _create_k8s_software_string(template[LocalTemplateKey.KUBERNETES], # noqa: E501 - template[LocalTemplateKey.KUBERNETES_VERSION]) # noqa: E501 - curr_rde_status.docker_version = template[LocalTemplateKey.DOCKER_VERSION] # noqa: E501 - curr_rde_status.os = template[LocalTemplateKey.OS] - curr_rde_status.phase = str( - DefEntityPhase(DefEntityOperation.UPGRADE, - DefEntityOperationStatus.SUCCEEDED)) - self._update_cluster_entity(cluster_id, curr_rde_status) - - msg = f"Successfully upgraded cluster '{cluster_name}' software " \ - f"to match template {template_name} (revision " \ - f"{template_revision}): Kubernetes: {c_k8s} -> {t_k8s}, " \ - f"Docker-CE: {c_docker} -> {t_docker}, " \ - f"CNI: {c_cni} -> {t_cni}" - self._update_task(BehaviorTaskStatus.SUCCESS, message=msg) - LOGGER.info(f"{msg} ({vapp_href})") - except Exception as err: - msg = f"Unexpected error while upgrading cluster " \ - f"'{cluster_name}'" - LOGGER.error(f"{msg}", exc_info=True) - try: - self._fail_operation( - cluster_id, - DefEntityOperation.UPGRADE) - except Exception: - msg = f"Failed to update defined entity status " \ - f" for cluster {cluster_id}" - LOGGER.error(f"{msg}", exc_info=True) - # NOTE: Since the defined entity is assumed to be - # resolved during cluster creation, there is no need - # to resolve the defined entity again - try: - self._sync_def_entity(cluster_id, vapp=vapp) - except Exception: - msg = f"Failed to sync defined entity of the cluster {cluster_id}" # noqa: E501 - LOGGER.error(f"{msg}", exc_info=True) - self._update_task(BehaviorTaskStatus.ERROR, - message=msg, error_message=str(err)) - - finally: - # TODO re-organize updating defined entity and task update as per - # https://stackoverflow.com/questions/49099637/how-to-determine-if-an-exception-was-raised-once-youre-in-the-finally-block - # noqa: E501 - self.context.end() - @thread_utils.run_async def _monitor_delete_nodes(self, cluster_id, nodes_to_del): """Triggers and monitors delete thread. @@ -1858,7 +1464,7 @@ def _monitor_delete_nodes(self, cluster_id, nodes_to_del): def _delete_nodes_async(self, cluster_id: str, input_native_entity: rde_2_x.NativeEntity = None, nodes_to_del=None): - """Delete worker and/or nfs nodes in vCD. + """Delete worker nodes in vCD. This method is executed by a thread in an asynchronous manner. Do's: @@ -1913,7 +1519,8 @@ def _delete_nodes_async(self, cluster_id: str, except (exceptions.NodeOperationError, exceptions.ScriptExecutionError) as err: # noqa: E501 LOGGER.warning(f"Failed to drain nodes: {nodes_to_del}" f" in cluster '{cluster_name}'." - f" Continuing node delete...\nError: {err}") + f" Continuing node delete...\nError: {err}", + exc_info=True) msg = f"Deleting {len(nodes_to_del)} node(s) from " \ f"cluster '{cluster_name}': {nodes_to_del}" @@ -1976,7 +1583,7 @@ def _sync_def_entity(self, cluster_id: str, return self._update_cluster_entity(cluster_id, new_status) def _update_cluster_entity(self, cluster_id: str, - native_tkgm_status: rde_2_x.Status, + tkgm_entity_status: rde_2_x.Status, external_id: Optional[str] = None): """Update status part of the cluster rde. @@ -1984,7 +1591,7 @@ def _update_cluster_entity(self, cluster_id: str, optimistic locking. :param str cluster_id: ID of the defined entity. - :param rde_2_x.Status native_tkgm_status: Defined entity status to be + :param rde_2_x.Status tkgm_entity_status: Defined entity status to be updated. :param str external_id: Vapp ID to update the defined entity of the cluster with. @@ -2000,7 +1607,7 @@ def _update_cluster_entity(self, cluster_id: str, if external_id is not None: cluster_rde.externalId = external_id # Update entity status with new values - cluster_rde.entity.status = native_tkgm_status + cluster_rde.entity.status = tkgm_entity_status # Update cluster rde return self.sysadmin_entity_svc.update_entity( @@ -2033,49 +1640,26 @@ def _update_task(self, status, message='', error_message='', progress=None): # def _replace_kubeconfig_expose_ip(self, internal_ip: str, cluster_id: str, vapp: vcd_vapp.VApp): # Form kubeconfig with internal ip - kubeconfig_with_exposed_ip = self.get_cluster_config(cluster_id) - script = \ - nw_exp_helper.construct_script_to_update_kubeconfig_with_internal_ip( # noqa: E501 - kubeconfig_with_exposed_ip=kubeconfig_with_exposed_ip, - internal_ip=internal_ip - ) + kubeconfig_with_exposed_ip = self._get_kube_config_from_rde(cluster_id) + if not kubeconfig_with_exposed_ip: + msg = "Failed to get cluster kube-config" + LOGGER.error(msg) + raise exceptions.ClusterOperationError(msg) - node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) - result = _execute_script_in_nodes( - self.context.sysadmin_client, - vapp=vapp, - node_names=node_names, - script=script, - check_tools=True + return nw_exp_helper.get_updated_kubeconfig_with_internal_ip( + kubeconfig_with_exposed_ip=kubeconfig_with_exposed_ip, + internal_ip=internal_ip, ) - errors = _get_script_execution_errors(result) - if errors: - raise exceptions.ScriptExecutionError( - f"Failed to overwrite kubeconfig with internal ip: " - f"{internal_ip}: {errors}" - ) - - -def _get_cluster_upgrade_target_templates( - source_template_name, source_template_revision) -> List[dict]: - """Get list of templates that a given cluster can upgrade to. - - :param str source_template_name: - :param str source_template_revision: - :return: List of dictionary containing templates - :rtype: List[dict] - """ - upgrades = [] - config = server_utils.get_server_runtime_config() - for t in config['broker']['templates']: - if source_template_name in t[LocalTemplateKey.UPGRADE_FROM]: - if t[LocalTemplateKey.NAME] == source_template_name and \ - int(t[LocalTemplateKey.REVISION]) <= int(source_template_revision): # noqa: E501 - continue - upgrades.append(t) - - return upgrades + def _get_kube_config_from_rde(self, cluster_id: str): + rde = self.entity_svc.get_entity(cluster_id) + native_entity: rde_2_x.NativeEntity = rde.entity + if hasattr(native_entity.status, + shared_constants.RDEProperty.PRIVATE.value) and hasattr( + native_entity.status.private, + shared_constants.RDEProperty.KUBE_CONFIG.value): + return native_entity.status.private.kube_config + return None def _get_nodes_details(sysadmin_client, vapp): @@ -2094,7 +1678,6 @@ def _get_nodes_details(sysadmin_client, vapp): try: vms = vapp.get_all_vms() workers = [] - nfs_nodes = [] control_plane = None for vm in vms: vcd_utils.to_dict(vm) @@ -2127,101 +1710,17 @@ def _get_nodes_details(sysadmin_client, vapp): rde_2_x.Node(name=vm_name, ip=ip, sizing_class=sizing_class, storage_profile=storage_profile)) - elif vm_name.startswith(NodeType.NFS): - exports = None - try: - exports = _get_nfs_exports(sysadmin_client, - ip, - vapp, - vm_name) - except Exception: - LOGGER.error(f"Failed to retrieve the NFS exports of " - f"node {vm_name} of cluster {vapp.name} ", - exc_info=True) - nfs_nodes.append(rde_2_x.NfsNode(name=vm_name, ip=ip, - sizing_class=sizing_class, - storage_profile=storage_profile, # noqa: E501 - exports=exports)) - return rde_2_x.Nodes(control_plane=control_plane, workers=workers, - nfs=nfs_nodes) + return rde_2_x.Nodes(control_plane=control_plane, workers=workers) except Exception as err: LOGGER.error("Failed to retrieve the status of the nodes of the " f"cluster {vapp.name}: {err}", exc_info=True) -def _get_nfs_exports(sysadmin_client: vcd_client.Client, ip, vapp, vm_name): - """Get the exports from remote NFS server. - - :param pyvcloud.vcd.client.Client sysadmin_client: - :param str ip: IP address of the NFS server - :param pyvcloud.vcd.vapp.vcd_vapp.VApp vapp: - :param str vm_name: - - :return: (List): List of exports - """ - script = f"#!/usr/bin/env bash\nshowmount -e {ip}" - result = _execute_script_in_nodes(sysadmin_client, vapp=vapp, - node_names=[vm_name], script=script, - check_tools=False) - lines = result[0][1].content.decode().split('\n') - exports = [] - for index in range(1, len(lines) - 1): - export = lines[index].strip().split()[0] - exports.append(export) - return exports - - -def _drain_nodes(sysadmin_client: vcd_client.Client, vapp_href, node_names, - cluster_name=''): +def _drain_nodes(_: vcd_client.Client, vapp_href, node_names, cluster_name=''): LOGGER.debug(f"Draining nodes {node_names} in cluster '{cluster_name}' " f"(vapp: {vapp_href})") - script = "#!/usr/bin/env bash\n" - for node_name in node_names: - script += f"kubectl drain {node_name} " \ - f"--force --ignore-daemonsets --timeout=60s --delete-local-data\n" # noqa:E501 - - try: - vapp = vcd_vapp.VApp(sysadmin_client, href=vapp_href) - control_plane_node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) # noqa: E501 - _run_script_in_nodes(sysadmin_client, - vapp_href, - [control_plane_node_names[0]], - script) - except Exception as err: - LOGGER.error(f"Failed to drain nodes {node_names} in cluster " - f"'{cluster_name}' (vapp: {vapp_href}) with " - f"error: {err}", exc_info=True) - raise - - LOGGER.debug(f"Successfully drained nodes {node_names} in cluster " - f"'{cluster_name}' (vapp: {vapp_href})") - - -def _uncordon_nodes(sysadmin_client: vcd_client.Client, vapp_href, node_names, - cluster_name=''): - vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) - - LOGGER.debug(f"Uncordoning nodes {node_names} in cluster '{cluster_name}' " - f"(vapp: {vapp_href})") - script = "#!/usr/bin/env bash\n" - for node_name in node_names: - script += f"kubectl uncordon {node_name}\n" - - try: - vapp = vcd_vapp.VApp(sysadmin_client, href=vapp_href) - control_plane_node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) # noqa: E501 - _run_script_in_nodes(sysadmin_client, - vapp_href, - [control_plane_node_names[0]], - script) - except Exception as err: - LOGGER.error(f"Failed to uncordon nodes {node_names} in cluster " - f"'{cluster_name}' (vapp: {vapp_href}) " - f"with error: {err}", exc_info=True) - raise - - LOGGER.debug(f"Successfully uncordoned nodes {node_names} in cluster " - f"'{cluster_name}' (vapp: {vapp_href})") + LOGGER.info("Draining is not supported since guest script execution is not permitted.") + return def _delete_vapp(client, org_name, ovdc_name, vapp_name): @@ -2250,25 +1749,6 @@ def _delete_nodes(sysadmin_client: vcd_client.Client, vapp_href, node_names, LOGGER.debug(f"Deleting node(s) {node_names} from cluster '{cluster_name}'" f" (vapp: {vapp_href})") - script = "#!/usr/bin/env bash\nkubectl delete node " - are_there_workers_to_del = False - for node_name in node_names: - if node_name.startswith(NodeType.WORKER): - script += f' {node_name}' - are_there_workers_to_del = True - script += '\n' - - vapp = vcd_vapp.VApp(sysadmin_client, href=vapp_href) - try: - if are_there_workers_to_del: - control_plane_node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) # noqa: E501 - _run_script_in_nodes(sysadmin_client, vapp_href, - [control_plane_node_names[0]], script) - except Exception as err: - LOGGER.error(f"Failed to delete node(s) {node_names} from cluster " - f"'{cluster_name}' using kubectl " - f"(vapp: {vapp_href}): {err}", exc_info=True) - vapp = vcd_vapp.VApp(sysadmin_client, href=vapp_href) for vm_name in node_names: vm = vcd_vm.VM(sysadmin_client, resource=vapp.get_vm(vm_name)) @@ -2313,285 +1793,365 @@ def _cluster_exists(client, cluster_name, org_name=None, ovdc_name=None): return len(list(result)) != 0 -def _get_template(name=None, revision=None): - if (name is None and revision is not None) or (name is not None and revision is None): # noqa: E501 - raise ValueError("If template revision is specified, then template " - "name must also be specified (and vice versa).") +def _get_tkgm_template(name: str): + if name is None: # noqa: E501 + raise ValueError("Template name should be specified.") server_config = server_utils.get_server_runtime_config() name = name or server_config['broker']['default_template_name'] - revision = revision or server_config['broker']['default_template_revision'] - for template in server_config['broker']['templates']: - if (template[LocalTemplateKey.NAME], str(template[LocalTemplateKey.REVISION])) == (name, str(revision)): # noqa: E501 + for template in server_config['broker']['tkgm_templates']: + if template[LocalTemplateKey.NAME] == name: return template - raise Exception(f"Template '{name}' at revision {revision} not found.") + raise Exception(f"Template '{name}' not found in list [{server_config['broker']['tkgm_templates']}]") + +def _add_control_plane_nodes(sysadmin_client, num_nodes, org, vdc, vapp, + catalog_name, template, network_name, + storage_profile=None, ssh_key=None, + sizing_class_name=None, expose=False, + cluster_name=None, cluster_id=None) -> Tuple[str, List[Dict]]: -def _add_nodes(sysadmin_client, num_nodes, node_type, org, vdc, vapp, - catalog_name, template, network_name, storage_profile=None, - ssh_key=None, sizing_class_name=None): vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) - if num_nodes > 0: - specs = [] - try: - # DEV NOTE: With api v33.0 and onwards, get_catalog operation will fail # noqa: E501 - # for non admin users of an an org which is not hosting the catalog, # noqa: E501 - # even if the catalog is explicitly shared with the org in question. # noqa: E501 - # This happens because for api v 33.0 and onwards, the Org XML no - # longer returns the href to catalogs accessible to the org, and typed # noqa: E501 - # queries hide the catalog link from non admin users. - # As a workaround, we will use a sys admin client to get the href and # noqa: E501 - # pass it forward. Do note that the catalog itself can still be - # accessed by these non admin users, just that they can't find by the # noqa: E501 - # href on their own. - - org_name = org.get_name() - org_resource = sysadmin_client.get_org_by_name(org_name) - org_sa = vcd_org.Org(sysadmin_client, resource=org_resource) - catalog_item = org_sa.get_catalog_item( - catalog_name, template[LocalTemplateKey.CATALOG_ITEM_NAME]) - catalog_item_href = catalog_item.Entity.get('href') - - source_vapp = vcd_vapp.VApp(sysadmin_client, href=catalog_item_href) # noqa: E501 - source_vm = source_vapp.get_all_vms()[0].get('name') - if storage_profile is not None: - storage_profile = vdc.get_storage_profile(storage_profile) - - config = server_utils.get_server_runtime_config() - cpm = compute_policy_manager.ComputePolicyManager(sysadmin_client, - log_wire=utils.str_to_bool(config['service']['log_wire'])) # noqa: E501 - sizing_class_href = None - if sizing_class_name: - vdc_resource = vdc.get_resource() - for policy in cpm.list_vdc_sizing_policies_on_vdc(vdc_resource.get('id')): # noqa: E501 - if policy['name'] == sizing_class_name: - if not sizing_class_href: - sizing_class_href = policy['href'] - else: - msg = f"Duplicate sizing policies with the name {sizing_class_name}" # noqa: E501 - LOGGER.error(msg) - raise Exception(msg) - if not sizing_class_href: - msg = f"No sizing policy with the name {sizing_class_name} exists on the VDC" # noqa: E501 - LOGGER.error(msg) - raise Exception(msg) - LOGGER.debug(f"Found sizing policy with name {sizing_class_name} on the VDC {vdc_resource.get('name')}") # noqa: E501 - - cust_script = None - if ssh_key is not None: - cust_script = \ - "#!/usr/bin/env bash\n" \ - "if [ x$1=x\"postcustomization\" ];\n" \ - "then\n" \ - "mkdir -p /root/.ssh\n" \ - f"echo '{ssh_key}' >> /root/.ssh/authorized_keys\n" \ - "chmod -R go-rwx /root/.ssh\n" \ - "fi" + vm_specs = [] + expose_ip = '' + try: + if num_nodes != 1: + raise ValueError(f"Unexpected number of control-plane nodes. Expected 1, obtained [{num_nodes}].") + + templated_script = get_cluster_script_file_contents( + ClusterScriptFile.CONTROL_PLANE, + ClusterScriptFile.VERSION_2_X_TKGM) + # Get template with no expose_ip; expose_ip will be computed + # later when control_plane internal ip is computed below. + vm_specs = _get_vm_specifications( + client=sysadmin_client, + num_nodes=num_nodes, + node_type=NodeType.CONTROL_PLANE.value, + org=org, + vdc=vdc, + vapp=vapp, + catalog_name=catalog_name, + template=template, + network_name=network_name, + storage_profile=storage_profile, + sizing_class_name=sizing_class_name, + cust_script=None, + ) + for spec in vm_specs: + spec['cust_script'] = templated_script.format( + vm_host_name=spec['target_vm_name'], + service_cidr=TKGM_DEFAULT_SERVICE_CIDR, + pod_cidr=TKGM_DEFAULT_POD_NETWORK_CIDR, + antrea_cni_version=ANTREA_CNI_VERSION, + ssh_key=ssh_key if ssh_key else '', + control_plane_endpoint='', + ) + + task = vapp.add_vms(vm_specs, power_on=False, deploy=False, all_eulas_accepted=True) + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_adding_control_plane_vm_to_vapp + ) + vapp.reload() + + internal_ip = '' + if len(vm_specs) > 0: + spec = vm_specs[0] + internal_ip = vapp.get_primary_ip(vm_name=spec['target_vm_name']) + + for spec in vm_specs: + vm_name = spec['target_vm_name'] + vm_resource = vapp.get_vm(vm_name) + vm = vcd_vm.VM(sysadmin_client, resource=vm_resource) + # Handle exposing cluster + control_plane_endpoint = internal_ip + if expose: + try: + expose_ip = nw_exp_helper.expose_cluster( + client=sysadmin_client, + org_name=org.get_name(), + ovdc_name=vdc.name, + network_name=network_name, + cluster_name=cluster_name, + cluster_id=cluster_id, + internal_ip=internal_ip) + control_plane_endpoint = expose_ip + except Exception as err: + LOGGER.error(f"Exposing cluster failed: {str(err)}", exc_info=True) # noqa: E501 + + # If expose is set, control_plane_endpoint is exposed ip + # Else control_plane_endpoint is internal_ip + cust_script = templated_script.format( + vm_host_name=spec['target_vm_name'], + service_cidr=TKGM_DEFAULT_SERVICE_CIDR, + pod_cidr=TKGM_DEFAULT_POD_NETWORK_CIDR, + antrea_cni_version=ANTREA_CNI_VERSION, + ssh_key=ssh_key if ssh_key else '', + control_plane_endpoint=f"{control_plane_endpoint}:6443" + ) + task = vm.update_guest_customization_section( + enabled=True, + customization_script=cust_script) + + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_update_customization + ) + vm.reload() vapp.reload() - for n in range(num_nodes): - while True: - name = f"{node_type}-{''.join(random.choices(string.ascii_lowercase + string.digits, k=4))}" # noqa: E501 - try: - vapp.get_vm(name) - except Exception: - break - spec = { - 'source_vm_name': source_vm, - 'vapp': source_vapp.resource, - 'target_vm_name': name, - 'hostname': name, - 'password_auto': True, - 'network': network_name, - 'ip_allocation_mode': 'pool' - } - if sizing_class_href: - spec['sizing_policy_href'] = sizing_class_href - spec['placement_policy_href'] = config['placement_policy_hrefs'][template[LocalTemplateKey.KIND]] # noqa: E501 - if cust_script is not None: - spec['cust_script'] = cust_script - if storage_profile is not None: - spec['storage_profile'] = storage_profile - specs.append(spec) - - task = vapp.add_vms(specs, power_on=False) - sysadmin_client.get_task_monitor().wait_for_status(task) + + task = vm.power_on_and_force_recustomization() + # wait_for_vm_power_on is reused for all vm creation callback + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_vm_power_on + ) vapp.reload() - for spec in specs: - vm_name = spec['target_vm_name'] - vm_resource = vapp.get_vm(vm_name) - vm = vcd_vm.VM(sysadmin_client, resource=vm_resource) - - task = vm.power_on() - sysadmin_client.get_task_monitor().wait_for_status(task) - vapp.reload() - - if node_type == NodeType.NFS: - LOGGER.debug(f"Enabling NFS server on {vm_name}") - script_filepath = ltm.get_script_filepath( - semver.Version(template[LocalTemplateKey.COOKBOOK_VERSION]), # noqa: E501 - template[LocalTemplateKey.NAME], - template[LocalTemplateKey.REVISION], - TemplateScriptFile.NFSD) - script = utils.read_data_file(script_filepath, logger=LOGGER) # noqa: E501 - exec_results = _execute_script_in_nodes( - sysadmin_client, vapp=vapp, node_names=[vm_name], - script=script) - errors = _get_script_execution_errors(exec_results) - if errors: - raise exceptions.ScriptExecutionError( - f"VM customization script execution failed " - f"on node {vm_name}:{errors}") - except Exception as err: - LOGGER.error(err, exc_info=True) - # TODO: get details of the exception to determine cause of failure, - # e.g. not enough resources available. - node_list = [entry.get('target_vm_name') for entry in specs] - if hasattr(err, 'vcd_error') and err.vcd_error is not None and \ - "throwPolicyNotAvailableException" in err.vcd_error.get('stackTrace', ''): # noqa: E501 - raise exceptions.NodeCreationError( - node_list, - f"OVDC not enabled for {template[LocalTemplateKey.KIND]}") # noqa: E501 + # HACK: sometimes the dbus.service is not yet started by the time the ToolsDeployPkg + # scripts run. As a result, that script does install the post-customize service, but + # does not reboot the machine, which is needed in order to execute the postcustomization + # script where the bulk of our functionality lives. + # Guest customization works with the following gross steps: + # 1. run script with parameter "precustomization" + # 2. set up network + # 3. convert script with parameter "precustomization" to service + # a. set the 'guestinfo.gc.status' to 'Successful'. + # 4. check if a forked script has completed setting up hostname (this fails sometimes) + # 5. Check for errors, cleanup and report. + # In the above steps, steps 4,5 take about 2s. Hence we wait for 3a and then wait for 10s + # and then trigger the reboot ourselves. + # + # However what happens when the bug is fixed in GOSC? + # In that case this hack will reboot the node one more time depending on a race. That is + # not detrimental to functionality, but we do have a slight delay due to the seconf reboot. + vcd_utils.wait_for_completion_of_post_customization_procedure( + vm, + customization_phase=PreCustomizationPhase.POST_BOOT_CUSTOMIZATION_SERVICE_SETUP.value, + logger=LOGGER, + expected_target_status_list=[cust_status.value for cust_status in ToolsDeployPkgCustomizationStatus], + ) + + time.sleep(10) + task = vm.reboot() + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_vm_power_on + ) + vm.reload() + + # Note that this is an ordered list. + for customization_phase in [PostCustomizationPhase.HOSTNAME_SETUP, + PostCustomizationPhase.NETWORK_CONFIGURATION, + PostCustomizationPhase.STORE_SSH_KEY, + PostCustomizationPhase.NAMESERVER_SETUP, + PostCustomizationPhase.KUBEADM_INIT, + PostCustomizationPhase.KUBECTL_APPLY_CNI, + PostCustomizationPhase.KUBEADM_TOKEN_GENERATE, + ]: + vcd_utils.wait_for_completion_of_post_customization_procedure( + vm, + customization_phase=customization_phase.value, # noqa: E501 + logger=LOGGER + ) + + except Exception as err: + LOGGER.error(err, exc_info=True) + node_list = [entry.get('target_vm_name') for entry in vm_specs] + if hasattr(err, 'vcd_error') and err.vcd_error is not None and \ + "throwPolicyNotAvailableException" in err.vcd_error.get('stackTrace', ''): # noqa: E501 + raise exceptions.NodeCreationError( + node_list, + f"OVDC not enabled for {template[LocalTemplateKey.KIND]}") # noqa: E501 + + raise exceptions.NodeCreationError(node_list, str(err)) - raise exceptions.NodeCreationError(node_list, str(err)) + vapp.reload() + return expose_ip, vm_specs + + +def _add_worker_nodes(sysadmin_client, num_nodes, org, vdc, vapp, + catalog_name, template, network_name, + storage_profile=None, ssh_key=None, + sizing_class_name=None, + control_plane_join_cmd='') -> List: + vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) + + vm_specs = [] + if num_nodes <= 0: + return vm_specs + + try: + templated_script = get_cluster_script_file_contents( + ClusterScriptFile.NODE, ClusterScriptFile.VERSION_2_X_TKGM) + + parts = control_plane_join_cmd.split() + num_parts = 7 + if len(parts) != num_parts: + raise ValueError(f"Badly formatted join command [{control_plane_join_cmd}]. Expected {num_parts} parts.") + ip_port = parts[2] + token = parts[4] + discovery_token_ca_cert_hash = parts[6] + + # The cust_script needs the vm host name which is computed in the _get_vm_specifications + # function, so the specs are obtained and the cust_script is recomputed and added. + vm_specs = _get_vm_specifications( + client=sysadmin_client, + num_nodes=num_nodes, + node_type=NodeType.WORKER.value, + org=org, + vdc=vdc, + vapp=vapp, + catalog_name=catalog_name, + template=template, + network_name=network_name, + storage_profile=storage_profile, + sizing_class_name=sizing_class_name, + cust_script=None, + ) + for spec in vm_specs: + spec['cust_script'] = templated_script.format( + vm_host_name=spec['target_vm_name'], + ssh_key=ssh_key if ssh_key else '', + ip_port=ip_port, + token=token, + discovery_token_ca_cert_hash=discovery_token_ca_cert_hash, + ) + task = vapp.add_vms(vm_specs, power_on=False, deploy=False, all_eulas_accepted=True) + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_adding_worker_vm_to_vapp + ) vapp.reload() - return {'task': task, 'specs': specs} + + for spec in vm_specs: + vm_name = spec['target_vm_name'] + vm_resource = vapp.get_vm(vm_name) + vm = vcd_vm.VM(sysadmin_client, resource=vm_resource) + + task = vm.power_on_and_force_recustomization() + # wait_for_vm_power_on is reused for all vm creation callback + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_vm_power_on + ) + vapp.reload() + + # HACK: Please read the long comment in the other similar section (look for + # POST_BOOT_CUSTOMIZATION_SERVICE_SETUP) which explains the rationale of the hack. + vcd_utils.wait_for_completion_of_post_customization_procedure( + vm, + customization_phase=PreCustomizationPhase.POST_BOOT_CUSTOMIZATION_SERVICE_SETUP.value, + logger=LOGGER, + expected_target_status_list=[cust_status.value for cust_status in ToolsDeployPkgCustomizationStatus], + ) + + time.sleep(10) + task = vm.reboot() + sysadmin_client.get_task_monitor().wait_for_status( + task, + callback=wait_for_vm_power_on + ) + vm.reload() + + LOGGER.debug(f"worker {vm_name} to join cluster using:{control_plane_join_cmd}") # noqa: E501 + + # Note that this is an ordered list. + for customization_phase in [PostCustomizationPhase.HOSTNAME_SETUP, + PostCustomizationPhase.NETWORK_CONFIGURATION, + PostCustomizationPhase.STORE_SSH_KEY, + PostCustomizationPhase.NAMESERVER_SETUP, + PostCustomizationPhase.KUBEADM_NODE_JOIN, + ]: + vcd_utils.wait_for_completion_of_post_customization_procedure( + vm, + customization_phase=customization_phase.value, # noqa: E501 + logger=LOGGER + ) + + except Exception as err: + LOGGER.error(err, exc_info=True) + # TODO: get details of the exception to determine cause of failure, + # e.g. not enough resources available. + node_list = [entry.get('target_vm_name') for entry in vm_specs] + if hasattr(err, 'vcd_error') and err.vcd_error is not None and \ + "throwPolicyNotAvailableException" in err.vcd_error.get('stackTrace', ''): # noqa: E501 + raise exceptions.NodeCreationError( + node_list, + f"OVDC not enabled for {template[LocalTemplateKey.KIND]}") # noqa: E501 + + raise exceptions.NodeCreationError(node_list, str(err)) + + vapp.reload() + return vm_specs def _get_node_names(vapp, node_type): return [vm.get('name') for vm in vapp.get_all_vms() if vm.get('name').startswith(node_type)] # noqa: E501 -def _get_control_plane_ip(sysadmin_client: vcd_client.Client, vapp, - check_tools=False): +def _get_control_plane_ip(sysadmin_client: vcd_client.Client, vapp): vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) LOGGER.debug(f"Getting control_plane IP for vapp: " f"{vapp.get_resource().get('name')}") - script = "#!/usr/bin/env bash\n" \ - "ip route get 1 | awk '{print $NF;exit}'\n" \ - node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) - result = _execute_script_in_nodes(sysadmin_client, vapp=vapp, - node_names=node_names, script=script, - check_tools=check_tools) - errors = _get_script_execution_errors(result) - if errors: - raise exceptions.ScriptExecutionError( - "Get control plane IP script execution " - "failed on control plane node " - f"{node_names}:{errors}") - control_plane_ip = result[0][1].content.decode().split()[0] + control_plane_ip = vapp.get_primary_ip(node_names[0]) LOGGER.debug(f"Retrieved control plane IP for vapp: " f"{vapp.get_resource().get('name')}, ip: {control_plane_ip}") return control_plane_ip -def _init_cluster(sysadmin_client: vcd_client.Client, vapp, - native_entity: rde_2_x.NativeEntity, cluster_id: str, - expose_ip=None): +def _get_join_cmd(sysadmin_client: vcd_client.Client, vapp): vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) + vapp.reload() + node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) + if not node_names: + raise exceptions.ClusterJoiningError("Join cluster failure: no control plane node found") # noqa: E501 - try: - templated_script = get_cluster_script_file_contents( - ClusterScriptFile.CONTROL_PLANE, - ClusterScriptFile.VERSION_2_X_TKGM) - # TODO: get username/password or replace with token - base64_username = "base64_username" - base64_password = "base64_password" - vip_subnet_cidr_ip = DEFAULT_SUBNET_CIDR_IP # TODO: get subnet - vip_subnet_cidr_suffix = DEFAULT_SUBNET_CIDR_SUFFIX - script = templated_script.format( - service_cidr=TKGM_DEFAULT_SERVICE_CIDR, - pod_cidr=TKGM_DEFAULT_POD_NETWORK_CIDR, - base64_username=base64_username, - base64_password=base64_password, - vcd_host=native_entity.metadata.site, - org=native_entity.metadata.org_name, - ovdc=native_entity.metadata.virtual_data_center_name, - ovdc_network=native_entity.spec.settings.ovdc_network, - vip_subnet_cidr_ip=vip_subnet_cidr_ip, - vip_subnet_cidr_suffix=vip_subnet_cidr_suffix, - cluster_id=cluster_id - ) + vm_resource = vapp.get_vm(node_names[0]) + control_plane_vm = vcd_vm.VM(sysadmin_client, resource=vm_resource) + control_plane_join_cmd: str = vcd_utils.get_vm_extra_config_element(control_plane_vm, KUBEADM_TOKEN_INFO) # noqa: E501 + control_plane_vm.reload() + if not control_plane_join_cmd: + raise exceptions.ClusterJoiningError("Join cluster failure: join info not found in control plane node") # noqa: E501 + return control_plane_join_cmd - # Expose cluster if given external ip - if expose_ip: - script = \ - nw_exp_helper.construct_init_cluster_script_with_exposed_ip( - script, expose_ip - ) - node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) - result = _execute_script_in_nodes(sysadmin_client, vapp=vapp, - node_names=node_names, script=script) - errors = _get_script_execution_errors(result) - if errors: - raise exceptions.ScriptExecutionError( - f"Initialize cluster script execution failed on node " - f"{node_names}:{errors}") - if result[0][0] != 0: - raise exceptions.ClusterInitializationError(f"Couldn't initialize cluster:\n{result[0][2].content.decode()}") # noqa: E501 - except Exception as err: - LOGGER.error(err, exc_info=True) - raise exceptions.ClusterInitializationError( - f"Couldn't initialize cluster: {str(err)}") +def _get_kube_config_from_control_plane_vm(sysadmin_client: vcd_client.Client, vapp): # noqa: E501 + vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) + vapp.reload() + node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) + if not node_names: + raise exceptions.KubeconfigNotFound("No control plane node found") # noqa: E501 + vm_resource = vapp.get_vm(node_names[0]) + control_plane_vm = vcd_vm.VM(sysadmin_client, resource=vm_resource) + control_plane_vm.reload() + kube_config: str = vcd_utils.get_vm_extra_config_element(control_plane_vm, KUBE_CONFIG) # noqa: E501 + if not kube_config: + raise exceptions.KubeconfigNotFound("kubeconfig not found in control plane extra configuration") # noqa: E501 + LOGGER.debug(f"Got kubeconfig from control plane:{node_names[0]} successfully") # noqa: E501 + kube_config_in_bytes: bytes = base64.b64decode(kube_config) + return kube_config_in_bytes.decode() -def _join_cluster(sysadmin_client: vcd_client.Client, vapp, target_nodes=None): - vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) - time.sleep(40) # hack for waiting for joining cluster - try: - script = """ - #!/usr/bin/env bash - kubeadm token create --print-join-command - """ - - node_names = _get_node_names(vapp, NodeType.CONTROL_PLANE) - control_plane_result = _execute_script_in_nodes(sysadmin_client, - vapp=vapp, - node_names=node_names, - script=script) - errors = _get_script_execution_errors(control_plane_result) - if errors: - raise exceptions.ClusterJoiningError( - "Join cluster script execution failed on " - f"control plane node {node_names}:{errors}") - # kubeadm join --token --discovery-token-ca-cert-hash # noqa: E501 - join_info = control_plane_result[0][1].content.decode().split() +def wait_for_update_customization(task): + LOGGER.debug(f"waiting for updating customization, status: {task.get('status').lower()}") # noqa: E501 - templated_script = get_cluster_script_file_contents( - ClusterScriptFile.NODE, ClusterScriptFile.VERSION_2_X_TKGM) - script = templated_script.format( - ip_port=join_info[2], - token=join_info[4], - discovery_token_ca_cert_hash=join_info[6] - ) - node_names = _get_node_names(vapp, NodeType.WORKER) - if target_nodes is not None: - node_names = [name for name in node_names if name in target_nodes] - - worker_results = _execute_script_in_nodes(sysadmin_client, vapp=vapp, - node_names=node_names, - script=script) - errors = _get_script_execution_errors(worker_results) - if errors: - raise exceptions.ClusterJoiningError( - "Join cluster script execution failed " - f"on worker node {node_names}:{errors}") - for result in worker_results: - if result[0] != 0: - raise exceptions.ClusterJoiningError( - "Couldn't join cluster:\n" - f"{result[2].content.decode()}") - except Exception as err: - LOGGER.error(err, exc_info=True) - raise exceptions.ClusterJoiningError( - f"Couldn't join cluster: {str(err)}") +def wait_for_adding_control_plane_vm_to_vapp(task): + LOGGER.debug(f"waiting for control plane add vm to vapp, status: {task.get('status').lower()}") # noqa: E501 + + +def wait_for_adding_worker_vm_to_vapp(task): + LOGGER.debug(f"waiting for add worker vm to vapp, status: {task.get('status').lower()}") # noqa: E501 + + +def wait_for_vm_power_on(task): + LOGGER.debug(f"waiting for vm power on, status: {task.get('status').lower()}") # noqa: E501 def _wait_for_tools_ready_callback(message, exception=None): @@ -2606,123 +2166,6 @@ def _wait_for_guest_execution_callback(message, exception=None): LOGGER.error(f"exception: {str(exception)}") -def _wait_until_ready_to_exec(vs, vm, password, tries=30): - ready = False - script = "#!/usr/bin/env bash\n" \ - "uname -a\n" - for _ in range(tries): - result = vs.execute_script_in_guest( - vm, 'root', password, script, - target_file=None, - wait_for_completion=True, - wait_time=5, - get_output=True, - delete_script=True, - callback=_wait_for_guest_execution_callback) - if result[0] == 0: - ready = True - break - LOGGER.info(f"Script returned {result[0]}; VM is not " - f"ready to execute scripts, yet") - time.sleep(2) - - if not ready: - raise exceptions.CseServerError('VM is not ready to execute scripts') - - -def _execute_script_in_nodes(sysadmin_client: vcd_client.Client, - vapp, node_names, script, - check_tools=True, wait=True): - vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) - all_results = [] - time.sleep(150) # hack for ubuntu 20 cluster creation - for node_name in node_names: - try: - LOGGER.debug(f"will try to execute script on {node_name}:\n" - f"{script}") - - vs = vs_utils.get_vsphere(sysadmin_client, vapp, vm_name=node_name, - logger=LOGGER) - vs.connect() - moid = vapp.get_vm_moid(node_name) - vm = vs.get_vm_by_moid(moid) - password = vapp.get_admin_password(node_name) - if check_tools: - LOGGER.debug(f"waiting for tools on {node_name}") - vs.wait_until_tools_ready( - vm, - sleep=5, - callback=_wait_for_tools_ready_callback) - _wait_until_ready_to_exec(vs, vm, password) - LOGGER.debug(f"about to execute script on {node_name} " - f"(vm={vm}), wait={wait}") - if wait: - result = vs.execute_script_in_guest( - vm, 'root', password, script, - target_file=None, - wait_for_completion=True, - wait_time=10, - get_output=True, - delete_script=True, - callback=_wait_for_guest_execution_callback) - result_stdout = result[1].content.decode() - result_stderr = result[2].content.decode() - else: - result = [ - vs.execute_program_in_guest(vm, 'root', password, script, - wait_for_completion=False, - get_output=False) - ] - result_stdout = '' - result_stderr = '' - LOGGER.debug(result[0]) - LOGGER.debug(result_stderr) - LOGGER.debug(result_stdout) - all_results.append(result) - except Exception as err: - msg = f"Error executing script in node {node_name}: {str(err)}" - LOGGER.error(msg, exc_info=True) - raise exceptions.ScriptExecutionError(msg) # noqa: E501 - - return all_results - - -def _run_script_in_nodes(sysadmin_client: vcd_client.Client, vapp_href, - node_names, script): - """Run script in all specified nodes. - - Wrapper around `execute_script_in_nodes()`. Use when we don't care about - preserving script results - - :param pyvcloud.vcd.client.Client sysadmin_client: - :param str vapp_href: - :param List[str] node_names: - :param str script: - """ - vcd_utils.raise_error_if_user_not_from_system_org(sysadmin_client) - - # when is tools checking necessary? - vapp = vcd_vapp.VApp(sysadmin_client, href=vapp_href) - results = _execute_script_in_nodes(sysadmin_client, - vapp=vapp, - node_names=node_names, - script=script, - check_tools=False) - errors = _get_script_execution_errors(results) - if errors: - raise exceptions.ScriptExecutionError( - "Script execution failed on node " - f"{node_names}\nErrors: {errors}") - if results[0][0] != 0: - raise exceptions.NodeOperationError( - "Error during node operation:\n" - f"{results[0][2].content.decode()}") - - -def _get_script_execution_errors(results): - return [result[2].content.decode() for result in results if result[0] != 0] - - def _create_k8s_software_string(software_name: str, software_version: str) -> str: # noqa: E501 """Generate string containing the software name and version. @@ -2734,3 +2177,79 @@ def _create_k8s_software_string(software_name: str, software_version: str) -> st :rtype: str """ return f"{software_name} {software_version}" + + +def _get_vm_specifications( + client, + num_nodes, + node_type, + org, + vdc, + vapp, + catalog_name, + template, + network_name, + storage_profile=None, + sizing_class_name=None, + cust_script=None) -> List[Dict]: + org_name = org.get_name() + org_resource = client.get_org_by_name(org_name) + org_sa = vcd_org.Org(client, resource=org_resource) + catalog_item = org_sa.get_catalog_item( + catalog_name, template[LocalTemplateKey.NAME]) + catalog_item_href = catalog_item.Entity.get('href') + + source_vapp = vcd_vapp.VApp(client, href=catalog_item_href) # noqa: E501 + source_vm = source_vapp.get_all_vms()[0].get('name') + if storage_profile is not None: + storage_profile = vdc.get_storage_profile(storage_profile) + + config = server_utils.get_server_runtime_config() + cpm = compute_policy_manager.ComputePolicyManager( + client, + log_wire=utils.str_to_bool(config['service']['log_wire']) + ) + sizing_class_href = None + if sizing_class_name: + vdc_resource = vdc.get_resource() + for policy in cpm.list_vdc_sizing_policies_on_vdc(vdc_resource.get('id')): # noqa: E501 + if policy['name'] == sizing_class_name: + if not sizing_class_href: + sizing_class_href = policy['href'] + else: + msg = f"Duplicate sizing policies with the name {sizing_class_name}" # noqa: E501 + LOGGER.error(msg) + raise Exception(msg) + if not sizing_class_href: + msg = f"No sizing policy with the name {sizing_class_name} exists on the VDC" # noqa: E501 + LOGGER.error(msg) + raise Exception(msg) + LOGGER.debug(f"Found sizing policy with name {sizing_class_name} on the VDC {vdc_resource.get('name')}") # noqa: E501 + + vapp.reload() + specs = [] + for n in range(num_nodes): + while True: + name = f"{node_type}-{''.join(random.choices(string.ascii_lowercase + string.digits, k=4))}" # noqa: E501 + try: + vapp.get_vm(name) + except Exception: + break + spec = { + 'source_vm_name': source_vm, + 'vapp': source_vapp.resource, + 'target_vm_name': name, + 'hostname': name, + 'password_auto': True, + 'network': network_name, + 'ip_allocation_mode': 'pool', + } + if sizing_class_href: + spec['sizing_policy_href'] = sizing_class_href + spec['placement_policy_href'] = config['placement_policy_hrefs'][template[LocalTemplateKey.KIND]] # noqa: E501 + if cust_script is not None: + spec['cust_script'] = cust_script + if storage_profile is not None: + spec['storage_profile'] = storage_profile + specs.append(spec) + return specs diff --git a/container_service_extension/rde/backend/common/network_expose_helper.py b/container_service_extension/rde/backend/common/network_expose_helper.py index 8f844a21f..9ff9072fe 100644 --- a/container_service_extension/rde/backend/common/network_expose_helper.py +++ b/container_service_extension/rde/backend/common/network_expose_helper.py @@ -130,7 +130,7 @@ def construct_expose_dnat_rule_name(cluster_name: str, cluster_id: str): def construct_script_to_update_kubeconfig_with_internal_ip( - kubeconfig_with_exposed_ip: dict, internal_ip: str): + kubeconfig_with_exposed_ip: str, internal_ip: str): """Construct script to update kubeconfig file with internal ip. :param dict kubeconfig_with_exposed_ip: the current kubeconfig @@ -141,18 +141,25 @@ def construct_script_to_update_kubeconfig_with_internal_ip( :rtype: str """ - kubeconfig_with_internal_ip = re.sub( - pattern=IP_PORT_REGEX, - repl=f'{internal_ip}:6443', - string=str(kubeconfig_with_exposed_ip) + kubeconfig_with_internal_ip = get_updated_kubeconfig_with_internal_ip( + kubeconfig_with_exposed_ip, internal_ip ) - script = f"#!/usr/bin/env bash\n" \ f"echo \'{kubeconfig_with_internal_ip}\' > " \ f"{CSE_CLUSTER_KUBECONFIG_PATH}\n" return script +def get_updated_kubeconfig_with_internal_ip( + kubeconfig_with_exposed_ip, + internal_ip: str): + return re.sub( + pattern=IP_PORT_REGEX, + repl=f'{internal_ip}:6443', + string=str(kubeconfig_with_exposed_ip) + ) + + def expose_cluster(client: vcd_client.Client, org_name: str, ovdc_name: str, network_name: str, cluster_name: str, cluster_id: str, internal_ip: str): diff --git a/container_service_extension/rde/models/common_models.py b/container_service_extension/rde/models/common_models.py index 1b5dfa662..955f8e163 100644 --- a/container_service_extension/rde/models/common_models.py +++ b/container_service_extension/rde/models/common_models.py @@ -244,7 +244,8 @@ def __init__(self, name: str, org: Org, entityType: str, entity, entity_dict = entity.to_dict() if not isinstance(entity, dict) else entity # noqa: E501 if entity_dict['kind'] in \ [shared_constants.ClusterEntityKind.NATIVE.value, - shared_constants.ClusterEntityKind.TKG_PLUS.value]: + shared_constants.ClusterEntityKind.TKG_PLUS.value, + shared_constants.ClusterEntityKind.TKG_M.value]: # Get the entity type version from entity type urn entity_type_version = self.entityType.split(":")[-1] # Parse the entity to the right entity class diff --git a/container_service_extension/rde/utils.py b/container_service_extension/rde/utils.py index b3914816b..a91ef6909 100644 --- a/container_service_extension/rde/utils.py +++ b/container_service_extension/rde/utils.py @@ -105,54 +105,72 @@ def construct_2_0_0_cluster_spec_from_entity_status(entity_status: rde_2_0_0.Sta :param rde_2_0_0.Status entity_status: Entity Status as defined in rde_2_0_0 # noqa: E501 :return: Cluster Specification as defined in rde_2_0_0 model """ + # Currently only single control-plane is supported. - if entity_status.nodes.control_plane.sizing_class: - control_plane = rde_2_0_0.ControlPlane( - sizing_class=entity_status.nodes.control_plane.sizing_class, - storage_profile=entity_status.nodes.control_plane.storage_profile, - cpu=None, - memory=None, - count=1) - else: - control_plane = rde_2_0_0.ControlPlane( - sizing_class=None, - storage_profile=entity_status.nodes.control_plane.storage_profile, - cpu=entity_status.nodes.control_plane.cpu, - memory=entity_status.nodes.control_plane.memory, - count=1) - - workers_count = len(entity_status.nodes.workers) - if workers_count == 0: - workers = rde_2_0_0.Workers(sizing_class=None, - cpu=None, - memory=None, - storage_profile=None, - count=0) - else: - if entity_status.nodes.workers[0].sizing_class: - workers = rde_2_0_0.Workers( - sizing_class=entity_status.nodes.workers[0].sizing_class, - cpu=None, - memory=None, - storage_profile=entity_status.nodes.workers[0].storage_profile, - count=workers_count) - else: - workers = rde_2_0_0.Workers( - sizing_class=None, - storage_profile=entity_status.nodes.workers[0].storage_profile, - cpu=entity_status.nodes.workers[0].cpu, - memory=entity_status.nodes.workers[0].memory, - count=workers_count) - - nfs_count = len(entity_status.nodes.nfs) - if nfs_count == 0: - nfs = rde_2_0_0.Nfs(sizing_class=None, storage_profile=None, - count=0) - else: - nfs = rde_2_0_0.Nfs( - sizing_class=entity_status.nodes.nfs[0].sizing_class, # noqa: E501 - storage_profile=entity_status.nodes.nfs[ - 0].storage_profile, count=nfs_count) + control_plane_sizing_class = None + control_plane_storage_profile = None + control_plane_cpu = None + control_plane_memory = None + if ( + entity_status is not None and + entity_status.nodes is not None and + entity_status.nodes.control_plane is not None + ): + control_plane_sizing_class = entity_status.nodes.control_plane.sizing_class + control_plane_storage_profile = entity_status.nodes.control_plane.storage_profile + control_plane_cpu = entity_status.nodes.control_plane.cpu + control_plane_memory = entity_status.nodes.control_plane.memory + if control_plane_sizing_class: + control_plane_cpu = None + control_plane_memory = None + control_plane = rde_2_0_0.ControlPlane( + sizing_class=control_plane_sizing_class, + storage_profile=control_plane_storage_profile, + cpu=control_plane_cpu, + memory=control_plane_memory, + count=1) + + workers_count = 0 + worker_sizing_class = None + worker_storage_profile = None + worker_cpu = None + worker_memory = None + if ( + entity_status is not None and + entity_status.nodes is not None and + entity_status.nodes.workers is not None + ): + workers_count = len(entity_status.nodes.workers) + if workers_count > 0: + worker_sizing_class = entity_status.nodes.workers[0].sizing_class + worker_storage_profile = entity_status.nodes.workers[0].storage_profile + worker_cpu = entity_status.nodes.workers[0].cpu + worker_memory = entity_status.nodes.workers[0].memory + if worker_sizing_class: + worker_cpu = None + worker_memory = None + workers = rde_2_0_0.Workers(sizing_class=worker_sizing_class, + cpu=worker_cpu, + memory=worker_memory, + storage_profile=worker_storage_profile, + count=workers_count) + + nfs_count = 0 + nfs_sizing_class = None + nfs_storage_profile = None + if ( + entity_status is not None and + entity_status.nodes is not None and + entity_status.nodes.nfs is not None + ): + nfs_count = len(entity_status.nodes.nfs) + if nfs_count > 0: + nfs_sizing_class = entity_status.nodes.nfs[0].sizing_class + nfs_storage_profile = entity_status.nodes.nfs[0].storage_profile + nfs = rde_2_0_0.Nfs( + sizing_class=nfs_sizing_class, + storage_profile=nfs_storage_profile, + count=nfs_count) k8_distribution = rde_2_0_0.Distribution( template_name=entity_status.cloud_properties.distribution.template_name, # noqa: E501 diff --git a/cse_def_schema/schema_2_0_0.json b/cse_def_schema/schema_2_0_0.json index 9996ab104..060c968e1 100644 --- a/cse_def_schema/schema_2_0_0.json +++ b/cse_def_schema/schema_2_0_0.json @@ -414,4 +414,5 @@ } }, "additionalProperties":true -} \ No newline at end of file +} +