Skip to content

Commit

Permalink
🌱 Source should retry to get informers until timeout expires
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Oct 4, 2021
1 parent 13f1400 commit b627bce
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"errors"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
var (
i cache.Informer
lastErr error
)

// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(lastErr, &kindMatchErr) {
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
if lastErr != nil {
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
return
}
ks.started <- err
return
}

i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
Expand Down

0 comments on commit b627bce

Please sign in to comment.