From 04161b8f11ceb2aeb3ce966b36e500a7cde4f885 Mon Sep 17 00:00:00 2001 From: Yury Kulazhenkov Date: Fri, 31 May 2024 15:13:14 +0300 Subject: [PATCH 1/2] Add subnet generator Signed-off-by: Yury Kulazhenkov --- pkg/ip/cidr.go | 42 ++++++++++++++++++++++++++++++++++ pkg/ip/cidr_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/pkg/ip/cidr.go b/pkg/ip/cidr.go index 156351a..0c66232 100644 --- a/pkg/ip/cidr.go +++ b/pkg/ip/cidr.go @@ -146,3 +146,45 @@ func IsBroadcast(ip net.IP, network *net.IPNet) bool { binary.BigEndian.Uint32(network.IP.To4())|^binary.BigEndian.Uint32(net.IP(network.Mask).To4())) return ip.Equal(masked) } + +// GetSubnetGen returns generator function that can be called multiple times +// to generate subnet for the network with the prefix size. +// The function always returns non-nil function. +// The generator function will return nil If subnet can't be generate +// (invalid input args provided, or no more subnets available for the network). +// Example: +// _, network, _ := net.ParseCIDR("192.168.0.0/23") +// gen := GetSubnetGen(network, 25) +// println(gen().String()) // 192.168.0.0/25 +// println(gen().String()) // 192.168.0.128/25 +// println(gen().String()) // 192.168.1.0/25 +// println(gen().String()) // 192.168.1.128/25 +// println(gen().String()) // - no more ranges available +func GetSubnetGen(network *net.IPNet, prefixSize uint) func() *net.IPNet { + networkOnes, netBitsTotal := network.Mask.Size() + if prefixSize < uint(networkOnes) || prefixSize > uint(netBitsTotal) { + return func() *net.IPNet { return nil } + } + isIPv6 := false + if network.IP.To4() == nil { + isIPv6 = true + } + networkIPAsInt := ipToInt(network.IP) + subnetIPCount := big.NewInt(0).Exp(big.NewInt(2), big.NewInt(int64(netBitsTotal)-int64(prefixSize)), nil) + subnetCount := big.NewInt(0).Exp(big.NewInt(2), big.NewInt(int64(prefixSize)-int64(networkOnes)), nil) + + curSubnetIndex := big.NewInt(0) + + return func() *net.IPNet { + if curSubnetIndex.Cmp(subnetCount) >= 0 { + return nil + } + subnetIPAsInt := big.NewInt(0).Add(networkIPAsInt, big.NewInt(0).Mul(subnetIPCount, curSubnetIndex)) + curSubnetIndex.Add(curSubnetIndex, big.NewInt(1)) + subnetIP := intToIP(subnetIPAsInt, isIPv6) + if subnetIP == nil { + return nil + } + return &net.IPNet{IP: subnetIP, Mask: net.CIDRMask(int(prefixSize), netBitsTotal)} + } +} diff --git a/pkg/ip/cidr_test.go b/pkg/ip/cidr_test.go index a6fc49b..1f98499 100644 --- a/pkg/ip/cidr_test.go +++ b/pkg/ip/cidr_test.go @@ -319,4 +319,59 @@ var _ = Describe("CIDR functions", func() { Expect(result).To(Equal(test.result)) } }) + + Context("GetSubnetGen", func() { + It("Invalid args - prefix is larger then network", func() { + _, net, _ := net.ParseCIDR("192.168.0.0/16") + gen := GetSubnetGen(net, 8) + Expect(gen).NotTo(BeNil()) + Expect(gen()).To(BeNil()) + }) + It("Invalid args - prefix is too small for IPv4", func() { + _, net, _ := net.ParseCIDR("192.168.0.0/16") + gen := GetSubnetGen(net, 120) + Expect(gen).NotTo(BeNil()) + Expect(gen()).To(BeNil()) + }) + It("Valid - single subnet IPv4", func() { + _, net, _ := net.ParseCIDR("192.168.0.0/24") + gen := GetSubnetGen(net, 24) + Expect(gen).NotTo(BeNil()) + Expect(gen().String()).To(Equal("192.168.0.0/24")) + Expect(gen()).To(BeNil()) + }) + It("Valid - single subnet IPv6", func() { + _, net, _ := net.ParseCIDR("2002:0:0:1234::/64") + gen := GetSubnetGen(net, 64) + Expect(gen).NotTo(BeNil()) + Expect(gen().String()).To(Equal("2002:0:0:1234::/64")) + Expect(gen()).To(BeNil()) + }) + It("valid - IPv4", func() { + _, net, _ := net.ParseCIDR("192.168.4.0/23") + gen := GetSubnetGen(net, 25) + Expect(gen).NotTo(BeNil()) + Expect(gen().String()).To(Equal("192.168.4.0/25")) + Expect(gen().String()).To(Equal("192.168.4.128/25")) + Expect(gen().String()).To(Equal("192.168.5.0/25")) + Expect(gen().String()).To(Equal("192.168.5.128/25")) + Expect(gen()).To(BeNil()) + }) + It("valid - IPv6", func() { + _, net, _ := net.ParseCIDR("2002:0:0:1234::/64") + gen := GetSubnetGen(net, 124) + Expect(gen).NotTo(BeNil()) + Expect(gen().String()).To(Equal("2002:0:0:1234::/124")) + Expect(gen().String()).To(Equal("2002:0:0:1234::10/124")) + Expect(gen().String()).To(Equal("2002:0:0:1234::20/124")) + }) + It("valid - large IPv6 subnet (overflow test)", func() { + _, net, _ := net.ParseCIDR("::/0") + gen := GetSubnetGen(net, 127) + Expect(gen).NotTo(BeNil()) + Expect(gen().String()).To(Equal("::/127")) + Expect(gen().String()).To(Equal("::2/127")) + Expect(gen().String()).To(Equal("::4/127")) + }) + }) }) From 8324b83cbcd5b00038b7bca9c7c91cc68f144248 Mon Sep 17 00:00:00 2001 From: Yury Kulazhenkov Date: Fri, 31 May 2024 17:21:39 +0300 Subject: [PATCH 2/2] Controller implementation for CIDRPool Signed-off-by: Yury Kulazhenkov --- api/v1alpha1/cidrpool_webhook.go | 61 ++ cmd/ipam-controller/app/app.go | 36 +- cmd/ipam-controller/app/app_suite_test.go | 1 - cmd/ipam-controller/app/app_test.go | 596 ++++++++++++------ deploy/manifests/controller/role.yaml | 2 + deploy/manifests/node/role.yaml | 1 + deploy/manifests/webhook/manifests.yaml | 20 + .../controllers/cidrpool/cidrpool.go | 293 +++++++++ pkg/ipam-controller/controllers/node/node.go | 25 +- 9 files changed, 808 insertions(+), 227 deletions(-) create mode 100644 api/v1alpha1/cidrpool_webhook.go create mode 100644 pkg/ipam-controller/controllers/cidrpool/cidrpool.go diff --git a/api/v1alpha1/cidrpool_webhook.go b/api/v1alpha1/cidrpool_webhook.go new file mode 100644 index 0000000..d45a2e0 --- /dev/null +++ b/api/v1alpha1/cidrpool_webhook.go @@ -0,0 +1,61 @@ +/* + Copyright 2024, NVIDIA CORPORATION & AFFILIATES + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logPkg "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +var cidrPoolLogger = logPkg.Log.WithName("CIDRPool-validator") + +// SetupWebhookWithManager registers webhook handler in the manager +func (r *CIDRPool) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(r). + Complete() +} + +var _ webhook.Validator = &CIDRPool{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (r *CIDRPool) ValidateCreate() (admission.Warnings, error) { + cidrPoolLogger.V(1).Info("validate create", "name", r.Name) + return r.validate() +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (r *CIDRPool) ValidateUpdate(_ runtime.Object) (admission.Warnings, error) { + cidrPoolLogger.V(1).Info("validate update", "name", r.Name) + return r.validate() +} + +func (r *CIDRPool) validate() (admission.Warnings, error) { + errList := r.Validate() + if len(errList) == 0 { + cidrPoolLogger.V(1).Info("validation succeed") + return nil, nil + } + err := errList.ToAggregate() + cidrPoolLogger.V(1).Info("validation failed", "reason", err.Error()) + return nil, err +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (r *CIDRPool) ValidateDelete() (admission.Warnings, error) { + return nil, nil +} diff --git a/cmd/ipam-controller/app/app.go b/cmd/ipam-controller/app/app.go index 502ab7c..7c09178 100644 --- a/cmd/ipam-controller/app/app.go +++ b/cmd/ipam-controller/app/app.go @@ -44,7 +44,8 @@ import ( ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" "github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-controller/app/options" "github.com/Mellanox/nvidia-k8s-ipam/pkg/common" - poolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/controllers/ippool" + cidrpoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/controllers/cidrpool" + ippoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/controllers/ippool" nodectrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/controllers/node" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/migrator" "github.com/Mellanox/nvidia-k8s-ipam/pkg/version" @@ -150,14 +151,16 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio os.Exit(1) } - nodeEventCH := make(chan event.GenericEvent, 1) + ipPoolNodeEventCH := make(chan event.GenericEvent, 1) + cidrPoolNodeEventCH := make(chan event.GenericEvent, 1) if err = (&nodectrl.NodeReconciler{ - NodeEventCh: nodeEventCH, - MigrationCh: migrationChan, - PoolsNamespace: opts.IPPoolsNamespace, - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + IPPoolNodeEventCH: ipPoolNodeEventCH, + CIDRPoolNodeEventCH: cidrPoolNodeEventCH, + MigrationCh: migrationChan, + PoolsNamespace: opts.IPPoolsNamespace, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { logger.Error(err, "unable to create controller", "controller", "Node") return err @@ -168,10 +171,13 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio logger.Error(err, "unable to create webhook", "webhook", "IPPool") os.Exit(1) } + if err = (&ipamv1alpha1.CIDRPool{}).SetupWebhookWithManager(mgr); err != nil { + logger.Error(err, "unable to create webhook", "webhook", "CIDRPool") + os.Exit(1) + } } - - if err = (&poolctrl.IPPoolReconciler{ - NodeEventCh: nodeEventCH, + if err = (&ippoolctrl.IPPoolReconciler{ + NodeEventCh: ipPoolNodeEventCH, PoolsNamespace: opts.IPPoolsNamespace, Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -181,6 +187,16 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio return err } + if err = (&cidrpoolctrl.CIDRPoolReconciler{ + NodeEventCh: cidrPoolNodeEventCH, + PoolsNamespace: opts.IPPoolsNamespace, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + logger.Error(err, "unable to create controller", "controller", "CIDRPool") + return err + } + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { logger.Error(err, "unable to set up health check") return err diff --git a/cmd/ipam-controller/app/app_suite_test.go b/cmd/ipam-controller/app/app_suite_test.go index 1cb15f5..4fee520 100644 --- a/cmd/ipam-controller/app/app_suite_test.go +++ b/cmd/ipam-controller/app/app_suite_test.go @@ -31,7 +31,6 @@ import ( const ( TestNamespace = "test-ns" - TestConfigMapName = "test-config" ) var ( diff --git a/cmd/ipam-controller/app/app_test.go b/cmd/ipam-controller/app/app_test.go index b52aab7..6237cf9 100644 --- a/cmd/ipam-controller/app/app_test.go +++ b/cmd/ipam-controller/app/app_test.go @@ -15,11 +15,12 @@ package app_test import ( "context" - "reflect" + "fmt" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -36,28 +37,34 @@ const ( pool1Name = "pool1" pool2Name = "pool2" pool3Name = "pool3" + testNode1 = "node1" + testNode2 = "node2" + testNode3 = "node3" ) -func createNode(name string) *corev1.Node { - node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}} - Expect(k8sClient.Create(ctx, node)).NotTo(HaveOccurred()) - return node -} - -func getNode(name string) *corev1.Node { - node := &corev1.Node{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: name}, node)).NotTo(HaveOccurred()) - return node -} - -func updateNode(node *corev1.Node) *corev1.Node { - Expect(k8sClient.Update(ctx, node)).NotTo(HaveOccurred()) - return node +func patchNodeLabel(nodeName, key, value string) { + patchString := []byte(fmt.Sprintf(`{"metadata":{"labels":{%q: %q}}}`, key, value)) + if value == "null" { + patchString = []byte(fmt.Sprintf(`{"metadata":{"labels":{%q: null}}}`, key)) + } + patch := client.RawPatch(types.MergePatchType, patchString) + err := k8sClient.Patch(ctx, &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}, patch) + Expect(err).NotTo(HaveOccurred()) } -func getRangeForNode(nodeName string, poolName string) *ipamv1alpha1.Allocation { +func getIPPoolRangeForNode(nodeName string, poolName string) *ipamv1alpha1.Allocation { allocations := getAllocationsFromIPPools(poolName) + for _, a := range allocations { + alloc := a + if a.NodeName == nodeName { + return &alloc + } + } + return nil +} +func getCIDRPoolAllocationForNode(nodeName string, poolName string) *ipamv1alpha1.CIDRPoolAllocation { + allocations := getAllocationsFromCIDRPools(poolName) for _, a := range allocations { alloc := a if a.NodeName == nodeName { @@ -73,16 +80,13 @@ func getAllocationsFromIPPools(poolName string) []ipamv1alpha1.Allocation { return pool.Status.Allocations } -func checkAllocationExists(allocations []ipamv1alpha1.Allocation, allocation ipamv1alpha1.Allocation) bool { - for _, a := range allocations { - if reflect.DeepEqual(a, allocation) { - return true - } - } - return false +func getAllocationsFromCIDRPools(poolName string) []ipamv1alpha1.CIDRPoolAllocation { + pool := &ipamv1alpha1.CIDRPool{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: poolName}, pool)).NotTo(HaveOccurred()) + return pool.Status.Allocations } -func updatePoolSpec(poolName string, spec ipamv1alpha1.IPPoolSpec) { +func updateIPPoolSpec(poolName string, spec ipamv1alpha1.IPPoolSpec) { pool := &ipamv1alpha1.IPPool{} Expect(k8sClient.Get( ctx, types.NamespacedName{Name: poolName, Namespace: TestNamespace}, pool)).NotTo(HaveOccurred()) @@ -90,6 +94,14 @@ func updatePoolSpec(poolName string, spec ipamv1alpha1.IPPoolSpec) { Expect(k8sClient.Update(ctx, pool)).NotTo(HaveOccurred()) } +func updateCIDRPoolSpec(poolName string, spec ipamv1alpha1.CIDRPoolSpec) { + pool := &ipamv1alpha1.CIDRPool{} + Expect(k8sClient.Get( + ctx, types.NamespacedName{Name: poolName, Namespace: TestNamespace}, pool)).NotTo(HaveOccurred()) + pool.Spec = spec + Expect(k8sClient.Update(ctx, pool)).NotTo(HaveOccurred()) +} + // WaitAndCheckForStability wait for condition and then check it is stable for 1 second func WaitAndCheckForStability(check func(g Gomega), wait interface{}, stability interface{}) { Eventually(func(g Gomega) { check(g) }, wait).Should(Succeed()) @@ -97,209 +109,375 @@ func WaitAndCheckForStability(check func(g Gomega), wait interface{}, stability } var _ = Describe("App", func() { - It("Basic tests", func() { - done := make(chan interface{}) - go func() { - testNode1 := "node1" - testNode2 := "node2" - testNode3 := "node3" - - cfg1pools := []string{pool1Name, pool2Name} - - By("Create valid pools") - pool1 := &ipamv1alpha1.IPPool{ - ObjectMeta: metav1.ObjectMeta{ - Name: pool1Name, - Namespace: TestNamespace, - }, - Spec: ipamv1alpha1.IPPoolSpec{ - Subnet: "192.168.0.0/16", - Gateway: "192.168.0.1", - PerNodeBlockSize: 10, - }, + BeforeEach(func() { + for _, nodeName := range []string{testNode1, testNode2, testNode3} { + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + Expect(k8sClient.Create(ctx, node)).NotTo(HaveOccurred()) + } + }) + AfterEach(func() { + Eventually(func(g Gomega) { + for _, nodeName := range []string{testNode1, testNode2, testNode3} { + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + g.Expect(k8sClient.Delete(ctx, node)).NotTo(HaveOccurred()) } - Expect(k8sClient.Create(ctx, pool1)).NotTo(HaveOccurred()) + nodeList := &corev1.NodeList{} + g.Expect(k8sClient.List(ctx, nodeList)).NotTo(HaveOccurred()) + g.Expect(nodeList.Items).To(BeEmpty()) + }, time.Second*15).Should(Succeed()) - pool2 := &ipamv1alpha1.IPPool{ - ObjectMeta: metav1.ObjectMeta{ - Name: pool2Name, - Namespace: TestNamespace, - }, - Spec: ipamv1alpha1.IPPoolSpec{ - Subnet: "172.16.0.0/16", - Gateway: "172.16.0.1", - PerNodeBlockSize: 50, + }) + It("IPPool Basic tests", func(ctx SpecContext) { + cfg1pools := []string{pool1Name, pool2Name} + + By("Create valid pools") + pool1 := &ipamv1alpha1.IPPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: pool1Name, + Namespace: TestNamespace, + }, + Spec: ipamv1alpha1.IPPoolSpec{ + Subnet: "192.168.0.0/16", + Gateway: "192.168.0.1", + PerNodeBlockSize: 10, + }, + } + Expect(k8sClient.Create(ctx, pool1)).NotTo(HaveOccurred()) + + pool2 := &ipamv1alpha1.IPPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: pool2Name, + Namespace: TestNamespace, + }, + Spec: ipamv1alpha1.IPPoolSpec{ + Subnet: "172.16.0.0/16", + Gateway: "172.16.0.1", + PerNodeBlockSize: 50, + }, + } + Expect(k8sClient.Create(ctx, pool2)).NotTo(HaveOccurred()) + + By("Update Pools Status with valid ranges for node1 and invalid for node2 (wrong IP count)") + node1InitialRanges := map[string]ipamv1alpha1.Allocation{pool1Name: { + NodeName: testNode1, + StartIP: "192.168.0.11", + EndIP: "192.168.0.20", + }, pool2Name: { + NodeName: testNode1, + StartIP: "172.16.0.1", + EndIP: "172.16.0.50", + }} + pool1.Status = ipamv1alpha1.IPPoolStatus{ + Allocations: []ipamv1alpha1.Allocation{ + node1InitialRanges[pool1Name], + { + NodeName: testNode2, + StartIP: "192.168.0.11", + EndIP: "192.168.0.14", }, + }, + } + pool2.Status = ipamv1alpha1.IPPoolStatus{ + Allocations: []ipamv1alpha1.Allocation{ + node1InitialRanges[pool2Name], + }, + } + Expect(k8sClient.Status().Update(ctx, pool1)).NotTo(HaveOccurred()) + Expect(k8sClient.Status().Update(ctx, pool2)).NotTo(HaveOccurred()) + + By("Start controller") + + ctrlCtx, ctrlStopFunc := context.WithCancel(ctx) + defer ctrlStopFunc() + controllerStopped := make(chan struct{}) + + go func() { + Expect(app.RunController(logr.NewContext(ctrlCtx, klog.NewKlogr()), cfg, &options.Options{ + MetricsAddr: "0", // disable + ProbeAddr: "0", // disable + IPPoolsNamespace: TestNamespace, + EnableLeaderElection: true, + LeaderElectionNamespace: TestNamespace, + })).NotTo(HaveOccurred()) + close(controllerStopped) + }() + + By("Check how controller handled the state") + WaitAndCheckForStability(func(g Gomega) { + uniqStartEndIPs := map[string]struct{}{} + pool1Allocations := getAllocationsFromIPPools(pool1Name) + pool2Allocations := getAllocationsFromIPPools(pool2Name) + for _, r := range [][]ipamv1alpha1.Allocation{pool1Allocations, pool2Allocations} { + g.Expect(r).To(HaveLen(3)) // 3 allocations, 1 for each node + for _, a := range r { + uniqStartEndIPs[a.StartIP] = struct{}{} + uniqStartEndIPs[a.EndIP] = struct{}{} + } } - Expect(k8sClient.Create(ctx, pool2)).NotTo(HaveOccurred()) - - By("Update Pools Status with valid ranges for node1 and invalid for node2 (wrong IP count)") - createNode(testNode1) - createNode(testNode2) - node1InitialRanges := map[string]ipamv1alpha1.Allocation{pool1Name: { - NodeName: testNode1, - StartIP: "192.168.0.11", - EndIP: "192.168.0.20", - }, pool2Name: { - NodeName: testNode1, - StartIP: "172.16.0.1", - EndIP: "172.16.0.50", - }} - pool1.Status = ipamv1alpha1.IPPoolStatus{ - Allocations: []ipamv1alpha1.Allocation{ - node1InitialRanges[pool1Name], + // we should have unique start/end IPs for each node for each pool + g.Expect(uniqStartEndIPs).To(HaveLen(len(cfg1pools) * 3 * 2)) + // node1 should have restored ranges + g.Expect(pool1Allocations).To(ContainElement(Equal(node1InitialRanges[pool1Name]))) + g.Expect(pool2Allocations).To(ContainElement(Equal(node1InitialRanges[pool2Name]))) + + }, 15, 2) + + By("Set invalid config for pool1") + invalidPool1Spec := ipamv1alpha1.IPPoolSpec{ + Subnet: "192.168.0.0/16", + Gateway: "10.10.0.1", + PerNodeBlockSize: 10, + } + updateIPPoolSpec(pool1Name, invalidPool1Spec) + Eventually(func(g Gomega) { + g.Expect(getAllocationsFromIPPools(pool1Name)).To(BeEmpty()) + }, 15, 1).Should(Succeed()) + + By("Create Pool3, with selector which ignores all nodes") + // ranges for two nodes only can be allocated + pool3Spec := ipamv1alpha1.IPPoolSpec{ + Subnet: "172.17.0.0/24", + Gateway: "172.17.0.1", + PerNodeBlockSize: 100, + NodeSelector: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ { - NodeName: testNode2, - StartIP: "192.168.0.11", - EndIP: "192.168.0.14", + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "ippool", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + MatchFields: nil, }, }, - } - pool2.Status = ipamv1alpha1.IPPoolStatus{ - Allocations: []ipamv1alpha1.Allocation{ - node1InitialRanges[pool2Name], + }, + } + pool3 := &ipamv1alpha1.IPPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: pool3Name, + Namespace: TestNamespace, + }, + Spec: pool3Spec, + } + Expect(k8sClient.Create(ctx, pool3)).NotTo(HaveOccurred()) + + Consistently(func(g Gomega) { + g.Expect(getAllocationsFromIPPools(pool3Name)).To(BeEmpty()) + }, 10, 1).Should(Succeed()) + + By("Update nodes to match selector in pool3") + + // node1 should have range + patchNodeLabel(testNode1, "ippool", "true") + WaitAndCheckForStability(func(g Gomega) { + g.Expect(getIPPoolRangeForNode(testNode1, pool3Name)).NotTo(BeNil()) + }, 15, 2) + + // node2 should have range + patchNodeLabel(testNode2, "ippool", "true") + WaitAndCheckForStability(func(g Gomega) { + g.Expect(getIPPoolRangeForNode(testNode2, pool3Name)).NotTo(BeNil()) + }, 15, 2) + + // node3 should have no range, because no free ranges available + patchNodeLabel(testNode3, "ippool", "true") + WaitAndCheckForStability(func(g Gomega) { + g.Expect(getIPPoolRangeForNode(testNode3, pool3Name)).To(BeNil()) + }, 15, 5) + + node2Range := getIPPoolRangeForNode(testNode2, pool3Name) + // remove label from node2, node3 should have a range now + patchNodeLabel(testNode2, "ippool", "null") + WaitAndCheckForStability(func(g Gomega) { + node3Range := getIPPoolRangeForNode(testNode3, pool3Name) + g.Expect(node3Range).NotTo(BeNil()) + // should reuse ranges from node2 + matchRange := node3Range.StartIP == node2Range.StartIP && + node3Range.EndIP == node2Range.EndIP + g.Expect(matchRange).To(BeTrue()) + }, 15, 2) + + By("Stop controller") + ctrlStopFunc() + Eventually(controllerStopped, 30*time.Second).Should(BeClosed()) + }, SpecTimeout(time.Minute*5)) + + It("CIDRPool Basic tests", func(ctx SpecContext) { + By("Create valid pools") + pool1gatewayIndex := uint(1) + pool1 := &ipamv1alpha1.CIDRPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: pool1Name, + Namespace: TestNamespace, + }, + Spec: ipamv1alpha1.CIDRPoolSpec{ + CIDR: "10.10.0.0/16", + GatewayIndex: &pool1gatewayIndex, + PerNodeNetworkPrefix: 24, + Exclusions: []ipamv1alpha1.ExcludeRange{ + {StartIP: "10.10.1.22", EndIP: "10.10.1.30"}, }, - } - Expect(k8sClient.Status().Update(ctx, pool1)).NotTo(HaveOccurred()) - Expect(k8sClient.Status().Update(ctx, pool2)).NotTo(HaveOccurred()) - - By("Start controller") - - ctrlCtx, ctrlStopFunc := context.WithCancel(ctx) - defer ctrlStopFunc() - controllerStopped := make(chan struct{}) - - go func() { - Expect(app.RunController(logr.NewContext(ctrlCtx, klog.NewKlogr()), cfg, &options.Options{ - MetricsAddr: "0", // disable - ProbeAddr: "0", // disable - IPPoolsNamespace: TestNamespace, - EnableLeaderElection: true, - LeaderElectionNamespace: TestNamespace, - })).NotTo(HaveOccurred()) - close(controllerStopped) - }() - - By("Create node3") - createNode(testNode3) - - By("Check how controller handled the state") - WaitAndCheckForStability(func(g Gomega) { - uniqStartEndIPs := map[string]struct{}{} - pool1Allocations := getAllocationsFromIPPools(pool1Name) - pool2Allocations := getAllocationsFromIPPools(pool2Name) - for _, r := range [][]ipamv1alpha1.Allocation{pool1Allocations, pool2Allocations} { - g.Expect(r).To(HaveLen(3)) // 3 allocations, 1 for each node - for _, a := range r { - uniqStartEndIPs[a.StartIP] = struct{}{} - uniqStartEndIPs[a.EndIP] = struct{}{} - } - } - // we should have unique start/end IPs for each node for each pool - g.Expect(uniqStartEndIPs).To(HaveLen(len(cfg1pools) * 3 * 2)) - // node1 should have restored ranges - g.Expect(checkAllocationExists(pool1Allocations, node1InitialRanges[pool1Name])).To(BeTrue()) - g.Expect(checkAllocationExists(pool2Allocations, node1InitialRanges[pool2Name])).To(BeTrue()) + StaticAllocations: []ipamv1alpha1.CIDRPoolStaticAllocation{{ + NodeName: testNode3, + Prefix: "10.10.100.0/24", + Gateway: "10.10.100.10", + }, {Prefix: "10.10.1.0/24"}, + }, + }, + } + Expect(k8sClient.Create(ctx, pool1)).NotTo(HaveOccurred()) + + pool2gatewayIndex := uint(1) + pool2 := &ipamv1alpha1.CIDRPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: pool2Name, + Namespace: TestNamespace, + }, + Spec: ipamv1alpha1.CIDRPoolSpec{ + CIDR: "192.168.0.0/16", + GatewayIndex: &pool2gatewayIndex, + PerNodeNetworkPrefix: 31, + }, + } + Expect(k8sClient.Create(ctx, pool2)).NotTo(HaveOccurred()) + + By("Update Pools Status with valid allocation for node1 and invalid for node2") + node1InitialAllocations := map[string]ipamv1alpha1.CIDRPoolAllocation{pool1Name: { + NodeName: testNode1, + Prefix: "10.10.0.0/24", + Gateway: "10.10.0.1", + }, pool2Name: { + NodeName: testNode1, + Prefix: "192.168.100.0/31", + Gateway: "192.168.100.1", + }} + pool1.Status = ipamv1alpha1.CIDRPoolStatus{ + Allocations: []ipamv1alpha1.CIDRPoolAllocation{ + node1InitialAllocations[pool1Name], + { + NodeName: testNode2, + Prefix: "10.33.33.0/24", + Gateway: "10.33.33.1", + }, + }, + } + pool2.Status = ipamv1alpha1.CIDRPoolStatus{ + Allocations: []ipamv1alpha1.CIDRPoolAllocation{ + node1InitialAllocations[pool2Name], + }, + } + Expect(k8sClient.Status().Update(ctx, pool1)).NotTo(HaveOccurred()) + Expect(k8sClient.Status().Update(ctx, pool2)).NotTo(HaveOccurred()) - }, 15, 2) + By("Start controller") - By("Set invalid config for pool1") - invalidPool1Spec := ipamv1alpha1.IPPoolSpec{ - Subnet: "192.168.0.0/16", - Gateway: "10.10.0.1", - PerNodeBlockSize: 10, - } - updatePoolSpec(pool1Name, invalidPool1Spec) - Eventually(func(g Gomega) bool { - pool := &ipamv1alpha1.IPPool{} - g.Expect(k8sClient.Get( - ctx, types.NamespacedName{Name: pool1Name, Namespace: TestNamespace}, pool)).NotTo(HaveOccurred()) - return len(pool.Status.Allocations) == 0 - }, 30, 5).Should(BeTrue()) - - By("Create Pool3, with selector which ignores all nodes") - // ranges for two nodes only can be allocated - pool3Spec := ipamv1alpha1.IPPoolSpec{ - Subnet: "172.17.0.0/24", - Gateway: "172.17.0.1", - PerNodeBlockSize: 100, + ctrlCtx, ctrlStopFunc := context.WithCancel(ctx) + defer ctrlStopFunc() + controllerStopped := make(chan struct{}) + + go func() { + Expect(app.RunController(logr.NewContext(ctrlCtx, klog.NewKlogr()), cfg, &options.Options{ + MetricsAddr: "0", // disable + ProbeAddr: "0", // disable + IPPoolsNamespace: TestNamespace, + EnableLeaderElection: true, + LeaderElectionNamespace: TestNamespace, + })).NotTo(HaveOccurred()) + close(controllerStopped) + }() + + By("Check how controller handled the state") + WaitAndCheckForStability(func(g Gomega) { + pool1Allocations := getAllocationsFromCIDRPools(pool1Name) + pool2Allocations := getAllocationsFromCIDRPools(pool2Name) + g.Expect(pool1Allocations).To(HaveLen(3)) + g.Expect(pool2Allocations).To(HaveLen(3)) + g.Expect(pool1Allocations).To(ContainElements(Equal( + ipamv1alpha1.CIDRPoolAllocation{ + NodeName: testNode3, // has static allocation + Prefix: "10.10.100.0/24", + Gateway: "10.10.100.10", + }), Equal(node1InitialAllocations[pool1Name]))) + g.Expect(pool2Allocations).To(ContainElements(Equal(node1InitialAllocations[pool2Name]))) + }, 15, 2) + + By("Set invalid config for pool1") + invalidPool1Spec := ipamv1alpha1.CIDRPoolSpec{ + CIDR: "192.168.0.0/16", + PerNodeNetworkPrefix: 99, + } + updateCIDRPoolSpec(pool1Name, invalidPool1Spec) + Eventually(func(g Gomega) { + g.Expect(getAllocationsFromCIDRPools(pool1Name)).To(BeEmpty()) + }, 15, 1).Should(Succeed()) + + By("Create Pool3, with selector which ignores all nodes") + // ranges for two nodes only can be allocated + pool3gatewayIndex := uint(1) + pool3 := &ipamv1alpha1.CIDRPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: pool3Name, + Namespace: TestNamespace, + }, + Spec: ipamv1alpha1.CIDRPoolSpec{ + CIDR: "10.10.0.0/24", + GatewayIndex: &pool3gatewayIndex, + PerNodeNetworkPrefix: 25, NodeSelector: &corev1.NodeSelector{ NodeSelectorTerms: []corev1.NodeSelectorTerm{ { MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "foo", + Key: "cidrpool", Operator: corev1.NodeSelectorOpIn, - Values: []string{"bar"}, + Values: []string{"true"}, }, }, - MatchFields: nil, }, }, }, - } - pool3 := &ipamv1alpha1.IPPool{ - ObjectMeta: metav1.ObjectMeta{ - Name: pool3Name, - Namespace: TestNamespace, - }, - Spec: pool3Spec, - } - Expect(k8sClient.Create(ctx, pool3)).NotTo(HaveOccurred()) - - Consistently(func(g Gomega) bool { - pool := &ipamv1alpha1.IPPool{} - g.Expect(k8sClient.Get( - ctx, types.NamespacedName{Name: pool3Name, Namespace: TestNamespace}, pool)).NotTo(HaveOccurred()) - return len(pool.Status.Allocations) == 0 - }, 30, 5).Should(BeTrue()) - - By("Update nodes to match selector in pool3") - - // node1 should have range - node1 := getNode(testNode1) - node1.Labels = map[string]string{"foo": "bar"} - updateNode(node1) - WaitAndCheckForStability(func(g Gomega) { - g.Expect(getRangeForNode(testNode1, pool3Name)).NotTo(BeNil()) - }, 15, 2) - - // node2 should have range - node2 := getNode(testNode2) - node2.Labels = map[string]string{"foo": "bar"} - updateNode(node2) - WaitAndCheckForStability(func(g Gomega) { - g.Expect(getRangeForNode(testNode2, pool3Name)).NotTo(BeNil()) - }, 15, 2) - - // node3 should have no range, because no free ranges available - node3 := getNode(testNode3) - node3.Labels = map[string]string{"foo": "bar"} - updateNode(node3) - WaitAndCheckForStability(func(g Gomega) { - g.Expect(getRangeForNode(testNode3, pool3Name)).To(BeNil()) - }, 15, 5) - - node2Range := getRangeForNode(testNode2, pool3Name) - // remove label from node2, node3 should have a range now - node2 = getNode(testNode2) - node2.Labels = nil - updateNode(node2) - WaitAndCheckForStability(func(g Gomega) { - node3Range := getRangeForNode(testNode3, pool3Name) - g.Expect(node3Range).NotTo(BeNil()) - // should reuse ranges from node2 - matchRange := node3Range.StartIP == node2Range.StartIP && - node3Range.EndIP == node2Range.EndIP - g.Expect(matchRange).To(BeTrue()) - }, 15, 2) - - By("Stop controller") - ctrlStopFunc() - Eventually(controllerStopped, 30*time.Second).Should(BeClosed()) - - close(done) - }() - Eventually(done, 5*time.Minute).Should(BeClosed()) - }) + }, + } + Expect(k8sClient.Create(ctx, pool3)).NotTo(HaveOccurred()) + + Consistently(func(g Gomega) { + g.Expect(getAllocationsFromCIDRPools(pool3Name)).To(BeEmpty()) + }, 10, 1).Should(Succeed()) + + By("Update nodes to match selector in pool3") + + // node1 should have range + patchNodeLabel(testNode1, "cidrpool", "true") + WaitAndCheckForStability(func(g Gomega) { + g.Expect(getCIDRPoolAllocationForNode(testNode1, pool3Name)).NotTo(BeNil()) + }, 15, 2) + + // node2 should have range + patchNodeLabel(testNode2, "cidrpool", "true") + WaitAndCheckForStability(func(g Gomega) { + g.Expect(getCIDRPoolAllocationForNode(testNode2, pool3Name)).NotTo(BeNil()) + }, 15, 2) + + // node3 should have no range, because no free ranges available + patchNodeLabel(testNode3, "cidrpool", "true") + WaitAndCheckForStability(func(g Gomega) { + g.Expect(getCIDRPoolAllocationForNode(testNode3, pool3Name)).To(BeNil()) + }, 15, 5) + + node2Alloc := getCIDRPoolAllocationForNode(testNode2, pool3Name) + // remove label from node2, node3 should have a range now + patchNodeLabel(testNode2, "cidrpool", "null") + WaitAndCheckForStability(func(g Gomega) { + node3Alloc := getCIDRPoolAllocationForNode(testNode3, pool3Name) + g.Expect(node3Alloc).NotTo(BeNil()) + // should reuse ranges from node2 + g.Expect(node3Alloc.Prefix).To(Equal(node2Alloc.Prefix)) + }, 15, 2) + + By("Stop controller") + ctrlStopFunc() + Eventually(controllerStopped, 30*time.Second).Should(BeClosed()) + }, SpecTimeout(time.Minute*5)) }) diff --git a/deploy/manifests/controller/role.yaml b/deploy/manifests/controller/role.yaml index 13a9b6b..77d5ffe 100644 --- a/deploy/manifests/controller/role.yaml +++ b/deploy/manifests/controller/role.yaml @@ -26,6 +26,7 @@ rules: - nv-ipam.nvidia.com resources: - ippools + - cidrpools verbs: - get - list @@ -35,6 +36,7 @@ rules: - nv-ipam.nvidia.com resources: - ippools/status + - cidrpools/status verbs: - get - update diff --git a/deploy/manifests/node/role.yaml b/deploy/manifests/node/role.yaml index 58519d2..a09f6fd 100644 --- a/deploy/manifests/node/role.yaml +++ b/deploy/manifests/node/role.yaml @@ -15,6 +15,7 @@ rules: - nv-ipam.nvidia.com resources: - ippools + - cidrpools verbs: - get - list diff --git a/deploy/manifests/webhook/manifests.yaml b/deploy/manifests/webhook/manifests.yaml index d06f2b7..1f3d4cf 100644 --- a/deploy/manifests/webhook/manifests.yaml +++ b/deploy/manifests/webhook/manifests.yaml @@ -23,3 +23,23 @@ webhooks: resources: - ippools sideEffects: None + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-nv-ipam-nvidia-com-v1alpha1-cidrpool + failurePolicy: Fail + name: validate-cidrpool.nv-ipam.nvidia.com + rules: + - apiGroups: + - nv-ipam.nvidia.com + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - cidrpools + sideEffects: None diff --git a/pkg/ipam-controller/controllers/cidrpool/cidrpool.go b/pkg/ipam-controller/controllers/cidrpool/cidrpool.go new file mode 100644 index 0000000..d9d9906 --- /dev/null +++ b/pkg/ipam-controller/controllers/cidrpool/cidrpool.go @@ -0,0 +1,293 @@ +/* + Copyright 2024, NVIDIA CORPORATION & AFFILIATES + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "net" + "reflect" + "sort" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + corev1 "k8s.io/api/core/v1" + apiErrors "k8s.io/apimachinery/pkg/api/errors" + v1helper "k8s.io/component-helpers/scheduling/corev1" + + ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" + "github.com/Mellanox/nvidia-k8s-ipam/pkg/ip" +) + +// CIDRPoolReconciler reconciles CIDRPool objects +type CIDRPoolReconciler struct { + PoolsNamespace string + NodeEventCh chan event.GenericEvent + client.Client + Scheme *runtime.Scheme + recorder record.EventRecorder +} + +// Reconcile contains logic to sync CIDRPool objects +func (r *CIDRPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLog := log.FromContext(ctx) + if req.Namespace != r.PoolsNamespace { + // this should never happen because of the watcher configuration of the manager from controller-runtime pkg + reqLog.Info("Ignoring notification on CIDRPool from different namespace", "ns", req.Namespace) + return ctrl.Result{}, nil + } + + pool := &ipamv1alpha1.CIDRPool{} + err := r.Client.Get(ctx, req.NamespacedName, pool) + if err != nil { + if apiErrors.IsNotFound(err) { + reqLog.Info("CIDRPool not found") + return ctrl.Result{}, nil + } + reqLog.Error(err, "failed to get CIDRPool object from the cache") + return ctrl.Result{}, err + } + reqLog.Info("Notification on CIDRPool", "name", pool.Name) + + errList := pool.Validate() + if len(errList) != 0 { + return r.handleInvalidSpec(ctx, errList.ToAggregate(), pool) + } + + _, cidrNetwork, _ := net.ParseCIDR(pool.Spec.CIDR) + + nodes, err := r.getMatchingNodes(ctx, pool.Spec.NodeSelector) + if err != nil { + return ctrl.Result{}, err + } + + allocationsByNodeName, allocationsByPrefix := r.getValidExistingAllocationsMaps(ctx, pool, nodes) + staticAllocationsByNodeName, staticAllocationsByPrefix := r.getStaticAllocationsMaps(pool) + + subnetGenFunc := ip.GetSubnetGen(cidrNetwork, pool.Spec.PerNodeNetworkPrefix) + for _, node := range nodes { + if _, allocationExist := allocationsByNodeName[node]; allocationExist { + continue + } + nodeAlloc := r.getStaticAllocationForNodeIfExist(node, pool, staticAllocationsByNodeName) + if nodeAlloc == nil { + nodeAlloc = r.getNewAllocationForNode(subnetGenFunc, node, pool, allocationsByPrefix, staticAllocationsByPrefix) + } + if nodeAlloc == nil { + msg := fmt.Sprintf("failed to allocated prefix for Node: %s, no free prefixes", node) + reqLog.Error(err, msg) + r.recorder.Event(pool, "Warning", "NoFreePrefixes", msg) + continue + } + allocationsByNodeName[node] = nodeAlloc + allocationsByPrefix[nodeAlloc.Prefix] = nodeAlloc + } + // prepare new status entry, keep the list sorted by the node name to avoid unnecessary updates caused by + // different elements order + allocations := make([]ipamv1alpha1.CIDRPoolAllocation, 0, len(allocationsByNodeName)) + for _, a := range allocationsByNodeName { + reqLog.Info("prefix allocated", "node", a.NodeName, "prefix", a.Prefix, "gateway", a.Gateway) + allocations = append(allocations, *a) + } + sort.Slice(allocations, func(i, j int) bool { + return allocations[i].NodeName < allocations[j].NodeName + }) + + if !reflect.DeepEqual(pool.Status.Allocations, allocations) { + pool.Status.Allocations = allocations + if err := r.Status().Update(ctx, pool); err != nil { + return ctrl.Result{}, err + } + reqLog.Info("pool status updated") + } + return ctrl.Result{}, nil +} + +// create new allocation for the node, returns nil if can't generate a subnet because lack of free prefixes +func (r *CIDRPoolReconciler) getNewAllocationForNode(subnetGen func() *net.IPNet, node string, + pool *ipamv1alpha1.CIDRPool, + allocByPrefix map[string]*ipamv1alpha1.CIDRPoolAllocation, + staticAllocByPrefix map[string]*ipamv1alpha1.CIDRPoolStaticAllocation) *ipamv1alpha1.CIDRPoolAllocation { + var nodePrefix *net.IPNet + for { + nodePrefix = subnetGen() + if nodePrefix == nil { + break + } + if _, alreadyAllocated := allocByPrefix[nodePrefix.String()]; alreadyAllocated { + // subnet is already allocated, try next subnet + continue + } + if _, alreadyAllocated := staticAllocByPrefix[nodePrefix.String()]; alreadyAllocated { + // static allocation for the subnet exist (not for the current node), try next subnet + continue + } + break + } + if nodePrefix == nil { + return nil + } + gateway := "" + if pool.Spec.GatewayIndex != nil { + gateway = ipamv1alpha1.GetGatewayForSubnet(nodePrefix, *pool.Spec.GatewayIndex) + } + return &ipamv1alpha1.CIDRPoolAllocation{ + NodeName: node, + Prefix: nodePrefix.String(), + Gateway: gateway, + } +} + +// builds CIDRPoolAllocation from the CIDRPoolStaticAllocation if it exist for the node +func (r *CIDRPoolReconciler) getStaticAllocationForNodeIfExist(node string, pool *ipamv1alpha1.CIDRPool, + staticAllocationsByNodeName map[string]*ipamv1alpha1.CIDRPoolStaticAllocation) *ipamv1alpha1.CIDRPoolAllocation { + staticAlloc, staticAllocationFound := staticAllocationsByNodeName[node] + if !staticAllocationFound { + return nil + } + gateway := staticAlloc.Gateway + if gateway == "" && pool.Spec.GatewayIndex != nil { + // prefix is already validated + _, nodeSubnet, _ := net.ParseCIDR(staticAlloc.Prefix) + gateway = ipamv1alpha1.GetGatewayForSubnet(nodeSubnet, *pool.Spec.GatewayIndex) + } + return &ipamv1alpha1.CIDRPoolAllocation{ + NodeName: staticAlloc.NodeName, + Prefix: staticAlloc.Prefix, + Gateway: gateway, + } +} + +// returns two maps with valid existing allocations, +// first map contains nodeName to allocations mapping, +// second map contains prefix to allocation mapping +func (r *CIDRPoolReconciler) getValidExistingAllocationsMaps(ctx context.Context, + pool *ipamv1alpha1.CIDRPool, nodes []string) ( + map[string]*ipamv1alpha1.CIDRPoolAllocation, map[string]*ipamv1alpha1.CIDRPoolAllocation) { + reqLog := log.FromContext(ctx) + nodeMap := make(map[string]*ipamv1alpha1.CIDRPoolAllocation, len(pool.Status.Allocations)) + prefixMap := make(map[string]*ipamv1alpha1.CIDRPoolAllocation, len(pool.Status.Allocations)) + + matchingNodes := make(map[string]struct{}, len(nodes)) + for _, n := range nodes { + matchingNodes[n] = struct{}{} + } + + for i := range pool.Status.Allocations { + alloc := &pool.Status.Allocations[i] + if _, nodeExist := matchingNodes[alloc.NodeName]; !nodeExist { + reqLog.Info("node doesn't match the pool, discard the allocation", "node", + alloc.NodeName, "prefix", alloc.Prefix, "gateway", alloc.Gateway) + continue + } + errList := alloc.Validate(pool) + if len(errList) > 0 { + reqLog.Info("discard invalid allocation for the node", "node", + alloc.NodeName, "prefix", alloc.Prefix, "gateway", alloc.Gateway, "reason", errList.ToAggregate().Error()) + continue + } + nodeMap[alloc.NodeName] = alloc + prefixMap[alloc.Prefix] = alloc + + reqLog.Info("keep allocation for node", "node", alloc.NodeName, "prefix", alloc.Prefix, "gateway", alloc.Gateway) + } + return nodeMap, prefixMap +} + +// returns two maps with static allocations, +// firs map contains nodeName to static allocations mapping +// second map contains prefix to static allocations mapping +func (r *CIDRPoolReconciler) getStaticAllocationsMaps(pool *ipamv1alpha1.CIDRPool) ( + map[string]*ipamv1alpha1.CIDRPoolStaticAllocation, map[string]*ipamv1alpha1.CIDRPoolStaticAllocation) { + nodeMap := make(map[string]*ipamv1alpha1.CIDRPoolStaticAllocation, len(pool.Status.Allocations)) + prefixMap := make(map[string]*ipamv1alpha1.CIDRPoolStaticAllocation, len(pool.Status.Allocations)) + // static allocations are already validated by pool.Validate() call + for i := range pool.Spec.StaticAllocations { + alloc := &pool.Spec.StaticAllocations[i] + nodeMap[alloc.NodeName] = alloc + prefixMap[alloc.Prefix] = alloc + } + return nodeMap, prefixMap +} + +// returns list with name of the nodes that match the pool selector +func (r *CIDRPoolReconciler) getMatchingNodes( + ctx context.Context, nodeSelector *corev1.NodeSelector) ([]string, error) { + reqLog := log.FromContext(ctx) + nodeList := &corev1.NodeList{} + if err := r.Client.List(ctx, nodeList); err != nil { + reqLog.Error(err, "failed to list Nodes") + return nil, err + } + nodeNames := make([]string, 0) + for i := range nodeList.Items { + node := nodeList.Items[i] + if nodeSelector == nil { + nodeNames = append(nodeNames, node.Name) + continue + } + match, err := v1helper.MatchNodeSelectorTerms(&node, nodeSelector) + if err != nil { + reqLog.Error(err, "failed to match Node", "node", node.Name) + continue + } + if match { + nodeNames = append(nodeNames, node.Name) + } + } + + sort.Slice(nodeNames, func(i, j int) bool { + return nodeNames[i] < nodeNames[j] + }) + return nodeNames, nil +} + +// reset all allocations if the pool has invalid configuration +func (r *CIDRPoolReconciler) handleInvalidSpec(ctx context.Context, + err error, pool *ipamv1alpha1.CIDRPool) (reconcile.Result, error) { + reqLog := log.FromContext(ctx) + reqLog.Error(err, "invalid CIDRPool Spec, clearing Status") + pool.Status.Allocations = make([]ipamv1alpha1.CIDRPoolAllocation, 0) + if err2 := r.Status().Update(ctx, pool); err2 != nil { + return ctrl.Result{}, err2 + } + r.recorder.Event(pool, "Warning", "InvalidSpec", err.Error()) + return ctrl.Result{Requeue: false}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *CIDRPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.recorder = mgr.GetEventRecorderFor("CIDRPoolController") + return ctrl.NewControllerManagedBy(mgr). + For(&ipamv1alpha1.CIDRPool{}). + // catch notifications received through chan from Node controller + WatchesRawSource(&source.Channel{Source: r.NodeEventCh}, handler.Funcs{ + GenericFunc: func(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: e.Object.GetNamespace(), + Name: e.Object.GetName(), + }}) + }}). + Complete(r) +} diff --git a/pkg/ipam-controller/controllers/node/node.go b/pkg/ipam-controller/controllers/node/node.go index ade513f..7cb4fa3 100644 --- a/pkg/ipam-controller/controllers/node/node.go +++ b/pkg/ipam-controller/controllers/node/node.go @@ -32,9 +32,10 @@ import ( // NodeReconciler reconciles Node objects type NodeReconciler struct { - PoolsNamespace string - NodeEventCh chan event.GenericEvent - MigrationCh chan struct{} + PoolsNamespace string + IPPoolNodeEventCH chan event.GenericEvent + CIDRPoolNodeEventCH chan event.GenericEvent + MigrationCh chan struct{} client.Client Scheme *runtime.Scheme } @@ -57,16 +58,26 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. reqLog.Info("node object removed") } // node updated, trigger sync for all pools - poolList := &ipamv1alpha1.IPPoolList{} - if err := r.Client.List(ctx, poolList, client.InNamespace(r.PoolsNamespace)); err != nil { + ipPoolList := &ipamv1alpha1.IPPoolList{} + if err := r.Client.List(ctx, ipPoolList, client.InNamespace(r.PoolsNamespace)); err != nil { return ctrl.Result{}, err } - for _, p := range poolList.Items { - r.NodeEventCh <- event.GenericEvent{ + for _, p := range ipPoolList.Items { + r.IPPoolNodeEventCH <- event.GenericEvent{ Object: &ipamv1alpha1.IPPool{ ObjectMeta: metav1.ObjectMeta{Namespace: r.PoolsNamespace, Name: p.Name}, }} } + cidrPoolList := &ipamv1alpha1.CIDRPoolList{} + if err := r.Client.List(ctx, cidrPoolList, client.InNamespace(r.PoolsNamespace)); err != nil { + return ctrl.Result{}, err + } + for _, p := range cidrPoolList.Items { + r.CIDRPoolNodeEventCH <- event.GenericEvent{ + Object: &ipamv1alpha1.CIDRPool{ + ObjectMeta: metav1.ObjectMeta{Namespace: r.PoolsNamespace, Name: p.Name}, + }} + } return ctrl.Result{}, nil }