diff --git a/build/yaml/samples/nsx_v1alpha1_vpcs.yaml b/build/yaml/samples/nsx_v1alpha1_vpcs.yaml new file mode 100644 index 000000000..99b9afc14 --- /dev/null +++ b/build/yaml/samples/nsx_v1alpha1_vpcs.yaml @@ -0,0 +1,6 @@ +apiVersion: nsx.vmware.com/v1alpha1 +kind: VPC +metadata: + name: vpc-default +spec: + diff --git a/cmd/main.go b/cmd/main.go index bd1c20837..8e028250c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,18 +17,21 @@ import ( "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" "github.com/vmware-tanzu/nsx-operator/pkg/config" commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common" + namespacecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/namespace" nsxserviceaccountcontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/nsxserviceaccount" securitypolicycontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/securitypolicy" staticroutecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/staticroute" "github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnet" "github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetport" "github.com/vmware-tanzu/nsx-operator/pkg/controllers/subnetset" + vpccontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/vpc" "github.com/vmware-tanzu/nsx-operator/pkg/logger" "github.com/vmware-tanzu/nsx-operator/pkg/metrics" "github.com/vmware-tanzu/nsx-operator/pkg/nsx" "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/nsxserviceaccount" "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/securitypolicy" + "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc" ) var ( @@ -102,6 +105,37 @@ func StartNSXServiceAccountController(mgr ctrl.Manager, commonService common.Ser } } +func StartVPCController(mgr ctrl.Manager, commonService common.Service) { + vpcReconciler := &vpccontroller.VPCReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + } + if vpcService, err := vpc.InitializeVPC(commonService); err != nil { + log.Error(err, "failed to initialize vpc commonService", "controller", "VPC") + os.Exit(1) + } else { + vpcReconciler.Service = vpcService + commonctl.ServiceMediator.VPCService = vpcService + } + if err := vpcReconciler.Start(mgr); err != nil { + log.Error(err, "failed to create vpc controller", "controller", "VPC") + os.Exit(1) + } +} + +func StartNamespaceController(mgr ctrl.Manager, commonService common.Service) { + nsReconciler := &namespacecontroller.NamespaceReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + NSXConfig: commonService.NSXConfig, + } + + if err := nsReconciler.Start(mgr); err != nil { + log.Error(err, "failed to create namespace controller", "controller", "Namespace") + os.Exit(1) + } +} + func main() { log.Info("starting NSX Operator") @@ -145,9 +179,13 @@ func main() { staticroutecontroller.StartStaticRouteController(mgr, commonService) subnetport.StartSubnetPortController(mgr, commonService) + + StartNamespaceController(mgr, commonService) + StartVPCController(mgr, commonService) } // Start the security policy controller, it supports VPC and non VPC mode StartSecurityPolicyController(mgr, commonService) + // Start the NSXServiceAccount controller. if cf.EnableAntreaNSXInterworking { StartNSXServiceAccountController(mgr, commonService) diff --git a/go.mod b/go.mod index c315a0947..88f9cde6f 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/stretchr/objx v0.4.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/mod v0.6.0 // indirect + golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.5.0 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect golang.org/x/sys v0.4.0 // indirect diff --git a/go.sum b/go.sum index b2ed9a7b0..712f6b464 100644 --- a/go.sum +++ b/go.sum @@ -371,8 +371,8 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= -golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/pkg/controllers/common/types.go b/pkg/controllers/common/types.go index 61decd0e4..d7c9b7f2f 100644 --- a/pkg/controllers/common/types.go +++ b/pkg/controllers/common/types.go @@ -15,11 +15,16 @@ const ( MetricResTypeStaticRoute = "staticroute" MetricResTypeSubnet = "subnet" MetricResTypeSubnetSet = "subnetset" + MetricResTypeVPC = "vpc" + MetricResTypeNamespace = "namespace" ) var ( - ResultNormal = ctrl.Result{} - ResultRequeue = ctrl.Result{Requeue: true} + ResultNormal = ctrl.Result{} + ResultRequeue = ctrl.Result{Requeue: true} + // for k8s events that need to retry in short loop, eg: namespace creation + ResultRequeueAfter10sec = ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second} + // for unstable event, eg: failed to k8s resources when reconciling, may due to k8s unstable ResultRequeueAfter5mins = ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Minute} ServiceMediator = mediator.ServiceMediator{} diff --git a/pkg/controllers/namespace/builder.go b/pkg/controllers/namespace/builder.go new file mode 100644 index 000000000..f13edf029 --- /dev/null +++ b/pkg/controllers/namespace/builder.go @@ -0,0 +1,19 @@ +package namespace + +import ( + "github.com/google/uuid" + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" +) + +func BuildVPCCR(ns string, ncName string, vpcName *string) *v1alpha1.VPC { + log.V(2).Info("building vpc", "ns", ns, "nc", ncName, "VPC", vpcName) + vpc := &v1alpha1.VPC{} + if vpcName == nil { + vpc.Name = "vpc-" + uuid.New().String() + } else { + vpc.Name = *vpcName + } + + vpc.Namespace = ns + return vpc +} diff --git a/pkg/controllers/namespace/namespace_controller.go b/pkg/controllers/namespace/namespace_controller.go new file mode 100644 index 000000000..fb6abe99a --- /dev/null +++ b/pkg/controllers/namespace/namespace_controller.go @@ -0,0 +1,197 @@ +/* Copyright © 2023 VMware, Inc. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 */ + +package namespace + +import ( + "context" + "errors" + "fmt" + "runtime" + "strings" + + v1 "k8s.io/api/core/v1" + apimachineryruntime "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" + "github.com/vmware-tanzu/nsx-operator/pkg/config" + "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common" + "github.com/vmware-tanzu/nsx-operator/pkg/logger" + "github.com/vmware-tanzu/nsx-operator/pkg/metrics" + _ "github.com/vmware-tanzu/nsx-operator/pkg/nsx/ratelimiter" + types "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" + "github.com/vmware-tanzu/nsx-operator/pkg/util" +) + +var ( + AnnotationNamespaceVPCError = " nsx.vmware.com/vpc_error" + + log = logger.Log +) + +// NamespaceReconciler process namespace create/delete event +type NamespaceReconciler struct { + Client client.Client + Scheme *apimachineryruntime.Scheme + NSXConfig *config.NSXOperatorConfig +} + +func (r *NamespaceReconciler) createVPCCR(ctx *context.Context, obj client.Object, ns string, ncName string, vpcName *string) (*v1alpha1.VPC, error) { + // check if vpc cr already exist under this namespace + vpcs := &v1alpha1.VPCList{} + r.Client.List(*ctx, vpcs, client.InNamespace(ns)) + if len(vpcs.Items) > 0 { + // if there is already one vpc exist under this ns, return this vpc. + log.Info("vpc cr already exist, skip creating", "VPC", vpcs.Items[0].Name) + return &vpcs.Items[0], nil + } + nc, ncExist := common.ServiceMediator.GetVPCNetworkConfig(ncName) + if !ncExist { + message := fmt.Sprintf("missing network config %s for namespace %s", ncName, ns) + r.namespaceError(ctx, obj, message, nil) + return nil, errors.New(message) + } + + //TODO: in next patch, remove this validation. If user did not provide private cidr + // use a hardcoded cidr to create a private ip block. + if !common.ServiceMediator.ValidateNetworkConfig(nc) { + // if netwrok config is not valid, no need to retry, skip processing + message := fmt.Sprintf("invalid network config %s for namespace %s, missing private cidr", ncName, ns) + r.namespaceError(ctx, obj, message, nil) + return nil, errors.New(message) + } + + // create vpc cr with exisitng vpc network config + vpcCR := BuildVPCCR(ns, ncName, vpcName) + err := r.Client.Create(*ctx, vpcCR) + if err != nil { + message := "failed to create VPC CR" + r.namespaceError(ctx, obj, message, err) + // If create VPC CR failed, put ns create event back to queue. + return nil, err + } + + changes := map[string]string{ + AnnotationNamespaceVPCError: "", + } + util.UpdateK8sResourceAnnotation(r.Client, ctx, obj, changes) + log.Info("create vpc CR", "VPC", vpcCR.Name, "Namespace", vpcCR.Namespace) + return vpcCR, nil +} + +func (r *NamespaceReconciler) namespaceError(ctx *context.Context, k8sObj client.Object, msg string, err error) { + logErr := util.If(err == nil, errors.New(msg), err).(error) + log.Error(logErr, msg) + changes := map[string]string{AnnotationNamespaceVPCError: msg} + util.UpdateK8sResourceAnnotation(r.Client, ctx, k8sObj, changes) +} + +func (r *NamespaceReconciler) insertNamespaceNetworkconfigBinding(ns string, anno map[string]string) { + if anno == nil { + log.Info("empty annotation for namespace", "Namespace", ns) + return + } + + ncName, ncExist := anno[types.AnnotationVPCNetworkConfig] + if !ncExist { + ncName = types.DefaultNetworkConfigName + } + + log.V(2).Info("insert namespace and network config mapping", "Namespace", ns, "Networkconfig", ncName) + common.ServiceMediator.RegisterNamespaceNetworkconfigBinding(ns, ncName) +} + +/* + VPC creation strategy: + +We suppose namespace should have following annotations: + - "nsx.vmware.com/vpc_name": "/" + If the ns contains this annotation, first check if the namespace in annotation is the same as + the one in ns event, if yes, create an infra VPC for it. if not, skip the whole ns event as the infra + VPC will be created its corresponding ns creation event. + - "nsx.vmware.com/vpc_network_config":"" + If ns do not contains "nsx.vmware.com/vpc_name" annotation. Use this annotation to handle VPC creation. + VPC will locate the network config with the CR name, and create VPC using its config. + - If the ns do not have either of the annotation above, then we believe it is using default VPC, try to search + default VPC in network config CR store. The default VPC network config CR's name is "default". +*/ +func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + + obj := &v1.Namespace{} + log.Info("reconciling K8s namespace", "namespace", req.NamespacedName) + metrics.CounterInc(r.NSXConfig, metrics.ControllerSyncTotal, common.MetricResTypeNamespace) + + if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { + log.Error(err, "unable to fetch namespace", "req", req.NamespacedName) + return common.ResultRequeueAfter5mins, err + } + + // processing create/update event + ns := obj.GetName() + if obj.ObjectMeta.DeletionTimestamp.IsZero() { + metrics.CounterInc(r.NSXConfig, metrics.ControllerUpdateTotal, common.MetricResTypeNamespace) + log.Info("start processing namespace create/update event", "namespace", ns) + ctx := context.Background() + annotations := obj.GetAnnotations() + r.insertNamespaceNetworkconfigBinding(ns, annotations) + // read anno "nsx.vmware.com/vpc_name", if ns contains this annotation, it means it will share + // infra VPC, if the ns in the annotation is the same as ns event, create infra VPC, if not, + // skip the event. + ncName, ncExist := annotations[types.AnnotationVPCNetworkConfig] + vpcName, nameExist := annotations[types.AnnotationVPCName] + var create_vpc_name *string + if nameExist { + log.Info("read ns annotation vpcName", "VPCNAME", vpcName) + res := strings.Split(vpcName, "/") + // The format should be namespace/vpc_name + if len(res) != 2 { + message := fmt.Sprintf("incorrect vpcName annotation %s for namespace %s", vpcName, ns) + r.namespaceError(&ctx, obj, message, nil) + // If illegal format, skip handling this event? + return common.ResultNormal, nil + } + log.Info("start to handle vpcName anno", "VPCNS", res[1], "NS", ns) + + if ns != res[0] { + log.Info("name space is using shared vpc, with vpc name anno", "VPCNAME", vpcName, "Namespace", ns) + return common.ResultNormal, nil + } + create_vpc_name = &res[1] + log.Info("creating vpc using customer defined vpc name", "VPCName", res[1]) + } + + // If ns do not have network config name tag, then use default vpc network config name + if !ncExist { + log.Info("network config name not found on ns, using default network config", "Namespace", ns) + ncName = types.DefaultNetworkConfigName + } + + if _, err := r.createVPCCR(&ctx, obj, ns, ncName, create_vpc_name); err != nil { + return common.ResultRequeueAfter10sec, nil + } + return common.ResultNormal, nil + } else { + log.Info("skip ns deletion event for ns", "Namespace", ns) + metrics.CounterInc(r.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeNamespace) + common.ServiceMediator.UnRegisterNamespaceNetworkconfigBinding(obj.GetNamespace()) + return common.ResultNormal, nil + } +} + +func (r *NamespaceReconciler) setupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1.Namespace{}). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: runtime.NumCPU(), + }). + Complete(r) +} + +// Start setup manager and launch GC +func (r *NamespaceReconciler) Start(mgr ctrl.Manager) error { + return r.setupWithManager(mgr) +} diff --git a/pkg/controllers/vpc/VPCNetworkConfiguration_handler.go b/pkg/controllers/vpc/VPCNetworkConfiguration_handler.go new file mode 100644 index 000000000..2ad168fbd --- /dev/null +++ b/pkg/controllers/vpc/VPCNetworkConfiguration_handler.go @@ -0,0 +1,89 @@ +package vpc + +import ( + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" + "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" + "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc" + vpcservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +// VPCNetworkConfigurationHandler handles VPC NetworkConfiguration event: +// - VPC Network Configuration creation: Add VPC Network Configuration into cache. +// - VPC Network Configuration deletion: Delete VPC Network Configuration from cache. + +type VPCNetworkConfigurationHandler struct { + Client client.Client + vpcService *vpc.VPCService +} + +func (h *VPCNetworkConfigurationHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) { + vpcConfigCR := e.Object.(*v1alpha1.VPCNetworkConfiguration) + vname := vpcConfigCR.GetName() + ninfo := &vpc.VPCNetworkConfigInfo{ + Name: vname, + DefaultGatewayPath: vpcConfigCR.Spec.DefaultGatewayPath, + EdgeClusterPath: vpcConfigCR.Spec.EdgeClusterPath, + NsxtProject: vpcConfigCR.Spec.NSXTProject, + ExternalIPv4Blocks: vpcConfigCR.Spec.ExternalIPv4Blocks, + PrivateIPv4CIDRs: vpcConfigCR.Spec.PrivateIPv4CIDRs, + DefaultIPv4SubnetSize: vpcConfigCR.Spec.DefaultIPv4SubnetSize, + DefaultSubnetAccessMode: vpcConfigCR.Spec.DefaultSubnetAccessMode, + } + h.vpcService.RegisterVPCNetworkConfig(vname, *ninfo) + log.Info("create vpc network config object and insert to storage", "vpc", ninfo) +} + +func (h *VPCNetworkConfigurationHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) { + // Currently we do not support deleting networkconfig + log.V(1).Info("do not support vpc network config deletion") +} + +func (h *VPCNetworkConfigurationHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) { + log.V(1).Info("VPCNetworkConfiguration generic event, do nothing") +} + +func (h *VPCNetworkConfigurationHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + log.V(1).Info("start processing vpc network config update event") + oldNc := e.ObjectOld.(*v1alpha1.VPCNetworkConfiguration) + newNc := e.ObjectNew.(*v1alpha1.VPCNetworkConfiguration) + if len(oldNc.Spec.ExternalIPv4Blocks) == len(newNc.Spec.ExternalIPv4Blocks) && + len(oldNc.Spec.PrivateIPv4CIDRs) == len(newNc.Spec.PrivateIPv4CIDRs) { + log.V(1).Info("only support updating external/private ipv4 cidr, no change") + return + } + + //TODO: known defect, this handling need to be moving to vpc reconciler by trigger an vpc cr event. + //if the appended cidr is private, no ip block created. + vpc := h.vpcService.GetVPCsByNamespace(newNc.GetNamespace())[0] + vpc.ExternalIpv4Blocks = vpcservice.TransferIpblockIDstoPaths(newNc.Spec.ExternalIPv4Blocks) + vpc.PrivateIpv4Blocks = newNc.Spec.PrivateIPv4CIDRs + path, err := common.ParseVPCResourcePath(*vpc.Path) + if err != nil { + log.Error(err, "failed to parse nsx path", "PATH", vpc.Path) + } + err = h.vpcService.NSXClient.VPCClient.Patch(path.OrgID, path.ProjectID, path.ID, vpc) + if err != nil { + log.Error(err, "Failed to update vpc model in nsx", "vpc") + } + h.vpcService.VpcStore.Update(vpc) +} + +var VPCNetworkConfigurationPredicate = predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, +} diff --git a/pkg/controllers/vpc/vpc_controller.go b/pkg/controllers/vpc/vpc_controller.go new file mode 100644 index 000000000..25943193d --- /dev/null +++ b/pkg/controllers/vpc/vpc_controller.go @@ -0,0 +1,206 @@ +/* Copyright © 2023 VMware, Inc. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 */ + +package vpc + +import ( + "context" + "errors" + "os" + "runtime" + "time" + + apimachineryruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" + "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common" + "github.com/vmware-tanzu/nsx-operator/pkg/logger" + "github.com/vmware-tanzu/nsx-operator/pkg/metrics" + _ "github.com/vmware-tanzu/nsx-operator/pkg/nsx/ratelimiter" + commonservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" + "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc" +) + +var ( + log = logger.Log + ResultNormal = common.ResultNormal + ResultRequeue = common.ResultRequeue + ResultRequeueAfter5mins = common.ResultRequeueAfter5mins + MetricResType = common.MetricResTypeVPC +) + +// VPCReconciler VPCReconcile reconciles a VPC object +type VPCReconciler struct { + Client client.Client + Scheme *apimachineryruntime.Scheme + Service *vpc.VPCService +} + +func (r *VPCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + obj := &v1alpha1.VPC{} + log.Info("reconciling VPC CR", "vpc", req.NamespacedName) + metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, common.MetricResTypeVPC) + + if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil { + log.Error(err, "unable to fetch VPC CR", "req", req.NamespacedName) + return common.ResultNormal, client.IgnoreNotFound(err) + } + + if obj.ObjectMeta.DeletionTimestamp.IsZero() { + metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateTotal, common.MetricResTypeVPC) + if !controllerutil.ContainsFinalizer(obj, commonservice.VPCFinalizerName) { + controllerutil.AddFinalizer(obj, commonservice.VPCFinalizerName) + if err := r.Client.Update(ctx, obj); err != nil { + log.Error(err, "add finalizer", "vpc", req.NamespacedName) + updateFail(r.Service.NSXConfig, &ctx, obj, &err, r.Client) + return common.ResultRequeue, err + } + log.V(1).Info("added finalizer on VPC CR", "vpc", req.NamespacedName) + } + + cVpc, err := r.Service.CreateorUpdateVPC(obj) + if err != nil { + log.Error(err, "operate failed, would retry exponentially", "vpc", req.NamespacedName) + updateFail(r.Service.NSXConfig, &ctx, obj, &err, r.Client) + return common.ResultRequeue, err + } + updateSuccess(r.Service.NSXConfig, &ctx, obj, r.Client, *cVpc.Path, "") + } else { + if controllerutil.ContainsFinalizer(obj, commonservice.VPCFinalizerName) { + metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeVPC) + vpcs := r.Service.GetVPCsByNamespace(obj.GetNamespace()) + if len(vpcs) == 0 { + // when nsx vpc not found in vpc store, do not retry + notFound := errors.New("can not find vpc from vpc stroe") + log.Error(notFound, "can not find vpc from vpc store. put event back to queue") + return common.ResultNormal, nil + } + vpc := vpcs[0] + if err := r.Service.DeleteVPC(*vpc.Path); err != nil { + log.Error(err, "delete failed, would retry exponentially", "vpc", req.NamespacedName) + deleteFail(r.Service.NSXConfig, &ctx, obj, &err, r.Client) + return common.ResultRequeueAfter10sec, err + } + + if err := r.Service.DeleteIPBlock(vpc); err != nil { + log.Error(err, "failed to delete private ip blocks for vpc", "VPC", vpc.DisplayName) + } + + controllerutil.RemoveFinalizer(obj, commonservice.VPCFinalizerName) + if err := r.Client.Update(ctx, obj); err != nil { + deleteFail(r.Service.NSXConfig, &ctx, obj, &err, r.Client) + return common.ResultRequeue, err + } + log.V(1).Info("removed finalizer", "vpc", req.NamespacedName) + deleteSuccess(r.Service.NSXConfig, &ctx, obj) + } else { + // only print a message because it's not a normal case + log.Info("finalizers cannot be recognized", "vpc", req.NamespacedName) + } + } + return common.ResultNormal, nil +} + +func (r *VPCReconciler) setupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.VPC{}). + WithOptions( + controller.Options{ + MaxConcurrentReconciles: runtime.NumCPU(), + }). + Watches( + // For created/removed network config, add/remove from vpc network config cache. + // For modified network config, currently only support appending ips to public ip blocks, + // update network config in cache and update nsx vpc object. + &source.Kind{Type: &v1alpha1.VPCNetworkConfiguration{}}, + &VPCNetworkConfigurationHandler{ + Client: mgr.GetClient(), + vpcService: r.Service, + }, + builder.WithPredicates(VPCNetworkConfigurationPredicate)). + Complete(r) +} + +// Start setup manager and launch GC +func (r *VPCReconciler) Start(mgr ctrl.Manager) error { + err := r.setupWithManager(mgr) + if err != nil { + return err + } + + go r.GarbageCollector(make(chan bool), commonservice.GCInterval) + return nil +} + +// GarbageCollector collect vpc which has been removed from crd. +// cancel is used to break the loop during UT +func (r *VPCReconciler) GarbageCollector(cancel chan bool, timeout time.Duration) { + ctx := context.Background() + log.Info("VPC garbage collector started") + for { + select { + case <-cancel: + return + case <-time.After(timeout): + } + nsxVPCList := r.Service.ListVPC() + if len(nsxVPCList) == 0 { + continue + } + + crdVPCList := &v1alpha1.VPCList{} + err := r.Client.List(ctx, crdVPCList) + if err != nil { + log.Error(err, "failed to list VPC CR") + continue + } + + crdVPCSet := sets.NewString() + for _, vc := range crdVPCList.Items { + crdVPCSet.Insert(string(vc.UID)) + } + + for _, elem := range nsxVPCList { + if crdVPCSet.Has(*elem.Id) { + continue + } + + log.V(1).Info("GC collected nsx VPC object", "ID", elem.Id) + metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeVPC) + err = r.Service.DeleteVPC(*elem.Path) + if err != nil { + metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, common.MetricResTypeVPC) + } else { + metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, common.MetricResTypeVPC) + if err := r.Service.DeleteIPBlock(elem); err != nil { + log.Error(err, "failed to delete private ip blocks for vpc", "VPC", *elem.DisplayName) + } + log.Info("deleted private ip blocks for vpc", "VPC", *elem.DisplayName) + } + } + } +} + +func StartVPCController(mgr ctrl.Manager, commonService commonservice.Service) { + vpcReconcile := VPCReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + } + if vpcService, err := vpc.InitializeVPC(commonService); err != nil { + log.Error(err, "failed to initialize vpc commonService") + os.Exit(1) + } else { + vpcReconcile.Service = vpcService + } + if err := vpcReconcile.Start(mgr); err != nil { + log.Error(err, "failed to create vpc controller") + os.Exit(1) + } +} diff --git a/pkg/controllers/vpc/vpc_utils.go b/pkg/controllers/vpc/vpc_utils.go new file mode 100644 index 000000000..866212587 --- /dev/null +++ b/pkg/controllers/vpc/vpc_utils.go @@ -0,0 +1,105 @@ +package vpc + +import ( + "context" + "fmt" + "reflect" + + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" + "github.com/vmware-tanzu/nsx-operator/pkg/config" + "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common" + "github.com/vmware-tanzu/nsx-operator/pkg/metrics" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func setVPCReadyStatusFalse(ctx *context.Context, vpc *v1alpha1.VPC, err *error, client client.Client) { + newConditions := []v1alpha1.Condition{ + { + Type: v1alpha1.Ready, + Status: v1.ConditionFalse, + Message: "NSX VPC could not be created/updated", + Reason: fmt.Sprintf("Error occurred while processing the VPC CR. Please check the config and try again. Error: %v", *err), + }, + } + updateVPCStatusConditions(ctx, vpc, newConditions, client, "", "") +} + +func updateVPCStatusConditions(ctx *context.Context, vpc *v1alpha1.VPC, newConditions []v1alpha1.Condition, client client.Client, path string, snatIP string) { + conditionsUpdated := false + statusUpdated := false + for i := range newConditions { + if mergeVPCStatusCondition(ctx, vpc, &newConditions[i]) { + conditionsUpdated = true + } + } + if vpc.Status.NSXResourcePath != path || vpc.Status.DefaultSNATIP != snatIP { + vpc.Status.NSXResourcePath = path + vpc.Status.DefaultSNATIP = snatIP + statusUpdated = true + } + + if conditionsUpdated || statusUpdated { + + client.Status().Update(*ctx, vpc) + log.V(1).Info("Updated VPC CRD", "Name", vpc.Name, "Namespace", vpc.Namespace, "New Conditions", newConditions) + } +} + +func deleteFail(nsxConfig *config.NSXOperatorConfig, c *context.Context, o *v1alpha1.VPC, e *error, client client.Client) { + setVPCReadyStatusFalse(c, o, e, client) + metrics.CounterInc(nsxConfig, metrics.ControllerDeleteFailTotal, common.MetricResTypeVPC) +} + +func updateFail(nsxConfig *config.NSXOperatorConfig, c *context.Context, o *v1alpha1.VPC, e *error, client client.Client) { + setVPCReadyStatusFalse(c, o, e, client) + metrics.CounterInc(nsxConfig, metrics.ControllerUpdateFailTotal, MetricResType) +} + +func updateSuccess(nsxConfig *config.NSXOperatorConfig, c *context.Context, o *v1alpha1.VPC, client client.Client, path string, snatIP string) { + setVPCReadyStatusTrue(c, o, client, path, snatIP) + metrics.CounterInc(nsxConfig, metrics.ControllerUpdateSuccessTotal, common.MetricResTypeVPC) +} + +func deleteSuccess(nsxConfig *config.NSXOperatorConfig, _ *context.Context, _ *v1alpha1.VPC) { + metrics.CounterInc(nsxConfig, metrics.ControllerDeleteSuccessTotal, common.MetricResTypeVPC) +} + +func setVPCReadyStatusTrue(ctx *context.Context, vpc *v1alpha1.VPC, client client.Client, path string, snatIP string) { + newConditions := []v1alpha1.Condition{ + { + Type: v1alpha1.Ready, + Status: v1.ConditionTrue, + Message: "NSX VPC has been successfully created/updated", + Reason: "NSX API returned 200 response code for PATCH", + }, + } + updateVPCStatusConditions(ctx, vpc, newConditions, client, path, snatIP) +} + +func mergeVPCStatusCondition(ctx *context.Context, vpc *v1alpha1.VPC, newCondition *v1alpha1.Condition) bool { + matchedCondition := getExistingConditionOfType(newCondition.Type, vpc.Status.Conditions) + + if reflect.DeepEqual(matchedCondition, newCondition) { + log.V(2).Info("Conditions already match", "New Condition", newCondition, "Existing Condition", matchedCondition) + return false + } + + if matchedCondition != nil { + matchedCondition.Reason = newCondition.Reason + matchedCondition.Message = newCondition.Message + matchedCondition.Status = newCondition.Status + } else { + vpc.Status.Conditions = append(vpc.Status.Conditions, *newCondition) + } + return true +} + +func getExistingConditionOfType(conditionType v1alpha1.ConditionType, existingConditions []v1alpha1.Condition) *v1alpha1.Condition { + for i := range existingConditions { + if existingConditions[i].Type == v1alpha1.ConditionType(conditionType) { + return &existingConditions[i] + } + } + return nil +} diff --git a/pkg/nsx/client.go b/pkg/nsx/client.go index b9523eaef..bf026af9c 100644 --- a/pkg/nsx/client.go +++ b/pkg/nsx/client.go @@ -9,6 +9,8 @@ import ( "strings" "github.com/sirupsen/logrus" + "github.com/vmware-tanzu/nsx-operator/pkg/config" + "github.com/vmware-tanzu/nsx-operator/pkg/nsx/ratelimiter" vspherelog "github.com/vmware/vsphere-automation-sdk-go/runtime/log" "github.com/vmware/vsphere-automation-sdk-go/runtime/protocol/client" nsx_policy "github.com/vmware/vsphere-automation-sdk-go/services/nsxt" @@ -18,15 +20,14 @@ import ( "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/infra/domains" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/infra/domains/security_policies" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/infra/sites/enforcement_points" + projects "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects" + infra "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects/infra" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects/infra/realized_state" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects/vpcs" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects/vpcs/subnets" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects/vpcs/subnets/ip_pools" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/orgs/projects/vpcs/subnets/ports" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/search" - - "github.com/vmware-tanzu/nsx-operator/pkg/config" - "github.com/vmware-tanzu/nsx-operator/pkg/nsx/ratelimiter" ) const ( @@ -51,6 +52,8 @@ type Client struct { RuleClient security_policies.RulesClient InfraClient nsx_policy.InfraClient StaticRouteClient vpcs.StaticRoutesClient + VPCClient projects.VpcsClient + IPBlockClient infra.IpBlocksClient ClusterControlPlanesClient enforcement_points.ClusterControlPlanesClient SubnetStatusClient subnets.StatusClient RealizedEntitiesClient realized_state.RealizedEntitiesClient @@ -118,12 +121,11 @@ func GetClient(cf *config.NSXOperatorConfig) *Client { securityClient := domains.NewSecurityPoliciesClient(restConnector(cluster)) ruleClient := security_policies.NewRulesClient(restConnector(cluster)) infraClient := nsx_policy.NewInfraClient(restConnector(cluster)) - staticRouteClient := vpcs.NewStaticRoutesClient(restConnector(cluster)) - + vpcClient := projects.NewVpcsClient(restConnector(cluster)) + ipBlockClient := infra.NewIpBlocksClient(restConnector(cluster)) clusterControlPlanesClient := enforcement_points.NewClusterControlPlanesClient(restConnector(cluster)) realizedEntitiesClient := realized_state.NewRealizedEntitiesClient(restConnector(cluster)) - mpQueryClient := mpsearch.NewQueryClient(restConnector(cluster)) certificatesClient := trust_management.NewCertificatesClient(restConnector(cluster)) principalIdentitiesClient := trust_management.NewPrincipalIdentitiesClient(restConnector(cluster)) @@ -155,6 +157,8 @@ func GetClient(cf *config.NSXOperatorConfig) *Client { RuleClient: ruleClient, InfraClient: infraClient, StaticRouteClient: staticRouteClient, + VPCClient: vpcClient, + IPBlockClient: ipBlockClient, ClusterControlPlanesClient: clusterControlPlanesClient, RealizedEntitiesClient: realizedEntitiesClient, diff --git a/pkg/nsx/services/common/types.go b/pkg/nsx/services/common/types.go index 0db42a70e..5b9a4bfac 100644 --- a/pkg/nsx/services/common/types.go +++ b/pkg/nsx/services/common/types.go @@ -40,21 +40,24 @@ const ( TagScopeSubnetPortCRName string = "nsx-op/subnetport_cr_name" TagScopeSubnetPortCRUID string = "nsx-op/subnetport_cr_uid" LabelDefaultSubnetSet string = "nsxoperator.vmware.com/default-subnetset-for" - // TagScopeSubnetCRType indicates that NSX Subnet is linked to Subnet CR or SubnetSet CR. - TagScopeSubnetCRType string = "nsx-op/subnet_cr_type" - TagScopeSubnetCRUID string = "nsx-op/subnet_cr_uid" - TagScopeSubnetCRName string = "nsx-op/subnet_cr_name" - TagScopeSubnetSetCRName string = "nsx-op/subnetset_cr_name" + LabelDefaultVMSubnet string = "VirtualMachine" + LabelDefaultPodSubnetSet string = "Pod" + TagScopeSubnetCRType string = "nsx-op/subnet_cr_type" + TagScopeSubnetCRUID string = "nsx-op/subnet_cr_uid" + TagScopeSubnetCRName string = "nsx-op/subnet_cr_name" + TagScopeSubnetSetCRName string = "nsx-op/subnetset_cr_name" + AnnotationVPCNetworkConfig string = "nsx.vmware.com/vpc_network_config" + AnnotationVPCName string = "nsx.vmware.com/vpc_name" + DefaultNetworkConfigName string = "default" - GCInterval = 60 * time.Second - FinalizerName = "securitypolicy.nsx.vmware.com/finalizer" - StaticRouteFinalizerName = "staticroute.nsx.vmware.com/finalizer" - LabelDefaultVMSubnet string = "VirtualMachine" - LabelDefaultPodSubnetSet string = "Pod" + GCInterval = 60 * time.Second + RealizeTimeout = 2 * time.Minute + FinalizerName = "securitypolicy.nsx.vmware.com/finalizer" + StaticRouteFinalizerName = "staticroute.nsx.vmware.com/finalizer" NSXServiceAccountFinalizerName = "nsxserviceaccount.nsx.vmware.com/finalizer" SubnetPortFinalizerName = "subnetport.nsx.vmware.com/finalizer" - RealizeTimeout = 2 * time.Minute + VPCFinalizerName = "vpc.nsx.vmware.com/finalizer" ) var ( @@ -63,6 +66,7 @@ var ( ResourceTypeGroup = "Group" ResourceTypeRule = "Rule" ResourceTypeVpc = "Vpc" + ResourceTypeIPBlock = "IpAddressBlock" ResourceTypeSubnetPort = "VpcSubnetPort" ResourceTypeVirtualMachine = "VirtualMachine" // ResourceTypeClusterControlPlane is used by NSXServiceAccountController diff --git a/pkg/nsx/services/mediator/mediator.go b/pkg/nsx/services/mediator/mediator.go index 3aff79a80..7af5a39b4 100644 --- a/pkg/nsx/services/mediator/mediator.go +++ b/pkg/nsx/services/mediator/mediator.go @@ -40,3 +40,9 @@ func (serviceMediator *ServiceMediator) ListVPCInfo(ns string) []common.VPCResou } return VPCInfoList } + +// This method is used for subnet service since vpc network config contains default subnet size +// and default subnet access mode. +func (m *ServiceMediator) GetVPCNetworkConfigByNamespace(ns string) *vpc.VPCNetworkConfigInfo { + return m.VPCService.GetVPCNetworkConfigByNamespace(ns) +} diff --git a/pkg/nsx/services/vpc/builder.go b/pkg/nsx/services/vpc/builder.go new file mode 100644 index 000000000..f48729915 --- /dev/null +++ b/pkg/nsx/services/vpc/builder.go @@ -0,0 +1,121 @@ +package vpc + +import ( + "github.com/google/uuid" + "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" + + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" + "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" + "github.com/vmware-tanzu/nsx-operator/pkg/util" +) + +var ( + DefaultVPCIPAddressType = "IPV4" + VPCLBEndpointEnabled = true +) + +// private ip block cidr is not unique, there maybe different ip blocks using same cidr, but for different vpc cr +// using cidr_vpccruid as key so that it could quickly check if ipblocks already created. +func generateIPBlockKey(block model.IpAddressBlock) string { + cidr := block.Cidr + vpc_uid := "" + for _, tag := range block.Tags { + if *tag.Scope == common.TagScopeVPCCRUID { + vpc_uid = *tag.Tag + } + } + return *cidr + "_" + vpc_uid +} + +func generateIPBlockSearchKey(cidr string, vpcCRUID string) string { + return cidr + "_" + vpcCRUID +} + +func TransferIpblockIDstoPaths(ids []string) []string { + paths := []string{} + if ids == nil { + return paths + } + + for _, id := range ids { + path := VPCIPBlockPathPrefix + id + paths = append(paths, path) + } + + return paths +} + +func buildNSXVPC(obj *v1alpha1.VPC, nc VPCNetworkConfigInfo, cluster string, pathMap map[string]string, nsxVPC *model.Vpc) (*model.Vpc, error) { + vpc := &model.Vpc{} + if nsxVPC != nil { + // for upgrade case, only check public/private ip block size changing + if !IsVPCChanged(nc, nsxVPC) { + log.Info("no changes on current nsx vpc, skip updating", "VPC", nsxVPC.Id) + return nil, nil + } + // for updating vpc case, use current vpc id, name + vpc = nsxVPC + } else { + // for creating vpc case, fill in vpc properties based on networkconfig + vpcName := "VPC_" + obj.GetNamespace() + "_" + uuid.NewString() + vpc.DisplayName = &vpcName + vpc.Id = (*string)(&obj.UID) + vpc.DefaultGatewayPath = &nc.DefaultGatewayPath + vpc.IpAddressType = &DefaultVPCIPAddressType + + siteInfos := []model.SiteInfo{ + { + EdgeClusterPaths: []string{nc.EdgeClusterPath}, + }, + } + vpc.SiteInfos = siteInfos + vpc.LoadBalancerVpcEndpoint = &model.LoadBalancerVPCEndpoint{Enabled: &VPCLBEndpointEnabled} + vpc.Tags = buildVPCTags(obj, cluster) + } + + // update private/public blocks + vpc.ExternalIpv4Blocks = TransferIpblockIDstoPaths(nc.ExternalIPv4Blocks) + vpc.PrivateIpv4Blocks = util.GetMapValues(pathMap) + + return vpc, nil +} + +func buildPrivateIPBlockTags(cluster string, project string, ns string, vpcUid string) []model.Tag { + tags := []model.Tag{ + { + Scope: common.String(common.TagScopeCluster), + Tag: common.String(cluster), + }, + { + Scope: common.String(common.TagScopeNamespace), + Tag: common.String(ns), + }, + { + Scope: common.String(common.TagScopeVPCCRUID), + Tag: common.String(vpcUid), + }, + } + return tags +} + +func buildVPCTags(obj *v1alpha1.VPC, cluster string) []model.Tag { + tags := []model.Tag{ + { + Scope: common.String(common.TagScopeCluster), + Tag: common.String(cluster), + }, + { + Scope: common.String(common.TagScopeNamespace), + Tag: common.String(obj.GetNamespace()), + }, + { + Scope: common.String(common.TagScopeVPCCRName), + Tag: common.String(obj.GetName()), + }, + { + Scope: common.String(common.TagScopeVPCCRUID), + Tag: common.String(string(obj.UID)), + }, + } + return tags +} diff --git a/pkg/nsx/services/vpc/compare.go b/pkg/nsx/services/vpc/compare.go new file mode 100644 index 000000000..aacbccea7 --- /dev/null +++ b/pkg/nsx/services/vpc/compare.go @@ -0,0 +1,19 @@ +package vpc + +import ( + "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" +) + +// currently we only support appending public/private cidrs +// so only comparing list size is enough to identify if vcp changed +func IsVPCChanged(nc VPCNetworkConfigInfo, vpc *model.Vpc) bool { + if len(nc.ExternalIPv4Blocks) != len(vpc.ExternalIpv4Blocks) { + return true + } + + if len(nc.PrivateIPv4CIDRs) != len(vpc.PrivateIpv4Blocks) { + return true + } + + return false +} diff --git a/pkg/nsx/services/vpc/store.go b/pkg/nsx/services/vpc/store.go index b5b96e58c..552b15d60 100644 --- a/pkg/nsx/services/vpc/store.go +++ b/pkg/nsx/services/vpc/store.go @@ -13,6 +13,8 @@ func keyFunc(obj interface{}) (string, error) { switch v := obj.(type) { case model.Vpc: return *v.Id, nil + case model.IpAddressBlock: + return generateIPBlockKey(obj.(model.IpAddressBlock)), nil default: return "", errors.New("keyFunc doesn't support unknown type") } @@ -25,6 +27,8 @@ func indexFunc(obj interface{}) ([]string, error) { switch o := obj.(type) { case model.Vpc: return filterTag(o.Tags), nil + case model.IpAddressBlock: + return filterTag(o.Tags), nil default: return res, errors.New("indexFunc doesn't support unknown type") } @@ -40,6 +44,32 @@ var filterTag = func(v []model.Tag) []string { return res } +// IPBlockStore is a store for private ip blocks +type IPBlockStore struct { + common.ResourceStore +} + +func (is *IPBlockStore) Operate(i interface{}) error { + if i == nil { + return nil + } + ipblock := i.(*model.IpAddressBlock) + if ipblock.MarkedForDelete != nil && *ipblock.MarkedForDelete { + err := is.Delete(*ipblock) + log.V(1).Info("delete ipblock from store", "IPBlock", ipblock) + if err != nil { + return err + } + } else { + err := is.Add(*ipblock) + log.V(1).Info("add IPBlock to store", "IPBlock", ipblock) + if err != nil { + return err + } + } + return nil +} + // VPCStore is a store for VPCs type VPCStore struct { common.ResourceStore @@ -85,3 +115,12 @@ func (vs *VPCStore) GetVPCsByNamespace(ns string) []model.Vpc { } return ret } + +func (vs *VPCStore) GetByKey(key string) *model.Vpc { + obj := vs.ResourceStore.GetByKey(key) + if obj != nil { + vpc := obj.(model.Vpc) + return &vpc + } + return nil +} diff --git a/pkg/nsx/services/vpc/types.go b/pkg/nsx/services/vpc/types.go new file mode 100644 index 000000000..fa3a421b4 --- /dev/null +++ b/pkg/nsx/services/vpc/types.go @@ -0,0 +1,12 @@ +package vpc + +type VPCNetworkConfigInfo struct { + Name string + DefaultGatewayPath string + EdgeClusterPath string + NsxtProject string + ExternalIPv4Blocks []string + PrivateIPv4CIDRs []string + DefaultIPv4SubnetSize int + DefaultSubnetAccessMode string +} diff --git a/pkg/nsx/services/vpc/vpc.go b/pkg/nsx/services/vpc/vpc.go index 0a9174fd7..2c60d4777 100644 --- a/pkg/nsx/services/vpc/vpc.go +++ b/pkg/nsx/services/vpc/vpc.go @@ -1,25 +1,94 @@ package vpc import ( + "context" + "errors" + "fmt" + "net" + "net/netip" + "strings" "sync" "github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + "github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1" "github.com/vmware-tanzu/nsx-operator/pkg/logger" "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common" + "github.com/vmware-tanzu/nsx-operator/pkg/util" ) var ( - log = logger.Log - NewConverter = common.NewConverter + log = logger.Log + ctx = context.Background() + ResourceTypeVPC = common.ResourceTypeVpc + NewConverter = common.NewConverter + // The following variables are defined as interface, they should be initialized as concrete type - vpcStore common.Store + vpcStore common.Store + ipblockStore common.Store + + // this store contains mapping relation of network config name and network config entity + VPCNetworkConfigMap = map[string]VPCNetworkConfigInfo{} + + // this map contains mapping relation between namespace and the network config it uses. + VPCNSNetworkconfigMap = map[string]string{} + + VPCDefaultOrg = "default" + VPCIPBlockPathPrefix = "/infra/ip-blocks/" + resourceType = "resource_type" + EnforceRevisionCheckParam = false + MarkedForDelete = true ) type VPCService struct { common.Service - vpcStore *VPCStore + VpcStore *VPCStore + IpblockStore *IPBlockStore +} + +func (s *VPCService) RegisterVPCNetworkConfig(ncCRName string, info VPCNetworkConfigInfo) { + VPCNetworkConfigMap[ncCRName] = info +} + +func (s *VPCService) UnregisterVPCNetworkConfig(ncCRName string) { + delete(VPCNetworkConfigMap, ncCRName) +} + +func (s *VPCService) GetVPCNetworkConfig(ncCRName string) (VPCNetworkConfigInfo, bool) { + nc, exist := VPCNetworkConfigMap[ncCRName] + return nc, exist +} + +func (s *VPCService) RegisterNamespaceNetworkconfigBinding(ns string, ncCRName string) { + VPCNSNetworkconfigMap[ns] = ncCRName +} + +func (s *VPCService) UnRegisterNamespaceNetworkconfigBinding(ns string) { + delete(VPCNSNetworkconfigMap, ns) +} + +func (s *VPCService) GetVPCNetworkConfigByNamespace(ns string) *VPCNetworkConfigInfo { + ncName, nameExist := VPCNSNetworkconfigMap[ns] + if !nameExist { + log.Info("failed to get network config name for namespace", "Namespace", ns) + return nil + } + + nc, ncExist := s.GetVPCNetworkConfig(ncName) + if !ncExist { + log.Info("failed to get network config info using network config name", "Name", ncName) + return nil + } + return &nc +} + +// TBD: for now, if network config info do not contains private cidr, we consider this is +// incorrect configuration, and skip creating this VPC CR +func (s *VPCService) ValidateNetworkConfig(nc VPCNetworkConfigInfo) bool { + return nc.PrivateIPv4CIDRs != nil && len(nc.PrivateIPv4CIDRs) != 0 } // InitializeVPC sync NSX resources @@ -28,16 +97,23 @@ func InitializeVPC(service common.Service) (*VPCService, error) { wgDone := make(chan bool) fatalErrors := make(chan error) - wg.Add(1) + wg.Add(2) VPCService := &VPCService{Service: service} - VPCService.vpcStore = &VPCStore{ResourceStore: common.ResourceStore{ + VPCService.VpcStore = &VPCStore{ResourceStore: common.ResourceStore{ Indexer: cache.NewIndexer(keyFunc, cache.Indexers{common.TagScopeVPCCRUID: indexFunc}), BindingType: model.VpcBindingType(), }} - go VPCService.InitializeResourceStore(&wg, fatalErrors, common.ResourceTypeVpc, nil, vpcStore) + VPCService.IpblockStore = &IPBlockStore{ResourceStore: common.ResourceStore{ + Indexer: cache.NewIndexer(keyFunc, cache.Indexers{common.TagScopeVPCCRUID: indexFunc}), + BindingType: model.IpAddressBlockBindingType(), + }} + + //initialize vpc store and ip blocks store + go VPCService.InitializeResourceStore(&wg, fatalErrors, common.ResourceTypeVpc, nil, VPCService.VpcStore) + go VPCService.InitializeResourceStore(&wg, fatalErrors, common.ResourceTypeIPBlock, nil, VPCService.IpblockStore) go func() { wg.Wait() @@ -56,5 +132,239 @@ func InitializeVPC(service common.Service) (*VPCService, error) { } func (s *VPCService) GetVPCsByNamespace(namespace string) []model.Vpc { - return s.vpcStore.GetVPCsByNamespace(namespace) + return s.VpcStore.GetVPCsByNamespace(namespace) +} + +func (service *VPCService) ListVPC() []model.Vpc { + vpcs := service.VpcStore.List() + vpcSet := []model.Vpc{} + for _, vpc := range vpcs { + vpcSet = append(vpcSet, vpc.(model.Vpc)) + } + return vpcSet +} + +func (service *VPCService) DeleteVPC(path string) error { + pathInfo, err := common.ParseVPCResourcePath(path) + if err != nil { + return err + } + vpcClient := service.NSXClient.VPCClient + vpc := service.VpcStore.GetByKey(pathInfo.VPCID) + if vpc == nil { + return nil + } + + if err := vpcClient.Delete(pathInfo.OrgID, pathInfo.ProjectID, pathInfo.VPCID); err != nil { + return err + } + vpc.MarkedForDelete = &MarkedForDelete + if err := service.VpcStore.Operate(vpc); err != nil { + return err + } + + log.Info("successfully deleted NSX VPC", "nsxVPC", pathInfo.VPCID) + return nil +} + +func (service *VPCService) DeleteIPBlock(vpc model.Vpc) error { + blocks := vpc.PrivateIpv4Blocks + if blocks == nil || len(blocks) == 0 { + log.Info("no private cidr list, skip deleting private ip blocks") + return nil + } + + ipblockClient := service.NSXClient.IPBlockClient + for _, block := range blocks { + parts := strings.Split(block, "/") + log.Info("deleting private ip block", "ORG", parts[2], "Project", parts[4], "ID", parts[7]) + if err := ipblockClient.Delete(parts[2], parts[4], parts[7]); err != nil { + log.Error(err, "failed to delete ip block", "PATH", block) + return err + } + vpcCRUid := "" + for _, tag := range vpc.Tags { + if *tag.Scope == common.TagScopeVPCCRUID { + vpcCRUid = *tag.Tag + } + } + log.Info("search ip block from store using index", "index", common.TagScopeVPCCRUID, "value", vpcCRUid) + // TODO: bugfix, seems using vpc cr uid can not get the ip blocks from cache, need further checking + ipblocks := service.IpblockStore.GetByIndex(common.TagScopeVPCCRUID, vpcCRUid) + if ipblocks != nil && len(ipblocks) != 0 { + log.Info("deleting ip blocks", "IPBlock", ipblocks[0]) + b := ipblocks[0].(model.IpAddressBlock) + b.MarkedForDelete = &MarkedForDelete + service.IpblockStore.Operate(&ipblocks[0]) + } + } + log.Info("successfully deleted all ip blocks") + return nil +} + +func (service *VPCService) CreatOrUpdatePrivateIPBlock(obj *v1alpha1.VPC, nc VPCNetworkConfigInfo) (map[string]string, error) { + // if network config contains PrivateIPV4CIDRs section, create private ip block for each cidr + path := map[string]string{} + if nc.PrivateIPv4CIDRs != nil { + for _, pCidr := range nc.PrivateIPv4CIDRs { + log.Info("start processing private cidr", "cidr", pCidr) + // if parse success, then check if private cidr exist, here we suppose it must be a cidr format string + ip, _, err := net.ParseCIDR(pCidr) + if err != nil { + message := fmt.Sprintf("invalid cidr %s for vpc %s", pCidr, obj.Name) + fmtError := errors.New(message) + log.Error(fmtError, message) + return nil, fmtError + } + // check if private ip block already exist + // use cidr_project_ns as search key + key := generateIPBlockSearchKey(pCidr, string(obj.UID)) + log.Info("using key to search from ipblock store", "key", key) + block := service.IpblockStore.GetByKey(key) + if block == nil { + log.Info("no ip block found in stroe for cidr", "cidr", pCidr) + blockId := nc.NsxtProject + "_" + ip.String() + "_" + obj.Namespace + addr, _ := netip.ParseAddr(ip.String()) + ipType := util.If(addr.Is4(), model.IpAddressBlock_IP_ADDRESS_TYPE_IPV4, model.IpAddressBlock_IP_ADDRESS_TYPE_IPV6).(string) + blockType := model.IpAddressBlock_VISIBILITY_PRIVATE + block := model.IpAddressBlock{ + DisplayName: &blockId, + Id: &blockId, + Tags: buildPrivateIPBlockTags(service.NSXConfig.Cluster, nc.NsxtProject, obj.Namespace, string(obj.UID)), + Cidr: &pCidr, + IpAddressType: &ipType, + Visibility: &blockType, + } + log.Info("creating ip block", "IPBlock", blockId, "VPC", obj.Name) + // can not find private ip block from store, create one + _err := service.NSXClient.IPBlockClient.Patch(VPCDefaultOrg, nc.NsxtProject, blockId, block) + if _err != nil { + message := fmt.Sprintf("failed to create private ip block for cidr %s for vpc %s", pCidr, obj.Name) + ipblockError := errors.New(message) + log.Error(ipblockError, message) + return nil, ipblockError + } + createdBlock, err := service.NSXClient.IPBlockClient.Get(VPCDefaultOrg, nc.NsxtProject, blockId) + if err != nil { + // created by can not get, ignore this error + log.Info("failed to read ip blocks from nsxt", "Project", nc.NsxtProject, "IPBlock", blockId) + continue + } + // update ip block store + service.IpblockStore.Add(createdBlock) + path[pCidr] = *createdBlock.Path + } else { + eBlock := block.(model.IpAddressBlock) + path[pCidr] = *eBlock.Path + log.Info("ip block found in stroe for cidr using key", "cidr", pCidr, "key", key) + } + } + } + return path, nil +} + +func (s *VPCService) getNetworkconfigNameFromNS(ns string) (string, error) { + obj := &v1.Namespace{} + if err := s.Client.Get(ctx, types.NamespacedName{ + Name: ns, + Namespace: ns, + }, obj); err != nil { + log.Error(err, "failed to fetch namespace", "Namespace", ns) + return "", err + } + + annos := obj.Annotations + if annos == nil || len(annos) == 0 { + return common.DefaultNetworkConfigName, nil + } + + ncName, exist := annos[common.AnnotationVPCNetworkConfig] + if !exist { + return common.DefaultNetworkConfigName, nil + } + return ncName, nil +} + +func (service *VPCService) CreateorUpdateVPC(obj *v1alpha1.VPC) (*model.Vpc, error) { + // check from VPC store if vpc already exist + updateVpc := false + existingVPC := service.VpcStore.GetVPCsByNamespace(obj.Namespace) + if existingVPC != nil && len(existingVPC) != 0 { // We now consider only one VPC for one namespace + updateVpc = true + log.Info("VPC already exist, updating vpc object") + } + + // read corresponding vpc network config from store + nc_name, err := service.getNetworkconfigNameFromNS(obj.Namespace) + if err != nil { + log.Error(err, "failed to get network config name for vpc", "VPC", obj.Name) + return nil, err + } + nc, _exist := service.GetVPCNetworkConfig(nc_name) + if !_exist { + message := fmt.Sprintf("network config %s not found", nc_name) + log.Info(message) + return nil, errors.New(message) + } + + log.Info("read network config from store", "NetworkConfig", nc_name) + + paths, err := service.CreatOrUpdatePrivateIPBlock(obj, nc) + if err != nil { + log.Error(err, "failed to process private ip blocks, push event back to queue") + return nil, err + } + + // if all private ip blocks are created, then create nsx vpc resource. + nsxVPC := &model.Vpc{} + if updateVpc { + log.Info("vpc resource already exist on nsx, updating vpc", "VPC", existingVPC[0].DisplayName) + nsxVPC = &existingVPC[0] + } else { + log.Info("vpc does not exist on nsx, creating vpc", "VPC", obj.Name) + nsxVPC = nil + } + + createdVpc, err := buildNSXVPC(obj, nc, service.NSXConfig.Cluster, paths, nsxVPC) + if err != nil { + log.Error(err, "failed to build nsx vpc object") + return nil, err + } + + // if there is not change in public cidr and private cidr, build partial vpc will return nil + if createdVpc == nil { + log.Info("no vpc changed, skip create or update process") + return &existingVPC[0], nil + } + + log.Info("creating nsx vpc resource", "VPC", *createdVpc.Id) + err = service.NSXClient.VPCClient.Patch(VPCDefaultOrg, nc.NsxtProject, *createdVpc.Id, *createdVpc) + if err != nil { + log.Error(err, "failed to create vpc", "Project", nc.NsxtProject, "Namespace", obj.Namespace) + // TODO: this seems to be a nsx bug, in some case, even if nsx returns failed but the object is still created. + // in this condition, we still need to read the object and update it into store, or else operator will create multiple + // vpcs for this namespace. + log.Info("try to read vpc object although vpc creation failed", "VPC", *createdVpc.Id) + failedVpc, rErr := service.NSXClient.VPCClient.Get(VPCDefaultOrg, nc.NsxtProject, *createdVpc.Id) + if rErr != nil { + // failed to read, but already created, we consider this scenario as success, but store may not sync with nsx + log.Info("confirmed vpc is not created", "VPC", createdVpc.Id) + return nil, err + } else { + // vpc created anyway, update store, and in this scenario, we condsider creating successfully + log.Info("read vpcs from nsx after creation failed, still update vpc store", "VPC", *createdVpc.Id) + service.VpcStore.Add(failedVpc) + return &failedVpc, nil + } + } + + newVpc, err := service.NSXClient.VPCClient.Get(VPCDefaultOrg, nc.NsxtProject, *createdVpc.Id) + if err != nil { + // failed to read, but already created, we consider this scenario as success, but store may not sync with nsx + log.Error(err, "failed to read vpc object after creating", "VPC", createdVpc.Id) + return &newVpc, nil + } + + service.VpcStore.Add(newVpc) + return &newVpc, nil } diff --git a/pkg/nsx/services/vpc/vpc_test.go b/pkg/nsx/services/vpc/vpc_test.go index b457f7d4d..d5af3b877 100644 --- a/pkg/nsx/services/vpc/vpc_test.go +++ b/pkg/nsx/services/vpc/vpc_test.go @@ -55,7 +55,7 @@ func TestVPC_GetVPCsByNamespace(t *testing.T) { service := &VPCService{ Service: common.Service{NSXClient: nil}, } - service.vpcStore = vpcStore + service.VpcStore = vpcStore type args struct { i interface{} j interface{} diff --git a/pkg/util/utils.go b/pkg/util/utils.go index 20baa3252..888c9452a 100644 --- a/pkg/util/utils.go +++ b/pkg/util/utils.go @@ -178,3 +178,64 @@ func CalculateIPFromCIDRs(IPAddresses []string) (int, error) { } return total, nil } + +func If(condition bool, trueVal, falseVal interface{}) interface{} { + if condition { + return trueVal + } else { + return falseVal + } +} + +func GetMapValues(in interface{}) []string { + if in == nil { + return make([]string, 0) + } + switch in.(type) { + case map[string]string: + ssMap := in.(map[string]string) + values := make([]string, 0, len(ssMap)) + for _, v := range ssMap { + values = append(values, v) + } + return values + default: + log.Info("Unsupported map format") + return nil + } +} + +// the changes map contains key/value map that you want to change. +// if giving empty value for a key in changes map like: "mykey":"", that means removing this annotation from k8s resource +func UpdateK8sResourceAnnotation(client client.Client, ctx *context.Context, k8sObj client.Object, changes map[string]string) error { + needUpdate := false + anno := k8sObj.GetAnnotations() // here it may return a nil because ns do not have annotations. + newAnno := If(anno == nil, map[string]string{}, anno).(map[string]string) + for key, value := range changes { + // if value is not none, it means this key/value need to add/update + if value != "" { + needUpdate = true + newAnno[key] = value + } else { // if value is empty, then this key/value need to be removed from map + _, exist := newAnno[key] + if exist { + delete(newAnno, key) + needUpdate = true + } else { + log.Info("No need to change ns annotation") + needUpdate = false + } + } + } + // update k8s object + k8sObj.SetAnnotations(newAnno) + + // only send update request when it is needed + if needUpdate { + err := client.Update(*ctx, k8sObj) + if err != nil { + return err + } + } + return nil +}