diff --git a/examples/fleet-namespace/main.go b/examples/fleet-namespace/main.go index 06facec1c7..c060b6240c 100644 --- a/examples/fleet-namespace/main.go +++ b/examples/fleet-namespace/main.go @@ -27,7 +27,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" toolscache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -99,10 +98,9 @@ func main() { entryLog.Error(err, "unable to set up provider") os.Exit(1) } - provider := &NamespacedClusterProvider{Cluster: cl} + provider := NewNamespacedClusterProvider(cl) - // Setup a cluster-aware Manager, watching the clusters (= namespaces) through - // the cluster provider. + // Setup a cluster-aware Manager, with the provider to lookup clusters. entryLog.Info("Setting up cluster-aware manager") mgr, err := manager.New(cfg, manager.Options{ NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { @@ -142,9 +140,15 @@ func main() { } entryLog.Info("Starting provider") + if err := provider.Start(ctx, mgr); err != nil { // does not block + entryLog.Error(err, "unable to start provider") + os.Exit(1) + } + + entryLog.Info("Starting cluster") g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - if err := ignoreCanceled(provider.Start(ctx)); err != nil { + if err := ignoreCanceled(cl.Start(ctx)); err != nil { return fmt.Errorf("failed to start provider: %w", err) } return nil @@ -169,80 +173,93 @@ func main() { // to "default" and vice versa, simulating a multi-cluster setup. It uses one // informer to watch objects for all namespaces. type NamespacedClusterProvider struct { - cluster.Cluster -} + cluster cluster.Cluster -func (p *NamespacedClusterProvider) Get(ctx context.Context, clusterName string, opts ...cluster.Option) (cluster.Cluster, error) { - ns := &corev1.Namespace{} - if err := p.Cluster.GetCache().Get(ctx, client.ObjectKey{Name: clusterName}, ns); err != nil { - return nil, err - } + mgr manager.Manager - return &NamespacedCluster{clusterName: clusterName, Cluster: p.Cluster}, nil + lock sync.RWMutex + clusters map[string]cluster.Cluster + cancelFns map[string]context.CancelFunc } -func (p *NamespacedClusterProvider) List(ctx context.Context) ([]string, error) { - nss := &corev1.NamespaceList{} - if err := p.Cluster.GetCache().List(ctx, nss); err != nil { - return nil, err - } - - res := make([]string, 0, len(nss.Items)) - for _, ns := range nss.Items { - res = append(res, ns.Name) +func NewNamespacedClusterProvider(cl cluster.Cluster) *NamespacedClusterProvider { + return &NamespacedClusterProvider{ + cluster: cl, + clusters: map[string]cluster.Cluster{}, + cancelFns: map[string]context.CancelFunc{}, } - return res, nil } -func (p *NamespacedClusterProvider) Watch(ctx context.Context) (cluster.Watcher, error) { - inf, err := p.Cluster.GetCache().GetInformer(ctx, &corev1.Namespace{}) +func (p *NamespacedClusterProvider) Start(ctx context.Context, mgr manager.Manager) error { + nsInf, err := p.cluster.GetCache().GetInformer(ctx, &corev1.Namespace{}) if err != nil { - return nil, err + return err } - return &NamespaceWatcher{inf: inf, ch: make(chan cluster.WatchEvent)}, nil -} -type NamespaceWatcher struct { - inf cache.Informer - init sync.Once - ch chan cluster.WatchEvent - reg toolscache.ResourceEventHandlerRegistration -} + if _, err := nsInf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ns := obj.(*corev1.Namespace) + + p.lock.RLock() + if _, ok := p.clusters[ns.Name]; ok { + defer p.lock.RUnlock() + return + } + + // create new cluster + p.lock.Lock() + clusterCtx, cancel := context.WithCancel(ctx) + cl := &NamespacedCluster{clusterName: ns.Name, Cluster: p.cluster} + p.clusters[ns.Name] = cl + p.cancelFns[ns.Name] = cancel + p.lock.Unlock() + + if err := mgr.Engage(clusterCtx, cl); err != nil { + runtime.HandleError(fmt.Errorf("failed to engage manager with cluster %q: %w", ns.Name, err)) + + // cleanup + p.lock.Lock() + delete(p.clusters, ns.Name) + delete(p.cancelFns, ns.Name) + p.lock.Unlock() + } + }, + DeleteFunc: func(obj interface{}) { + ns := obj.(*corev1.Namespace) + + p.lock.RLock() + cl, ok := p.clusters[ns.Name] + if !ok { + p.lock.RUnlock() + return + } + p.lock.RUnlock() + + if err := mgr.Disengage(ctx, cl); err != nil { + runtime.HandleError(fmt.Errorf("failed to disengage manager with cluster %q: %w", ns.Name, err)) + } -func (w *NamespaceWatcher) Stop() { - if w.reg != nil { - _ = w.inf.RemoveEventHandler(w.reg) + // stop and forget + p.lock.Lock() + p.cancelFns[ns.Name]() + delete(p.clusters, ns.Name) + delete(p.cancelFns, ns.Name) + p.lock.Unlock() + }, + }); err != nil { + return err } - close(w.ch) + + return nil } -func (w *NamespaceWatcher) ResultChan() <-chan cluster.WatchEvent { - w.init.Do(func() { - w.reg, _ = w.inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - ns := obj.(*corev1.Namespace) - w.ch <- cluster.WatchEvent{ - Type: watch.Added, - ClusterName: ns.Name, - } - }, - DeleteFunc: func(obj interface{}) { - ns := obj.(*corev1.Namespace) - w.ch <- cluster.WatchEvent{ - Type: watch.Deleted, - ClusterName: ns.Name, - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - ns := newObj.(*corev1.Namespace) - w.ch <- cluster.WatchEvent{ - Type: watch.Modified, - ClusterName: ns.Name, - } - }, - }) - }) - return w.ch +func (p *NamespacedClusterProvider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) { + p.lock.RLock() + defer p.lock.RUnlock() + if cl, ok := p.clusters[clusterName]; ok { + return cl, nil + } + return nil, fmt.Errorf("cluster %s not found", clusterName) } func ignoreCanceled(err error) error { diff --git a/examples/fleet/main.go b/examples/fleet/main.go index 4e950969b7..4dd366cfd8 100644 --- a/examples/fleet/main.go +++ b/examples/fleet/main.go @@ -18,14 +18,16 @@ package main import ( "context" + "fmt" "os" "strings" "sync" "time" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -44,6 +46,7 @@ func init() { } func main() { + ctx := signals.SetupSignalHandler() entryLog := log.Log.WithName("entrypoint") testEnv := &envtest.Environment{} @@ -62,11 +65,15 @@ func main() { } }() - // Setup a Manager + // Setup a Manager, note that this not yet engages clusters, only makes them available. entryLog.Info("Setting up manager") + provider := &KindClusterProvider{ + log: log.Log.WithName("kind-cluster-provider"), + clusters: map[string]cluster.Cluster{}, + } mgr, err := manager.New( cfg, - manager.Options{ExperimentalClusterProvider: &KindClusterProvider{}}, + manager.Options{ExperimentalClusterProvider: provider}, ) if err != nil { entryLog.Error(err, "unable to set up overall controller manager") @@ -102,109 +109,145 @@ func main() { }, )) + entryLog.Info("Starting provider") + go func() { + if err := provider.Run(ctx, mgr); err != nil { + entryLog.Error(err, "unable to run provider") + os.Exit(1) + } + }() + entryLog.Info("Starting manager") - if err := mgr.Start(signals.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { entryLog.Error(err, "unable to run manager") os.Exit(1) } } // KindClusterProvider is a cluster provider that works with a local Kind instance. -type KindClusterProvider struct{} +type KindClusterProvider struct { + Options []cluster.Option + log logr.Logger + lock sync.RWMutex + clusters map[string]cluster.Cluster + cancelFns map[string]context.CancelFunc +} -func (k *KindClusterProvider) Get(ctx context.Context, clusterName string, opts ...cluster.Option) (cluster.Cluster, error) { - provider := kind.NewProvider() - kubeconfig, err := provider.KubeConfig(clusterName, false) - if err != nil { - return nil, err - } - // Parse the kubeconfig into a rest.Config. - cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfig)) - if err != nil { - return nil, err +var _ cluster.Provider = &KindClusterProvider{} + +func (k *KindClusterProvider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) { + k.lock.RLock() + defer k.lock.RUnlock() + if cl, ok := k.clusters[clusterName]; ok { + return cl, nil } - return cluster.New(cfg, opts...) + + return nil, fmt.Errorf("cluster %s not found", clusterName) } -func (k *KindClusterProvider) List(ctx context.Context) ([]string, error) { +func (k *KindClusterProvider) Run(ctx context.Context, mgr manager.Manager) error { provider := kind.NewProvider() - list, err := provider.List() - if err != nil { - return nil, err + + // initial list to smoke test + if _, err := provider.List(); err != nil { + return err } - res := make([]string, 0, len(list)) - for _, cluster := range list { - if !strings.HasPrefix(cluster, "fleet-") { - continue + + return wait.PollUntilContextCancel(ctx, time.Second*2, true, func(ctx context.Context) (done bool, err error) { + list, err := provider.List() + if err != nil { + k.log.Info("failed to list kind clusters", "error", err) + return false, nil // keep going } - res = append(res, cluster) - } - return res, nil -} -func (k *KindClusterProvider) Watch(_ context.Context) (cluster.Watcher, error) { - return &KindWatcher{ch: make(chan cluster.WatchEvent)}, nil -} + // start new clusters + for _, clusterName := range list { + log := k.log.WithValues("cluster", clusterName) -type KindWatcher struct { - init sync.Once - wg sync.WaitGroup - ch chan cluster.WatchEvent - cancel context.CancelFunc -} + // skip? + if !strings.HasPrefix(clusterName, "fleet-") { + continue + } + k.lock.RLock() + if _, ok := k.clusters[clusterName]; ok { + continue + } + k.lock.RUnlock() -func (k *KindWatcher) Stop() { - if k.cancel != nil { - k.cancel() - } - k.wg.Wait() - close(k.ch) -} + // create a new cluster + kubeconfig, err := provider.KubeConfig(clusterName, false) + if err != nil { + k.log.Info("failed to get kind kubeconfig", "error", err) + return false, nil // keep going + } + cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfig)) + if err != nil { + k.log.Info("failed to create rest config", "error", err) + return false, nil // keep going + } + cl, err := cluster.New(cfg, k.Options...) + if err != nil { + k.log.Info("failed to create cluster", "error", err) + return false, nil // keep going + } + clusterCtx, cancel := context.WithCancel(ctx) + go func() { + if err := cl.Start(clusterCtx); err != nil { + log.Error(err, "failed to start cluster") + return + } + }() + if !cl.GetCache().WaitForCacheSync(ctx) { + cancel() + log.Info("failed to sync cache") + return false, nil + } -func (k *KindWatcher) ResultChan() <-chan cluster.WatchEvent { - k.init.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - k.cancel = cancel - set := sets.New[string]() - k.wg.Add(1) - go func() { - defer k.wg.Done() - for { - select { - case <-time.After(2 * time.Second): - provider := kind.NewProvider() - list, err := provider.List() - if err != nil { - klog.Error(err) - continue - } - newSet := sets.New(list...) - // Check for new clusters. - for _, cl := range newSet.Difference(set).UnsortedList() { - if !strings.HasPrefix(cl, "fleet-") { - continue - } - k.ch <- cluster.WatchEvent{ - Type: watch.Added, - ClusterName: cl, - } - } - // Check for deleted clusters. - for _, cl := range set.Difference(newSet).UnsortedList() { - if !strings.HasPrefix(cl, "fleet-") { - continue - } - k.ch <- cluster.WatchEvent{ - Type: watch.Deleted, - ClusterName: cl, - } + // remember + k.lock.Lock() + k.clusters[clusterName] = cl + k.cancelFns[clusterName] = cancel + k.lock.Unlock() + + // engage manager + if mgr != nil { + if err := mgr.Engage(clusterCtx, cl); err != nil { + log.Error(err, "failed to engage manager") + k.lock.Lock() + delete(k.clusters, clusterName) + delete(k.cancelFns, clusterName) + k.lock.Unlock() + return false, nil + } + } + } + + // remove old clusters + kindNames := sets.New(list...) + k.lock.Lock() + clusterNames := make([]string, 0, len(k.clusters)) + for name := range k.clusters { + clusterNames = append(clusterNames, name) + } + k.lock.Unlock() + for _, name := range clusterNames { + if !kindNames.Has(name) { + // disengage manager + if mgr != nil { + if err := mgr.Disengage(ctx, k.clusters[name]); err != nil { + k.log.Error(err, "failed to disengage manager") } - set = newSet - case <-ctx.Done(): - return } + + // stop and forget + k.lock.Lock() + k.cancelFns[name]() + delete(k.clusters, name) + delete(k.cancelFns, name) + k.lock.Unlock() } - }() + } + + return false, nil }) - return k.ch } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0e92697989..732cbbc895 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,28 +35,6 @@ import ( intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" ) -// Aware is an interface that can be implemented by components that -// can engage and disengange when clusters are added or removed. -type Aware interface { - // Engage gets called when the component should start operations for the given Cluster. - // The given context is tied to the Cluster's lifecycle and will be cancelled when the - // Cluster is removed or an error occurs. - // - // Implementers should return an error if they cannot start operations for the given Cluster, - // and should ensure this operation is re-entrant and non-blocking. - // - // \_________________|)____.---'--`---.____ - // || \----.________.----/ - // || / / `--' - // __||____/ /_ - // |___ \ - // `--------' - Engage(context.Context, Cluster) error - - // Disengage gets called when the component should stop operations for the given Cluster. - Disengage(context.Context, Cluster) error -} - // Cluster provides various methods to interact with a cluster. type Cluster interface { // Name returns the name of the cluster. It identifies the cluster in the diff --git a/pkg/cluster/multicluster.go b/pkg/cluster/multicluster.go new file mode 100644 index 0000000000..73c0fdd9b5 --- /dev/null +++ b/pkg/cluster/multicluster.go @@ -0,0 +1,39 @@ +package cluster + +import ( + "context" +) + +// Aware is an interface that can be implemented by components that +// can engage and disengage when clusters are added or removed at runtime. +type Aware interface { + // Engage gets called when the component should start operations for the given Cluster. + // The given context is tied to the Cluster's lifecycle and will be cancelled when the + // Cluster is removed or an error occurs. + // + // Implementers should return an error if they cannot start operations for the given Cluster, + // and should ensure this operation is re-entrant and non-blocking. + // + // \_________________|)____.---'--`---.____ + // || \----.________.----/ + // || / / `--' + // __||____/ /_ + // |___ \ + // `--------' + Engage(context.Context, Cluster) error + + // Disengage gets called when the component should stop operations for the given Cluster. + Disengage(context.Context, Cluster) error +} + +// Provider defines methods to retrieve clusters by name. The provider is +// responsible for discovering and managing the lifecycle of each cluster. +// +// Example: A Cluster API provider would be responsible for discovering and +// managing clusters that are backed by Cluster API resources, which can live +// in multiple namespaces in a single management cluster. +type Provider interface { + // Get returns a cluster for the given identifying cluster name. Get + // returns an existing cluster if it has been created before. + Get(ctx context.Context, clusterName string) (Cluster, error) +} diff --git a/pkg/cluster/provider.go b/pkg/cluster/provider.go deleted file mode 100644 index f4e769344e..0000000000 --- a/pkg/cluster/provider.go +++ /dev/null @@ -1,62 +0,0 @@ -package cluster - -import ( - "context" - - "k8s.io/apimachinery/pkg/watch" -) - -// Provider defines methods to retrieve, list, and watch fleet of clusters. -// The provider is responsible for discovering and managing the lifecycle of each -// cluster. -// -// Example: A Cluster API provider would be responsible for discovering and managing -// clusters that are backed by Cluster API resources, which can live -// in multiple namespaces in a single management cluster. -type Provider interface { - // Get returns a cluster for the given identifying cluster name. The - // options are passed to the cluster constructor in case the cluster has - // not been created yet. Get returns an existing cluster if it has been - // created before. - Get(ctx context.Context, clusterName string, opts ...Option) (Cluster, error) - - // List returns a list of known identifying clusters names. - // This method is used to discover the initial set of known cluster names - // and to refresh the list of cluster names periodically. - List(ctx context.Context) ([]string, error) - - // Watch returns a Watcher that watches for changes to a list of known clusters - // and react to potential changes. - Watch(ctx context.Context) (Watcher, error) -} - -// Watcher watches for changes to clusters and provides events to a channel -// for the Manager to react to. -type Watcher interface { - // Stop stops watching. Will close the channel returned by ResultChan(). Releases - // any resources used by the watch. - Stop() - - // ResultChan returns a chan which will receive all the events. If an error occurs - // or Stop() is called, the implementation will close this channel and - // release any resources used by the watch. - ResultChan() <-chan WatchEvent -} - -// WatchEvent is an event that is sent when a cluster is added, modified, or deleted. -type WatchEvent struct { - // Type is the type of event that occurred. - // - // - ADDED or MODIFIED - // The cluster was added or updated: a new RESTConfig is available, or needs to be refreshed. - // - DELETED - // The cluster was deleted: the cluster is removed. - // - ERROR - // An error occurred while watching the cluster: the cluster is removed. - // - BOOKMARK - // A periodic event is sent that contains no new data: ignored. - Type watch.EventType - - // ClusterName is the identifying name of the cluster related to the event. - ClusterName string -} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index c1801c27d1..40e5172f9e 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -31,9 +31,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -62,14 +59,9 @@ const ( ) var _ Runnable = &controllerManager{} +var _ cluster.Aware = &controllerManager{} var _ Manager = &controllerManager{} -type engagedCluster struct { - cluster.Cluster - ctx context.Context - cancel context.CancelFunc -} - type controllerManager struct { sync.Mutex started bool @@ -82,10 +74,9 @@ type controllerManager struct { defaultCluster cluster.Cluster defaultClusterOptions cluster.Option - clusterProvider cluster.Provider - - clusterLock sync.RWMutex // protects clusters - clusters map[string]*engagedCluster + // engagedCluster is a map of engaged clusters. The can come and go as the manager is running. + engagedClustersLock sync.RWMutex + engagedClusters map[string]cluster.Cluster clusterAwareRunnables []cluster.Aware // recorderProvider is used to generate event recorders that will be injected into Controllers @@ -178,6 +169,9 @@ type controllerManager struct { // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} + + // clusterProvider is used to get clusters by name, beyond the default cluster. + clusterProvider cluster.Provider } type hasCache interface { @@ -193,10 +187,43 @@ func (cm *controllerManager) Add(r Runnable) error { } func (cm *controllerManager) add(r Runnable) error { + var engaged []cluster.Aware + var errs []error + disengage := func() { + for _, aware := range engaged { + if err := aware.Disengage(cm.internalCtx, cm.defaultCluster); err != nil { + errs = append(errs, err) + } + } + } + + // engage with existing clusters (this is reversible) + if aware, ok := r.(cluster.Aware); ok { + cm.engagedClustersLock.RLock() + defer cm.engagedClustersLock.RUnlock() + for _, cl := range cm.engagedClusters { + if err := aware.Engage(cm.internalCtx, cl); err != nil { + errs = append(errs, err) + break + } + engaged = append(engaged, aware) + } + if len(errs) > 0 { + disengage() + return kerrors.NewAggregate(errs) + } + } + + if err := cm.runnables.Add(r); err != nil { + disengage() + return err + } + if aware, ok := r.(cluster.Aware); ok { cm.clusterAwareRunnables = append(cm.clusterAwareRunnables, aware) } - return cm.runnables.Add(r) + + return nil } // AddHealthzCheck allows you to add Healthz checker. @@ -238,7 +265,17 @@ func (cm *controllerManager) Name() string { } func (cm *controllerManager) GetCluster(ctx context.Context, clusterName string) (cluster.Cluster, error) { - return cm.getCluster(ctx, clusterName) + if clusterName == "" || clusterName == cm.defaultCluster.Name() { + return cm.defaultCluster, nil + } + + if cm.clusterProvider == nil { + return nil, fmt.Errorf("cluster %q not found, cluster provider is not set", clusterName) + } + + // intentionally not returning from engaged clusters. This can be used + // without engaging clusters. + return cm.clusterProvider.Get(ctx, clusterName) } func (cm *controllerManager) GetHTTPClient() *http.Client { @@ -277,118 +314,6 @@ func (cm *controllerManager) GetAPIReader() client.Reader { return cm.defaultCluster.GetAPIReader() } -func (cm *controllerManager) engageClusterAwareRunnables() { - cm.Lock() - defer cm.Unlock() - - // If we don't have a cluster provider, we cannot sync the runnables. - if cm.clusterProvider == nil { - return - } - - // If we successfully retrieved the cluster, check - // that we schedule all the cluster aware runnables. - for name, cluster := range cm.clusters { - for _, aware := range cm.clusterAwareRunnables { - if err := aware.Engage(cluster.ctx, cluster); err != nil { - cm.logger.Error(err, "failed to engage cluster with runnable, won't retry", "clusterName", name, "runnable", aware) - continue - } - } - } -} - -func (cm *controllerManager) getCluster(ctx context.Context, clusterName string) (c *engagedCluster, err error) { - // Check if the manager was configured with a cluster provider, - // otherwise we cannot retrieve the cluster. - if cm.clusterProvider == nil { - return nil, fmt.Errorf("manager was not configured with a cluster provider, cannot retrieve %q", clusterName) - } - - // Check if the cluster already exists. - cm.clusterLock.RLock() - c, ok := cm.clusters[clusterName] - cm.clusterLock.RUnlock() - if ok { - return c, nil - } - - // Lock the whole function to avoid creating multiple clusters for the same name. - cm.clusterLock.Lock() - defer cm.clusterLock.Unlock() - - // Check again in case another goroutine already created the cluster. - c, ok = cm.clusters[clusterName] - if ok { - return c, nil - } - - // Create a new cluster. - var cl cluster.Cluster - { - // TODO(vincepri): Make this timeout configurable. - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - var watchErr error - if err := wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (done bool, err error) { - cl, watchErr = cm.clusterProvider.Get(ctx, clusterName, cm.defaultClusterOptions, cluster.WithName(clusterName)) - if watchErr != nil { - return false, nil //nolint:nilerr // We want to keep trying. - } - return true, nil - }); err != nil { - return nil, fmt.Errorf("failed create cluster %q: %w", clusterName, kerrors.NewAggregate([]error{err, watchErr})) - } - } - - // We add the Cluster to the manager as a Runnable, which is going to be categorized - // as a Cache-backed Runnable. - // - // Once added, if the manager has already started, it waits for the Cache to sync before returning - // otherwise it enqueues the Runnable to be started when the manager starts. - if err := cm.Add(cl); err != nil { - return nil, fmt.Errorf("cannot add cluster %q to manager: %w", clusterName, err) - } - - // Create a new context for the Cluster, so that it can be stopped independently. - ctx, cancel := context.WithCancel(context.Background()) - - c = &engagedCluster{ - Cluster: cl, - ctx: ctx, - cancel: cancel, - } - cm.clusters[clusterName] = c - return c, nil -} - -func (cm *controllerManager) removeNamedCluster(clusterName string) error { - // Check if the manager was configured with a cluster provider, - // otherwise we cannot retrieve the cluster. - if cm.clusterProvider == nil { - return fmt.Errorf("manager was not configured with a cluster provider, cannot retrieve %q", clusterName) - } - - cm.clusterLock.Lock() - defer cm.clusterLock.Unlock() - c, ok := cm.clusters[clusterName] - if !ok { - return nil - } - - // Disengage all the runnables. - for _, aware := range cm.clusterAwareRunnables { - if err := aware.Disengage(c.ctx, c); err != nil { - return fmt.Errorf("failed to disengage cluster aware runnable: %w", err) - } - } - - // Cancel the context and remove the cluster from the map. - c.cancel() - delete(cm.clusters, clusterName) - return nil -} - func (cm *controllerManager) GetWebhookServer() webhook.Server { cm.webhookServerOnce.Do(func() { if cm.webhookServer == nil { @@ -570,80 +495,6 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { //nolint:g }() } - // If the manager has been configured with a cluster provider, start it. - if cm.clusterProvider != nil { - if err := cm.add(RunnableFunc(func(ctx context.Context) error { - resync := func() error { - clusterList, err := cm.clusterProvider.List(ctx) - if err != nil { - return err - } - for _, name := range clusterList { - if _, err := cm.getCluster(ctx, name); err != nil { - return err - } - } - clusterListSet := sets.New(clusterList...) - for name := range cm.clusters { - if clusterListSet.Has(name) { - continue - } - if err := cm.removeNamedCluster(name); err != nil { - return err - } - } - cm.engageClusterAwareRunnables() - return nil - } - - // Always do an initial full resync. - if err := resync(); err != nil { - return err - } - - // Create a watcher and start watching for changes. - watcher, err := cm.clusterProvider.Watch(ctx) - if err != nil { - return err - } - defer func() { - go func() { - // Drain the watcher result channel to prevent a goroutine leak. - for range watcher.ResultChan() { - } - }() - watcher.Stop() - }() - - for { - select { - case <-time.After(10 * time.Minute): - if err := resync(); err != nil { - return err - } - case event := <-watcher.ResultChan(): - switch event.Type { - case watch.Added, watch.Modified: - if _, err := cm.getCluster(ctx, event.ClusterName); err != nil { - return err - } - cm.engageClusterAwareRunnables() - case watch.Deleted, watch.Error: - if err := cm.removeNamedCluster(event.ClusterName); err != nil { - return err - } - case watch.Bookmark: - continue - } - case <-ctx.Done(): - return nil - } - } - })); err != nil { - return fmt.Errorf("failed to add cluster provider to runnables: %w", err) - } - } - // Manager is ready. ready = true cm.Unlock() @@ -761,6 +612,69 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e return nil } +func (cm *controllerManager) Engage(ctx context.Context, cl cluster.Cluster) error { + cm.Lock() + defer cm.Unlock() + + // be reentrant via noop + cm.engagedClustersLock.RLock() + if _, ok := cm.engagedClusters[cl.Name()]; ok { + cm.engagedClustersLock.RUnlock() + return nil + } + + // add early because any engaged runnable could access it + cm.engagedClustersLock.Lock() + cm.engagedClusters[cl.Name()] = cl + cm.engagedClustersLock.Unlock() + + // engage known runnables + var errs []error + var engaged []cluster.Aware + for _, r := range cm.clusterAwareRunnables { + if err := r.Engage(ctx, cl); err != nil { + errs = append(errs, err) + break + } + engaged = append(engaged, r) + } + + // clean-up + if len(errs) > 0 { + for _, aware := range engaged { + if err := aware.Disengage(ctx, cl); err != nil { + errs = append(errs, err) + } + } + + cm.engagedClustersLock.Lock() + delete(cm.engagedClusters, cl.Name()) + cm.engagedClustersLock.Unlock() + + return kerrors.NewAggregate(errs) + } + + return nil +} + +func (cm *controllerManager) Disengage(ctx context.Context, cl cluster.Cluster) error { + cm.Lock() + defer cm.Unlock() + + var errs []error + for _, r := range cm.clusterAwareRunnables { + if err := r.Disengage(ctx, cl); err != nil { + errs = append(errs, err) + } + } + + cm.engagedClustersLock.Lock() + delete(cm.engagedClusters, cl.Name()) + cm.engagedClustersLock.Unlock() + + return kerrors.NewAggregate(errs) +} + func (cm *controllerManager) startLeaderElectionRunnables() error { return cm.runnables.LeaderElection.Start(cm.internalCtx) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 776fee5337..d9f7acbf88 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -51,6 +51,11 @@ type Manager interface { // Cluster holds a variety of methods to interact with a cluster. cluster.Cluster + // Aware is an interface for dynamic cluster addition and removal. The + // Manager will call Engage and Disengage on cluster-aware runnables like + // controllers to e.g. watch multiple clusters. + cluster.Aware + // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either @@ -334,7 +339,7 @@ func New(config *rest.Config, options Options) (Manager, error) { clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck } - cluster, err := cluster.New(config, clusterOptions) + cl, err := cluster.New(config, clusterOptions) if err != nil { return nil, err } @@ -347,7 +352,7 @@ func New(config *rest.Config, options Options) (Manager, error) { // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. - recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) + recorderProvider, err := options.newRecorderProvider(config, cl.GetHTTPClient(), cl.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } @@ -361,7 +366,7 @@ func New(config *rest.Config, options Options) (Manager, error) { leaderRecorderProvider = recorderProvider } else { leaderConfig = rest.CopyConfig(options.LeaderElectionConfig) - scheme := cluster.GetScheme() + scheme := cl.GetScheme() err := corev1.AddToScheme(scheme) if err != nil { return nil, err @@ -396,7 +401,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } // Create the metrics server. - metricsServer, err := options.newMetricsServer(options.Metrics, config, cluster.GetHTTPClient()) + metricsServer, err := options.newMetricsServer(options.Metrics, config, cl.GetHTTPClient()) if err != nil { return nil, err } @@ -418,10 +423,10 @@ func New(config *rest.Config, options Options) (Manager, error) { errChan := make(chan error, 1) return &controllerManager{ stopProcedureEngaged: ptr.To(int64(0)), - defaultCluster: cluster, + defaultCluster: cl, defaultClusterOptions: clusterOptions, clusterProvider: options.ExperimentalClusterProvider, - clusters: make(map[string]*engagedCluster), + engagedClusters: make(map[string]cluster.Cluster), runnables: newRunnables(options.BaseContext, errChan), errChan: errChan, recorderProvider: recorderProvider,