From eccd78fe7f0acd3b9511d51f0ec5b97cc0723aa0 Mon Sep 17 00:00:00 2001 From: Yury Kulazhenkov Date: Tue, 19 Mar 2024 14:39:03 +0200 Subject: [PATCH 1/2] Fix namespace in Pod watch for nv-ipam-node Before the fix, nv-ipam-node was watching only for Pods in the namespace provided by `-- ippools-namespace` arg. This is incorrect and cause "stale IP cleaner" to work incorrectly. Watch Pods in all namespaces instead. Signed-off-by: Yury Kulazhenkov --- cmd/ipam-node/app/app.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/ipam-node/app/app.go b/cmd/ipam-node/app/app.go index f18dcab..1c45103 100644 --- a/cmd/ipam-node/app/app.go +++ b/cmd/ipam-node/app/app.go @@ -31,6 +31,7 @@ import ( "github.com/google/renameio/v2" "github.com/spf13/cobra" "google.golang.org/grpc" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -39,6 +40,7 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -141,7 +143,11 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio Scheme: scheme, Metrics: metricsserver.Options{BindAddress: opts.MetricsAddr}, HealthProbeBindAddress: opts.ProbeAddr, - Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{opts.PoolsNamespace: {}}}, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{opts.PoolsNamespace: {}}, + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: {Namespaces: map[string]cache.Config{cache.AllNamespaces: {}}}}, + }, }) if err != nil { logger.Error(err, "unable to initialize manager") From 1e29014f51e2a2f23a46c0cc530c190e7936eb1c Mon Sep 17 00:00:00 2001 From: Yury Kulazhenkov Date: Tue, 19 Mar 2024 15:51:42 +0200 Subject: [PATCH 2/2] Add additional check to the cleaner Add extra check to the cleaner to validate API (in addition to the cache) before decide to cleanup the stale allocation. Signed-off-by: Yury Kulazhenkov --- cmd/ipam-node/app/app.go | 7 ++- go.mod | 1 + pkg/ipam-node/cleaner/cleaner.go | 56 +++++++++++++++------ pkg/ipam-node/cleaner/cleaner_suite_test.go | 15 ++++-- pkg/ipam-node/cleaner/cleaner_test.go | 2 +- 5 files changed, 59 insertions(+), 22 deletions(-) diff --git a/cmd/ipam-node/app/app.go b/cmd/ipam-node/app/app.go index 1c45103..a61071e 100644 --- a/cmd/ipam-node/app/app.go +++ b/cmd/ipam-node/app/app.go @@ -154,6 +154,11 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio return err } + k8sClient, err := client.New(config, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) + if err != nil { + logger.Error(err, "unable to direct k8s client") + } + if err = (&ippoolctrl.IPPoolReconciler{ PoolManager: poolManager, Client: mgr.GetClient(), @@ -258,7 +263,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio } return } - c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3) + c := cleaner.New(mgr.GetClient(), k8sClient, store, poolManager, time.Minute, 3) c.Start(innerCtx) logger.Info("cleaner stopped") }() diff --git a/go.mod b/go.mod index 158436e..ab05416 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.12.0 // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/pkg/ipam-node/cleaner/cleaner.go b/pkg/ipam-node/cleaner/cleaner.go index 184a0f0..a2c1f4f 100644 --- a/pkg/ipam-node/cleaner/cleaner.go +++ b/pkg/ipam-node/cleaner/cleaner.go @@ -45,11 +45,12 @@ type Cleaner interface { // New creates and initialize new cleaner instance // "checkInterval" defines delay between checks for stale allocations. // "checkCountBeforeRelease: defines how many check to do before remove the allocation -func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader, +func New(cachedClient client.Client, directClient client.Client, store storePkg.Store, poolConfReader pool.ConfigReader, checkInterval time.Duration, checkCountBeforeRelease int) Cleaner { return &cleaner{ - client: client, + cachedClient: cachedClient, + directClient: directClient, store: store, poolConfReader: poolConfReader, checkInterval: checkInterval, @@ -59,7 +60,8 @@ func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigR } type cleaner struct { - client client.Client + cachedClient client.Client + directClient client.Client store storePkg.Store poolConfReader pool.ConfigReader checkInterval time.Duration @@ -109,22 +111,27 @@ func (c *cleaner) loop(ctx context.Context) error { resLogger.V(2).Info("reservation has no required metadata fields, skip") continue } - pod := &corev1.Pod{} - err := c.client.Get(ctx, apiTypes.NamespacedName{ - Namespace: reservation.Metadata.PodNamespace, - Name: reservation.Metadata.PodName, - }, pod) - if err != nil && !apiErrors.IsNotFound(err) { + // first check in the cache + found, err := c.checkPod(ctx, c.cachedClient, reservation) + if err != nil { session.Cancel() return fmt.Errorf("failed to read Pod info from the cache: %v", err) } - if apiErrors.IsNotFound(err) || - (reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) { - c.staleAllocations[key]++ - resLogger.V(2).Info("pod not found in the API, increase stale counter", - "value", c.staleAllocations[key]) - } else { + if !found { + resLogger.V(2).Info("cache check failed for the Pod, check in the API") + // pod not found in the cache, try to query API directly to make sure + // that we use latest state + found, err = c.checkPod(ctx, c.directClient, reservation) + if err != nil { + session.Cancel() + return fmt.Errorf("failed to read Pod info from the API: %v", err) + } + } + if found { delete(c.staleAllocations, key) + } else { + c.staleAllocations[key]++ + resLogger.V(2).Info("pod not found, increase stale counter", "value", c.staleAllocations[key]) } } } @@ -156,6 +163,25 @@ func (c *cleaner) loop(ctx context.Context) error { return nil } +// return true if pod exist, error in case if an unknown error occurred +func (c *cleaner) checkPod(ctx context.Context, k8sClient client.Client, reservation types.Reservation) (bool, error) { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, apiTypes.NamespacedName{ + Namespace: reservation.Metadata.PodNamespace, + Name: reservation.Metadata.PodName, + }, pod) + if err != nil && !apiErrors.IsNotFound(err) { + return false, fmt.Errorf("failed to read Pod info: %v", err) + } + if apiErrors.IsNotFound(err) || + (reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) { + // pod not found or it has different UUID (was recreated) + return false, nil + } + // pod exist + return true, nil +} + func (c *cleaner) getStaleAllocKey(poolName string, r types.Reservation) string { return fmt.Sprintf("%s|%s|%s", poolName, r.ContainerID, r.InterfaceName) } diff --git a/pkg/ipam-node/cleaner/cleaner_suite_test.go b/pkg/ipam-node/cleaner/cleaner_suite_test.go index 5ab4121..9f03f39 100644 --- a/pkg/ipam-node/cleaner/cleaner_suite_test.go +++ b/pkg/ipam-node/cleaner/cleaner_suite_test.go @@ -24,15 +24,17 @@ import ( "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/envtest" ) var ( - cfg *rest.Config - k8sClient client.Client - testEnv *envtest.Environment - cFunc context.CancelFunc - ctx context.Context + cfg *rest.Config + k8sClient client.Client + fakeClient client.Client + testEnv *envtest.Environment + cFunc context.CancelFunc + ctx context.Context ) func TestCleaner(t *testing.T) { @@ -56,6 +58,9 @@ var _ = BeforeSuite(func() { k8sClient, err = client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + + fakeClient = fake.NewFakeClient() + Expect(fakeClient).NotTo(BeNil()) }) var _ = AfterSuite(func() { diff --git a/pkg/ipam-node/cleaner/cleaner_test.go b/pkg/ipam-node/cleaner/cleaner_test.go index cfe63e0..8cc56f5 100644 --- a/pkg/ipam-node/cleaner/cleaner_test.go +++ b/pkg/ipam-node/cleaner/cleaner_test.go @@ -73,7 +73,7 @@ var _ = Describe("Cleaner", func() { // this will create empty pool config session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100")) - cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3) + cleaner := cleanerPkg.New(fakeClient, k8sClient, store, poolManager, time.Millisecond*100, 3) pod1UID := createPod(testPodName1, testNamespace) _ = createPod(testPodName2, testNamespace)