Skip to content

Commit

Permalink
Add node controller for CIDRPool CR
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Kulazhenkov <ykulazhenkov@nvidia.com>
  • Loading branch information
ykulazhenkov committed Jun 12, 2024
1 parent cf4997a commit cdcd33f
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 92 deletions.
11 changes: 11 additions & 0 deletions cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/allocator"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner"
cidrpoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/cidrpool"
ippoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/ippool"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/grpc/middleware"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/handlers"
Expand Down Expand Up @@ -157,6 +158,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
k8sClient, err := client.New(config, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
logger.Error(err, "unable to direct k8s client")
return err
}

if err = (&ippoolctrl.IPPoolReconciler{
Expand All @@ -168,6 +170,15 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
logger.Error(err, "unable to create controller", "controller", "IPPool")
return err
}
if err = (&cidrpoolctrl.CIDRPoolReconciler{
PoolManager: poolManager,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
NodeName: opts.NodeName,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "CIDRPool")
return err
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
logger.Error(err, "unable to set up health check")
Expand Down
166 changes: 95 additions & 71 deletions cmd/ipam-node/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1"
Expand All @@ -44,7 +43,7 @@ const (
testNamespace = "default"
)

func createTestPools() {
func createTestIPPools() {
pool1 := &ipamv1alpha1.IPPool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName1, Namespace: testNamespace},
Spec: ipamv1alpha1.IPPoolSpec{
Expand All @@ -54,6 +53,15 @@ func createTestPools() {
},
}
ExpectWithOffset(1, k8sClient.Create(ctx, pool1))
pool1.Status = ipamv1alpha1.IPPoolStatus{
Allocations: []ipamv1alpha1.Allocation{
{
NodeName: testNodeName,
StartIP: "192.168.0.2",
EndIP: "192.168.0.254",
},
}}
ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool1))

pool2 := &ipamv1alpha1.IPPool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName2, Namespace: testNamespace},
Expand All @@ -64,47 +72,57 @@ func createTestPools() {
},
}
ExpectWithOffset(1, k8sClient.Create(ctx, pool2))

// Update statuses with range allocation
Eventually(func(g Gomega) error {
status := ipamv1alpha1.IPPoolStatus{
Allocations: []ipamv1alpha1.Allocation{
{
NodeName: testNodeName,
StartIP: "192.168.0.2",
EndIP: "192.168.0.254",
},
},
}
return updatePoolStatus(testPoolName1, status)
}, 30, 5).Should(Not(HaveOccurred()))

Eventually(func(g Gomega) error {
status := ipamv1alpha1.IPPoolStatus{
Allocations: []ipamv1alpha1.Allocation{
{
NodeName: testNodeName,
StartIP: "10.100.0.2",
EndIP: "10.100.0.254",
},
pool2.Status = ipamv1alpha1.IPPoolStatus{
Allocations: []ipamv1alpha1.Allocation{
{
NodeName: testNodeName,
StartIP: "10.100.0.2",
EndIP: "10.100.0.254",
},
}
return updatePoolStatus(testPoolName2, status)
}, 30, 5).Should(Not(HaveOccurred()))
}}
ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool2))
}

func updatePoolStatus(poolName string, status ipamv1alpha1.IPPoolStatus) error {
pool := &ipamv1alpha1.IPPool{}
err := k8sClient.Get(ctx, types.NamespacedName{Name: poolName, Namespace: testNamespace}, pool)
if err != nil {
return err
func createTestCIDRPools() {
pool1GatewayIndex := uint(1)
pool1 := &ipamv1alpha1.CIDRPool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName1, Namespace: testNamespace},
Spec: ipamv1alpha1.CIDRPoolSpec{
CIDR: "192.100.0.0/16",
GatewayIndex: &pool1GatewayIndex,
PerNodeNetworkPrefix: 24,
Exclusions: []ipamv1alpha1.ExcludeRange{
{StartIP: "192.100.0.1", EndIP: "192.100.0.10"},
},
},
}
pool.Status = status
err = k8sClient.Status().Update(ctx, pool)
if err != nil {
return err
ExpectWithOffset(1, k8sClient.Create(ctx, pool1))
pool1.Status = ipamv1alpha1.CIDRPoolStatus{
Allocations: []ipamv1alpha1.CIDRPoolAllocation{
{
NodeName: testNodeName,
Prefix: "192.100.0.0/24",
Gateway: "192.100.0.1",
},
}}
ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool1))

