From b627bce0589b24037577a63837cee415a77af23e Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Mon, 4 Oct 2021 11:32:22 -0700 Subject: [PATCH] :seedling: Source should retry to get informers until timeout expires Signed-off-by: Vince Prignano --- pkg/source/source.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/pkg/source/source.go b/pkg/source/source.go index a63b37c443..708c5a5bfc 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -21,8 +21,10 @@ import ( "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chan error) go func() { - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(ctx, ks.Type) - if err != nil { - kindMatchErr := &meta.NoKindMatchError{} - if errors.As(err, &kindMatchErr) { - log.Error(err, "if kind is a CRD, it should be installed before calling Start", - "kind", kindMatchErr.GroupKind) + var ( + i cache.Informer + lastErr error + ) + + // Tries to get an informer until it returns true, + // an error or the specified context is cancelled or expired. + if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { + // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + i, lastErr = ks.cache.GetInformer(ctx, ks.Type) + if lastErr != nil { + kindMatchErr := &meta.NoKindMatchError{} + if errors.As(lastErr, &kindMatchErr) { + log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", + "kind", kindMatchErr.GroupKind) + } + return false, nil // Retry. + } + return true, nil + }); err != nil { + if lastErr != nil { + ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr) + return } ks.started <- err return } + i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here