Skip to content

Commit

Permalink
Scope controllers for a logical cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
vincepri committed Feb 27, 2023
1 parent 23cd6a1 commit 64ecb97
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 30 deletions.
20 changes: 18 additions & 2 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/logical-cluster"
)

// Supporting mocking out functions for testing.
var newController = controller.New
var newController = controller.NewUnmanaged
var getGvk = apiutil.GVKForObject

// project represents other forms that the we can use to
Expand All @@ -60,6 +61,7 @@ type Builder struct {
watchesInput []WatchesInput
mgr manager.Manager
cluster cluster.Cluster
logicalName logical.Name
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
Expand All @@ -69,6 +71,7 @@ type Builder struct {
func (blder *Builder) clone() *Builder {
clone := *blder
clone.cluster = nil
clone.logicalName = ""
clone.ctrl = nil
return &clone
}
Expand Down Expand Up @@ -244,9 +247,10 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return nil, blder.forInput.err
}

if err := blder.mgr.AddRunnableBuilder(func(cl cluster.Cluster) (manager.Runnable, error) {
if err := blder.mgr.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (manager.Runnable, error) {
cloned := blder.clone()
cloned.cluster = cl
cloned.logicalName = name
if err := cloned.do(r); err != nil {
return nil, err
}
Expand Down Expand Up @@ -351,6 +355,14 @@ func (blder *Builder) doWatch() error {
return err
}
srckind.Type = typeForSrc
} else if !ok {
// If we're building a logical controller, raw watches are not allowed
// given that the cache cannot be validated to be coming from the same cluter.
// In the future, we could consider allowing this by satisfying a new interface
// that sets and uses the cluster.
if blder.logicalName != "" {
return fmt.Errorf("when using a logical adapter, custom raw watches %T are not allowed", w.src)
}
}

if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
Expand Down Expand Up @@ -378,6 +390,10 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
ctrlOptions.Reconciler = r
}

if blder.logicalName != "" {
ctrlOptions.LogicalCluster = blder.logicalName
}

// Retrieve the GVK from the object we're reconciling
// to prepopulate logger information, and to optionally generate a default name.
var gvk schema.GroupVersionKind
Expand Down
4 changes: 2 additions & 2 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (l *testLogger) WithName(name string) logr.LogSink {

var _ = Describe("application", func() {
BeforeEach(func() {
newController = controller.New
newController = controller.NewUnmanaged
})

noop := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
Expand Down Expand Up @@ -623,7 +623,7 @@ var _ = Describe("application", func() {
},
},
}
cluster1, err := mgr.LogicalClusterGetter()("cluster1")
cluster1, err := mgr.GetCluster("cluster1")
Expect(err).NotTo(HaveOccurred())
Expect(cluster1.GetClient().Create(ctx, dep)).To(Succeed())

Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
)

// LogicalGetterFunc is a function that returns a client for a given logical cluster name.
// LogicalGetterFunc is a function that returns a cluster for a given logical cluster name.
type LogicalGetterFunc func(logical.Name) (Cluster, error)

// Cluster provides various methods to interact with a cluster.
Expand Down
24 changes: 21 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/logical-cluster"
)

// Options are the arguments for creating a new Controller.
Expand All @@ -50,6 +51,10 @@ type Options struct {
// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger

// LogicalCluster is populated when the controller was created for a logical cluster.
// This is used to determine if watch events without a logical.Name should be ignored.
LogicalCluster logical.Name
}

// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
Expand All @@ -76,9 +81,21 @@ type Controller interface {
GetLogger() logr.Logger
}

// New returns a new Controller based on the Manager, the caller is responsible
// for adding the controller to the manager as a Runnable.
// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}

// Add the controller as a Manager components
return c, mgr.Add(c)
}

// NewUnmanaged returns a new Controller based on the Manager, the caller is responsible
// for adding the controller to the manager as a Runnable.
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
Expand Down Expand Up @@ -121,7 +138,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)

// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
Cluster: options.LogicalCluster,
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ var _ = Describe("controller.Controller", func() {
It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
Expand All @@ -102,20 +101,21 @@ var _ = Describe("controller.Controller", func() {
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).To(Succeed())
close(controllerFinished)
}()

<-reconcileStarted
Eventually(reconcileStarted).Should(BeClosed())
cancel()
<-controllerFinished
Eventually(controllerFinished).Should(BeClosed())

// force-close keep-alive connections. These'll time anyway (after
// like 30s or so) but force it to speed up the tests.
clientTransport.CloseIdleConnections()
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
Eventually(func() error { return goleak.Find(currentGRs) }, 10*time.Second).Should(Succeed())
})

