Skip to content

Commit

Permalink
⚠️ Propagate context.Context throughout the codebase
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Aug 6, 2020
1 parent 420cd15 commit 5ad43b9
Show file tree
Hide file tree
Showing 30 changed files with 414 additions and 459 deletions.
26 changes: 14 additions & 12 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,10 @@ func (typedNoop) Reconcile(context.Context, reconcile.Request) (reconcile.Result
}

var _ = Describe("application", func() {
var stop chan struct{}

BeforeEach(func() {
stop = make(chan struct{})
newController = controller.New
})

AfterEach(func() {
close(stop)
})

noop := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
})
Expand Down Expand Up @@ -212,17 +205,23 @@ var _ = Describe("application", func() {

Describe("Start with ControllerManagedBy", func() {
It("should Reconcile Owns objects", func(done Done) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

bldr := ControllerManagedBy(m).
For(&appsv1.Deployment{}).
Owns(&appsv1.ReplicaSet{})
doReconcileTest("3", stop, bldr, m, false)
doReconcileTest(ctx, "3", bldr, m, false)
close(done)
}, 10)

It("should Reconcile Watches objects", func(done Done) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

Expand All @@ -231,13 +230,16 @@ var _ = Describe("application", func() {
Watches( // Equivalent of Owns
&source.Kind{Type: &appsv1.ReplicaSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
doReconcileTest("4", stop, bldr, m, true)
doReconcileTest(ctx, "4", bldr, m, true)
close(done)
}, 10)
})

Describe("Set custom predicates", func() {
It("should execute registered predicates only for assigned kind", func(done Done) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -286,7 +288,7 @@ var _ = Describe("application", func() {
Owns(&appsv1.ReplicaSet{}, WithPredicates(replicaSetPrct)).
WithEventFilter(allPrct)

doReconcileTest("5", stop, bldr, m, true)
doReconcileTest(ctx, "5", bldr, m, true)

Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
Expand All @@ -298,7 +300,7 @@ var _ = Describe("application", func() {

})

func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix

Expand Down Expand Up @@ -328,7 +330,7 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
By("Starting the application")
go func() {
defer GinkgoRecover()
Expect(mgr.Start(stop)).NotTo(HaveOccurred())
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
By("Stopping the application")
}()

Expand Down
25 changes: 13 additions & 12 deletions pkg/builder/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package builder

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -89,11 +90,11 @@ var _ = Describe("application", func() {
}
}`)

stopCh := make(chan struct{})
close(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO: we may want to improve it to make it be able to inject dependencies,
// but not always try to load certs and return not found error.
err = svr.Start(stopCh)
err = svr.Start(ctx)
if err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
Expand Down Expand Up @@ -163,11 +164,11 @@ var _ = Describe("application", func() {
}
}`)

stopCh := make(chan struct{})
close(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO: we may want to improve it to make it be able to inject dependencies,
// but not always try to load certs and return not found error.
err = svr.Start(stopCh)
err = svr.Start(ctx)
if err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
Expand Down Expand Up @@ -234,11 +235,11 @@ var _ = Describe("application", func() {
}
}`)

stopCh := make(chan struct{})
close(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO: we may want to improve it to make it be able to inject dependencies,
// but not always try to load certs and return not found error.
err = svr.Start(stopCh)
err = svr.Start(ctx)
if err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
Expand Down Expand Up @@ -308,11 +309,11 @@ var _ = Describe("application", func() {
}
}
}`)
stopCh := make(chan struct{})
close(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO: we may want to improve it to make it be able to inject dependencies,
// but not always try to load certs and return not found error.
err = svr.Start(stopCh)
err = svr.Start(ctx)
if err != nil && !os.IsNotExist(err) {
Expect(err).NotTo(HaveOccurred())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ type Informers interface {

// Start runs all the informers known to this cache until the given channel is closed.
// It blocks.
Start(stopCh <-chan struct{}) error
Start(ctx context.Context) error

// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
WaitForCacheSync(stop <-chan struct{}) bool
WaitForCacheSync(ctx context.Context) bool

// Informers knows how to add indices to the caches (informers) that it manages.
client.FieldIndexer
Expand Down
45 changes: 26 additions & 19 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,17 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error)) {
Describe("Cache test", func() {
var (
informerCache cache.Cache
stop chan struct{}
knownPod1 runtime.Object
knownPod2 runtime.Object
knownPod3 runtime.Object
knownPod4 runtime.Object
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
knownPod1 runtime.Object
knownPod2 runtime.Object
knownPod3 runtime.Object
knownPod4 runtime.Object
)

BeforeEach(func() {
stop = make(chan struct{})
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating three pods")
Expand Down Expand Up @@ -123,11 +124,11 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(stopCh chan struct{}) {
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(stopCh)).To(Succeed())
}(stop)
Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue())
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})

AfterEach(func() {
Expand All @@ -137,7 +138,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
deletePod(knownPod3)
deletePod(knownPod4)

close(stop)
informerCacheCancel()
})

Describe("as a Reader", func() {
Expand Down Expand Up @@ -394,11 +395,13 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(namespacedCache.Start(stop)).To(Succeed())
Expect(namespacedCache.Start(ctx)).To(Succeed())
}()
Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse())
Expect(namespacedCache.WaitForCacheSync(ctx)).NotTo(BeFalse())

By("listing pods in all namespaces")
out := &unstructured.UnstructuredList{}
Expand Down Expand Up @@ -572,11 +575,13 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())

By("running the cache and waiting for it to sync")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(informer.Start(stop)).To(Succeed())
Expect(informer.Start(ctx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse())
Expect(informer.WaitForCacheSync(ctx)).NotTo(BeFalse())

By("listing Pods with restartPolicyOnFailure")
listObj := &kcorev1.PodList{}
Expand Down Expand Up @@ -636,7 +641,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Object: map[string]interface{}{
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
map[string]interface{}{
{
"name": "nginx",
"image": "nginx",
},
Expand Down Expand Up @@ -700,11 +705,13 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(informer.IndexField(context.TODO(), pod, "spec.restartPolicy", indexFunc)).To(Succeed())

By("running the cache and waiting for it to sync")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(informer.Start(stop)).To(Succeed())
Expect(informer.Start(ctx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse())
Expect(informer.WaitForCacheSync(ctx)).NotTo(BeFalse())

By("listing Pods with restartPolicyOnFailure")
listObj := &unstructured.UnstructuredList{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj runtime.Object) (ca
}

// WaitForCacheSync implements Informers
func (c *FakeInformers) WaitForCacheSync(stop <-chan struct{}) bool {
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
return true
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
}

// Start implements Informers
func (c *FakeInformers) Start(stopCh <-chan struct{}) error {
func (c *FakeInformers) Start(ctx context.Context) error {
return c.Error
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,25 @@ func NewInformersMap(config *rest.Config,
}

// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
func (m *InformersMap) Start(stop <-chan struct{}) error {
go m.structured.Start(stop)
go m.unstructured.Start(stop)
<-stop
func (m *InformersMap) Start(ctx context.Context) error {
go m.structured.Start(ctx)
go m.unstructured.Start(ctx)
<-ctx.Done()
return nil
}

// WaitForCacheSync waits until all the caches have been started and synced.
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool {
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...)
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...)

if !m.structured.waitForStarted(stop) {
if !m.structured.waitForStarted(ctx) {
return false
}
if !m.unstructured.waitForStarted(stop) {
if !m.unstructured.waitForStarted(ctx) {
return false
}
return cache.WaitForCacheSync(stop, syncedFuncs...)
return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...)
}

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,31 +123,31 @@ type specificInformersMap struct {

// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func (ip *specificInformersMap) Start(ctx context.Context) {
func() {
ip.mu.Lock()
defer ip.mu.Unlock()

// Set the stop channel so it can be passed to informers that are added later
ip.stop = stop
ip.stop = ctx.Done()

// Start each informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(stop)
go informer.Informer.Run(ctx.Done())
}

// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()
<-stop
<-ctx.Done()
}

func (ip *specificInformersMap) waitForStarted(stop <-chan struct{}) bool {
func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool {
select {
case <-ip.startWait:
return true
case <-stop:
case <-ctx.Done():
return false
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,23 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
}

func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error {
func (c *multiNamespaceCache) Start(ctx context.Context) error {
for ns, cache := range c.namespaceToCache {
go func(ns string, cache Cache) {
err := cache.Start(stopCh)
err := cache.Start(ctx)
if err != nil {
log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns)
}
}(ns, cache)
}
<-stopCh
<-ctx.Done()
return nil
}

func (c *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool {
func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
synced := true
for _, cache := range c.namespaceToCache {
if s := cache.WaitForCacheSync(stop); !s {
if s := cache.WaitForCacheSync(ctx); !s {
synced = s
}
}
Expand Down
Loading

0 comments on commit 5ad43b9

Please sign in to comment.