diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 33c78b65a1..f6d56cad16 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -189,7 +189,7 @@ func (cm *controllerManager) add(r Runnable) error { if err := cm.SetFields(r); err != nil { return err } - return cm.runnables.Add(r, nil) + return cm.runnables.Add(r) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. @@ -456,21 +456,21 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks // to never start because no cache can be populated. - if err := cm.runnables.Webhooks.StartAndWaitReady(cm.internalCtx); err != nil { + if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if err != wait.ErrWaitTimeout { return err } } // Start and wait for caches. - if err := cm.runnables.Caches.StartAndWaitReady(cm.internalCtx); err != nil { + if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if err != wait.ErrWaitTimeout { return err } } // Start the non-leaderelection Runnables after the cache has synced. - if err := cm.runnables.Others.StartAndWaitReady(cm.internalCtx); err != nil { + if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if err != wait.ErrWaitTimeout { return err } @@ -601,7 +601,7 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } func (cm *controllerManager) startLeaderElectionRunnables() error { - return cm.runnables.LeaderElection.StartAndWaitReady(cm.internalCtx) + return cm.runnables.LeaderElection.Start(cm.internalCtx) } func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) { diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 3fff91cbc5..0dd1cc1f5b 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1817,9 +1817,6 @@ func (c *startSignalingInformer) Start(ctx context.Context) error { func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool { defer func() { - for !c.started() { - continue - } c.mu.Lock() c.wasSynced = true c.mu.Unlock() diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index 7c43a20645..ded8aed221 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -4,9 +4,7 @@ import ( "context" "errors" "sync" - "time" - "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -18,8 +16,8 @@ var ( // a ready check. type readyRunnable struct { Runnable - Check runnableCheck - Ready bool + Check runnableCheck + signalReady bool } // runnableCheck can be passed to Add() to let the runnable group determine that a @@ -46,33 +44,27 @@ func newRunnables(errChan chan error) *runnables { } } -// Add adds a runnable and its ready check to the closest -// group of runnable that they belong to. +// Add adds a runnable to closest group of runnable that they belong to. // // Add should be able to be called before and after Start, but not after StopAndWait. // Add should return an error when called during StopAndWait. // The runnables added before Start are started when Start is called. // The runnables added after Start are started directly. -func (r *runnables) Add(fn Runnable, ready runnableCheck) error { - if ready == nil { - // If we don't have a readiness check, always return true. - ready = func(_ context.Context) bool { return true } - } - +func (r *runnables) Add(fn Runnable) error { switch runnable := fn.(type) { case hasCache: return r.Caches.Add(fn, func(ctx context.Context) bool { - return ready(ctx) && runnable.GetCache().WaitForCacheSync(ctx) + return runnable.GetCache().WaitForCacheSync(ctx) }) case *webhook.Server: - return r.Webhooks.Add(fn, ready) + return r.Webhooks.Add(fn, nil) case LeaderElectionRunnable: if !runnable.NeedLeaderElection() { - return r.Others.Add(fn, ready) + return r.Others.Add(fn, nil) } - return r.LeaderElection.Add(fn, ready) + return r.LeaderElection.Add(fn, nil) default: - return r.LeaderElection.Add(fn, ready) + return r.LeaderElection.Add(fn, nil) } } @@ -85,9 +77,11 @@ type runnableGroup struct { ctx context.Context cancel context.CancelFunc - start sync.Mutex - startOnce sync.Once - started bool + start sync.Mutex + startOnce sync.Once + started bool + startQueue []*readyRunnable + startReadyCh chan *readyRunnable stop sync.RWMutex stopOnce sync.Once @@ -104,23 +98,14 @@ type runnableGroup struct { // wg is an internal sync.WaitGroup that allows us to properly stop // and wait for all the runnables to finish before returning. wg *sync.WaitGroup - - // group is a sync.Map that contains every runnable ever. - // The key of the map is the runnable itself (key'd by pointer), - // while the value is its ready state. - // - // The group of runnable is append-only, runnables scheduled - // through this group are going to be stored in this internal map - // until the application exits. The limit is the available memory. - group *sync.Map } func newRunnableGroup(errChan chan error) *runnableGroup { r := &runnableGroup{ - errChan: errChan, - ch: make(chan *readyRunnable), - wg: new(sync.WaitGroup), - group: new(sync.Map), + startReadyCh: make(chan *readyRunnable), + errChan: errChan, + ch: make(chan *readyRunnable), + wg: new(sync.WaitGroup), } r.ctx, r.cancel = context.WithCancel(context.Background()) return r @@ -133,25 +118,57 @@ func (r *runnableGroup) Started() bool { return r.started } -// StartAndWaitReady starts all the runnables previously -// added to the group and waits for all to report ready. -func (r *runnableGroup) StartAndWaitReady(ctx context.Context) error { - r.Start() - return r.WaitReady(ctx) -} +// Start starts the group and waits for all +// initially registered runnables to start. +// It can only be called once, subsequent calls have no effect. +func (r *runnableGroup) Start(ctx context.Context) error { + var retErr error -// Start starts the group, it can only be called once. -func (r *runnableGroup) Start() { r.startOnce.Do(func() { + defer close(r.startReadyCh) + + // Start the internal reconciler. go r.reconcile() + + // Start the group and queue up all + // the runnables that were added prior. r.start.Lock() r.started = true - r.group.Range(func(key, _ interface{}) bool { - r.ch <- key.(*readyRunnable) - return true - }) + for _, rn := range r.startQueue { + rn.signalReady = true + r.ch <- rn + } r.start.Unlock() + + // If we don't have any queue, return. + if len(r.startQueue) == 0 { + return + } + + // Wait for all runnables to signal. + for { + select { + case <-ctx.Done(): + if err := ctx.Err(); !errors.Is(err, context.Canceled) { + retErr = err + } + case rn := <-r.startReadyCh: + for i, existing := range r.startQueue { + if existing == rn { + // Remove the item from the start queue. + r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...) + break + } + } + // We're done waiting if the queue is empty, return. + if len(r.startQueue) == 0 { + return + } + } + } }) + + return retErr } // reconcile is our main entrypoint for every runnable added @@ -185,26 +202,17 @@ func (r *runnableGroup) reconcile() { go func(rn *readyRunnable) { go func() { if rn.Check(r.ctx) { - r.group.Store(rn, true) + if rn.signalReady { + r.startReadyCh <- rn + } } }() // If we return, the runnable ended cleanly // or returned an error to the channel. // - // We should always decrement the WaitGroup and - // mark the runnable as ready. - // - // Think about the group as an append-only system. - // - // A runnable is marked as ready if: - // - The health check return true. - // - The runnable Start() method returned and - // it either finished cleanly (e.g. one shot operations) - // or it failed to run and it returned an error which - // gets propagated to the manager. + // We should always decrement the WaitGroup here. defer r.wg.Done() - defer r.group.Store(rn, true) // Start the runnable. if err := rn.Start(r.ctx); err != nil { @@ -214,27 +222,6 @@ func (r *runnableGroup) reconcile() { } } -// WaitReady polls until the group is ready or until the context is cancelled. -func (r *runnableGroup) WaitReady(ctx context.Context) error { - return wait.PollImmediateInfiniteWithContext(ctx, - 100*time.Millisecond, - func(_ context.Context) (bool, error) { - if !r.Started() { - return false, nil - } - ready, total := 0, 0 - r.group.Range(func(_, value interface{}) bool { - total++ - if rd, ok := value.(bool); ok && rd { - ready++ - } - return true - }) - return ready == total, nil - }, - ) -} - // Add should be able to be called before and after Start, but not after StopAndWait. // Add should return an error when called during StopAndWait. func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { @@ -261,11 +248,10 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { { r.start.Lock() - // Store the runnable in the internal buffer. - r.group.Store(readyRunnable, false) - // Check if we're already started. if !r.started { + // Store the runnable in the internal if not. + r.startQueue = append(r.startQueue, readyRunnable) r.start.Unlock() return nil } @@ -283,7 +269,7 @@ func (r *runnableGroup) StopAndWait(ctx context.Context) { // Close the reconciler channel once we're done. defer close(r.ch) - r.Start() + _ = r.Start(ctx) r.stop.Lock() // Store the stopped variable so we don't accept any new // runnables for the time being. diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index 5c9ee81b64..57e0ad6387 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -24,29 +24,15 @@ var _ = Describe("runnables", func() { It("should add caches to the appropriate group", func() { cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}} r := newRunnables(errCh) - Expect(r.Add(cache, nil)).To(Succeed()) - var found *readyRunnable - r.Caches.group.Range(func(key, value interface{}) bool { - found = key.(*readyRunnable) - // Only iterate once. - return false - }) - Expect(found).ToNot(BeNil()) - Expect(found.Runnable).To(BeIdenticalTo(cache)) + Expect(r.Add(cache)).To(Succeed()) + Expect(r.Caches.startQueue).To(HaveLen(1)) }) It("should add webhooks to the appropriate group", func() { webhook := &webhook.Server{} r := newRunnables(errCh) - Expect(r.Add(webhook, nil)).To(Succeed()) - var found *readyRunnable - r.Webhooks.group.Range(func(key, value interface{}) bool { - found = key.(*readyRunnable) - // Only iterate once. - return false - }) - Expect(found).ToNot(BeNil()) - Expect(found.Runnable).To(BeIdenticalTo(webhook)) + Expect(r.Add(webhook)).To(Succeed()) + Expect(r.Webhooks.startQueue).To(HaveLen(1)) }) It("should add any runnable to the leader election group", func() { @@ -56,18 +42,8 @@ var _ = Describe("runnables", func() { }) r := newRunnables(errCh) - Expect(r.Add(runnable, nil)).To(Succeed()) - var found *readyRunnable - r.LeaderElection.group.Range(func(key, value interface{}) bool { - found = key.(*readyRunnable) - // Only iterate once. - return false - }) - Expect(found).ToNot(BeNil()) - - // Functions are not comparable, we just make sure it's the same type and it returns what we expect - Expect(found.Runnable).To(BeAssignableToTypeOf(runnable)) - Expect(found.Runnable.Start(context.Background())).To(MatchError(err)) + Expect(r.Add(runnable)).To(Succeed()) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) }) }) @@ -94,13 +70,12 @@ var _ = Describe("runnableGroup", func() { <-ctx.Done() return nil }), nil)).To(Succeed()) - rg.Start() + Expect(rg.Start(ctx)).To(Succeed()) Expect(rg.Started()).To(BeTrue()) Expect(rg.Add(RunnableFunc(func(c context.Context) error { <-ctx.Done() return nil }), nil)).To(Succeed()) - Expect(rg.WaitReady(ctx)).To(Succeed()) }) It("should be able to add new runnables before and after start concurrently", func() { @@ -109,8 +84,9 @@ var _ = Describe("runnableGroup", func() { rg := newRunnableGroup(errCh) go func() { + defer GinkgoRecover() <-time.After(50 * time.Millisecond) - rg.Start() + Expect(rg.Start(ctx)).To(Succeed()) }() for i := 0; i < 20; i++ { @@ -124,15 +100,6 @@ var _ = Describe("runnableGroup", func() { }), nil)).To(Succeed()) }(i) } - Expect(rg.WaitReady(ctx)).To(Succeed()) - Eventually(func() int { - i := 0 - rg.group.Range(func(key, value interface{}) bool { - i++ - return true - }) - return i - }).Should(BeNumerically("==", 20)) }) It("should be able to close the group and wait for all runnables to finish", func() { @@ -148,7 +115,7 @@ var _ = Describe("runnableGroup", func() { return nil }), nil)).To(Succeed()) } - Expect(rg.StartAndWaitReady(ctx)).To(Succeed()) + Expect(rg.Start(ctx)).To(Succeed()) // Cancel the context, asking the runnables to exit. cancel() @@ -167,8 +134,9 @@ var _ = Describe("runnableGroup", func() { rg := newRunnableGroup(errCh) go func() { + defer GinkgoRecover() <-time.After(50 * time.Millisecond) - rg.Start() + Expect(rg.Start(ctx)).To(Succeed()) }() for i := 0; i < 20; i++ { @@ -184,15 +152,6 @@ var _ = Describe("runnableGroup", func() { })).To(Succeed()) }(i) } - Expect(rg.WaitReady(ctx)).To(Succeed()) - Eventually(func() int { - i := 0 - rg.group.Range(func(key, value interface{}) bool { - i++ - return true - }) - return i - }).Should(BeNumerically("==", 20)) }) It("should not turn ready if some readiness check fail", func() { @@ -201,8 +160,9 @@ var _ = Describe("runnableGroup", func() { rg := newRunnableGroup(errCh) go func() { + defer GinkgoRecover() <-time.After(50 * time.Millisecond) - rg.Start() + Expect(rg.Start(ctx)).To(Succeed()) }() for i := 0; i < 20; i++ { @@ -218,6 +178,5 @@ var _ = Describe("runnableGroup", func() { })).To(Succeed()) }(i) } - Expect(rg.WaitReady(ctx)).ToNot(Succeed()) }) })