It("should not create goroutines if never started", func() {
Expand Down
9 changes: 8 additions & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string

// Cluster is the logical cluster that this controller is running against.
// +optional
Cluster logical.Name

// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int

Expand Down Expand Up @@ -317,7 +321,10 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
ctx = addReconcileID(ctx, reconcileID)

// Set the Cluster on the request if it is set on the context.
req.Cluster = logical.FromContext(ctx)
if req.Cluster != c.Cluster {
panic(fmt.Sprintf("controller was setup for logical cluster %q, got a request for a cluster %q, not allowed!", c.Cluster, req.Cluster))
}
req.Cluster = c.Cluster

// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
// resource to be synced.
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/testing/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var _ = Describe("Start method", func() {
HealthCheck: HealthCheck{
URL: getServerURL(server),
},
StopTimeout: 2 * time.Second,
}
processState.Path = "bash"
processState.Args = simpleBashScript

})
Expand Down
17 changes: 8 additions & 9 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (

var _ Runnable = &controllerManager{}
var _ Manager = &controllerManager{}
var _ cluster.LogicalGetterFunc = (&controllerManager{}).GetCluster

type logicalCluster struct {
cluster.Cluster
Expand All @@ -81,7 +82,7 @@ type controllerManager struct {
errChan chan error
runnables *runnables

runnableBuilders []func(cl cluster.Cluster) (Runnable, error)
runnableBuilders []func(logical.Name, cluster.Cluster) (Runnable, error)

// defaultCluster holds a variety of methods to interact with a defaultCluster. Required.
defaultCluster cluster.Cluster
Expand Down Expand Up @@ -256,6 +257,10 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
return nil
}

func (cm *controllerManager) GetCluster(name logical.Name) (cluster.Cluster, error) {
return cm.getLogicalCluster(name)
}

func (cm *controllerManager) GetHTTPClient() *http.Client {
return cm.defaultCluster.GetHTTPClient()
}
Expand Down Expand Up @@ -292,12 +297,6 @@ func (cm *controllerManager) GetAPIReader() client.Reader {
return cm.defaultCluster.GetAPIReader()
}

func (cm *controllerManager) LogicalClusterGetter() cluster.LogicalGetterFunc {
return func(name logical.Name) (cluster.Cluster, error) {
return cm.getLogicalCluster(name)
}
}

func (cm *controllerManager) syncClusterAwareRunnables() {
cm.Lock()
defer cm.Unlock()
Expand All @@ -319,7 +318,7 @@ func (cm *controllerManager) syncClusterAwareRunnables() {
}

// Build the runnable.
runnable, err := build(cluster)
runnable, err := build(name, cluster)
if err != nil {
cluster.runnableBuilds = append(cluster.runnableBuilds, err)
cm.logger.Error(err, "failed to build cluster aware runnable, won't retry", "clusterName", name)
Expand Down Expand Up @@ -410,7 +409,7 @@ func (cm *controllerManager) removeLogicalCluster(name logical.Name) error {
return nil
}

func (cm *controllerManager) AddRunnableBuilder(fn func(cl cluster.Cluster) (Runnable, error)) error {
func (cm *controllerManager) AddLogicalRunnableBuilder(fn func(name logical.Name, cl cluster.Cluster) (Runnable, error)) error {
cm.Lock()
defer cm.Unlock()
cm.runnableBuilders = append(cm.runnableBuilders, fn)
Expand Down
10 changes: 5 additions & 5 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ type Manager interface {
// Cluster holds a variety of methods to interact with a cluster.
cluster.Cluster

// LogicalClientGetter can be set on reconcilers to retrieve a Cluster from a logical.Name.
LogicalClusterGetter() cluster.LogicalGetterFunc

// Add will set requested dependencies on the component, and cause the component to be
// started when Start is called.
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
Expand All @@ -67,9 +64,9 @@ type Manager interface {
// election was configured.
Elected() <-chan struct{}

// AddRunnableBuilder adds a controller builder to the manager, which is used to build
// AddLogicalRunnableBuilder adds a controller builder to the manager, which is used to build
// controllers for a given cluster. This is useful when the Manager is running against many logical clusters.
AddRunnableBuilder(func(cluster.Cluster) (Runnable, error)) error
AddLogicalRunnableBuilder(func(logical.Name, cluster.Cluster) (Runnable, error)) error

// AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
// Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
Expand All @@ -92,6 +89,9 @@ type Manager interface {
// lock was lost.
Start(ctx context.Context) error

// GetCluster retrieves a Cluster from a given logical name.
GetCluster(logical.Name) (cluster.Cluster, error)

// GetWebhookServer returns a webhook.Server
GetWebhookServer() *webhook.Server

Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,7 @@ var _ = Describe("manger.Manager", func() {

var built atomic.Bool
var started atomic.Bool
err = m.AddRunnableBuilder(func(cl cluster.Cluster) (Runnable, error) {
err = m.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (Runnable, error) {
built.Store(true)
return RunnableFunc(func(ctx context.Context) error {
Expect(logical.FromContext(ctx)).To(Equal(logical.Name("test-cluster")))
Expand Down Expand Up @@ -1735,7 +1735,7 @@ var _ = Describe("manger.Manager", func() {
var started atomic.Int64
var completed atomic.Int64
removedCh := make(chan struct{})
err = m.AddRunnableBuilder(func(cl cluster.Cluster) (Runnable, error) {
err = m.AddLogicalRunnableBuilder(func(name logical.Name, cl cluster.Cluster) (Runnable, error) {
built.Add(1)
return RunnableFunc(func(ctx context.Context) error {
defer completed.Add(1)
Expand Down

0 comments on commit 64ecb97

Please sign in to comment.