Skip to content

Commit

Permalink
Make sure to only init watcher once, RESTConfig should have backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@redhat.com>
  • Loading branch information
vincepri committed Mar 1, 2023
1 parent 689d426 commit 88a7b48
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 45 deletions.
79 changes: 41 additions & 38 deletions examples/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func main() {
For(&corev1.Pod{}).Complete(reconcile.Func(
func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling pod", "name", req.Name)

cluster, err := mgr.GetCluster(req.Cluster)
if err != nil {
Expand All @@ -75,6 +74,7 @@ func main() {
if err := client.Get(ctx, req.NamespacedName, pod); err != nil {
return reconcile.Result{}, err
}
log.Info("Reconciling pod", "name", pod.Name, "uuid", pod.UID)

// Print any annotations that start with fleet.
for k, v := range pod.Labels {
Expand All @@ -87,7 +87,7 @@ func main() {
},
))

entryLog.Info("starting manager")
entryLog.Info("Starting manager")
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
entryLog.Error(err, "unable to run manager")
os.Exit(1)
Expand Down Expand Up @@ -129,6 +129,7 @@ func (k *KindAdapter) Watch() (logical.Watcher, error) {
}

type KindWatcher struct {
init sync.Once
wg sync.WaitGroup
ch chan logical.Event
cancel context.CancelFunc
Expand All @@ -142,47 +143,49 @@ func (k *KindWatcher) Stop() {
close(k.ch)
}
func (k *KindWatcher) ResultChan() <-chan logical.Event {
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
set := sets.New[string]()
k.wg.Add(1)
go func() {
defer k.wg.Done()
for {
select {
case <-time.After(2 * time.Second):
provider := kind.NewProvider()
list, err := provider.List()
if err != nil {
klog.Error(err)
continue
}
newSet := sets.New(list...)
// Check for new clusters.
for _, cluster := range newSet.Difference(set).UnsortedList() {
if !strings.HasPrefix(cluster, "fleet-") {
k.init.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
set := sets.New[string]()
k.wg.Add(1)
go func() {
defer k.wg.Done()
for {
select {
case <-time.After(2 * time.Second):
provider := kind.NewProvider()
list, err := provider.List()
if err != nil {
klog.Error(err)
continue
}
k.ch <- logical.Event{
Type: watch.Added,
Name: logical.Name(cluster),
}
}
// Check for deleted clusters.
for _, cluster := range set.Difference(newSet).UnsortedList() {
if !strings.HasPrefix(cluster, "fleet-") {
continue
newSet := sets.New(list...)
// Check for new clusters.
for _, cluster := range newSet.Difference(set).UnsortedList() {
if !strings.HasPrefix(cluster, "fleet-") {
continue
}
k.ch <- logical.Event{
Type: watch.Added,
Name: logical.Name(cluster),
}
}
k.ch <- logical.Event{
Type: watch.Deleted,
Name: logical.Name(cluster),
// Check for deleted clusters.
for _, cluster := range set.Difference(newSet).UnsortedList() {
if !strings.HasPrefix(cluster, "fleet-") {
continue
}
k.ch <- logical.Event{
Type: watch.Deleted,
Name: logical.Name(cluster),
}
}
set = newSet
case <-ctx.Done():
return
}
set = newSet
case <-ctx.Done():
return
}
}
}()
}()
})
return k.ch
}
26 changes: 19 additions & 7 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
}

func (cm *controllerManager) GetCluster(name logical.Name) (cluster.Cluster, error) {
return cm.getLogicalCluster(name)
return cm.getLogicalCluster(context.TODO(), name)
}

func (cm *controllerManager) GetHTTPClient() *http.Client {
Expand Down Expand Up @@ -337,7 +337,7 @@ func (cm *controllerManager) syncClusterAwareRunnables() {
}
}

func (cm *controllerManager) getLogicalCluster(name logical.Name) (c *logicalCluster, err error) {
func (cm *controllerManager) getLogicalCluster(ctx context.Context, name logical.Name) (c *logicalCluster, err error) {
// Check if the manager was configured with a logical adapter,
// otherwise we cannot retrieve the cluster.
if cm.logicalAdapter == nil {
Expand All @@ -363,10 +363,22 @@ func (cm *controllerManager) getLogicalCluster(name logical.Name) (c *logicalClu
}

// Create a new cluster.
cfg, err := cm.logicalAdapter.RESTConfig(name)
if err != nil {
return nil, fmt.Errorf("cannot find logical cluster %q from adapter: %w", name, err)
var cfg *rest.Config
{
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var watchErr error
if err := wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (done bool, err error) {
cfg, watchErr = cm.logicalAdapter.RESTConfig(name)
if watchErr != nil {
return false, nil // retry
}
return true, nil
}); err != nil {
return nil, fmt.Errorf("failed to retrieve RESTConfig: %w", kerrors.NewAggregate([]error{err, watchErr}))
}
}

cl, err := cluster.New(cfg, cm.defaultClusterOptions)
if err != nil {
return nil, fmt.Errorf("cannot create logical cluster %q: %w", name, err)
Expand Down Expand Up @@ -627,7 +639,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g
return err
}
for _, name := range clusterList {
if _, err := cm.getLogicalCluster(name); err != nil {
if _, err := cm.getLogicalCluster(ctx, name); err != nil {
return err
}
}
Expand Down Expand Up @@ -672,7 +684,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g
case event := <-watcher.ResultChan():
switch event.Type {
case watch.Added, watch.Modified:
if _, err := cm.getLogicalCluster(event.Name); err != nil {
if _, err := cm.getLogicalCluster(ctx, event.Name); err != nil {
return err
}
cm.syncClusterAwareRunnables()
Expand Down

0 comments on commit 88a7b48

Please sign in to comment.