From 9940b92c2ed39d5d4a700738e7485b211c709fb6 Mon Sep 17 00:00:00 2001 From: Evan Cordell Date: Fri, 9 Nov 2018 16:47:50 -0500 Subject: [PATCH] feat(catalog): use operatorlister in catalog operator --- pkg/controller/operators/catalog/operator.go | 196 +++++++++++++----- .../operators/catalog/operator_test.go | 38 ++-- pkg/controller/registry/reconciler.go | 37 ++-- pkg/controller/registry/reconciler_test.go | 64 +++--- pkg/lib/operatorlister/configmap.go | 94 +++++++++ pkg/lib/operatorlister/lister.go | 8 + pkg/lib/operatorlister/pod.go | 94 +++++++++ pkg/lib/operatorlister/service.go | 2 +- 8 files changed, 411 insertions(+), 122 deletions(-) create mode 100644 pkg/lib/operatorlister/configmap.go create mode 100644 pkg/lib/operatorlister/pod.go diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 4f30a3964bb..48e1d08e60f 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -4,16 +4,20 @@ import ( "encoding/json" "errors" "fmt" - "k8s.io/client-go/informers" "sync" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -46,13 +50,15 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) } // resolving dependencies in a catalog. type Operator struct { *queueinformer.Operator - client versioned.Interface - namespace string - sources map[registry.ResourceKey]registry.Source - sourcesLock sync.RWMutex - sourcesLastUpdate metav1.Time - dependencyResolver resolver.DependencyResolver - subQueue workqueue.RateLimitingInterface + client versioned.Interface + namespace string + sources map[registry.ResourceKey]registry.Source + sourcesLock sync.RWMutex + sourcesLastUpdate metav1.Time + dependencyResolver resolver.DependencyResolver + subQueue workqueue.RateLimitingInterface + catSrcQueue workqueue.RateLimitingInterface + lister operatorlister.OperatorLister configmapRegistryReconciler *registry.ConfigMapRegistryReconciler } @@ -96,6 +102,7 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapR Operator: queueOperator, client: crClient, namespace: operatorNamespace, + lister: operatorlister.NewLister(), sources: make(map[registry.ResourceKey]registry.Source), dependencyResolver: &resolver.MultiSourceResolver{}, } @@ -113,6 +120,7 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapR for _, informer := range catsrcQueueInformer { op.RegisterQueueInformer(informer) } + op.catSrcQueue = catsrcQueue // Register InstallPlan informers. ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "installplans") @@ -143,58 +151,115 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapR op.RegisterQueueInformer(informer) } - // Creates registry pods in response to configmaps - informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval) - roleInformer := informerFactory.Rbac().V1().Roles() - roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() - serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() - serviceInformer := informerFactory.Core().V1().Services() - podInformer := informerFactory.Core().V1().Pods() - configMapInformer := informerFactory.Core().V1().ConfigMaps() + handleDelete := &cache.ResourceEventHandlerFuncs{ + DeleteFunc: op.handleDeletion, + } + // Set up informers for requeuing catalogs + for _, namespace := range watchedNamespaces { + roleQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "role") + roleBindingQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rolebinding") + serviceAccountQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount") + serviceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "service") + podQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod") + configmapQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "configmap") + + informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)) + informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval) + roleInformer := informerFactory.Rbac().V1().Roles() + roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + configMapInformer := informerFactory.Core().V1().ConfigMaps() + + queueInformers := []*queueinformer.QueueInformer{ + queueinformer.NewInformer(roleQueue, roleInformer.Informer(), op.syncObject, handleDelete, "role", metrics.NewMetricsNil()), + queueinformer.NewInformer(roleBindingQueue, roleBindingInformer.Informer(), op.syncObject, handleDelete, "rolebinding", metrics.NewMetricsNil()), + queueinformer.NewInformer(serviceAccountQueue, serviceAccountInformer.Informer(), op.syncObject, handleDelete, "serviceaccount", metrics.NewMetricsNil()), + queueinformer.NewInformer(serviceQueue, serviceInformer.Informer(), op.syncObject, handleDelete, "service", metrics.NewMetricsNil()), + queueinformer.NewInformer(podQueue, podInformer.Informer(), op.syncObject, handleDelete, "pod", metrics.NewMetricsNil()), + queueinformer.NewInformer(configmapQueue, configMapInformer.Informer(), op.syncObject, handleDelete, "configmap", metrics.NewMetricsNil()), + } + for _, q := range queueInformers { + op.RegisterQueueInformer(q) + } + + op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister()) + op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister()) + op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister()) + op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister()) + op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) + op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) + } op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ - Image: configmapRegistryImage, + Image: configmapRegistryImage, OpClient: op.OpClient, - RoleLister: roleInformer.Lister(), - RoleBindingLister: roleBindingInformer.Lister(), - ServiceAccountLister: serviceAccountInformer.Lister(), - ServiceLister: serviceInformer.Lister(), - PodLister: podInformer.Lister(), - ConfigMapLister: configMapInformer.Lister(), - } - - // register informers for configmapRegistryReconciler - registryInformers := []cache.SharedIndexInformer{ - roleInformer.Informer(), - roleBindingInformer.Informer(), - serviceAccountInformer.Informer(), - serviceInformer.Informer(), - podInformer.Informer(), - configMapInformer.Informer(), - } - - // TODO: won't this possibly conflict since GVK isn't part of the queue entry? - registryQueueInformers := queueinformer.New( - workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "registry"), - registryInformers, - op.syncRegistry, - nil, - "registry", - metrics.NewMetricsNil(), - ) - for _, informer := range registryQueueInformers { - op.RegisterQueueInformer(informer) + Lister: op.lister, } return op, nil } -func (o *Operator) syncRegistry(obj interface{}) (syncError error) { - switch obj.(type) { - case *corev1.ConfigMap: - // requeue catalogsource +func (o *Operator) syncObject(obj interface{}) (syncError error) { + // Assert as runtime.Object + runtimeObj, ok := obj.(runtime.Object) + if !ok { + syncError = errors.New("object sync: casting to runtime.Object failed") + log.Warn(syncError.Error()) + return + } + + gvk := runtimeObj.GetObjectKind().GroupVersionKind() + logger := log.WithFields(log.Fields{ + "group": gvk.Group, + "version": gvk.Version, + "kind": gvk.Kind, + }) + + // Assert as metav1.Object + metaObj, ok := obj.(metav1.Object) + if !ok { + syncError = errors.New("object sync: casting to metav1.Object failed") + logger.Warn(syncError.Error()) + return + } + logger = logger.WithFields(log.Fields{ + "name": metaObj.GetName(), + "namespace": metaObj.GetNamespace(), + }) + + logger.Debug("syncing") + + if ownerutil.IsOwnedByKind(metaObj, v1alpha1.CatalogSourceKind) { + logger.Debug("requeueing owner CatalogSource") + owner := ownerutil.GetOwnerByKind(metaObj, v1alpha1.CatalogSourceKind) + o.catSrcQueue.AddRateLimited(fmt.Sprintf("%s/%s", metaObj.GetNamespace(), owner.Name)) } + return nil } +func (o *Operator) handleDeletion(obj interface{}) { + ownee, ok := obj.(metav1.Object) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + + ownee, ok = tombstone.Obj.(metav1.Object) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Namespace %#v", obj)) + return + } + } + + if ownerutil.IsOwnedByKind(ownee, v1alpha1.CatalogSourceKind) { + owner := ownerutil.GetOwnerByKind(ownee, v1alpha1.CatalogSourceKind) + o.catSrcQueue.AddRateLimited(fmt.Sprintf("%s/%s", ownee.GetNamespace(), owner.Name)) + } +} + func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { catsrc, ok := obj.(*v1alpha1.CatalogSource) if !ok { @@ -203,7 +268,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { } logger := log.WithFields(log.Fields{ - "source": catsrc.GetName(), + "source": catsrc.GetName(), }) if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap { @@ -220,12 +285,14 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) { // Get the catalog source's config map - configMap, err := o.configmapRegistryReconciler.ConfigMapLister.ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap) + configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap) if err != nil { return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err) } - if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID == configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.ResourceVersion { + sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} + + if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() { // configmap ref nonexistant or updated, write out the new configmap ref to status and exit out := catsrc.DeepCopy() out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ @@ -239,7 +306,6 @@ func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.Catal // update source map o.sourcesLock.Lock() defer o.sourcesLock.Unlock() - sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap) o.sources[sourceKey] = src if err != nil { @@ -247,13 +313,26 @@ func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.Catal } // update status - if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil { + if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { return err } o.sourcesLastUpdate = timeNow() return nil } + // configmap not parsed to memory, but also not out of date + if _, ok := o.sources[sourceKey]; !ok { + // update source map + o.sourcesLock.Lock() + defer o.sourcesLock.Unlock() + src, err := registry.NewInMemoryFromConfigMap(o.OpClient, catsrc.GetNamespace(), catsrc.Spec.ConfigMap) + o.sources[sourceKey] = src + if err != nil { + return err + } + o.sourcesLastUpdate = timeNow() + } + // configmap ref is up to date, continue parsing if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) { // if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it @@ -263,8 +342,13 @@ func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.Catal logger.WithError(err).Warn("couldn't ensure registry server") return err } + + if !catsrc.Status.LastSync.Before(&out.Status.LastSync) { + return nil + } + // update status - if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil { + if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { return err } o.sourcesLastUpdate = timeNow() diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index 1cd726bb8c8..028c6ae377b 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -3,11 +3,13 @@ package catalog import ( "errors" "fmt" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" "testing" "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "github.com/ghodss/yaml" "github.com/stretchr/testify/require" @@ -153,7 +155,7 @@ func TestSyncCatalogSources(t *testing.T) { UID: types.UID("catalog-uid"), }, Spec: v1alpha1.CatalogSourceSpec{ - ConfigMap: "cool-configmap", + ConfigMap: "cool-configmap", SourceType: "nope", }, }, @@ -167,7 +169,7 @@ func TestSyncCatalogSources(t *testing.T) { Data: fakeConfigMapData(), }, expectedStatus: nil, - expectedError: nil, + expectedError: nil, }, { testName: "CatalogSourceWithBackingConfigMap", @@ -179,7 +181,7 @@ func TestSyncCatalogSources(t *testing.T) { UID: types.UID("catalog-uid"), }, Spec: v1alpha1.CatalogSourceSpec{ - ConfigMap: "cool-configmap", + ConfigMap: "cool-configmap", SourceType: v1alpha1.SourceTypeInternal, }, }, @@ -212,7 +214,7 @@ func TestSyncCatalogSources(t *testing.T) { UID: types.UID("catalog-uid"), }, Spec: v1alpha1.CatalogSourceSpec{ - ConfigMap: "cool-configmap", + ConfigMap: "cool-configmap", SourceType: v1alpha1.SourceTypeConfigmap, }, Status: v1alpha1.CatalogSourceStatus{ @@ -253,7 +255,7 @@ func TestSyncCatalogSources(t *testing.T) { UID: types.UID("catalog-uid"), }, Spec: v1alpha1.CatalogSourceSpec{ - ConfigMap: "cool-configmap", + ConfigMap: "cool-configmap", SourceType: v1alpha1.SourceTypeConfigmap, }, }, @@ -279,7 +281,7 @@ func TestSyncCatalogSources(t *testing.T) { UID: types.UID("catalog-uid"), }, Spec: v1alpha1.CatalogSourceSpec{ - ConfigMap: "cool-configmap", + ConfigMap: "cool-configmap", SourceType: v1alpha1.SourceTypeConfigmap, }, }, @@ -411,7 +413,7 @@ func fakeConfigMapData() map[string]string { } // NewFakeOperator creates a new operator using fake clients -func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string, stopc <- chan struct{}) (*Operator, error) { +func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string, stopc <-chan struct{}) (*Operator, error) { // Create client fakes clientFake := fake.NewSimpleClientset(clientObjs...) opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), apiextensionsfake.NewSimpleClientset(extObjs...), apiregistrationfake.NewSimpleClientset(regObjs...)) @@ -431,25 +433,29 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO podInformer := informerFactory.Core().V1().Pods() configMapInformer := informerFactory.Core().V1().ConfigMaps() + lister := operatorlister.NewLister() + lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister()) + lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister()) + lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister()) + lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister()) + lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) + lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) + // Create the new operator queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake) op := &Operator{ Operator: queueOperator, client: clientFake, + lister: lister, namespace: namespace, sources: make(map[registry.ResourceKey]registry.Source), dependencyResolver: resolver, } op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ - Image: "test:pod", + Image: "test:pod", OpClient: op.OpClient, - RoleLister: roleInformer.Lister(), - RoleBindingLister: roleBindingInformer.Lister(), - ServiceAccountLister: serviceAccountInformer.Lister(), - ServiceLister: serviceInformer.Lister(), - PodLister: podInformer.Lister(), - ConfigMapLister: configMapInformer.Lister(), + Lister: lister, } // register informers for configmapRegistryReconciler diff --git a/pkg/controller/registry/reconciler.go b/pkg/controller/registry/reconciler.go index e2d07a5ea3b..181635a05f5 100644 --- a/pkg/controller/registry/reconciler.go +++ b/pkg/controller/registry/reconciler.go @@ -2,8 +2,11 @@ package registry import ( "fmt" + "time" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" "github.com/sirupsen/logrus" "k8s.io/api/core/v1" @@ -11,9 +14,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" - v1lister "k8s.io/client-go/listers/core/v1" - rbacv1lister "k8s.io/client-go/listers/rbac/v1" - "time" ) var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) } @@ -104,7 +104,7 @@ func (s *catalogSourceDecorator) Pod(image string) *v1.Pod { } func (s *catalogSourceDecorator) ServiceAccount() *v1.ServiceAccount { - sa:= &v1.ServiceAccount{ + sa := &v1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: s.serviceAccountName(), Namespace: s.GetNamespace(), @@ -115,7 +115,7 @@ func (s *catalogSourceDecorator) ServiceAccount() *v1.ServiceAccount { } func (s *catalogSourceDecorator) Role() *rbacv1.Role { - role:= &rbacv1.Role{ + role := &rbacv1.Role{ ObjectMeta: metav1.ObjectMeta{ Name: s.roleName(), Namespace: s.GetNamespace(), @@ -134,7 +134,7 @@ func (s *catalogSourceDecorator) Role() *rbacv1.Role { } func (s *catalogSourceDecorator) RoleBinding() *rbacv1.RoleBinding { - rb:=&rbacv1.RoleBinding{ + rb := &rbacv1.RoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: s.GetName() + "-server-configmap-reader", Namespace: s.GetNamespace(), @@ -161,21 +161,16 @@ type RegistryReconciler interface { } type ConfigMapRegistryReconciler struct { - ConfigMapLister v1lister.ConfigMapLister - ServiceLister v1lister.ServiceLister - RoleBindingLister rbacv1lister.RoleBindingLister - RoleLister rbacv1lister.RoleLister - PodLister v1lister.PodLister - ServiceAccountLister v1lister.ServiceAccountLister - OpClient operatorclient.ClientInterface - Image string + Lister operatorlister.OperatorLister + OpClient operatorclient.ClientInterface + Image string } var _ RegistryReconciler = &ConfigMapRegistryReconciler{} func (c *ConfigMapRegistryReconciler) currentService(source catalogSourceDecorator) *v1.Service { serviceName := source.Service().GetName() - service, err := c.ServiceLister.Services(source.GetNamespace()).Get(serviceName) + service, err := c.Lister.CoreV1().ServiceLister().Services(source.GetNamespace()).Get(serviceName) if err != nil { logrus.WithField("service", serviceName).Warn("couldn't find service in cache") return nil @@ -185,7 +180,7 @@ func (c *ConfigMapRegistryReconciler) currentService(source catalogSourceDecorat func (c *ConfigMapRegistryReconciler) currentServiceAccount(source catalogSourceDecorator) *v1.ServiceAccount { serviceAccountName := source.ServiceAccount().GetName() - serviceAccount, err := c.ServiceAccountLister.ServiceAccounts(source.GetNamespace()).Get(serviceAccountName) + serviceAccount, err := c.Lister.CoreV1().ServiceAccountLister().ServiceAccounts(source.GetNamespace()).Get(serviceAccountName) if err != nil { logrus.WithField("serviceAccouint", serviceAccountName).WithError(err).Warn("couldn't find service account in cache") return nil @@ -195,7 +190,7 @@ func (c *ConfigMapRegistryReconciler) currentServiceAccount(source catalogSource func (c *ConfigMapRegistryReconciler) currentRole(source catalogSourceDecorator) *rbacv1.Role { roleName := source.Role().GetName() - role, err := c.RoleLister.Roles(source.GetNamespace()).Get(roleName) + role, err := c.Lister.RbacV1().RoleLister().Roles(source.GetNamespace()).Get(roleName) if err != nil { logrus.WithField("role", roleName).WithError(err).Warn("couldn't find role in cache") return nil @@ -205,7 +200,7 @@ func (c *ConfigMapRegistryReconciler) currentRole(source catalogSourceDecorator) func (c *ConfigMapRegistryReconciler) currentRoleBinding(source catalogSourceDecorator) *rbacv1.RoleBinding { roleBindingName := source.RoleBinding().GetName() - roleBinding, err := c.RoleBindingLister.RoleBindings(source.GetNamespace()).Get(roleBindingName) + roleBinding, err := c.Lister.RbacV1().RoleBindingLister().RoleBindings(source.GetNamespace()).Get(roleBindingName) if err != nil { logrus.WithField("roleBinding", roleBindingName).WithError(err).Warn("couldn't find role binding in cache") return nil @@ -215,7 +210,7 @@ func (c *ConfigMapRegistryReconciler) currentRoleBinding(source catalogSourceDec func (c *ConfigMapRegistryReconciler) currentPods(source catalogSourceDecorator, image string) []*v1.Pod { podName := source.Pod(image).GetName() - pods, err := c.PodLister.Pods(source.GetNamespace()).List(source.Selector()) + pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(source.Selector()) if err != nil { logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache") return nil @@ -228,7 +223,7 @@ func (c *ConfigMapRegistryReconciler) currentPods(source catalogSourceDecorator, func (c *ConfigMapRegistryReconciler) currentPodsWithCorrectResourceVersion(source catalogSourceDecorator, image string) []*v1.Pod { podName := source.Pod(image).GetName() - pods, err := c.PodLister.Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels())) + pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels())) if err != nil { logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache") return nil @@ -244,7 +239,7 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alph source := catalogSourceDecorator{catalogSource} // fetch configmap first, exit early if we can't find it - configMap, err := c.ConfigMapLister.ConfigMaps(source.GetNamespace()).Get(source.Spec.ConfigMap) + configMap, err := c.Lister.CoreV1().ConfigMapLister().ConfigMaps(source.GetNamespace()).Get(source.Spec.ConfigMap) if err != nil { return fmt.Errorf("unable to get configmap %s/%s from cache", source.GetNamespace(), source.Spec.ConfigMap) } diff --git a/pkg/controller/registry/reconciler_test.go b/pkg/controller/registry/reconciler_test.go index d5e0df867ee..070ae01c381 100644 --- a/pkg/controller/registry/reconciler_test.go +++ b/pkg/controller/registry/reconciler_test.go @@ -2,9 +2,13 @@ package registry import ( "fmt" + "testing" + "time" + "github.com/ghodss/yaml" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -14,12 +18,13 @@ import ( "k8s.io/client-go/informers" k8sfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - "testing" - "time" ) + const ( registryImageName = "test:image" + testNamespace = "testns" ) + func reconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) (*ConfigMapRegistryReconciler, operatorclient.ClientInterface) { opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), nil, nil) @@ -41,15 +46,18 @@ func reconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) ( configMapInformer.Informer(), } + lister := operatorlister.NewLister() + lister.RbacV1().RegisterRoleLister(testNamespace, roleInformer.Lister()) + lister.RbacV1().RegisterRoleBindingLister(testNamespace, roleBindingInformer.Lister()) + lister.CoreV1().RegisterServiceAccountLister(testNamespace, serviceAccountInformer.Lister()) + lister.CoreV1().RegisterServiceLister(testNamespace, serviceInformer.Lister()) + lister.CoreV1().RegisterPodLister(testNamespace, podInformer.Lister()) + lister.CoreV1().RegisterConfigMapLister(testNamespace, configMapInformer.Lister()) + rec := &ConfigMapRegistryReconciler{ - Image: registryImageName, - OpClient: opClientFake, - RoleLister: roleInformer.Lister(), - RoleBindingLister: roleBindingInformer.Lister(), - ServiceAccountLister: serviceAccountInformer.Lister(), - ServiceLister: serviceInformer.Lister(), - PodLister: podInformer.Lister(), - ConfigMapLister: configMapInformer.Lister(), + Image: registryImageName, + OpClient: opClientFake, + Lister: lister, } var hasSyncedCheckFns []cache.InformerSynced @@ -69,11 +77,11 @@ func crd(name string) v1beta1.CustomResourceDefinition { Name: name, }, Spec: v1beta1.CustomResourceDefinitionSpec{ - Group: name + "group", + Group: name + "group", Versions: []v1beta1.CustomResourceDefinitionVersion{ { - Name: "v1", - Served: true, + Name: "v1", + Served: true, Storage: true, }, }, @@ -91,7 +99,7 @@ func validConfigMap() *corev1.ConfigMap { return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "cool-configmap", - Namespace: "cool-namespace", + Namespace: testNamespace, UID: types.UID("configmap-uid"), ResourceVersion: "resource-version", }, @@ -109,7 +117,7 @@ func validCatalogSource(configMap *corev1.ConfigMap) *v1alpha1.CatalogSource { return &v1alpha1.CatalogSource{ ObjectMeta: metav1.ObjectMeta{ Name: "cool-catalog", - Namespace: "cool-namespace", + Namespace: testNamespace, UID: types.UID("catalog-uid"), }, Spec: v1alpha1.CatalogSourceSpec{ @@ -118,9 +126,9 @@ func validCatalogSource(configMap *corev1.ConfigMap) *v1alpha1.CatalogSource { }, Status: v1alpha1.CatalogSourceStatus{ ConfigMapResource: &v1alpha1.ConfigMapResourceReference{ - Name: configMap.GetName(), - Namespace: configMap.GetNamespace(), - UID: configMap.GetUID(), + Name: configMap.GetName(), + Namespace: configMap.GetNamespace(), + UID: configMap.GetUID(), ResourceVersion: configMap.GetResourceVersion(), }, }, @@ -154,7 +162,7 @@ func objectsForCatalogSource(catsrc *v1alpha1.CatalogSource) []runtime.Object { func modifyObjName(objs []runtime.Object, kind, newName string) []runtime.Object { out := []runtime.Object{} - for _, o :=range objs { + for _, o := range objs { if o.GetObjectKind().GroupVersionKind().Kind == kind { mo := o.(metav1.Object) mo.SetName(newName) @@ -176,7 +184,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { } type in struct { cluster cluster - catsrc *v1alpha1.CatalogSource + catsrc *v1alpha1.CatalogSource } type out struct { status *v1alpha1.RegistryServiceStatus @@ -191,7 +199,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { testName: "NoConfigMap", in: in{ cluster: cluster{}, - catsrc: &v1alpha1.CatalogSource{}, + catsrc: &v1alpha1.CatalogSource{}, }, out: out{ err: fmt.Errorf("unable to get configmap / from cache"), @@ -209,7 +217,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, @@ -226,7 +234,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, @@ -243,7 +251,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, @@ -260,7 +268,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, @@ -277,7 +285,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, @@ -294,7 +302,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, @@ -311,7 +319,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) { status: &v1alpha1.RegistryServiceStatus{ Protocol: "grpc", ServiceName: "cool-catalog", - ServiceNamespace: "cool-namespace", + ServiceNamespace: testNamespace, Port: "50051", }, }, diff --git a/pkg/lib/operatorlister/configmap.go b/pkg/lib/operatorlister/configmap.go new file mode 100644 index 00000000000..e15132103d4 --- /dev/null +++ b/pkg/lib/operatorlister/configmap.go @@ -0,0 +1,94 @@ +package operatorlister + +import ( + "fmt" + "sync" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + corev1 "k8s.io/client-go/listers/core/v1" +) + +type UnionConfigMapLister struct { + configMapListers map[string]corev1.ConfigMapLister + configMapLock sync.RWMutex +} + +// List lists all ConfigMaps in the indexer. +func (usl *UnionConfigMapLister) List(selector labels.Selector) (ret []*v1.ConfigMap, err error) { + usl.configMapLock.RLock() + defer usl.configMapLock.RUnlock() + + set := make(map[types.UID]*v1.ConfigMap) + for _, sl := range usl.configMapListers { + configMaps, err := sl.List(selector) + if err != nil { + return nil, err + } + + for _, configMap := range configMaps { + set[configMap.GetUID()] = configMap + } + } + + for _, configMap := range set { + ret = append(ret, configMap) + } + + return +} + +// ConfigMaps returns an object that can list and get ConfigMaps. +func (usl *UnionConfigMapLister) ConfigMaps(namespace string) corev1.ConfigMapNamespaceLister { + usl.configMapLock.RLock() + defer usl.configMapLock.RUnlock() + + // Check for specific namespace listers + if sl, ok := usl.configMapListers[namespace]; ok { + return sl.ConfigMaps(namespace) + } + + // Check for any namespace-all listers + if sl, ok := usl.configMapListers[metav1.NamespaceAll]; ok { + return sl.ConfigMaps(namespace) + } + + return &NullConfigMapNamespaceLister{} +} + +func (usl *UnionConfigMapLister) RegisterConfigMapLister(namespace string, lister corev1.ConfigMapLister) { + usl.configMapLock.Lock() + defer usl.configMapLock.Unlock() + + if usl.configMapListers == nil { + usl.configMapListers = make(map[string]corev1.ConfigMapLister) + } + usl.configMapListers[namespace] = lister +} + +func (l *coreV1Lister) RegisterConfigMapLister(namespace string, lister corev1.ConfigMapLister) { + l.configMapLister.RegisterConfigMapLister(namespace, lister) +} + +func (l *coreV1Lister) ConfigMapLister() corev1.ConfigMapLister { + return l.configMapLister +} + +// NullConfigMapNamespaceLister is an implementation of a null ConfigMapNamespaceLister. It is +// used to prevent nil pointers when no ConfigMapNamespaceLister has been registered for a given +// namespace. +type NullConfigMapNamespaceLister struct { + corev1.ConfigMapNamespaceLister +} + +// List returns nil and an error explaining that this is a NullConfigMapNamespaceLister. +func (n *NullConfigMapNamespaceLister) List(selector labels.Selector) (ret []*v1.ConfigMap, err error) { + return nil, fmt.Errorf("cannot list ConfigMaps with a NullConfigMapNamespaceLister") +} + +// Get returns nil and an error explaining that this is a NullConfigMapNamespaceLister. +func (n *NullConfigMapNamespaceLister) Get(name string) (*v1.ConfigMap, error) { + return nil, fmt.Errorf("cannot get ConfigMap with a NullConfigMapNamespaceLister") +} diff --git a/pkg/lib/operatorlister/lister.go b/pkg/lib/operatorlister/lister.go index ee6fbcd3eb0..9c589884bbf 100644 --- a/pkg/lib/operatorlister/lister.go +++ b/pkg/lib/operatorlister/lister.go @@ -30,12 +30,16 @@ type CoreV1Lister interface { RegisterSecretLister(namespace string, lister corev1.SecretLister) RegisterServiceLister(namespace string, lister corev1.ServiceLister) RegisterServiceAccountLister(namespace string, lister corev1.ServiceAccountLister) + RegisterPodLister(namespace string, lister corev1.PodLister) + RegisterConfigMapLister(namespace string, lister corev1.ConfigMapLister) RegisterNamespaceLister(lister corev1.NamespaceLister) SecretLister() corev1.SecretLister ServiceLister() corev1.ServiceLister ServiceAccountLister() corev1.ServiceAccountLister NamespaceLister() corev1.NamespaceLister + PodLister() corev1.PodLister + ConfigMapLister() corev1.ConfigMapLister } type RbacV1Lister interface { @@ -83,6 +87,8 @@ type coreV1Lister struct { serviceLister *UnionServiceLister serviceAccountLister *UnionServiceAccountLister namespaceLister *UnionNamespaceLister + podLister *UnionPodLister + configMapLister *UnionConfigMapLister } func newCoreV1Lister() *coreV1Lister { @@ -91,6 +97,8 @@ func newCoreV1Lister() *coreV1Lister { serviceLister: &UnionServiceLister{}, serviceAccountLister: &UnionServiceAccountLister{}, namespaceLister: &UnionNamespaceLister{}, + podLister: &UnionPodLister{}, + configMapLister: &UnionConfigMapLister{}, } } diff --git a/pkg/lib/operatorlister/pod.go b/pkg/lib/operatorlister/pod.go new file mode 100644 index 00000000000..1dcaf0bc117 --- /dev/null +++ b/pkg/lib/operatorlister/pod.go @@ -0,0 +1,94 @@ +package operatorlister + +import ( + "fmt" + "sync" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + corev1 "k8s.io/client-go/listers/core/v1" +) + +type UnionPodLister struct { + podListers map[string]corev1.PodLister + podLock sync.RWMutex +} + +// List lists all Pods in the indexer. +func (usl *UnionPodLister) List(selector labels.Selector) (ret []*v1.Pod, err error) { + usl.podLock.RLock() + defer usl.podLock.RUnlock() + + set := make(map[types.UID]*v1.Pod) + for _, sl := range usl.podListers { + pods, err := sl.List(selector) + if err != nil { + return nil, err + } + + for _, pod := range pods { + set[pod.GetUID()] = pod + } + } + + for _, pod := range set { + ret = append(ret, pod) + } + + return +} + +// Pods returns an object that can list and get Pods. +func (usl *UnionPodLister) Pods(namespace string) corev1.PodNamespaceLister { + usl.podLock.RLock() + defer usl.podLock.RUnlock() + + // Check for specific namespace listers + if sl, ok := usl.podListers[namespace]; ok { + return sl.Pods(namespace) + } + + // Check for any namespace-all listers + if sl, ok := usl.podListers[metav1.NamespaceAll]; ok { + return sl.Pods(namespace) + } + + return &NullPodNamespaceLister{} +} + +func (usl *UnionPodLister) RegisterPodLister(namespace string, lister corev1.PodLister) { + usl.podLock.Lock() + defer usl.podLock.Unlock() + + if usl.podListers == nil { + usl.podListers = make(map[string]corev1.PodLister) + } + usl.podListers[namespace] = lister +} + +func (l *coreV1Lister) RegisterPodLister(namespace string, lister corev1.PodLister) { + l.podLister.RegisterPodLister(namespace, lister) +} + +func (l *coreV1Lister) PodLister() corev1.PodLister { + return l.podLister +} + +// NullPodNamespaceLister is an implementation of a null PodNamespaceLister. It is +// used to prevent nil pointers when no PodNamespaceLister has been registered for a given +// namespace. +type NullPodNamespaceLister struct { + corev1.PodNamespaceLister +} + +// List returns nil and an error explaining that this is a NullPodNamespaceLister. +func (n *NullPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) { + return nil, fmt.Errorf("cannot list Pods with a NullPodNamespaceLister") +} + +// Get returns nil and an error explaining that this is a NullPodNamespaceLister. +func (n *NullPodNamespaceLister) Get(name string) (*v1.Pod, error) { + return nil, fmt.Errorf("cannot get Pod with a NullPodNamespaceLister") +} diff --git a/pkg/lib/operatorlister/service.go b/pkg/lib/operatorlister/service.go index 1660fac3662..75bdccf34fc 100644 --- a/pkg/lib/operatorlister/service.go +++ b/pkg/lib/operatorlister/service.go @@ -111,6 +111,6 @@ func (n *NullServiceNamespaceLister) Get(name string) (*v1.Service, error) { } // GetPodServices returns nil and an error explaining that this is a NullServiceNamespaceLister. -func (n *NullServiceAccountNamespaceLister) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) { +func (n *NullServiceNamespaceLister) GetPodServices(pod *v1.Pod) ([]*v1.Service, error) { return nil, fmt.Errorf("could not get pod services with a NullServiceNamespaceLister") }