From 9b0b6f6eb03412876d769ff9ebd0c8054afd5da7 Mon Sep 17 00:00:00 2001 From: Yury Kulazhenkov Date: Tue, 4 Jun 2024 16:51:35 +0300 Subject: [PATCH] Add node controller for CIDRPool CR Signed-off-by: Yury Kulazhenkov --- cmd/ipam-node/app/app.go | 11 ++ cmd/ipam-node/app/app_test.go | 166 ++++++++++-------- pkg/ip/cidr.go | 18 ++ pkg/ip/cidr_test.go | 32 ++++ .../controllers/cidrpool/cidrpool.go | 108 ++++++++++++ 5 files changed, 264 insertions(+), 71 deletions(-) create mode 100644 pkg/ipam-node/controllers/cidrpool/cidrpool.go diff --git a/cmd/ipam-node/app/app.go b/cmd/ipam-node/app/app.go index a61071e..bc2dfdc 100644 --- a/cmd/ipam-node/app/app.go +++ b/cmd/ipam-node/app/app.go @@ -58,6 +58,7 @@ import ( "github.com/Mellanox/nvidia-k8s-ipam/pkg/common" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/allocator" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner" + cidrpoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/cidrpool" ippoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/ippool" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/grpc/middleware" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/handlers" @@ -157,6 +158,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio k8sClient, err := client.New(config, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) if err != nil { logger.Error(err, "unable to direct k8s client") + return err } if err = (&ippoolctrl.IPPoolReconciler{ @@ -168,6 +170,15 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio logger.Error(err, "unable to create controller", "controller", "IPPool") return err } + if err = (&cidrpoolctrl.CIDRPoolReconciler{ + PoolManager: poolManager, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + NodeName: opts.NodeName, + }).SetupWithManager(mgr); err != nil { + logger.Error(err, "unable to create controller", "controller", "CIDRPool") + return err + } if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { logger.Error(err, "unable to set up health check") diff --git a/cmd/ipam-node/app/app_test.go b/cmd/ipam-node/app/app_test.go index cab85f5..50ecadc 100644 --- a/cmd/ipam-node/app/app_test.go +++ b/cmd/ipam-node/app/app_test.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1" @@ -44,7 +43,7 @@ const ( testNamespace = "default" ) -func createTestPools() { +func createTestIPPools() { pool1 := &ipamv1alpha1.IPPool{ ObjectMeta: metav1.ObjectMeta{Name: testPoolName1, Namespace: testNamespace}, Spec: ipamv1alpha1.IPPoolSpec{ @@ -54,6 +53,15 @@ func createTestPools() { }, } ExpectWithOffset(1, k8sClient.Create(ctx, pool1)) + pool1.Status = ipamv1alpha1.IPPoolStatus{ + Allocations: []ipamv1alpha1.Allocation{ + { + NodeName: testNodeName, + StartIP: "192.168.0.2", + EndIP: "192.168.0.254", + }, + }} + ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool1)) pool2 := &ipamv1alpha1.IPPool{ ObjectMeta: metav1.ObjectMeta{Name: testPoolName2, Namespace: testNamespace}, @@ -64,47 +72,57 @@ func createTestPools() { }, } ExpectWithOffset(1, k8sClient.Create(ctx, pool2)) - - // Update statuses with range allocation - Eventually(func(g Gomega) error { - status := ipamv1alpha1.IPPoolStatus{ - Allocations: []ipamv1alpha1.Allocation{ - { - NodeName: testNodeName, - StartIP: "192.168.0.2", - EndIP: "192.168.0.254", - }, - }, - } - return updatePoolStatus(testPoolName1, status) - }, 30, 5).Should(Not(HaveOccurred())) - - Eventually(func(g Gomega) error { - status := ipamv1alpha1.IPPoolStatus{ - Allocations: []ipamv1alpha1.Allocation{ - { - NodeName: testNodeName, - StartIP: "10.100.0.2", - EndIP: "10.100.0.254", - }, + pool2.Status = ipamv1alpha1.IPPoolStatus{ + Allocations: []ipamv1alpha1.Allocation{ + { + NodeName: testNodeName, + StartIP: "10.100.0.2", + EndIP: "10.100.0.254", }, - } - return updatePoolStatus(testPoolName2, status) - }, 30, 5).Should(Not(HaveOccurred())) + }} + ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool2)) } -func updatePoolStatus(poolName string, status ipamv1alpha1.IPPoolStatus) error { - pool := &ipamv1alpha1.IPPool{} - err := k8sClient.Get(ctx, types.NamespacedName{Name: poolName, Namespace: testNamespace}, pool) - if err != nil { - return err +func createTestCIDRPools() { + pool1GatewayIndex := uint(1) + pool1 := &ipamv1alpha1.CIDRPool{ + ObjectMeta: metav1.ObjectMeta{Name: testPoolName1, Namespace: testNamespace}, + Spec: ipamv1alpha1.CIDRPoolSpec{ + CIDR: "192.100.0.0/16", + GatewayIndex: &pool1GatewayIndex, + PerNodeNetworkPrefix: 24, + Exclusions: []ipamv1alpha1.ExcludeRange{ + {StartIP: "192.100.0.1", EndIP: "192.100.0.10"}, + }, + }, } - pool.Status = status - err = k8sClient.Status().Update(ctx, pool) - if err != nil { - return err + ExpectWithOffset(1, k8sClient.Create(ctx, pool1)) + pool1.Status = ipamv1alpha1.CIDRPoolStatus{ + Allocations: []ipamv1alpha1.CIDRPoolAllocation{ + { + NodeName: testNodeName, + Prefix: "192.100.0.0/24", + Gateway: "192.100.0.1", + }, + }} + ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool1)) + + pool2 := &ipamv1alpha1.CIDRPool{ + ObjectMeta: metav1.ObjectMeta{Name: testPoolName2, Namespace: testNamespace}, + Spec: ipamv1alpha1.CIDRPoolSpec{ + CIDR: "10.200.0.0/24", + PerNodeNetworkPrefix: 31, + }, } - return nil + ExpectWithOffset(1, k8sClient.Create(ctx, pool2)) + pool2.Status = ipamv1alpha1.CIDRPoolStatus{ + Allocations: []ipamv1alpha1.CIDRPoolAllocation{ + { + NodeName: testNodeName, + Prefix: "10.200.0.0/31", + }, + }} + ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool2)) } func createTestPod() *corev1.Pod { @@ -161,10 +179,13 @@ var _ = Describe("IPAM Node daemon", func() { It("Validate main flows", func() { done := make(chan interface{}) go func() { + defer GinkgoRecover() + defer close(done) testDir := GinkgoT().TempDir() opts := getOptions(testDir) - createTestPools() + createTestIPPools() + createTestCIDRPools() pod := createTestPod() ctx = logr.NewContext(ctx, klog.NewKlogr()) @@ -180,38 +201,41 @@ var _ = Describe("IPAM Node daemon", func() { grpcClient := nodev1.NewIPAMServiceClient(conn) - params := getValidReqParams(string(pod.UID), pod.Name, pod.Namespace) - - // no allocation yet - _, err = grpcClient.IsAllocated(ctx, - &nodev1.IsAllocatedRequest{Parameters: params}) - Expect(status.Code(err) == codes.NotFound).To(BeTrue()) - - // allocate - resp, err := grpcClient.Allocate(ctx, &nodev1.AllocateRequest{Parameters: params}) - Expect(err).NotTo(HaveOccurred()) - Expect(resp.Allocations).To(HaveLen(2)) - Expect(resp.Allocations[0].Pool).NotTo(BeEmpty()) - Expect(resp.Allocations[0].Gateway).NotTo(BeEmpty()) - Expect(resp.Allocations[0].Ip).NotTo(BeEmpty()) - - _, err = grpcClient.IsAllocated(ctx, - &nodev1.IsAllocatedRequest{Parameters: params}) - Expect(err).NotTo(HaveOccurred()) - - // deallocate - _, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params}) - Expect(err).NotTo(HaveOccurred()) - - // deallocate should be idempotent - _, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params}) - Expect(err).NotTo(HaveOccurred()) - - // check should fail - _, err = grpcClient.IsAllocated(ctx, - &nodev1.IsAllocatedRequest{Parameters: params}) - Expect(status.Code(err) == codes.NotFound).To(BeTrue()) - close(done) + cidrPoolParams := getValidReqParams(string(pod.UID), pod.Name, pod.Namespace) + cidrPoolParams.PoolType = nodev1.PoolType_POOL_TYPE_CIDRPOOL + ipPoolParams := getValidReqParams(string(pod.UID), pod.Name, pod.Namespace) + + for _, params := range []*nodev1.IPAMParameters{ipPoolParams, cidrPoolParams} { + // no allocation yet + _, err = grpcClient.IsAllocated(ctx, + &nodev1.IsAllocatedRequest{Parameters: params}) + Expect(status.Code(err) == codes.NotFound).To(BeTrue()) + + // allocate + resp, err := grpcClient.Allocate(ctx, &nodev1.AllocateRequest{Parameters: params}) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Allocations).To(HaveLen(2)) + Expect(resp.Allocations[0].Pool).NotTo(BeEmpty()) + Expect(resp.Allocations[0].Gateway).NotTo(BeEmpty()) + Expect(resp.Allocations[0].Ip).NotTo(BeEmpty()) + + _, err = grpcClient.IsAllocated(ctx, + &nodev1.IsAllocatedRequest{Parameters: params}) + Expect(err).NotTo(HaveOccurred()) + + // deallocate + _, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params}) + Expect(err).NotTo(HaveOccurred()) + + // deallocate should be idempotent + _, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params}) + Expect(err).NotTo(HaveOccurred()) + + // check should fail + _, err = grpcClient.IsAllocated(ctx, + &nodev1.IsAllocatedRequest{Parameters: params}) + Expect(status.Code(err) == codes.NotFound).To(BeTrue()) + } }() Eventually(done, 5*time.Minute).Should(BeClosed()) }) diff --git a/pkg/ip/cidr.go b/pkg/ip/cidr.go index 0c66232..8bbbf59 100644 --- a/pkg/ip/cidr.go +++ b/pkg/ip/cidr.go @@ -147,6 +147,24 @@ func IsBroadcast(ip net.IP, network *net.IPNet) bool { return ip.Equal(masked) } +// IsPointToPointSubnet returns true if the network is point to point (/31 or /127) +func IsPointToPointSubnet(network *net.IPNet) bool { + ones, maskLen := network.Mask.Size() + return ones == maskLen-1 +} + +// LastIP returns the last IP of a subnet, excluding the broadcast if IPv4 (if not /31 net) +func LastIP(network *net.IPNet) net.IP { + var end net.IP + for i := 0; i < len(network.IP); i++ { + end = append(end, network.IP[i]|^network.Mask[i]) + } + if network.IP.To4() != nil && !IsPointToPointSubnet(network) { + end[3]-- + } + return end +} + // GetSubnetGen returns generator function that can be called multiple times // to generate subnet for the network with the prefix size. // The function always returns non-nil function. diff --git a/pkg/ip/cidr_test.go b/pkg/ip/cidr_test.go index 1f98499..f45f65d 100644 --- a/pkg/ip/cidr_test.go +++ b/pkg/ip/cidr_test.go @@ -374,4 +374,36 @@ var _ = Describe("CIDR functions", func() { Expect(gen().String()).To(Equal("::4/127")) }) }) + Context("IsPointToPointSubnet", func() { + It("/31", func() { + _, network, _ := net.ParseCIDR("192.168.1.0/31") + Expect(IsPointToPointSubnet(network)).To(BeTrue()) + }) + It("/127", func() { + _, network, _ := net.ParseCIDR("2002:0:0:1234::1/127") + Expect(IsPointToPointSubnet(network)).To(BeTrue()) + }) + It("/24", func() { + _, network, _ := net.ParseCIDR("192.168.1.0/24") + Expect(IsPointToPointSubnet(network)).To(BeFalse()) + }) + }) + Context("LastIP", func() { + It("/31", func() { + _, network, _ := net.ParseCIDR("192.168.1.0/31") + Expect(LastIP(network).String()).To(Equal("192.168.1.1")) + }) + It("/127", func() { + _, network, _ := net.ParseCIDR("2002:0:0:1234::0/127") + Expect(LastIP(network).String()).To(Equal("2002:0:0:1234::1")) + }) + It("/24", func() { + _, network, _ := net.ParseCIDR("192.168.1.0/24") + Expect(LastIP(network).String()).To(Equal("192.168.1.254")) + }) + It("/64", func() { + _, network, _ := net.ParseCIDR("2002:0:0:1234::0/64") + Expect(LastIP(network).String()).To(Equal("2002::1234:ffff:ffff:ffff:ffff")) + }) + }) }) diff --git a/pkg/ipam-node/controllers/cidrpool/cidrpool.go b/pkg/ipam-node/controllers/cidrpool/cidrpool.go new file mode 100644 index 0000000..f83b63a --- /dev/null +++ b/pkg/ipam-node/controllers/cidrpool/cidrpool.go @@ -0,0 +1,108 @@ +/* + Copyright 2023, NVIDIA CORPORATION & AFFILIATES + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package controllers + +import ( + "context" + "net" + + apiErrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" + "github.com/Mellanox/nvidia-k8s-ipam/pkg/common" + "github.com/Mellanox/nvidia-k8s-ipam/pkg/ip" + "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" +) + +// CIDRPoolReconciler reconciles CIDRPool objects +type CIDRPoolReconciler struct { + PoolManager pool.Manager + client.Client + Scheme *runtime.Scheme + NodeName string +} + +// Reconcile contains logic to sync CIDRPool objects +func (r *CIDRPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLog := log.FromContext(ctx) + cidrPool := &ipamv1alpha1.CIDRPool{} + poolKey := common.GetPoolKey(req.Name, common.PoolTypeCIDRPool) + err := r.Client.Get(ctx, req.NamespacedName, cidrPool) + if err != nil { + if apiErrors.IsNotFound(err) { + reqLog.Info("CIDRPool not found, removing from PoolManager") + r.PoolManager.RemovePool(poolKey) + return ctrl.Result{}, nil + } + reqLog.Error(err, "failed to get CIDRPool object from the cache") + return ctrl.Result{}, err + } + reqLog.Info("Notification on CIDRPool", "name", cidrPool.Name) + found := false + + if errList := cidrPool.Validate(); len(errList) > 0 { + reqLog.Info("CIDRPool has invalid config, ignore the pool", "name", + cidrPool.Name, "reason", errList.ToAggregate().Error()) + r.PoolManager.RemovePool(poolKey) + return ctrl.Result{}, nil + } + + for _, alloc := range cidrPool.Status.Allocations { + if alloc.NodeName == r.NodeName { + if errList := alloc.Validate(cidrPool); len(errList) > 0 { + reqLog.Info("CIDRPool has invalid allocation for the node, ignore the pool", + "name", cidrPool.Name, "reason", errList.ToAggregate().Error()) + r.PoolManager.RemovePool(poolKey) + return ctrl.Result{}, nil + } + _, nodeSubnet, _ := net.ParseCIDR(alloc.Prefix) + startIP := ip.NextIP(nodeSubnet.IP) + if ip.IsPointToPointSubnet(nodeSubnet) { + startIP = nodeSubnet.IP + } + exclusions := make([]pool.ExclusionRange, 0, len(cidrPool.Spec.Exclusions)) + for _, e := range cidrPool.Spec.Exclusions { + exclusions = append(exclusions, pool.ExclusionRange{StartIP: e.StartIP, EndIP: e.EndIP}) + } + cidrPool := &pool.Pool{ + Name: cidrPool.Name, + Subnet: alloc.Prefix, + Gateway: alloc.Gateway, + StartIP: startIP.String(), + EndIP: ip.LastIP(nodeSubnet).String(), + Exclusions: exclusions, + } + reqLog.Info("CIDRPool config updated", "name", cidrPool.Name) + r.PoolManager.UpdatePool(poolKey, cidrPool) + found = true + break + } + } + if !found { + reqLog.Info("CIDRPool config removed", "name", cidrPool.Name, "reason", "allocation not found") + r.PoolManager.RemovePool(poolKey) + } + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *CIDRPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&ipamv1alpha1.CIDRPool{}). + Complete(r) +}