pool2 := &ipamv1alpha1.CIDRPool{
ObjectMeta: metav1.ObjectMeta{Name: testPoolName2, Namespace: testNamespace},
Spec: ipamv1alpha1.CIDRPoolSpec{
CIDR: "10.200.0.0/24",
PerNodeNetworkPrefix: 31,
},
}
return nil
ExpectWithOffset(1, k8sClient.Create(ctx, pool2))
pool2.Status = ipamv1alpha1.CIDRPoolStatus{
Allocations: []ipamv1alpha1.CIDRPoolAllocation{
{
NodeName: testNodeName,
Prefix: "10.200.0.0/31",
},
}}
ExpectWithOffset(1, k8sClient.Status().Update(ctx, pool2))
}

func createTestPod() *corev1.Pod {
Expand Down Expand Up @@ -161,10 +179,13 @@ var _ = Describe("IPAM Node daemon", func() {
It("Validate main flows", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
defer close(done)
testDir := GinkgoT().TempDir()
opts := getOptions(testDir)

createTestPools()
createTestIPPools()
createTestCIDRPools()
pod := createTestPod()

ctx = logr.NewContext(ctx, klog.NewKlogr())
Expand All @@ -180,38 +201,41 @@ var _ = Describe("IPAM Node daemon", func() {

grpcClient := nodev1.NewIPAMServiceClient(conn)

params := getValidReqParams(string(pod.UID), pod.Name, pod.Namespace)

// no allocation yet
_, err = grpcClient.IsAllocated(ctx,
&nodev1.IsAllocatedRequest{Parameters: params})
Expect(status.Code(err) == codes.NotFound).To(BeTrue())

// allocate
resp, err := grpcClient.Allocate(ctx, &nodev1.AllocateRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())
Expect(resp.Allocations).To(HaveLen(2))
Expect(resp.Allocations[0].Pool).NotTo(BeEmpty())
Expect(resp.Allocations[0].Gateway).NotTo(BeEmpty())
Expect(resp.Allocations[0].Ip).NotTo(BeEmpty())

_, err = grpcClient.IsAllocated(ctx,
&nodev1.IsAllocatedRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())

// deallocate
_, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())

// deallocate should be idempotent
_, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())

// check should fail
_, err = grpcClient.IsAllocated(ctx,
&nodev1.IsAllocatedRequest{Parameters: params})
Expect(status.Code(err) == codes.NotFound).To(BeTrue())
close(done)
cidrPoolParams := getValidReqParams(string(pod.UID), pod.Name, pod.Namespace)
cidrPoolParams.PoolType = nodev1.PoolType_POOL_TYPE_CIDRPOOL
ipPoolParams := getValidReqParams(string(pod.UID), pod.Name, pod.Namespace)

for _, params := range []*nodev1.IPAMParameters{ipPoolParams, cidrPoolParams} {
// no allocation yet
_, err = grpcClient.IsAllocated(ctx,
&nodev1.IsAllocatedRequest{Parameters: params})
Expect(status.Code(err) == codes.NotFound).To(BeTrue())

// allocate
resp, err := grpcClient.Allocate(ctx, &nodev1.AllocateRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())
Expect(resp.Allocations).To(HaveLen(2))
Expect(resp.Allocations[0].Pool).NotTo(BeEmpty())
Expect(resp.Allocations[0].Gateway).NotTo(BeEmpty())
Expect(resp.Allocations[0].Ip).NotTo(BeEmpty())

_, err = grpcClient.IsAllocated(ctx,
&nodev1.IsAllocatedRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())

// deallocate
_, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())

// deallocate should be idempotent
_, err = grpcClient.Deallocate(ctx, &nodev1.DeallocateRequest{Parameters: params})
Expect(err).NotTo(HaveOccurred())

// check should fail
_, err = grpcClient.IsAllocated(ctx,
&nodev1.IsAllocatedRequest{Parameters: params})
Expect(status.Code(err) == codes.NotFound).To(BeTrue())
}
}()
Eventually(done, 5*time.Minute).Should(BeClosed())
})
Expand Down
18 changes: 18 additions & 0 deletions pkg/ip/cidr.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@ func IsBroadcast(ip net.IP, network *net.IPNet) bool {
return ip.Equal(masked)
}

