Skip to content

Commit

Permalink
Rework controllers to be multi-cluster aware
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@redhat.com>
  • Loading branch information
vincepri committed Mar 1, 2023
1 parent ebfcabf commit 8d3b8f7
Show file tree
Hide file tree
Showing 19 changed files with 394 additions and 203 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module sigs.k8s.io/controller-runtime
go 1.19

require (
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch/v5 v5.6.0
github.com/fsnotify/fsnotify v1.6.0
github.com/go-logr/logr v1.2.3
Expand Down Expand Up @@ -32,6 +31,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
Expand Down
21 changes: 0 additions & 21 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,6 @@ type Builder struct {
name string
}

func (blder *Builder) clone() *Builder {
clone := *blder
clone.cluster = nil
clone.logicalName = ""
clone.ctrl = nil
return &clone
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
func ControllerManagedBy(m manager.Manager) *Builder {
return &Builder{cluster: m, mgr: m}
Expand Down Expand Up @@ -246,19 +238,6 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
if blder.forInput.err != nil {
return nil, blder.forInput.err
}

if err := blder.mgr.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (manager.Runnable, error) {
cloned := blder.clone()
cloned.cluster = cl
cloned.logicalName = name
if err := cloned.do(r); err != nil {
return nil, err
}
return cloned.ctrl, nil
}); err != nil {
return nil, err
}

if err := blder.do(r); err != nil {
return nil, err
}
Expand Down
43 changes: 27 additions & 16 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strings"
"sync/atomic"

"github.com/davecgh/go-spew/spew"
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -571,14 +570,36 @@ var _ = Describe("application", func() {
mgr, err := manager.New(cfg, manager.Options{}.WithExperimentalLogicalAdapter(adapter))
Expect(err).NotTo(HaveOccurred())

ch1 := make(chan reconcile.Request)
ch2 := make(chan reconcile.Request)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
By("Starting the manager")
go func() {
defer GinkgoRecover()
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
}()

cluster1, err := mgr.GetCluster(ctx, "cluster1")
Expect(err).NotTo(HaveOccurred())

By("Creating a custom namespace")
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-multi-cluster-",
},
}
Expect(cluster1.GetClient().Create(ctx, ns)).To(Succeed())

ch1 := make(chan reconcile.Request, 1)
ch2 := make(chan reconcile.Request, 1)
Expect(
ControllerManagedBy(mgr).
For(&appsv1.Deployment{}).
Owns(&appsv1.ReplicaSet{}).
Complete(reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
spew.Dump(req)
if req.Namespace != ns.Name {
return reconcile.Result{}, nil
}

defer GinkgoRecover()
switch req.Cluster {
case "cluster1":
Expand All @@ -592,19 +613,11 @@ var _ = Describe("application", func() {
})),
).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
By("Starting the manager")
go func() {
defer GinkgoRecover()
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
}()

By("Creating a deployment")
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deploy-multi-cluster",
Namespace: "default",
Namespace: ns.Name,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
Expand All @@ -623,8 +636,6 @@ var _ = Describe("application", func() {
},
},
}
cluster1, err := mgr.GetCluster(ctx, "cluster1")
Expect(err).NotTo(HaveOccurred())
Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed())

By("Waiting for the Deployment Reconcile on both clusters")
Expand All @@ -647,7 +658,7 @@ var _ = Describe("application", func() {
// Expect a Reconcile when an Owned object is managedObjects.
rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Namespace: dep.Namespace,
Name: "rs-multi-cluster",
Labels: dep.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{
Expand Down
46 changes: 46 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,43 @@ import (
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
)

// AwareRunnable is an interface that can be implemented by runnable types
// that are cluster-aware.
type AwareRunnable interface {
// Engage gets called when the runnable should start operations for the given Cluster.
// The given context is tied to the Cluster's lifecycle and will be cancelled when the
// Cluster is removed or an error occurs.
//
// Implementers should return an error if they cannot start operations for the given Cluster,
// and should ensure this operation is re-entrant and non-blocking.
//
// \_________________|)____.---'--`---.____
// || \----.________.----/
// || / / `--'
// __||____/ /_
// |___ \
// `--------'
Engage(context.Context, Cluster) error

// Disengage gets called when the runnable should stop operations for the given Cluster.
Disengage(context.Context, Cluster) error
}

// AwareDeepCopy is an interface that can be implemented by types
// that are cluster-aware, and can return a copy of themselves
// for a given cluster.
type AwareDeepCopy[T any] interface {
DeepCopyFor(Cluster) T
}

// LogicalGetterFunc is a function that returns a cluster for a given logical cluster name.
type LogicalGetterFunc func(context.Context, logical.Name) (Cluster, error)

// Cluster provides various methods to interact with a cluster.
type Cluster interface {
// Name returns the unique logical name of the cluster.
Name() logical.Name

// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver
GetHTTPClient() *http.Client

Expand Down Expand Up @@ -81,6 +113,9 @@ type Cluster interface {

// Options are the possible options that can be configured for a Cluster.
type Options struct {
// Name is the unique name of the cluster.
Name logical.Name

// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
// idea to pass your own scheme in. See the documentation in pkg/scheme for more information.
Expand Down Expand Up @@ -279,6 +314,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
}

return &cluster{
name: options.Name,
config: config,
httpClient: options.HTTPClient,
scheme: options.Scheme,
Expand Down Expand Up @@ -347,3 +383,13 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {

return options, nil
}

// WithName sets the name of the cluster.
func WithName(name logical.Name) Option {
return func(o *Options) {
if o.Name != "" {
panic("cluster name cannot be set more than once")
}
o.Name = name
}
}
7 changes: 7 additions & 0 deletions pkg/cluster/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/logical-cluster"
)

type cluster struct {
name logical.Name

// config is the rest.config used to talk to the apiserver. Required.
config *rest.Config

Expand Down Expand Up @@ -59,6 +62,10 @@ type cluster struct {
logger logr.Logger
}

func (c *cluster) Name() logical.Name {
return c.name
}

func (c *cluster) GetConfig() *rest.Config {
return c.config
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller

// Create controller with dependencies set
return &controller.Controller{
Cluster: options.LogicalCluster,
Do: options.Reconciler,
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
Expand Down
37 changes: 30 additions & 7 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -36,16 +37,22 @@ var _ EventHandler = &EnqueueRequestForObject{}
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type EnqueueRequestForObject struct{}
type EnqueueRequestForObject struct {
cluster cluster.Cluster
}

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
var logicalClusterName logical.Name
if e.cluster != nil {
logicalClusterName = e.cluster.Name()
}
q.Add(reconcile.Request{
Cluster: logical.FromContext(ctx),
Cluster: logicalClusterName,
NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
Expand All @@ -55,20 +62,23 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv

// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
logicalName := logical.FromContext(ctx)
var logicalClusterName logical.Name
if e.cluster != nil {
logicalClusterName = e.cluster.Name()
}

switch {
case evt.ObjectNew != nil:
q.Add(reconcile.Request{
Cluster: logicalName,
Cluster: logicalClusterName,
NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
},
})
case evt.ObjectOld != nil:
q.Add(reconcile.Request{
Cluster: logicalName,
Cluster: logicalClusterName,
NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
Expand All @@ -85,8 +95,12 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
var logicalClusterName logical.Name
if e.cluster != nil {
logicalClusterName = e.cluster.Name()
}
q.Add(reconcile.Request{
Cluster: logical.FromContext(ctx),
Cluster: logicalClusterName,
NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
Expand All @@ -100,11 +114,20 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
}
var logicalClusterName logical.Name
if e.cluster != nil {
logicalClusterName = e.cluster.Name()
}
q.Add(reconcile.Request{
Cluster: logical.FromContext(ctx),
Cluster: logicalClusterName,
NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
},
})
}

// DeepCopyFor implements cluster.AwareDeepCopy[EventHandler].
func (e *EnqueueRequestForObject) DeepCopyFor(c cluster.Cluster) EventHandler {
return &EnqueueRequestForObject{cluster: c}
}
15 changes: 12 additions & 3 deletions pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/logical-cluster"
)

// MapFunc is the signature required for enqueueing requests from a generic function.
Expand All @@ -49,6 +49,8 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
var _ EventHandler = &enqueueRequestsFromMapFunc{}

type enqueueRequestsFromMapFunc struct {
cluster cluster.Cluster

// Mapper transforms the argument into a slice of keys to be reconciled
toRequests MapFunc
}
Expand Down Expand Up @@ -85,11 +87,18 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqu
continue
}
// If the request doesn't specify a cluster, use the cluster from the context.
if req.Cluster == "" {
req.Cluster = logical.FromContext(ctx)
if req.Cluster == "" && e.cluster != nil {
req.Cluster = e.cluster.Name()
}
// Enqueue the request and track it.
q.Add(req)
reqs[req] = empty{}
}
}

func (e *enqueueRequestsFromMapFunc) DeepCopyFor(c cluster.Cluster) EventHandler {
return &enqueueRequestsFromMapFunc{
cluster: c,
toRequests: e.toRequests,
}
}
Loading

0 comments on commit 8d3b8f7

Please sign in to comment.