From 9d0c23e323bd5107337a8ff74e18cf47837a10ff Mon Sep 17 00:00:00 2001 From: Yury Kulazhenkov Date: Fri, 4 Aug 2023 12:22:25 +0300 Subject: [PATCH] ipam-node: Remove stale pool entries from the store remove the pool data from the store if the pool has no allocations and if information about the pool is unavailable in the Kubernetes API Signed-off-by: Yury Kulazhenkov --- cmd/ipam-node/app/app.go | 2 +- pkg/ipam-node/cleaner/cleaner.go | 36 ++++++++++++++++------- pkg/ipam-node/cleaner/cleaner_test.go | 42 ++++++++++++++++++--------- pkg/ipam-node/store/mocks/Session.go | 33 +++++++++++++++++++++ pkg/ipam-node/store/store.go | 9 ++++++ pkg/ipam-node/store/store_test.go | 8 +++++ 6 files changed, 106 insertions(+), 24 deletions(-) diff --git a/cmd/ipam-node/app/app.go b/cmd/ipam-node/app/app.go index 55ee06a..8438981 100644 --- a/cmd/ipam-node/app/app.go +++ b/cmd/ipam-node/app/app.go @@ -243,7 +243,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio } return } - c := cleaner.New(mgr.GetClient(), store, time.Minute, 3) + c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3) c.Start(innerCtx) logger.Info("cleaner stopped") }() diff --git a/pkg/ipam-node/cleaner/cleaner.go b/pkg/ipam-node/cleaner/cleaner.go index 93c5112..184a0f0 100644 --- a/pkg/ipam-node/cleaner/cleaner.go +++ b/pkg/ipam-node/cleaner/cleaner.go @@ -28,12 +28,14 @@ import ( storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types" + "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" ) // Cleaner is the interface of the cleaner package. // The cleaner periodically scan the store and check for allocations which doesn't have // related Pod in the k8s API. If allocation has no Pod for more than X checks, then the cleaner -// will release the allocation. +// will release the allocation. Also, the cleaner will remove pool entries in the store if the pool has no +// allocation and pool configuration is unavailable in the Kubernetes API. type Cleaner interface { // Start starts the cleaner loop. // The cleaner loop discovers stale allocations and clean up them. @@ -43,12 +45,13 @@ type Cleaner interface { // New creates and initialize new cleaner instance // "checkInterval" defines delay between checks for stale allocations. // "checkCountBeforeRelease: defines how many check to do before remove the allocation -func New(client client.Client, store storePkg.Store, +func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader, checkInterval time.Duration, checkCountBeforeRelease int) Cleaner { return &cleaner{ client: client, store: store, + poolConfReader: poolConfReader, checkInterval: checkInterval, checkCountBeforeRelease: checkCountBeforeRelease, staleAllocations: make(map[string]int), @@ -58,6 +61,7 @@ func New(client client.Client, store storePkg.Store, type cleaner struct { client client.Client store storePkg.Store + poolConfReader pool.ConfigReader checkInterval time.Duration checkCountBeforeRelease int // key is ||, value is count of failed checks @@ -84,13 +88,19 @@ func (c *cleaner) Start(ctx context.Context) { func (c *cleaner) loop(ctx context.Context) error { logger := logr.FromContextOrDiscard(ctx) - store, err := c.store.Open(ctx) + session, err := c.store.Open(ctx) if err != nil { return fmt.Errorf("failed to open store: %v", err) } allReservations := map[string]struct{}{} - for _, poolName := range store.ListPools() { - for _, reservation := range store.ListReservations(poolName) { + emptyPools := []string{} + for _, poolName := range session.ListPools() { + poolReservations := session.ListReservations(poolName) + if len(poolReservations) == 0 { + emptyPools = append(emptyPools, poolName) + continue + } + for _, reservation := range poolReservations { resLogger := logger.WithValues("pool", poolName, "container_id", reservation.ContainerID, "interface_name", reservation.InterfaceName) key := c.getStaleAllocKey(poolName, reservation) @@ -105,7 +115,7 @@ func (c *cleaner) loop(ctx context.Context) error { Name: reservation.Metadata.PodName, }, pod) if err != nil && !apiErrors.IsNotFound(err) { - store.Cancel() + session.Cancel() return fmt.Errorf("failed to read Pod info from the cache: %v", err) } if apiErrors.IsNotFound(err) || @@ -128,13 +138,19 @@ func (c *cleaner) loop(ctx context.Context) error { // release reservations which were marked as stale multiple times if count > c.checkCountBeforeRelease { keyFields := strings.SplitN(k, "|", 3) - pool, containerID, ifName := keyFields[0], keyFields[1], keyFields[2] - logger.Info("stale reservation released", "pool", pool, + poolName, containerID, ifName := keyFields[0], keyFields[1], keyFields[2] + logger.Info("stale reservation released", "poolName", poolName, "container_id", containerID, "interface_name", ifName) - store.ReleaseReservationByID(pool, containerID, ifName) + session.ReleaseReservationByID(poolName, containerID, ifName) + } + } + // remove empty pools if they don't have configuration in the k8s API + for _, emptyPool := range emptyPools { + if p := c.poolConfReader.GetPoolByName(emptyPool); p == nil { + session.RemovePool(emptyPool) } } - if err := store.Commit(); err != nil { + if err := session.Commit(); err != nil { return fmt.Errorf("failed to commit changes to the store: %v", err) } return nil diff --git a/pkg/ipam-node/cleaner/cleaner_test.go b/pkg/ipam-node/cleaner/cleaner_test.go index 14433cd..cfe63e0 100644 --- a/pkg/ipam-node/cleaner/cleaner_test.go +++ b/pkg/ipam-node/cleaner/cleaner_test.go @@ -27,6 +27,8 @@ import ( cleanerPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner" storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types" + poolPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" + poolMockPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool/mocks" ) const ( @@ -35,6 +37,7 @@ const ( testPodName2 = "test-pod2" testPool1 = "pool1" testPool2 = "pool2" + testPool3 = "pool3" testIFName = "net0" ) @@ -54,52 +57,63 @@ var _ = Describe("Cleaner", func() { It("Cleanup test", func() { done := make(chan interface{}) go func() { + defer GinkgoRecover() + defer close(done) storePath := filepath.Join(GinkgoT().TempDir(), "test_store") - storeMgr := storePkg.New(storePath) - cleaner := cleanerPkg.New(k8sClient, storeMgr, time.Millisecond*100, 3) + store := storePkg.New(storePath) - pod1UID := createPod(testPodName1, testNamespace) - _ = createPod(testPodName2, testNamespace) + poolManager := poolMockPkg.NewManager(GinkgoT()) + // pool2 has no config in the k8s API + poolManager.On("GetPoolByName", testPool2).Return(nil) + // pool3 has config in the k8s API + poolManager.On("GetPoolByName", testPool3).Return(&poolPkg.IPPool{}) - store, err := storeMgr.Open(ctx) + session, err := store.Open(ctx) Expect(err).NotTo(HaveOccurred()) + // this will create empty pool config + session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100")) + + cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3) + + pod1UID := createPod(testPodName1, testNamespace) + _ = createPod(testPodName2, testNamespace) // should keep these reservations - Expect(store.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodUUID: pod1UID, PodName: testPodName1, PodNamespace: testNamespace, }, net.ParseIP("192.168.1.100"))).NotTo(HaveOccurred()) - Expect(store.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{}, + Expect(session.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{}, net.ParseIP("192.168.1.101"))).NotTo(HaveOccurred()) // should remove these reservations - Expect(store.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodName: "unknown", PodNamespace: testNamespace, }, net.ParseIP("192.168.1.102"))).NotTo(HaveOccurred()) - Expect(store.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodName: "unknown2", PodNamespace: testNamespace, }, net.ParseIP("192.168.2.100"))).NotTo(HaveOccurred()) - Expect(store.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodUUID: "something", // differ from the reservation PodName: testPodName2, PodNamespace: testNamespace, }, net.ParseIP("192.168.2.101"))).NotTo(HaveOccurred()) - Expect(store.Commit()).NotTo(HaveOccurred()) + Expect(session.Commit()).NotTo(HaveOccurred()) go func() { cleaner.Start(ctx) }() Eventually(func(g Gomega) { - store, err := storeMgr.Open(ctx) + store, err := store.Open(ctx) g.Expect(err).NotTo(HaveOccurred()) defer store.Cancel() g.Expect(store.GetReservationByID(testPool1, "id1", testIFName)).NotTo(BeNil()) @@ -107,9 +121,11 @@ var _ = Describe("Cleaner", func() { g.Expect(store.GetReservationByID(testPool1, "id3", testIFName)).To(BeNil()) g.Expect(store.GetReservationByID(testPool2, "id4", testIFName)).To(BeNil()) g.Expect(store.GetReservationByID(testPool2, "id5", testIFName)).To(BeNil()) + g.Expect(store.ListPools()).To(And( + ContainElements(testPool1, testPool3), + Not(ContainElement(testPool2)))) }, 10).Should(Succeed()) - close(done) }() Eventually(done, time.Minute).Should(BeClosed()) }) diff --git a/pkg/ipam-node/store/mocks/Session.go b/pkg/ipam-node/store/mocks/Session.go index 2cfa5d5..ff07eeb 100644 --- a/pkg/ipam-node/store/mocks/Session.go +++ b/pkg/ipam-node/store/mocks/Session.go @@ -308,6 +308,39 @@ func (_c *Session_ReleaseReservationByID_Call) RunAndReturn(run func(string, str return _c } +// RemovePool provides a mock function with given fields: pool +func (_m *Session) RemovePool(pool string) { + _m.Called(pool) +} + +// Session_RemovePool_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemovePool' +type Session_RemovePool_Call struct { + *mock.Call +} + +// RemovePool is a helper method to define mock.On call +// - pool string +func (_e *Session_Expecter) RemovePool(pool interface{}) *Session_RemovePool_Call { + return &Session_RemovePool_Call{Call: _e.mock.On("RemovePool", pool)} +} + +func (_c *Session_RemovePool_Call) Run(run func(pool string)) *Session_RemovePool_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *Session_RemovePool_Call) Return() *Session_RemovePool_Call { + _c.Call.Return() + return _c +} + +func (_c *Session_RemovePool_Call) RunAndReturn(run func(string)) *Session_RemovePool_Call { + _c.Call.Return(run) + return _c +} + // Reserve provides a mock function with given fields: pool, id, ifName, meta, address func (_m *Session) Reserve(pool string, id string, ifName string, meta types.ReservationMetadata, address net.IP) error { ret := _m.Called(pool, id, ifName, meta, address) diff --git a/pkg/ipam-node/store/store.go b/pkg/ipam-node/store/store.go index fb4a0b7..0c53577 100644 --- a/pkg/ipam-node/store/store.go +++ b/pkg/ipam-node/store/store.go @@ -60,6 +60,8 @@ type Session interface { ListReservations(pool string) []types.Reservation // ListPools return list with names of all known pools ListPools() []string + // RemovePool removes information about the pool from the store + RemovePool(pool string) // GetLastReservedIP returns last reserved IP for the pool or nil GetLastReservedIP(pool string) net.IP // SetLastReservedIP set last reserved IP fot the pool @@ -256,6 +258,13 @@ func (s *session) ListPools() []string { return pools } +// RemovePool is the Session interface implementation for session +func (s *session) RemovePool(pool string) { + s.checkClosed() + delete(s.tmpData.Pools, pool) + s.isModified = true +} + // GetLastReservedIP is the Session interface implementation for session func (s *session) GetLastReservedIP(pool string) net.IP { s.checkClosed() diff --git a/pkg/ipam-node/store/store_test.go b/pkg/ipam-node/store/store_test.go index 8d6aa79..160e56f 100644 --- a/pkg/ipam-node/store/store_test.go +++ b/pkg/ipam-node/store/store_test.go @@ -230,4 +230,12 @@ var _ = Describe("Store", func() { s.Reserve(testPoolName, "other", testNetIfName, types.ReservationMetadata{}, net.ParseIP(testIP))).To(MatchError(storePkg.ErrIPAlreadyReserved)) }) + It("Remove pool data", func() { + s, err := store.Open(context.Background()) + Expect(err).NotTo(HaveOccurred()) + createTestReservation(s) + Expect(s.ListReservations(testPoolName)).NotTo(BeEmpty()) + s.RemovePool(testPoolName) + Expect(s.ListReservations(testPoolName)).To(BeEmpty()) + }) })