Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP adds Context support to manager in place of stop channels. #164

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"os"

Expand Down Expand Up @@ -128,7 +129,13 @@ func main() {
os.Exit(1)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-signals.SetupSignalHandler()
cancel()
}()

if err := mgr.Start(ctx); err != nil {
entryLog.Error(err, "unable to run manager")
os.Exit(1)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/builder/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ import (

var _ = Describe("application", func() {
var stop chan struct{}
var ctx context.Context

BeforeEach(func() {
stop = make(chan struct{})
ctx = context.Background()
getConfig = func() (*rest.Config, error) { return cfg, nil }
newController = controller.New
newManager = manager.New
Expand Down Expand Up @@ -138,7 +140,7 @@ var _ = Describe("application", func() {
By("Starting the application")
go func() {
defer GinkgoRecover()
Expect(instance.Start(stop)).NotTo(HaveOccurred())
Expect(instance.Start(ctx)).NotTo(HaveOccurred())
By("Stopping the application")
}()

Expand Down
3 changes: 1 addition & 2 deletions pkg/builder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
)

// NB: don't call SetLogger in init(), or else you'll mess up logging in the main suite.
Expand All @@ -49,7 +48,7 @@ func ExampleBuilder() {
os.Exit(1)
}

if err := rs.Start(signals.SetupSignalHandler()); err != nil {
if err := rs.Start(context.TODO()); err != nil {
log.Error(err, "Unable to run controller")
os.Exit(1)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package controller_test

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,9 +37,11 @@ import (
var _ = Describe("controller", func() {
var reconciled chan reconcile.Request
var stop chan struct{}
var ctx context.Context

BeforeEach(func() {
stop = make(chan struct{})
ctx = context.Background()
reconciled = make(chan reconcile.Request)
Expect(cfg).NotTo(BeNil())
})
Expand Down Expand Up @@ -76,7 +80,7 @@ var _ = Describe("controller", func() {
By("Starting the Manager")
go func() {
defer GinkgoRecover()
Expect(cm.Start(stop)).NotTo(HaveOccurred())
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
}()

deployment := &appsv1.Deployment{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller_test

import (
"context"
"os"

"k8s.io/api/core/v1"
Expand All @@ -25,7 +26,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
"sigs.k8s.io/controller-runtime/pkg/source"
)

Expand Down Expand Up @@ -75,5 +75,5 @@ func ExampleController() {
}

// Start the Controller through the manager.
mgr.Start(signals.SetupSignalHandler())
mgr.Start(context.TODO())
}
6 changes: 5 additions & 1 deletion pkg/internal/recorder/recorder_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package recorder_test

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,9 +37,11 @@ import (

var _ = Describe("recorder", func() {
var stop chan struct{}
var ctx context.Context

BeforeEach(func() {
stop = make(chan struct{})
ctx = context.Background()
Expect(cfg).NotTo(BeNil())
})

Expand Down Expand Up @@ -71,7 +75,7 @@ var _ = Describe("recorder", func() {
By("Starting the Manager")
go func() {
defer GinkgoRecover()
Expect(cm.Start(stop)).NotTo(HaveOccurred())
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
}()

deployment := &appsv1.Deployment{
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ limitations under the License.
package manager_test

import (
"context"
"os"

"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
)

var (
Expand Down Expand Up @@ -61,7 +61,7 @@ func ExampleManager_Add() {

// This example starts a Manager that has had Runnables added.
func ExampleManager_Start() {
err := mgr.Start(signals.SetupSignalHandler())
err := mgr.Start(context.TODO())
if err != nil {
log.Error(err, "unable start the manager")
os.Exit(1)
Expand Down
44 changes: 26 additions & 18 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package manager

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -73,7 +74,7 @@ type controllerManager struct {
mu sync.Mutex
started bool
errChan chan error
stop <-chan struct{}
stop chan struct{}

startCache func(stop <-chan struct{}) error
}
Expand Down Expand Up @@ -157,11 +158,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
return cm.mapper
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
func (cm *controllerManager) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(cm.stop)

if cm.resourceLock == nil {
go cm.start(stop)
go cm.start(ctx)
select {
case <-stop:
case <-ctx.Done():
// we are done
return nil
case err := <-cm.errChan:
Expand All @@ -178,7 +183,16 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: cm.start,
// Ignore the passed-in stop channel from leaderelection. The next
// thing it does anyway after closing its stop channel is call
// OnStoppedLeading.
// With the 1.12 release, this will accept a Context, which is a
// descendant of the same context that will be passed into
// `l.Run(ctx)` below. Then it will be safe to pass the context
// through to cm.start here.
OnStartedLeading: func(_ <-chan struct{}) {
cm.start(ctx)
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
Expand All @@ -194,7 +208,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
go l.Run()

select {
case <-stop:
case <-ctx.Done():
// We are done
return nil
case err := <-cm.errChan:
Expand All @@ -203,43 +217,37 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start(stop <-chan struct{}) {
func (cm *controllerManager) start(ctx context.Context) {
func() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.stop = stop

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(stop); err != nil {
if err := cm.startCache(ctx.Done()); err != nil {
cm.errChan <- err
}
}()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(stop)
cm.cache.WaitForCacheSync(ctx.Done())

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(stop)
// Because a controller can be added and started at any time, we always
// give them cm.stop.
cm.errChan <- ctrl.Start(cm.stop)
}()
}

cm.started = true
}()

select {
case <-stop:
// We are done
return
}
}
4 changes: 3 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package manager

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -52,7 +53,7 @@ type Manager interface {

// Start starts all registered Controllers and blocks until the Stop channel is closed.
// Returns an error if there is an error starting any controller.
Start(<-chan struct{}) error
Start(context.Context) error

// GetConfig returns an initialized Config
GetConfig() *rest.Config
Expand Down Expand Up @@ -204,6 +205,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
stop: make(chan struct{}),
}, nil
}

Expand Down
22 changes: 12 additions & 10 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package manager

import (
"context"
"fmt"

"github.com/go-logr/logr"
Expand All @@ -38,9 +39,11 @@ import (

var _ = Describe("manger.Manager", func() {
var stop chan struct{}
var ctx context.Context

BeforeEach(func() {
stop = make(chan struct{})
ctx = context.Background()
})

AfterEach(func() {
Expand Down Expand Up @@ -146,7 +149,7 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Start(stop)).NotTo(HaveOccurred())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()
<-c1
<-c2
Expand All @@ -157,9 +160,9 @@ var _ = Describe("manger.Manager", func() {
It("should stop when stop is called", func(done Done) {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
s := make(chan struct{})
close(s)
Expect(m.Start(s)).NotTo(HaveOccurred())
ctx, cancel := context.WithCancel(ctx)
cancel()
Expect(m.Start(ctx)).NotTo(HaveOccurred())

close(done)
})
Expand All @@ -172,7 +175,7 @@ var _ = Describe("manger.Manager", func() {
mgr.startCache = func(stop <-chan struct{}) error {
return fmt.Errorf("expected error")
}
Expect(m.Start(stop).Error()).To(ContainSubstring("expected error"))
Expect(m.Start(ctx).Error()).To(ContainSubstring("expected error"))

close(done)
})
Expand Down Expand Up @@ -203,7 +206,7 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Start(stop)).NotTo(HaveOccurred())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
close(done)
}()
<-c1
Expand Down Expand Up @@ -244,7 +247,7 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Start(stop)).NotTo(HaveOccurred())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()

// Wait for the Manager to start
Expand All @@ -271,7 +274,7 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Start(stop)).NotTo(HaveOccurred())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()

// Wait for the Manager to start
Expand Down Expand Up @@ -327,8 +330,7 @@ var _ = Describe("manger.Manager", func() {
},
stop: func(stop <-chan struct{}) error {
defer GinkgoRecover()
// Manager stop chan has not been initialized.
Expect(stop).To(BeNil())
Expect(stop).NotTo(BeNil())
return nil
},
f: func(f inject.Func) error {
Expand Down