Skip to content

Commit

Permalink
ipam-node: add periodic cleanup loop for stale reservations
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Kulazhenkov <ykulazhenkov@nvidia.com>
  • Loading branch information
ykulazhenkov committed Jul 19, 2023
1 parent 5222efe commit 196946c
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 5 deletions.
24 changes: 19 additions & 5 deletions cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/google/renameio/v2"
Expand Down Expand Up @@ -53,6 +54,7 @@ import (
cniTypes "github.com/Mellanox/nvidia-k8s-ipam/pkg/cni/types"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/common"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/allocator"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner"
nodectrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/node"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/grpc/middleware"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/handlers"
Expand Down Expand Up @@ -160,24 +162,26 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
return err
}

grpcServer, listener, err := initGRPCServer(opts, logger, poolManager)
storeMgr := store.NewManager(opts.StoreFile)

grpcServer, listener, err := initGRPCServer(opts, logger, poolManager, storeMgr)
if err != nil {
return err
}

wg := sync.WaitGroup{}
wg.Add(3)

errCh := make(chan error, 1)

innerCtx, innerCFunc := context.WithCancel(ctx)
defer innerCFunc()

wg.Add(1)
go func() {
defer wg.Done()
<-innerCtx.Done()
grpcServer.GracefulStop()
}()
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("start grpc server")
Expand All @@ -190,6 +194,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}
logger.Info("grpc server stopped")
}()
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("start manager")
Expand All @@ -202,6 +207,15 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}
logger.Info("manager stopped")
}()
mgr.GetCache().WaitForCacheSync(ctx)

wg.Add(1)
go func() {
defer wg.Done()
logger.Info("start stale reservations cleaner")
c := cleaner.New(mgr.GetClient(), storeMgr, time.Minute, 3)
c.Start(ctx)
}()

select {
case <-ctx.Done():
Expand All @@ -215,7 +229,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}

func initGRPCServer(opts *options.Options,
log logr.Logger, poolManager poolPkg.Manager) (*grpc.Server, net.Listener, error) {
log logr.Logger, poolManager poolPkg.Manager, storeMgr store.Manager) (*grpc.Server, net.Listener, error) {
network, address, err := options.ParseBindAddress(opts.BindAddress)
if err != nil {
return nil, nil, err
Expand All @@ -235,7 +249,7 @@ func initGRPCServer(opts *options.Options,
middleware.LogResponseMiddleware))

nodev1.RegisterIPAMBackendServiceServer(grpcServer,
handlers.New(poolManager, store.NewManager(opts.StoreFile), allocator.NewIPAllocator))
handlers.New(poolManager, storeMgr, allocator.NewIPAllocator))
return grpcServer, listener, nil
}

Expand Down
136 changes: 136 additions & 0 deletions pkg/ipam-node/cleaner/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2023, NVIDIA CORPORATION & AFFILIATES
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cleaner

import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
apiTypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types"
)

type Cleaner interface {
Start(ctx context.Context)
}

func New(client client.Client, storeMgr storePkg.Manager,
checkInterval time.Duration,
checkCountBeforeRelease int) Cleaner {
return &cleaner{
client: client,
storeMgr: storeMgr,
checkInterval: checkInterval,
checkCountBeforeRelease: checkCountBeforeRelease,
staleAllocations: make(map[string]int),
}
}

type cleaner struct {
client client.Client
storeMgr storePkg.Manager
checkInterval time.Duration
checkCountBeforeRelease int
// key is <pool_name>|<container_id>|<interface_name>, value is count of failed checks
staleAllocations map[string]int
}

func (c *cleaner) Start(ctx context.Context) {
logger := logr.FromContextOrDiscard(ctx).WithName("cleaner")
for {
loopLogger := logger.WithValues("checkID", uuid.NewString())
loopLogger.Info("check for stale IPs")
ctx = logr.NewContext(ctx, loopLogger)
if err := c.loop(ctx); err != nil {
loopLogger.Error(err, "check failed")
}
select {
case <-ctx.Done():
logger.Info("shutdown cleaner")
return
case <-time.After(c.checkInterval):
}
}
}

func (c *cleaner) loop(ctx context.Context) error {
logger := logr.FromContextOrDiscard(ctx)
store, err := c.storeMgr.Open(ctx)
if err != nil {
return fmt.Errorf("failed to open store: %v", err)
}
allReservations := map[string]struct{}{}
for _, poolName := range store.ListPools() {
for _, reservation := range store.ListReservations(poolName) {
resLogger := logger.WithValues("pool", poolName,
"container_id", reservation.ContainerID, "interface_name", reservation.InterfaceName)
key := c.getStaleAllocKey(poolName, reservation)
allReservations[key] = struct{}{}
if reservation.Metadata.PodName == "" || reservation.Metadata.PodNamespace == "" {
resLogger.V(2).Info("reservation has no required metadata fields, skip")
continue
}
pod := &corev1.Pod{}
err := c.client.Get(ctx, apiTypes.NamespacedName{
Namespace: reservation.Metadata.PodNamespace,
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
store.Cancel()
return fmt.Errorf("failed to read Pod info from the cache: %v", err)
}
if apiErrors.IsNotFound(err) ||
(reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) {
c.staleAllocations[key]++
resLogger.V(2).Info("pod not found in the API, increase stale counter",
"value", c.staleAllocations[key])
} else {
delete(c.staleAllocations, key)
}
}
}

for k, count := range c.staleAllocations {
// remove unknown reservations from c.staleAllocations
if _, isKnownReservation := allReservations[k]; !isKnownReservation {
delete(c.staleAllocations, k)
continue
}
// release reservations which were marked as stale multiple times
if count > c.checkCountBeforeRelease {
keyFields := strings.SplitN(k, "|", 3)
pool, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "pool", pool,
"container_id", containerID, "interface_name", ifName)
store.ReleaseReservationByID(pool, containerID, ifName)
}
}
if err := store.Commit(); err != nil {
return fmt.Errorf("failed to commit changes to the store: %v", err)
}
return nil
}

func (c *cleaner) getStaleAllocKey(poolName string, r types.Reservation) string {
return fmt.Sprintf("%s|%s|%s", poolName, r.ContainerID, r.InterfaceName)
}
66 changes: 66 additions & 0 deletions pkg/ipam-node/cleaner/cleaner_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2023, NVIDIA CORPORATION & AFFILIATES
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cleaner_test

import (
"context"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/klog/v2"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
)

var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
cFunc context.CancelFunc
ctx context.Context
)

func TestCleaner(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Cleaner Suite")
}

var _ = BeforeSuite(func() {
By("bootstrapping test environment")
testEnv = &envtest.Environment{}

ctx, cFunc = context.WithCancel(context.Background())
ctx = logr.NewContext(ctx, klog.NewKlogr())

var err error
// cfg is defined in this file globally.
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

k8sClient, err = client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
})

var _ = AfterSuite(func() {
cFunc()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
Loading

0 comments on commit 196946c

Please sign in to comment.