diff --git a/cmd/ipam-node/app/app.go b/cmd/ipam-node/app/app.go index 55ee06a..8438981 100644 --- a/cmd/ipam-node/app/app.go +++ b/cmd/ipam-node/app/app.go @@ -243,7 +243,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio } return } - c := cleaner.New(mgr.GetClient(), store, time.Minute, 3) + c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3) c.Start(innerCtx) logger.Info("cleaner stopped") }() diff --git a/pkg/ipam-node/cleaner/cleaner.go b/pkg/ipam-node/cleaner/cleaner.go index 93c5112..184a0f0 100644 --- a/pkg/ipam-node/cleaner/cleaner.go +++ b/pkg/ipam-node/cleaner/cleaner.go @@ -28,12 +28,14 @@ import ( storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types" + "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" ) // Cleaner is the interface of the cleaner package. // The cleaner periodically scan the store and check for allocations which doesn't have // related Pod in the k8s API. If allocation has no Pod for more than X checks, then the cleaner -// will release the allocation. +// will release the allocation. Also, the cleaner will remove pool entries in the store if the pool has no +// allocation and pool configuration is unavailable in the Kubernetes API. type Cleaner interface { // Start starts the cleaner loop. // The cleaner loop discovers stale allocations and clean up them. @@ -43,12 +45,13 @@ type Cleaner interface { // New creates and initialize new cleaner instance // "checkInterval" defines delay between checks for stale allocations. // "checkCountBeforeRelease: defines how many check to do before remove the allocation -func New(client client.Client, store storePkg.Store, +func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader, checkInterval time.Duration, checkCountBeforeRelease int) Cleaner { return &cleaner{ client: client, store: store, + poolConfReader: poolConfReader, checkInterval: checkInterval, checkCountBeforeRelease: checkCountBeforeRelease, staleAllocations: make(map[string]int), @@ -58,6 +61,7 @@ func New(client client.Client, store storePkg.Store, type cleaner struct { client client.Client store storePkg.Store + poolConfReader pool.ConfigReader checkInterval time.Duration checkCountBeforeRelease int // key is ||, value is count of failed checks @@ -84,13 +88,19 @@ func (c *cleaner) Start(ctx context.Context) { func (c *cleaner) loop(ctx context.Context) error { logger := logr.FromContextOrDiscard(ctx) - store, err := c.store.Open(ctx) + session, err := c.store.Open(ctx) if err != nil { return fmt.Errorf("failed to open store: %v", err) } allReservations := map[string]struct{}{} - for _, poolName := range store.ListPools() { - for _, reservation := range store.ListReservations(poolName) { + emptyPools := []string{} + for _, poolName := range session.ListPools() { + poolReservations := session.ListReservations(poolName) + if len(poolReservations) == 0 { + emptyPools = append(emptyPools, poolName) + continue + } + for _, reservation := range poolReservations { resLogger := logger.WithValues("pool", poolName, "container_id", reservation.ContainerID, "interface_name", reservation.InterfaceName) key := c.getStaleAllocKey(poolName, reservation) @@ -105,7 +115,7 @@ func (c *cleaner) loop(ctx context.Context) error { Name: reservation.Metadata.PodName, }, pod) if err != nil && !apiErrors.IsNotFound(err) { - store.Cancel() + session.Cancel() return fmt.Errorf("failed to read Pod info from the cache: %v", err) } if apiErrors.IsNotFound(err) || @@ -128,13 +138,19 @@ func (c *cleaner) loop(ctx context.Context) error { // release reservations which were marked as stale multiple times if count > c.checkCountBeforeRelease { keyFields := strings.SplitN(k, "|", 3) - pool, containerID, ifName := keyFields[0], keyFields[1], keyFields[2] - logger.Info("stale reservation released", "pool", pool, + poolName, containerID, ifName := keyFields[0], keyFields[1], keyFields[2] + logger.Info("stale reservation released", "poolName", poolName, "container_id", containerID, "interface_name", ifName) - store.ReleaseReservationByID(pool, containerID, ifName) + session.ReleaseReservationByID(poolName, containerID, ifName) + } + } + // remove empty pools if they don't have configuration in the k8s API + for _, emptyPool := range emptyPools { + if p := c.poolConfReader.GetPoolByName(emptyPool); p == nil { + session.RemovePool(emptyPool) } } - if err := store.Commit(); err != nil { + if err := session.Commit(); err != nil { return fmt.Errorf("failed to commit changes to the store: %v", err) } return nil diff --git a/pkg/ipam-node/cleaner/cleaner_test.go b/pkg/ipam-node/cleaner/cleaner_test.go index 14433cd..cfe63e0 100644 --- a/pkg/ipam-node/cleaner/cleaner_test.go +++ b/pkg/ipam-node/cleaner/cleaner_test.go @@ -27,6 +27,8 @@ import ( cleanerPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner" storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types" + poolPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" + poolMockPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool/mocks" ) const ( @@ -35,6 +37,7 @@ const ( testPodName2 = "test-pod2" testPool1 = "pool1" testPool2 = "pool2" + testPool3 = "pool3" testIFName = "net0" ) @@ -54,52 +57,63 @@ var _ = Describe("Cleaner", func() { It("Cleanup test", func() { done := make(chan interface{}) go func() { + defer GinkgoRecover() + defer close(done) storePath := filepath.Join(GinkgoT().TempDir(), "test_store") - storeMgr := storePkg.New(storePath) - cleaner := cleanerPkg.New(k8sClient, storeMgr, time.Millisecond*100, 3) + store := storePkg.New(storePath) - pod1UID := createPod(testPodName1, testNamespace) - _ = createPod(testPodName2, testNamespace) + poolManager := poolMockPkg.NewManager(GinkgoT()) + // pool2 has no config in the k8s API + poolManager.On("GetPoolByName", testPool2).Return(nil) + // pool3 has config in the k8s API + poolManager.On("GetPoolByName", testPool3).Return(&poolPkg.IPPool{}) - store, err := storeMgr.Open(ctx) + session, err := store.Open(ctx) Expect(err).NotTo(HaveOccurred()) + // this will create empty pool config + session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100")) + + cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3) + + pod1UID := createPod(testPodName1, testNamespace) + _ = createPod(testPodName2, testNamespace) // should keep these reservations - Expect(store.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodUUID: pod1UID, PodName: testPodName1, PodNamespace: testNamespace, }, net.ParseIP("192.168.1.100"))).NotTo(HaveOccurred()) - Expect(store.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{}, + Expect(session.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{}, net.ParseIP("192.168.1.101"))).NotTo(HaveOccurred()) // should remove these reservations - Expect(store.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodName: "unknown", PodNamespace: testNamespace, }, net.ParseIP("192.168.1.102"))).NotTo(HaveOccurred()) - Expect(store.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodName: "unknown2", PodNamespace: testNamespace, }, net.ParseIP("192.168.2.100"))).NotTo(HaveOccurred()) - Expect(store.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{ + Expect(session.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{ CreateTime: time.Now().Format(time.RFC3339Nano), PodUUID: "something", // differ from the reservation PodName: testPodName2, PodNamespace: testNamespace, }, net.ParseIP("192.168.2.101"))).NotTo(HaveOccurred()) - Expect(store.Commit()).NotTo(HaveOccurred()) + Expect(session.Commit()).NotTo(HaveOccurred()) go func() { cleaner.Start(ctx) }() Eventually(func(g Gomega) { - store, err := storeMgr.Open(ctx) + store, err := store.Open(ctx) g.Expect(err).NotTo(HaveOccurred()) defer store.Cancel() g.Expect(store.GetReservationByID(testPool1, "id1", testIFName)).NotTo(BeNil()) @@ -107,9 +121,11 @@ var _ = Describe("Cleaner", func() { g.Expect(store.GetReservationByID(testPool1, "id3", testIFName)).To(BeNil()) g.Expect(store.GetReservationByID(testPool2, "id4", testIFName)).To(BeNil()) g.Expect(store.GetReservationByID(testPool2, "id5", testIFName)).To(BeNil()) + g.Expect(store.ListPools()).To(And( + ContainElements(testPool1, testPool3), + Not(ContainElement(testPool2)))) }, 10).Should(Succeed()) - close(done) }() Eventually(done, time.Minute).Should(BeClosed()) }) diff --git a/pkg/ipam-node/store/mocks/Session.go b/pkg/ipam-node/store/mocks/Session.go index 2cfa5d5..ff07eeb 100644 --- a/pkg/ipam-node/store/mocks/Session.go +++ b/pkg/ipam-node/store/mocks/Session.go @@ -308,6 +308,39 @@ func (_c *Session_ReleaseReservationByID_Call) RunAndReturn(run func(string, str return _c } +// RemovePool provides a mock function with given fields: pool +func (_m *Session) RemovePool(pool string) { + _m.Called(pool) +} + +// Session_RemovePool_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemovePool' +type Session_RemovePool_Call struct { + *mock.Call +} + +// RemovePool is a helper method to define mock.On call +// - pool string +func (_e *Session_Expecter) RemovePool(pool interface{}) *Session_RemovePool_Call { + return &Session_RemovePool_Call{Call: _e.mock.On("RemovePool", pool)} +} + +func (_c *Session_RemovePool_Call) Run(run func(pool string)) *Session_RemovePool_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *Session_RemovePool_Call) Return() *Session_RemovePool_Call { + _c.Call.Return() + return _c +} + +func (_c *Session_RemovePool_Call) RunAndReturn(run func(string)) *Session_RemovePool_Call { + _c.Call.Return(run) + return _c +} + // Reserve provides a mock function with given fields: pool, id, ifName, meta, address func (_m *Session) Reserve(pool string, id string, ifName string, meta types.ReservationMetadata, address net.IP) error { ret := _m.Called(pool, id, ifName, meta, address) diff --git a/pkg/ipam-node/store/store.go b/pkg/ipam-node/store/store.go index fb4a0b7..0c53577 100644 --- a/pkg/ipam-node/store/store.go +++ b/pkg/ipam-node/store/store.go @@ -60,6 +60,8 @@ type Session interface { ListReservations(pool string) []types.Reservation // ListPools return list with names of all known pools ListPools() []string + // RemovePool removes information about the pool from the store + RemovePool(pool string) // GetLastReservedIP returns last reserved IP for the pool or nil GetLastReservedIP(pool string) net.IP // SetLastReservedIP set last reserved IP fot the pool @@ -256,6 +258,13 @@ func (s *session) ListPools() []string { return pools } +// RemovePool is the Session interface implementation for session +func (s *session) RemovePool(pool string) { + s.checkClosed() + delete(s.tmpData.Pools, pool) + s.isModified = true +} + // GetLastReservedIP is the Session interface implementation for session func (s *session) GetLastReservedIP(pool string) net.IP { s.checkClosed() diff --git a/pkg/ipam-node/store/store_test.go b/pkg/ipam-node/store/store_test.go index 8d6aa79..160e56f 100644 --- a/pkg/ipam-node/store/store_test.go +++ b/pkg/ipam-node/store/store_test.go @@ -230,4 +230,12 @@ var _ = Describe("Store", func() { s.Reserve(testPoolName, "other", testNetIfName, types.ReservationMetadata{}, net.ParseIP(testIP))).To(MatchError(storePkg.ErrIPAlreadyReserved)) }) + It("Remove pool data", func() { + s, err := store.Open(context.Background()) + Expect(err).NotTo(HaveOccurred()) + createTestReservation(s) + Expect(s.ListReservations(testPoolName)).NotTo(BeEmpty()) + s.RemovePool(testPoolName) + Expect(s.ListReservations(testPoolName)).To(BeEmpty()) + }) })