From 782b674e602e0870d9b7aab4d18cdfc1b315c28f Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 5 Oct 2019 17:44:41 +0200 Subject: [PATCH] :bug: Return error when trying to read from an unstarted cache --- pkg/cache/informer_cache.go | 23 +++++++++++++++---- pkg/cache/internal/cache_reader.go | 2 +- pkg/cache/internal/deleg_map.go | 2 +- pkg/cache/internal/informers_map.go | 8 +++---- pkg/controller/controller_integration_test.go | 8 +++++++ 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index b906a2e08f..036c95e5c3 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -38,6 +38,13 @@ var ( _ Cache = &informerCache{} ) +// ErrCacheNotStarted is returned when trying to read from the cache that wasn't started. +type ErrCacheNotStarted struct{} + +func (*ErrCacheNotStarted) Error() string { + return "the cache is not started, can not read objects" +} + // informerCache is a Kubernetes Object cache populated from InformersMap. informerCache wraps an InformersMap. type informerCache struct { *internal.InformersMap @@ -50,10 +57,14 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt return err } - cache, err := ip.InformersMap.Get(gvk, out) + started, cache, err := ip.InformersMap.Get(gvk, out) if err != nil { return err } + + if !started { + return &ErrCacheNotStarted{} + } return cache.Reader.Get(ctx, key, out) } @@ -90,11 +101,15 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c } } - cache, err := ip.InformersMap.Get(gvk, cacheTypeObj) + started, cache, err := ip.InformersMap.Get(gvk, cacheTypeObj) if err != nil { return err } + if !started { + return &ErrCacheNotStarted{} + } + return cache.Reader.List(ctx, out, opts...) } @@ -105,7 +120,7 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform if err != nil { return nil, err } - i, err := ip.InformersMap.Get(gvk, obj) + _, i, err := ip.InformersMap.Get(gvk, obj) if err != nil { return nil, err } @@ -118,7 +133,7 @@ func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) { if err != nil { return nil, err } - i, err := ip.InformersMap.Get(gvk, obj) + _, i, err := ip.InformersMap.Get(gvk, obj) if err != nil { return nil, err } diff --git a/pkg/cache/internal/cache_reader.go b/pkg/cache/internal/cache_reader.go index a9b7ae347c..62152130ff 100644 --- a/pkg/cache/internal/cache_reader.go +++ b/pkg/cache/internal/cache_reader.go @@ -33,7 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/internal/objectutil" ) -// CacheReader is a CacheReader +// CacheReader is a client.Reader var _ client.Reader = &CacheReader{} // CacheReader wraps a cache.Index to implement the client.CacheReader interface for a single type diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 978002d9cf..8e2c65f4ba 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -73,7 +73,7 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool { // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns // the Informer from the map. -func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) { +func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { _, isUnstructured := obj.(*unstructured.Unstructured) _, isUnstructuredList := obj.(*unstructured.UnstructuredList) isUnstructured = isUnstructured || isUnstructuredList diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index b2787adfc8..7345c54b49 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -145,7 +145,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) { +func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -157,18 +157,18 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj if !ok { var err error if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { - return nil, err + return started, nil, err } } if started && !i.Informer.HasSynced() { // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) { - return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj) + return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj) } } - return i, nil + return started, i, nil } func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index df2d9c1f90..392fb285c1 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -17,11 +17,14 @@ limitations under the License. package controller_test import ( + "context" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -73,6 +76,11 @@ var _ = Describe("controller", func() { err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) + err = cm.GetClient().Get(context.Background(), types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) + Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) + err = cm.GetClient().List(context.Background(), &corev1.NamespaceList{}) + Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) + By("Starting the Manager") go func() { defer GinkgoRecover()