Skip to content

Commit

Permalink
SQUASH: make manager cluster.Aware
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
  • Loading branch information
sttts committed Apr 24, 2024
1 parent 489224f commit 56e0c2b
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 442 deletions.
147 changes: 82 additions & 65 deletions examples/fleet-namespace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -99,10 +98,9 @@ func main() {
entryLog.Error(err, "unable to set up provider")
os.Exit(1)
}
provider := &NamespacedClusterProvider{Cluster: cl}
provider := NewNamespacedClusterProvider(cl)

// Setup a cluster-aware Manager, watching the clusters (= namespaces) through
// the cluster provider.
// Setup a cluster-aware Manager, with the provider to lookup clusters.
entryLog.Info("Setting up cluster-aware manager")
mgr, err := manager.New(cfg, manager.Options{
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
Expand Down Expand Up @@ -142,9 +140,15 @@ func main() {
}

entryLog.Info("Starting provider")
if err := provider.Start(ctx, mgr); err != nil { // does not block
entryLog.Error(err, "unable to start provider")
os.Exit(1)
}

entryLog.Info("Starting cluster")
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
if err := ignoreCanceled(provider.Start(ctx)); err != nil {
if err := ignoreCanceled(cl.Start(ctx)); err != nil {
return fmt.Errorf("failed to start provider: %w", err)
}
return nil
Expand All @@ -169,80 +173,93 @@ func main() {
// to "default" and vice versa, simulating a multi-cluster setup. It uses one
// informer to watch objects for all namespaces.
type NamespacedClusterProvider struct {
cluster.Cluster
}
cluster cluster.Cluster

func (p *NamespacedClusterProvider) Get(ctx context.Context, clusterName string, opts ...cluster.Option) (cluster.Cluster, error) {
ns := &corev1.Namespace{}
if err := p.Cluster.GetCache().Get(ctx, client.ObjectKey{Name: clusterName}, ns); err != nil {
return nil, err
}
mgr manager.Manager

return &NamespacedCluster{clusterName: clusterName, Cluster: p.Cluster}, nil
lock sync.RWMutex
clusters map[string]cluster.Cluster
cancelFns map[string]context.CancelFunc
}

func (p *NamespacedClusterProvider) List(ctx context.Context) ([]string, error) {
nss := &corev1.NamespaceList{}
if err := p.Cluster.GetCache().List(ctx, nss); err != nil {
return nil, err
}

res := make([]string, 0, len(nss.Items))
for _, ns := range nss.Items {
res = append(res, ns.Name)
func NewNamespacedClusterProvider(cl cluster.Cluster) *NamespacedClusterProvider {
return &NamespacedClusterProvider{
cluster: cl,
clusters: map[string]cluster.Cluster{},
cancelFns: map[string]context.CancelFunc{},
}
return res, nil
}

func (p *NamespacedClusterProvider) Watch(ctx context.Context) (cluster.Watcher, error) {
inf, err := p.Cluster.GetCache().GetInformer(ctx, &corev1.Namespace{})
func (p *NamespacedClusterProvider) Start(ctx context.Context, mgr manager.Manager) error {
nsInf, err := p.cluster.GetCache().GetInformer(ctx, &corev1.Namespace{})
if err != nil {
return nil, err
return err
}
return &NamespaceWatcher{inf: inf, ch: make(chan cluster.WatchEvent)}, nil
}

type NamespaceWatcher struct {
inf cache.Informer
init sync.Once
ch chan cluster.WatchEvent
reg toolscache.ResourceEventHandlerRegistration
}
if _, err := nsInf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ns := obj.(*corev1.Namespace)

p.lock.RLock()
if _, ok := p.clusters[ns.Name]; ok {
defer p.lock.RUnlock()
return
}

// create new cluster
p.lock.Lock()
clusterCtx, cancel := context.WithCancel(ctx)
cl := &NamespacedCluster{clusterName: ns.Name, Cluster: p.cluster}
p.clusters[ns.Name] = cl
p.cancelFns[ns.Name] = cancel
p.lock.Unlock()

if err := mgr.Engage(clusterCtx, cl); err != nil {
runtime.HandleError(fmt.Errorf("failed to engage manager with cluster %q: %w", ns.Name, err))

// cleanup
p.lock.Lock()
delete(p.clusters, ns.Name)
delete(p.cancelFns, ns.Name)
p.lock.Unlock()
}
},
DeleteFunc: func(obj interface{}) {
ns := obj.(*corev1.Namespace)

p.lock.RLock()
cl, ok := p.clusters[ns.Name]
if !ok {
p.lock.RUnlock()
return
}
p.lock.RUnlock()

if err := mgr.Disengage(ctx, cl); err != nil {
runtime.HandleError(fmt.Errorf("failed to disengage manager with cluster %q: %w", ns.Name, err))
}

func (w *NamespaceWatcher) Stop() {
if w.reg != nil {
_ = w.inf.RemoveEventHandler(w.reg)
// stop and forget
p.lock.Lock()
p.cancelFns[ns.Name]()
delete(p.clusters, ns.Name)
delete(p.cancelFns, ns.Name)
p.lock.Unlock()
},
}); err != nil {
return err
}
close(w.ch)

return nil
}

func (w *NamespaceWatcher) ResultChan() <-chan cluster.WatchEvent {
w.init.Do(func() {
w.reg, _ = w.inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ns := obj.(*corev1.Namespace)
w.ch <- cluster.WatchEvent{
Type: watch.Added,
ClusterName: ns.Name,
}
},
DeleteFunc: func(obj interface{}) {
ns := obj.(*corev1.Namespace)
w.ch <- cluster.WatchEvent{
Type: watch.Deleted,
ClusterName: ns.Name,
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
ns := newObj.(*corev1.Namespace)
w.ch <- cluster.WatchEvent{
Type: watch.Modified,
ClusterName: ns.Name,
}
},
})
})
return w.ch
func (p *NamespacedClusterProvider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if cl, ok := p.clusters[clusterName]; ok {
return cl, nil
}
return nil, fmt.Errorf("cluster %s not found", clusterName)
}

func ignoreCanceled(err error) error {
Expand Down
Loading

0 comments on commit 56e0c2b

Please sign in to comment.