// IsPointToPointSubnet returns true if the network is point to point (/31 or /127)
func IsPointToPointSubnet(network *net.IPNet) bool {
ones, maskLen := network.Mask.Size()
return ones == maskLen-1
}

// LastIP returns the last IP of a subnet, excluding the broadcast if IPv4 (if not /31 net)
func LastIP(network *net.IPNet) net.IP {
var end net.IP
for i := 0; i < len(network.IP); i++ {
end = append(end, network.IP[i]|^network.Mask[i])
}
if network.IP.To4() != nil && !IsPointToPointSubnet(network) {
end[3]--
}
return end
}

// GetSubnetGen returns generator function that can be called multiple times
// to generate subnet for the network with the prefix size.
// The function always returns non-nil function.
Expand Down
32 changes: 32 additions & 0 deletions pkg/ip/cidr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,36 @@ var _ = Describe("CIDR functions", func() {
Expect(gen().String()).To(Equal("::4/127"))
})
})
Context("IsPointToPointSubnet", func() {
It("/31", func() {
_, network, _ := net.ParseCIDR("192.168.1.0/31")
Expect(IsPointToPointSubnet(network)).To(BeTrue())
})
It("/127", func() {
_, network, _ := net.ParseCIDR("2002:0:0:1234::1/127")
Expect(IsPointToPointSubnet(network)).To(BeTrue())
})
It("/24", func() {
_, network, _ := net.ParseCIDR("192.168.1.0/24")
Expect(IsPointToPointSubnet(network)).To(BeFalse())
})
})
Context("LastIP", func() {
It("/31", func() {
_, network, _ := net.ParseCIDR("192.168.1.0/31")
Expect(LastIP(network).String()).To(Equal("192.168.1.1"))
})
It("/127", func() {
_, network, _ := net.ParseCIDR("2002:0:0:1234::0/127")
Expect(LastIP(network).String()).To(Equal("2002:0:0:1234::1"))
})
It("/24", func() {
_, network, _ := net.ParseCIDR("192.168.1.0/24")
Expect(LastIP(network).String()).To(Equal("192.168.1.254"))
})
It("/64", func() {
_, network, _ := net.ParseCIDR("2002:0:0:1234::0/64")
Expect(LastIP(network).String()).To(Equal("2002::1234:ffff:ffff:ffff:ffff"))
})
})
})
23 changes: 2 additions & 21 deletions pkg/ipam-node/allocator/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *Range) Canonicalize() error {
return fmt.Errorf("RangeStart %s not in network %s", r.RangeStart.String(), (*net.IPNet)(&r.Subnet).String())
}
} else {
if isPointToPoint(r.Subnet) {
if ip.IsPointToPointSubnet((*net.IPNet)(&r.Subnet)) {
r.RangeStart = r.Subnet.IP
} else {
r.RangeStart = ip.NextIP(r.Subnet.IP)
Expand All @@ -95,7 +95,7 @@ func (r *Range) Canonicalize() error {
return fmt.Errorf("RangeEnd %s not in network %s", r.RangeEnd.String(), (*net.IPNet)(&r.Subnet).String())
}
} else {
r.RangeEnd = lastIP(r.Subnet)
r.RangeEnd = ip.LastIP((*net.IPNet)(&r.Subnet))
}

return nil
Expand Down Expand Up @@ -162,22 +162,3 @@ func CanonicalizeIP(addr *net.IP) error {
*addr = normalizedIP
return nil
}

// Returns true if IPNet is point to point (/31 or /127)
func isPointToPoint(subnet types.IPNet) bool {
ones, maskLen := subnet.Mask.Size()
return ones == maskLen-1
}

// Determine the last IP of a subnet, excluding the broadcast if IPv4 (if not /31 net)
func lastIP(subnet types.IPNet) net.IP {
var end net.IP
for i := 0; i < len(subnet.IP); i++ {
end = append(end, subnet.IP[i]|^subnet.Mask[i])
}
if subnet.IP.To4() != nil && !isPointToPoint(subnet) {
end[3]--
}

return end
}
Loading

0 comments on commit cdcd33f

Please sign in to comment.