Skip to content

Commit

Permalink
🐛 Return error when trying to read from an unstarted cache
Browse files Browse the repository at this point in the history
  • Loading branch information
alvaroaleman committed Oct 5, 2019
1 parent 801e12a commit 782b674
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 10 deletions.
23 changes: 19 additions & 4 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ var (
_ Cache = &informerCache{}
)

// ErrCacheNotStarted is returned when trying to read from the cache that wasn't started.
type ErrCacheNotStarted struct{}

func (*ErrCacheNotStarted) Error() string {
return "the cache is not started, can not read objects"
}

// informerCache is a Kubernetes Object cache populated from InformersMap. informerCache wraps an InformersMap.
type informerCache struct {
*internal.InformersMap
Expand All @@ -50,10 +57,14 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
return err
}

cache, err := ip.InformersMap.Get(gvk, out)
started, cache, err := ip.InformersMap.Get(gvk, out)
if err != nil {
return err
}

if !started {
return &ErrCacheNotStarted{}
}
return cache.Reader.Get(ctx, key, out)
}

Expand Down Expand Up @@ -90,11 +101,15 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
}
}

cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj)
if err != nil {
return err
}

if !started {
return &ErrCacheNotStarted{}
}

return cache.Reader.List(ctx, out, opts...)
}

Expand All @@ -105,7 +120,7 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform
if err != nil {
return nil, err
}
i, err := ip.InformersMap.Get(gvk, obj)
_, i, err := ip.InformersMap.Get(gvk, obj)
if err != nil {
return nil, err
}
Expand All @@ -118,7 +133,7 @@ func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
if err != nil {
return nil, err
}
i, err := ip.InformersMap.Get(gvk, obj)
_, i, err := ip.InformersMap.Get(gvk, obj)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/internal/objectutil"
)

// CacheReader is a CacheReader
// CacheReader is a client.Reader
var _ client.Reader = &CacheReader{}

// CacheReader wraps a cache.Index to implement the client.CacheReader interface for a single type
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -157,18 +157,18 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return nil, err
return started, nil, err
}
}

if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
}
}

return i, nil
return started, i, nil
}

func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package controller_test

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -73,6 +76,11 @@ var _ = Describe("controller", func() {
err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{})
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(context.Background(), types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expect(err).To(Equal(&cache.ErrCacheNotStarted{}))
err = cm.GetClient().List(context.Background(), &corev1.NamespaceList{})
Expect(err).To(Equal(&cache.ErrCacheNotStarted{}))

By("Starting the Manager")
go func() {
defer GinkgoRecover()
Expand Down

0 comments on commit 782b674

Please sign in to comment.