Skip to content

Commit

Permalink
Generalize code to support both pool types in ipam-node
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Kulazhenkov <ykulazhenkov@nvidia.com>
  • Loading branch information
ykulazhenkov committed Jun 11, 2024
1 parent 653ab95 commit cf4997a
Show file tree
Hide file tree
Showing 24 changed files with 340 additions and 288 deletions.
23 changes: 23 additions & 0 deletions pkg/common/pool_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024, NVIDIA CORPORATION & AFFILIATES
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

// GetPoolKey builds uniq key for pool from poolName and poolType
func GetPoolKey(poolName, poolType string) string {
if poolType == "" || poolType == PoolTypeIPPool {
// to avoid migration of the store, and to support downgrade
return poolName
}
return poolType + "/" + poolName
}
3 changes: 2 additions & 1 deletion pkg/ipam-controller/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/config"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
)
Expand Down Expand Up @@ -196,7 +197,7 @@ func updateAllocations(ctx context.Context, c client.Client,
continue
}
nodesToClearAnnotation.Insert(node.Name)
nodeIPPoolConfig := poolCfg.GetPoolByName(poolName)
nodeIPPoolConfig := poolCfg.GetPoolByKey(common.GetPoolKey(poolName, common.PoolTypeIPPool))
if nodeIPPoolConfig == nil {
nodeLog.Info("skip loading data for pool from the node, pool not configured", "node", node.Name, "pool", poolName)
continue
Expand Down
6 changes: 3 additions & 3 deletions pkg/ipam-controller/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func updateNode(node *corev1.Node) *corev1.Node {
return node
}

func getRangeFromNode(nodeName string) map[string]*pool.IPPool {
func getRangeFromNode(nodeName string) map[string]*pool.Pool {
node := getNode(nodeName)
poolCfg, err := pool.NewConfigReader(node)
if err != nil {
Expand All @@ -111,7 +111,7 @@ var _ = Describe("Controller Migrator", func() {

By("Set annotation with valid ranges for node1")
node1 := createNode(testNode1)
node1InitialRanges := map[string]*pool.IPPool{pool1Name: {
node1InitialRanges := map[string]*pool.Pool{pool1Name: {
Name: pool1Name,
Subnet: "192.168.0.0/16",
StartIP: "192.168.0.11",
Expand All @@ -129,7 +129,7 @@ var _ = Describe("Controller Migrator", func() {

By("Set annotation with valid ranges for node2")
node2 := createNode(testNode2)
node2InitialRanges := map[string]*pool.IPPool{pool1Name: {
node2InitialRanges := map[string]*pool.Pool{pool1Name: {
Name: pool1Name,
Subnet: "192.168.0.0/16",
StartIP: "192.168.0.21",
Expand Down
10 changes: 5 additions & 5 deletions pkg/ipam-node/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ type IPAllocator interface {
type allocator struct {
rangeSet *RangeSet
session storePkg.Session
poolName string
poolKey string
exclusions *RangeSet
}

// NewIPAllocator create and initialize a new instance of IP allocator
func NewIPAllocator(s *RangeSet, exclusions *RangeSet,
poolName string, session storePkg.Session) IPAllocator {
poolKey string, session storePkg.Session) IPAllocator {
return &allocator{
rangeSet: s,
session: session,
poolName: poolName,
poolKey: poolKey,
exclusions: exclusions,
}
}
Expand All @@ -70,7 +70,7 @@ func (a *allocator) Allocate(id string, ifName string, meta types.ReservationMet
if a.exclusions != nil && a.exclusions.Contains(reservedIP.IP) {
continue
}
err := a.session.Reserve(a.poolName, id, ifName, meta, reservedIP.IP)
err := a.session.Reserve(a.poolKey, id, ifName, meta, reservedIP.IP)
if err == nil {
break
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func (a *allocator) getIter() *RangeIter {

// We might get a last reserved IP that is wrong if the range indexes changed.
// This is not critical, we just lose round-robin this one time.
lastReservedIP := a.session.GetLastReservedIP(a.poolName)
lastReservedIP := a.session.GetLastReservedIP(a.poolKey)
if lastReservedIP != nil {
startFromLastReservedIP = a.rangeSet.Contains(lastReservedIP)
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/ipam-node/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type cleaner struct {
}

func (c *cleaner) Start(ctx context.Context) {
logger := logr.FromContextOrDiscard(ctx).WithName("cleaner")
logger := logr.FromContextOrDiscard(ctx)
for {
loopLogger := logger.WithValues("checkID", uuid.NewString())
loopLogger.Info("check for stale IPs")
Expand All @@ -96,17 +96,17 @@ func (c *cleaner) loop(ctx context.Context) error {
}
allReservations := map[string]struct{}{}
emptyPools := []string{}
for _, poolName := range session.ListPools() {
poolReservations := session.ListReservations(poolName)
for _, poolKey := range session.ListPools() {
poolReservations := session.ListReservations(poolKey)
if len(poolReservations) == 0 {
emptyPools = append(emptyPools, poolName)
emptyPools = append(emptyPools, poolKey)
continue
}
for _, reservation := range poolReservations {
resLogger := logger.WithValues("pool", poolName,
resLogger := logger.WithValues("poolKey", poolKey,
"container_id", reservation.ContainerID, "interface_name", reservation.InterfaceName)
key := c.getStaleAllocKey(poolName, reservation)
allReservations[key] = struct{}{}
staleAllocKey := c.getStaleAllocKey(poolKey, reservation)
allReservations[staleAllocKey] = struct{}{}
if reservation.Metadata.PodName == "" || reservation.Metadata.PodNamespace == "" {
resLogger.V(2).Info("reservation has no required metadata fields, skip")
continue
Expand All @@ -128,10 +128,10 @@ func (c *cleaner) loop(ctx context.Context) error {
}
}
if found {
delete(c.staleAllocations, key)
delete(c.staleAllocations, staleAllocKey)
} else {
c.staleAllocations[key]++
resLogger.V(2).Info("pod not found, increase stale counter", "value", c.staleAllocations[key])
c.staleAllocations[staleAllocKey]++
resLogger.V(2).Info("pod not found, increase stale counter", "value", c.staleAllocations[staleAllocKey])
}
}
}
Expand All @@ -145,15 +145,15 @@ func (c *cleaner) loop(ctx context.Context) error {
// release reservations which were marked as stale multiple times
if count > c.checkCountBeforeRelease {
keyFields := strings.SplitN(k, "|", 3)
poolName, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "poolName", poolName,
poolKey, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "poolKey", poolKey,
"container_id", containerID, "interface_name", ifName)
session.ReleaseReservationByID(poolName, containerID, ifName)
session.ReleaseReservationByID(poolKey, containerID, ifName)
}
}
// remove empty pools if they don't have configuration in the k8s API
for _, emptyPool := range emptyPools {
if p := c.poolConfReader.GetPoolByName(emptyPool); p == nil {
if p := c.poolConfReader.GetPoolByKey(emptyPool); p == nil {
session.RemovePool(emptyPool)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ipam-node/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ var _ = Describe("Cleaner", func() {

poolManager := poolMockPkg.NewManager(GinkgoT())
// pool2 has no config in the k8s API
poolManager.On("GetPoolByName", testPool2).Return(nil)
poolManager.On("GetPoolByKey", testPool2).Return(nil)
// pool3 has config in the k8s API
poolManager.On("GetPoolByName", testPool3).Return(&poolPkg.IPPool{})
poolManager.On("GetPoolByKey", testPool3).Return(&poolPkg.Pool{})

session, err := store.Open(ctx)
Expect(err).NotTo(HaveOccurred())
Expand Down
11 changes: 7 additions & 4 deletions pkg/ipam-node/controllers/ippool/ippool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
)

Expand All @@ -38,34 +39,36 @@ type IPPoolReconciler struct {
func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLog := log.FromContext(ctx)
ipPool := &ipamv1alpha1.IPPool{}
poolKey := common.GetPoolKey(req.Name, common.PoolTypeIPPool)
err := r.Client.Get(ctx, req.NamespacedName, ipPool)
if err != nil {
if apiErrors.IsNotFound(err) {
reqLog.Info("IPPool not found, removing from PoolManager")
r.PoolManager.RemovePool(req.Name)
r.PoolManager.RemovePool(poolKey)
return ctrl.Result{}, nil
}
reqLog.Error(err, "failed to get IPPool object from the cache")
return ctrl.Result{}, err
}
reqLog.Info("Notification on IPPool", "name", ipPool.Name)
found := false

for _, alloc := range ipPool.Status.Allocations {
if alloc.NodeName == r.NodeName {
ipPool := &pool.IPPool{
ipPool := &pool.Pool{
Name: ipPool.Name,
Subnet: ipPool.Spec.Subnet,
Gateway: ipPool.Spec.Gateway,
StartIP: alloc.StartIP,
EndIP: alloc.EndIP,
}
r.PoolManager.UpdatePool(ipPool)
r.PoolManager.UpdatePool(poolKey, ipPool)
found = true
break
}
}
if !found {
r.PoolManager.RemovePool(req.Name)
r.PoolManager.RemovePool(poolKey)
}
return ctrl.Result{}, nil
}
Expand Down
56 changes: 38 additions & 18 deletions pkg/ipam-node/handlers/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/status"

nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/allocator"
storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types"
Expand Down Expand Up @@ -77,9 +78,9 @@ func (h *Handlers) allocate(reqLog logr.Logger,
session storePkg.Session, params *nodev1.IPAMParameters) ([]PoolAlloc, error) {
var err error
result := make([]PoolAlloc, 0, len(params.Pools))
for _, pool := range params.Pools {
for _, poolName := range params.Pools {
var alloc PoolAlloc
alloc, err = h.allocateInPool(pool, reqLog, session, params)
alloc, err = h.allocateInPool(poolName, reqLog, session, params)
if err != nil {
break
}
Expand All @@ -92,26 +93,28 @@ func (h *Handlers) allocate(reqLog logr.Logger,
return result, nil
}

func (h *Handlers) allocateInPool(pool string, reqLog logr.Logger,
func (h *Handlers) allocateInPool(poolName string, reqLog logr.Logger,
session storePkg.Session, params *nodev1.IPAMParameters) (PoolAlloc, error) {
poolType := poolTypeAsString(params.PoolType)
poolLog := reqLog.WithValues("pool", pool, "poolType", poolType)
poolLog := reqLog.WithValues("pool", poolName, "poolType", poolType)
poolKey := common.GetPoolKey(poolName, poolType)

poolCfg := h.poolConfReader.GetPoolByName(pool)
poolCfg := h.poolConfReader.GetPoolByKey(poolKey)
if poolCfg == nil {
return PoolAlloc{}, status.Errorf(codes.NotFound, "configuration for pool %s not found", pool)
return PoolAlloc{}, status.Errorf(codes.NotFound,
"configuration for pool \"%s\", poolType \"%s\" not found", poolName, poolType)
}
rangeStart := net.ParseIP(poolCfg.StartIP)
if rangeStart == nil {
return PoolAlloc{}, poolCfgError(poolLog, pool, "invalid rangeStart")
return PoolAlloc{}, poolCfgError(poolLog, poolName, poolType, "invalid rangeStart")
}
rangeEnd := net.ParseIP(poolCfg.EndIP)
if rangeEnd == nil {
return PoolAlloc{}, poolCfgError(poolLog, pool, "invalid rangeEnd")
return PoolAlloc{}, poolCfgError(poolLog, poolName, poolType, "invalid rangeEnd")
}
_, subnet, err := net.ParseCIDR(poolCfg.Subnet)
if err != nil || subnet == nil || subnet.IP == nil || subnet.Mask == nil {
return PoolAlloc{}, poolCfgError(poolLog, pool, "invalid subnet")
return PoolAlloc{}, poolCfgError(poolLog, poolName, poolType, "invalid subnet")
}
rangeSet := &allocator.RangeSet{allocator.Range{
RangeStart: rangeStart,
Expand All @@ -120,10 +123,24 @@ func (h *Handlers) allocateInPool(pool string, reqLog logr.Logger,
Gateway: net.ParseIP(poolCfg.Gateway),
}}
if err := rangeSet.Canonicalize(); err != nil {
return PoolAlloc{}, poolCfgError(poolLog, pool,
return PoolAlloc{}, poolCfgError(poolLog, poolName, poolType,
fmt.Sprintf("invalid range config: %s", err.Error()))
}
alloc := h.getAllocFunc(rangeSet, &allocator.RangeSet{}, pool, session)
exclusionRangeSet := make(allocator.RangeSet, 0, len(poolCfg.Exclusions))
for _, e := range poolCfg.Exclusions {
exclusionRangeSet = append(exclusionRangeSet, allocator.Range{
Subnet: cniTypes.IPNet(*subnet),
RangeStart: net.ParseIP(e.StartIP),
RangeEnd: net.ParseIP(e.EndIP),
})
}
if len(exclusionRangeSet) > 0 {
if err := exclusionRangeSet.Canonicalize(); err != nil {
return PoolAlloc{}, poolCfgError(poolLog, poolName, poolType,
fmt.Sprintf("invalid exclusion range config: %s", err.Error()))
}
}
alloc := h.getAllocFunc(rangeSet, &exclusionRangeSet, poolKey, session)
allocMeta := types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PoolConfigSnapshot: poolCfg.String(),
Expand All @@ -139,23 +156,26 @@ func (h *Handlers) allocateInPool(pool string, reqLog logr.Logger,
poolLog.Error(err, "failed to allocate IP address")
if errors.Is(err, storePkg.ErrReservationAlreadyExist) {
return PoolAlloc{}, status.Errorf(codes.AlreadyExists,
"allocation already exist in the pool %s", pool)
"allocation already exist in the pool \"%s\", poolType \"%s\"", poolName, poolType)
}
if errors.Is(err, allocator.ErrNoFreeAddresses) {
return PoolAlloc{}, status.Errorf(codes.ResourceExhausted, "no free addresses in the pool %s", pool)
return PoolAlloc{}, status.Errorf(codes.ResourceExhausted,
"no free addresses in the pool \"%s\", poolType \"%s\"",
poolName, poolType)
}
return PoolAlloc{}, status.Errorf(codes.Internal, "failed to allocate IP address in pool %s", pool)
return PoolAlloc{}, status.Errorf(codes.Internal,
"failed to allocate IP address in pool \"%s\", poolType \"%s\"", poolName, poolType)
}
poolLog.Info("IP address allocated", "allocation", result.String())

return PoolAlloc{
Pool: pool,
Pool: poolName,
IPConfig: result,
}, nil
}

func poolCfgError(reqLog logr.Logger, pool, reason string) error {
reqLog.Error(nil, "invalid pool config", "pool", pool,
func poolCfgError(reqLog logr.Logger, pool, poolType, reason string) error {
reqLog.Error(nil, "invalid pool config", "pool", pool, "poolType", poolType,
"reason", reason)
return status.Errorf(codes.Internal, "invalid config for pool %s", pool)
return status.Errorf(codes.Internal, "invalid config for pool \"%s\", poolType \"%s\"", pool, poolType)
}
6 changes: 4 additions & 2 deletions pkg/ipam-node/handlers/deallocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-logr/logr"

nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
)

// Deallocate is the handler for Deallocate GRPC endpoint
Expand All @@ -37,8 +38,9 @@ func (h *Handlers) Deallocate(
if err := checkReqIsCanceled(ctx); err != nil {
return nil, h.closeSession(ctx, store, err)
}
for _, p := range params.Pools {
store.ReleaseReservationByID(p, params.CniContainerid, params.CniIfname)
poolType := poolTypeAsString(params.PoolType)
for _, poolName := range params.Pools {
store.ReleaseReservationByID(common.GetPoolKey(poolName, poolType), params.CniContainerid, params.CniIfname)
}
if err := h.closeSession(ctx, store, nil); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipam-node/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

type GetAllocatorFunc = func(s *allocator.RangeSet, exclusions *allocator.RangeSet,
poolName string, session storePkg.Session) allocator.IPAllocator
poolKey string, session storePkg.Session) allocator.IPAllocator

// New create and initialize new instance of grpc Handlers
func New(poolConfReader poolPkg.ConfigReader, store storePkg.Store, getAllocFunc GetAllocatorFunc) *Handlers {
Expand Down
Loading

0 comments on commit cf4997a

Please sign in to comment.