From b9c9be8db09c9dab6b469b7d00f9aa808abc6e7c Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Thu, 4 Apr 2024 15:36:07 -0700 Subject: [PATCH] :bug: Prevent race when informers are started more than once If `Informers` are started a second time, there is a possibility for a data race because it sets a `ctx` field on itself. This write is protected by a mutex, but reads from that field are not. --- pkg/cache/cache.go | 2 -- pkg/cache/cache_test.go | 6 ++++++ pkg/cache/internal/informers.go | 7 +++++++ pkg/cache/multi_namespace_cache.go | 14 +++++++++----- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5e841ee179..e23045bf40 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -39,11 +39,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - logf "sigs.k8s.io/controller-runtime/pkg/internal/log" ) var ( - log = logf.RuntimeLog.WithName("object-cache") defaultSyncPeriod = 10 * time.Hour ) diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index cfe0856a1e..2d88b43ef3 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1849,6 +1849,12 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca ) }) Describe("as an Informer", func() { + It("should error when starting the cache a second time", func() { + err := informerCache.Start(context.Background()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("Informer already started")) + }) + Context("with structured objects", func() { It("should be able to get informer for the object", func() { By("getting a shared index informer for a pod") diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index c270e809ca..f1e7a00eb7 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -18,6 +18,7 @@ package internal import ( "context" + "errors" "fmt" "math/rand" "net/http" @@ -186,6 +187,12 @@ type Informers struct { // Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. func (ip *Informers) Start(ctx context.Context) error { + select { + case <-ip.startWait: + return errors.New("Informer already started") //nolint:stylecheck + default: + // continue + } func() { ip.mu.Lock() defer ip.mu.Unlock() diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index e38da1455c..da69f40f65 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -163,12 +163,13 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema } func (c *multiNamespaceCache) Start(ctx context.Context) error { + errs := make(chan error) // start global cache if c.clusterCache != nil { go func() { err := c.clusterCache.Start(ctx) if err != nil { - log.Error(err, "cluster scoped cache failed to start") + errs <- fmt.Errorf("failed to start cluster-scoped cache: %w", err) } }() } @@ -177,13 +178,16 @@ func (c *multiNamespaceCache) Start(ctx context.Context) error { for ns, cache := range c.namespaceToCache { go func(ns string, cache Cache) { if err := cache.Start(ctx); err != nil { - log.Error(err, "multi-namespace cache failed to start namespaced informer", "namespace", ns) + errs <- fmt.Errorf("failed to start cache for namespace %s: %w", ns, err) } }(ns, cache) } - - <-ctx.Done() - return nil + select { + case <-ctx.Done(): + return nil + case err := <-errs: + return err + } } func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {