diff --git a/cmd/catalog/main.go b/cmd/catalog/main.go index f2848d50739..b103c576410 100644 --- a/cmd/catalog/main.go +++ b/cmd/catalog/main.go @@ -69,7 +69,7 @@ func main() { go http.ListenAndServe(":8080", nil) // Create a new instance of the operator. - catalogOperator, err := catalog.NewOperator(*kubeConfigPath, *wakeupInterval, *catalogNamespace, strings.Split(*watchedNamespaces, ",")...) + catalogOperator, err := catalog.NewOperator(*kubeConfigPath, *wakeupInterval, *configmapServerImage, *catalogNamespace, strings.Split(*watchedNamespaces, ",")...) if err != nil { log.Panicf("error configuring operator: %s", err.Error()) } diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 3b5e8ec9afb..4f30a3964bb 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "k8s.io/client-go/informers" "sync" "time" @@ -52,10 +53,11 @@ type Operator struct { sourcesLastUpdate metav1.Time dependencyResolver resolver.DependencyResolver subQueue workqueue.RateLimitingInterface + configmapRegistryReconciler *registry.ConfigMapRegistryReconciler } // NewOperator creates a new Catalog Operator. -func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) { +func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapRegistryImage string, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) { // Default to watching all namespaces. if watchedNamespaces == nil { watchedNamespaces = []string{metav1.NamespaceAll} @@ -141,9 +143,58 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNa op.RegisterQueueInformer(informer) } + // Creates registry pods in response to configmaps + informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval) + roleInformer := informerFactory.Rbac().V1().Roles() + roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + configMapInformer := informerFactory.Core().V1().ConfigMaps() + op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ + Image: configmapRegistryImage, + OpClient: op.OpClient, + RoleLister: roleInformer.Lister(), + RoleBindingLister: roleBindingInformer.Lister(), + ServiceAccountLister: serviceAccountInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + PodLister: podInformer.Lister(), + ConfigMapLister: configMapInformer.Lister(), + } + + // register informers for configmapRegistryReconciler + registryInformers := []cache.SharedIndexInformer{ + roleInformer.Informer(), + roleBindingInformer.Informer(), + serviceAccountInformer.Informer(), + serviceInformer.Informer(), + podInformer.Informer(), + configMapInformer.Informer(), + } + + // TODO: won't this possibly conflict since GVK isn't part of the queue entry? + registryQueueInformers := queueinformer.New( + workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "registry"), + registryInformers, + op.syncRegistry, + nil, + "registry", + metrics.NewMetricsNil(), + ) + for _, informer := range registryQueueInformers { + op.RegisterQueueInformer(informer) + } return op, nil } +func (o *Operator) syncRegistry(obj interface{}) (syncError error) { + switch obj.(type) { + case *corev1.ConfigMap: + // requeue catalogsource + } + return nil +} + func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { catsrc, ok := obj.(*v1alpha1.CatalogSource) if !ok { @@ -151,47 +202,76 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return fmt.Errorf("casting CatalogSource failed") } - // Get the catalog source's config map - configMap, err := o.OpClient.KubernetesInterface().CoreV1().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get catalog config map %s when updating status: %s", catsrc.Spec.ConfigMap, err) + logger := log.WithFields(log.Fields{ + "source": catsrc.GetName(), + }) + + if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap { + return o.syncConfigMapSource(logger, catsrc) } - o.sourcesLock.Lock() - defer o.sourcesLock.Unlock() - sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} - _, ok = o.sources[sourceKey] + logger.WithField("sourceType", catsrc.Spec.SourceType).Warn("unknown source type") - // Check for catalog source changes - if ok && catsrc.Status.ConfigMapResource != nil && catsrc.Status.ConfigMapResource.Name == configMap.GetName() && catsrc.Status.ConfigMapResource.ResourceVersion == configMap.GetResourceVersion() { - return nil - } + // TODO: write status about invalid source type - // Update status subresource - out := catsrc.DeepCopy() - out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ - Name: configMap.GetName(), - Namespace: configMap.GetNamespace(), - UID: configMap.GetUID(), - ResourceVersion: configMap.GetResourceVersion(), - } - out.Status.LastSync = timeNow() + return nil +} - _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out) +func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) { + + // Get the catalog source's config map + configMap, err := o.configmapRegistryReconciler.ConfigMapLister.ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap) if err != nil { - return fmt.Errorf("failed to update catalog source %s status: %s", out.GetName(), err) + return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err) } - // Create a new in-mem registry - src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap) - if err != nil { - return fmt.Errorf("failed to create catalog source from ConfigMap %s: %s", out.Spec.ConfigMap, err) + if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID == configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.ResourceVersion { + // configmap ref nonexistant or updated, write out the new configmap ref to status and exit + out := catsrc.DeepCopy() + out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ + Name: configMap.GetName(), + Namespace: configMap.GetNamespace(), + UID: configMap.GetUID(), + ResourceVersion: configMap.GetResourceVersion(), + } + out.Status.LastSync = timeNow() + + // update source map + o.sourcesLock.Lock() + defer o.sourcesLock.Unlock() + sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} + src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap) + o.sources[sourceKey] = src + if err != nil { + return err + } + + // update status + if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil { + return err + } + o.sourcesLastUpdate = timeNow() + return nil } - // Update sources map - o.sources[sourceKey] = src - o.sourcesLastUpdate = timeNow() + // configmap ref is up to date, continue parsing + if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) { + // if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it + + out := catsrc.DeepCopy() + if err := o.configmapRegistryReconciler.EnsureRegistryServer(out); err != nil { + logger.WithError(err).Warn("couldn't ensure registry server") + return err + } + // update status + if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil { + return err + } + o.sourcesLastUpdate = timeNow() + return nil + } + // no updates return nil } diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index 874c48bf063..1cd726bb8c8 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -2,7 +2,11 @@ package catalog import ( "errors" + "fmt" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "testing" + "time" "github.com/ghodss/yaml" @@ -139,6 +143,32 @@ func TestSyncCatalogSources(t *testing.T) { expectedStatus *v1alpha1.CatalogSourceStatus expectedError error }{ + { + testName: "CatalogSourceWithInvalidSourceType", + operatorNamespace: "cool-namespace", + catalogSource: &v1alpha1.CatalogSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cool-catalog", + Namespace: "cool-namespace", + UID: types.UID("catalog-uid"), + }, + Spec: v1alpha1.CatalogSourceSpec{ + ConfigMap: "cool-configmap", + SourceType: "nope", + }, + }, + configMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cool-configmap", + Namespace: "cool-namespace", + UID: types.UID("configmap-uid"), + ResourceVersion: "resource-version", + }, + Data: fakeConfigMapData(), + }, + expectedStatus: nil, + expectedError: nil, + }, { testName: "CatalogSourceWithBackingConfigMap", operatorNamespace: "cool-namespace", @@ -150,6 +180,7 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeInternal, }, }, configMap: &corev1.ConfigMap{ @@ -182,6 +213,7 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeConfigmap, }, Status: v1alpha1.CatalogSourceStatus{ ConfigMapResource: &v1alpha1.ConfigMapResourceReference{ @@ -222,6 +254,7 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeConfigmap, }, }, configMap: &corev1.ConfigMap{ @@ -234,7 +267,7 @@ func TestSyncCatalogSources(t *testing.T) { Data: map[string]string{}, }, expectedStatus: nil, - expectedError: errors.New("failed to create catalog source from ConfigMap cool-configmap: error parsing ConfigMap cool-configmap: no valid resources found"), + expectedError: errors.New("error parsing ConfigMap cool-configmap: no valid resources found"), }, { testName: "CatalogSourceWithMissingConfigMap", @@ -247,21 +280,25 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeConfigmap, }, }, configMap: &corev1.ConfigMap{}, expectedStatus: nil, - expectedError: errors.New("failed to get catalog config map cool-configmap when updating status: configmaps \"cool-configmap\" not found"), + expectedError: errors.New("failed to get catalog config map cool-configmap: configmap \"cool-configmap\" not found"), }, } for _, tt := range tests { t.Run(tt.testName, func(t *testing.T) { + stopc := make(chan struct{}) + defer close(stopc) + // Create existing objects clientObjs := []runtime.Object{tt.catalogSource} k8sObjs := []runtime.Object{tt.configMap} // Create test operator - op, err := NewFakeOperator(clientObjs, k8sObjs, nil, nil, resolver, tt.operatorNamespace) + op, err := NewFakeOperator(clientObjs, k8sObjs, nil, nil, resolver, tt.operatorNamespace, stopc) require.NoError(t, err) // Run sync @@ -373,8 +410,8 @@ func fakeConfigMapData() map[string]string { return data } -// NewFakeOprator creates a new operator using fake clients -func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string) (*Operator, error) { +// NewFakeOperator creates a new operator using fake clients +func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string, stopc <- chan struct{}) (*Operator, error) { // Create client fakes clientFake := fake.NewSimpleClientset(clientObjs...) opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), apiextensionsfake.NewSimpleClientset(extObjs...), apiregistrationfake.NewSimpleClientset(regObjs...)) @@ -385,6 +422,15 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO return nil, err } + // Creates registry pods in response to configmaps + informerFactory := informers.NewSharedInformerFactory(opClientFake.KubernetesInterface(), 5*time.Second) + roleInformer := informerFactory.Rbac().V1().Roles() + roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + configMapInformer := informerFactory.Core().V1().ConfigMaps() + // Create the new operator queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake) op := &Operator{ @@ -395,6 +441,38 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO dependencyResolver: resolver, } + op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ + Image: "test:pod", + OpClient: op.OpClient, + RoleLister: roleInformer.Lister(), + RoleBindingLister: roleBindingInformer.Lister(), + ServiceAccountLister: serviceAccountInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + PodLister: podInformer.Lister(), + ConfigMapLister: configMapInformer.Lister(), + } + + // register informers for configmapRegistryReconciler + registryInformers := []cache.SharedIndexInformer{ + roleInformer.Informer(), + roleBindingInformer.Informer(), + serviceAccountInformer.Informer(), + serviceInformer.Informer(), + podInformer.Informer(), + configMapInformer.Informer(), + } + + var hasSyncedCheckFns []cache.InformerSynced + for _, informer := range registryInformers { + op.RegisterInformer(informer) + hasSyncedCheckFns = append(hasSyncedCheckFns, informer.HasSynced) + go informer.Run(stopc) + } + + if ok := cache.WaitForCacheSync(stopc, hasSyncedCheckFns...); !ok { + return nil, fmt.Errorf("failed to wait for caches to sync") + } + return op, nil } diff --git a/pkg/controller/registry/reconciler.go b/pkg/controller/registry/reconciler.go new file mode 100644 index 00000000000..db125fe5282 --- /dev/null +++ b/pkg/controller/registry/reconciler.go @@ -0,0 +1,360 @@ +package registry + +import ( + "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + v1lister "k8s.io/client-go/listers/core/v1" + rbacv1lister "k8s.io/client-go/listers/rbac/v1" + "time" +) + +var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) } + +// catalogSourceDecorator wraps CatalogSource to add additional methods +type catalogSourceDecorator struct { + *v1alpha1.CatalogSource +} + +func (s *catalogSourceDecorator) serviceAccountName() string { + return s.GetName() + "-configmap-server" +} + +func (s *catalogSourceDecorator) roleName() string { + return s.GetName() + "-configmap-reader" +} + +func (s *catalogSourceDecorator) Selector() labels.Selector { + return labels.SelectorFromValidatedSet(map[string]string{ + "olm.catalogSource": s.GetName(), + }) +} + +func (s *catalogSourceDecorator) Labels() map[string]string { + return map[string]string{ + "olm.catalogSource": s.GetName(), + "olm.configMapResourceVersion": s.Status.ConfigMapResource.ResourceVersion, + } +} + +func (s *catalogSourceDecorator) ConfigMapChanges(configMap *v1.ConfigMap) bool { + if s.Status.ConfigMapResource == nil { + return true + } + if s.Status.ConfigMapResource.ResourceVersion == configMap.GetResourceVersion() { + return false + } + return true +} + +func (s *catalogSourceDecorator) Service() *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.GetName(), + Namespace: s.GetNamespace(), + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "grpc", + Port: 50051, + TargetPort: intstr.FromInt(50051), + }, + }, + Selector: s.Labels(), + }, + } +} + +func (s *catalogSourceDecorator) Pod(image string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: s.GetName() + "-", + Namespace: s.GetNamespace(), + Labels: s.Labels(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "configmap-registry-server", + Image: image, + Args: []string{"-c", s.GetName(), "-n", s.GetNamespace()}, + Ports: []v1.ContainerPort{ + { + Name: "grpc", + ContainerPort: 50051, + }, + }, + }, + }, + ServiceAccountName: s.GetName() + "-configmap-server", + }, + } +} + +func (s *catalogSourceDecorator) ServiceAccount() *v1.ServiceAccount { + return &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.serviceAccountName(), + Namespace: s.GetNamespace(), + }, + } +} + +func (s *catalogSourceDecorator) Role() *rbacv1.Role { + return &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.roleName(), + Namespace: s.GetNamespace(), + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"get"}, + APIGroups: []string{""}, + Resources: []string{"configmaps"}, + ResourceNames: []string{s.Spec.ConfigMap}, + }, + }, + } +} + +func (s *catalogSourceDecorator) RoleBinding() *rbacv1.RoleBinding { + return &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.GetName() + "-server-configmap-reader", + Namespace: s.GetNamespace(), + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: s.serviceAccountName(), + Namespace: s.GetNamespace(), + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: s.roleName(), + }, + } +} + +type RegistryReconciler interface { + EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error +} + +type ConfigMapRegistryReconciler struct { + ConfigMapLister v1lister.ConfigMapLister + ServiceLister v1lister.ServiceLister + RoleBindingLister rbacv1lister.RoleBindingLister + RoleLister rbacv1lister.RoleLister + PodLister v1lister.PodLister + ServiceAccountLister v1lister.ServiceAccountLister + OpClient operatorclient.ClientInterface + Image string +} + +var _ RegistryReconciler = &ConfigMapRegistryReconciler{} + +func (c *ConfigMapRegistryReconciler) currentService(source catalogSourceDecorator) *v1.Service { + serviceName := source.Service().GetName() + service, err := c.ServiceLister.Services(source.GetNamespace()).Get(serviceName) + if err != nil { + logrus.WithField("service", serviceName).Warn("couldn't find service in cache") + return nil + } + return service +} + +func (c *ConfigMapRegistryReconciler) currentServiceAccount(source catalogSourceDecorator) *v1.ServiceAccount { + serviceAccountName := source.ServiceAccount().GetName() + serviceAccount, err := c.ServiceAccountLister.ServiceAccounts(source.GetNamespace()).Get(serviceAccountName) + if err != nil { + logrus.WithField("serviceAccouint", serviceAccountName).WithError(err).Warn("couldn't find service account in cache") + return nil + } + return serviceAccount +} + +func (c *ConfigMapRegistryReconciler) currentRole(source catalogSourceDecorator) *rbacv1.Role { + roleName := source.Role().GetName() + role, err := c.RoleLister.Roles(source.GetNamespace()).Get(roleName) + if err != nil { + logrus.WithField("role", roleName).WithError(err).Warn("couldn't find role in cache") + return nil + } + return role +} + +func (c *ConfigMapRegistryReconciler) currentRoleBinding(source catalogSourceDecorator) *rbacv1.RoleBinding { + roleBindingName := source.RoleBinding().GetName() + roleBinding, err := c.RoleBindingLister.RoleBindings(source.GetNamespace()).Get(roleBindingName) + if err != nil { + logrus.WithField("roleBinding", roleBindingName).WithError(err).Warn("couldn't find role binding in cache") + return nil + } + return roleBinding +} + +func (c *ConfigMapRegistryReconciler) currentPods(source catalogSourceDecorator, image string) []*v1.Pod { + podName := source.Pod(image).GetName() + pods, err := c.PodLister.Pods(source.GetNamespace()).List(source.Selector()) + if err != nil { + logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache") + return nil + } + if len(pods) > 1 { + logrus.WithField("selector", source.Selector()).Warn("multiple pods found for selector") + } + return pods +} + +func (c *ConfigMapRegistryReconciler) currentPodsWithCorrectResourceVersion(source catalogSourceDecorator, image string) []*v1.Pod { + podName := source.Pod(image).GetName() + pods, err := c.PodLister.Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels())) + if err != nil { + logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache") + return nil + } + if len(pods) > 1 { + logrus.WithField("selector", source.Selector()).Warn("multiple pods found for selector") + } + return pods +} + +// Ensure that all components of registry server are up to date. +func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error { + source := catalogSourceDecorator{catalogSource} + + // fetch configmap first, exit early if we can't find it + configMap, err := c.ConfigMapLister.ConfigMaps(source.GetNamespace()).Get(source.Spec.ConfigMap) + if err != nil { + return fmt.Errorf("unable to get configmap %s/%s from cache", source.GetNamespace(), source.Spec.ConfigMap) + } + + if source.ConfigMapChanges(configMap) { + catalogSource.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ + Name: configMap.GetName(), + Namespace: configMap.GetNamespace(), + UID: configMap.GetUID(), + ResourceVersion: configMap.GetResourceVersion(), + } + } + + // if service status is nil, we force create every object to ensure they're created the first time + overwrite := source.Status.RegistryServiceStatus == nil + + // recreate the pod if there are configmap changes; this causes the db to be rebuilt + // recreate the pod if no existing pod is serving the latest configmap + overwritePod := overwrite || source.ConfigMapChanges(configMap) || len(c.currentPodsWithCorrectResourceVersion(source, c.Image)) == 0 + + //TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated) + if err := c.ensureServiceAccount(source, overwrite); err != nil { + return err + } + if err := c.ensureRole(source, overwrite); err != nil { + return err + } + if err := c.ensureRoleBinding(source, overwrite); err != nil { + return err + } + if err := c.ensurePod(source, overwritePod); err != nil { + return err + } + if err := c.ensureService(source, overwrite); err != nil { + return err + } + + if overwritePod { + catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{ + Protocol: "grpc", + ServiceName: source.Service().GetName(), + ServiceNamespace: source.GetNamespace(), + Port: fmt.Sprintf("%d", source.Service().Spec.Ports[0].Port), + } + catalogSource.Status.LastSync = timeNow() + } + return nil +} + +func (c *ConfigMapRegistryReconciler) ensureServiceAccount(source catalogSourceDecorator, overwrite bool) error { + serviceAccount := source.ServiceAccount() + ownerutil.AddOwner(serviceAccount, source, false, false) + if c.currentServiceAccount(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteServiceAccount(serviceAccount.GetNamespace(), serviceAccount.GetName(), metav1.NewDeleteOptions(0)); err != nil { + return err + } + } + _, err := c.OpClient.CreateServiceAccount(serviceAccount) + return err +} + +func (c *ConfigMapRegistryReconciler) ensureRole(source catalogSourceDecorator, overwrite bool) error { + role := source.Role() + ownerutil.AddOwner(role, source, false, false) + if c.currentRole(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteRole(role.GetNamespace(), role.GetName(), metav1.NewDeleteOptions(0)); err != nil { + return err + } + } + _, err := c.OpClient.CreateRole(role) + return err +} + +func (c *ConfigMapRegistryReconciler) ensureRoleBinding(source catalogSourceDecorator, overwrite bool) error { + roleBinding := source.RoleBinding() + ownerutil.AddOwner(roleBinding, source, false, false) + if c.currentRoleBinding(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteRoleBinding(roleBinding.GetNamespace(), roleBinding.GetName(), metav1.NewDeleteOptions(0)); err != nil { + return err + } + } + _, err := c.OpClient.CreateRoleBinding(roleBinding) + return err +} + +func (c *ConfigMapRegistryReconciler) ensurePod(source catalogSourceDecorator, overwrite bool) error { + pod := source.Pod(c.Image) + ownerutil.AddOwner(pod, source, false, false) + if len(c.currentPods(source, c.Image)) > 0 { + if !overwrite { + return nil + } + if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(pod.GetName(), metav1.NewDeleteOptions(0)); err != nil { + return err + } + } + _, err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Create(pod) + return err +} + +func (c *ConfigMapRegistryReconciler) ensureService(source catalogSourceDecorator, overwrite bool) error { + service := source.Service() + ownerutil.AddOwner(service, source, false, false) + if c.currentService(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteService(service.GetNamespace(), service.GetName(), metav1.NewDeleteOptions(0)); err != nil { + return err + } + } + _, err := c.OpClient.CreateService(service) + return err +} diff --git a/pkg/controller/registry/reconciler_test.go b/pkg/controller/registry/reconciler_test.go new file mode 100644 index 00000000000..26fc0b63d77 --- /dev/null +++ b/pkg/controller/registry/reconciler_test.go @@ -0,0 +1,237 @@ +package registry + +import ( + "fmt" + "github.com/ghodss/yaml" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "testing" + "time" +) +const ( + registryImageName = "test:image" +) +func reconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) (*ConfigMapRegistryReconciler, operatorclient.ClientInterface) { + opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), nil, nil) + + // Creates registry pods in response to configmaps + informerFactory := informers.NewSharedInformerFactory(opClientFake.KubernetesInterface(), 5*time.Second) + roleInformer := informerFactory.Rbac().V1().Roles() + roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + configMapInformer := informerFactory.Core().V1().ConfigMaps() + + registryInformers := []cache.SharedIndexInformer{ + roleInformer.Informer(), + roleBindingInformer.Informer(), + serviceAccountInformer.Informer(), + serviceInformer.Informer(), + podInformer.Informer(), + configMapInformer.Informer(), + } + + rec := &ConfigMapRegistryReconciler{ + Image: registryImageName, + OpClient: opClientFake, + RoleLister: roleInformer.Lister(), + RoleBindingLister: roleBindingInformer.Lister(), + ServiceAccountLister: serviceAccountInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + PodLister: podInformer.Lister(), + ConfigMapLister: configMapInformer.Lister(), + } + + var hasSyncedCheckFns []cache.InformerSynced + for _, informer := range registryInformers { + hasSyncedCheckFns = append(hasSyncedCheckFns, informer.HasSynced) + go informer.Run(stopc) + } + + require.True(t, cache.WaitForCacheSync(stopc, hasSyncedCheckFns...), "caches failed to sync") + + return rec, opClientFake +} + +func crd(name string) v1beta1.CustomResourceDefinition { + return v1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: name + "group", + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + }, + }, + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: name, + }, + }, + } +} + +func validConfigMap() *corev1.ConfigMap { + data := make(map[string]string) + dataYaml, _ := yaml.Marshal([]v1beta1.CustomResourceDefinition{crd("fake-crd")}) + data["customResourceDefinitions"] = string(dataYaml) + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cool-configmap", + Namespace: "cool-namespace", + UID: types.UID("configmap-uid"), + ResourceVersion: "resource-version", + }, + Data: data, + } +} + +func TestValidConfigMap(t *testing.T) { + cm := validConfigMap() + require.NotNil(t, cm) + require.Contains(t, cm.Data["customResourceDefinitions"], "fake") +} + +func validCatalogSource(configMap *corev1.ConfigMap) *v1alpha1.CatalogSource { + return &v1alpha1.CatalogSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cool-catalog", + Namespace: "cool-namespace", + UID: types.UID("catalog-uid"), + }, + Spec: v1alpha1.CatalogSourceSpec{ + ConfigMap: "cool-configmap", + SourceType: "nope", + }, + Status: v1alpha1.CatalogSourceStatus{ + ConfigMapResource: &v1alpha1.ConfigMapResourceReference{ + Name: configMap.GetName(), + Namespace: configMap.GetNamespace(), + UID: configMap.GetUID(), + ResourceVersion: configMap.GetResourceVersion(), + }, + }, + } +} + +func objectsForCatalogSource(catsrc *v1alpha1.CatalogSource) []runtime.Object { + decorated := catalogSourceDecorator{catsrc} + return []runtime.Object{ + decorated.Pod(registryImageName), + decorated.Service(), + decorated.ServiceAccount(), + decorated.Role(), + decorated.RoleBinding(), + } +} + +func TestConfigMapRegistryReconciler(t *testing.T) { + type cluster struct { + k8sObjs []runtime.Object + } + type in struct { + cluster cluster + catsrc *v1alpha1.CatalogSource + } + type out struct { + cluster cluster + status *v1alpha1.RegistryServiceStatus + err error + } + tests := []struct { + testName string + in in + out out + }{ + { + testName: "NoConfigMap", + in: in{ + cluster: cluster{}, + catsrc: &v1alpha1.CatalogSource{}, + }, + out: out{ + err: fmt.Errorf("unable to get configmap / from cache"), + }, + }, + { + testName: "NoExistingRegistry/CreateSuccessful", + in: in{ + cluster: cluster{ + k8sObjs: []runtime.Object{validConfigMap()}, + }, + catsrc: validCatalogSource(validConfigMap()), + }, + out: out{ + status: &v1alpha1.RegistryServiceStatus{ + Protocol: "grpc", + ServiceName: "cool-catalog", + ServiceNamespace: "cool-namespace", + Port: "50051", + }, + cluster: cluster{k8sObjs: objectsForCatalogSource(validCatalogSource(validConfigMap()))}, + }, + }, + // TODO: combinations of existing state + } + for _, tt := range tests { + t.Run(tt.testName, func(t *testing.T) { + stopc := make(chan struct{}) + defer close(stopc) + + rec, client := reconciler(t, tt.in.cluster.k8sObjs, stopc) + + err := rec.EnsureRegistryServer(tt.in.catsrc) + + require.Equal(t, tt.out.err, err) + require.Equal(t, tt.out.status, tt.in.catsrc.Status.RegistryServiceStatus) + + if tt.out.err != nil { + return + } + + // if no error, the reconciler should create the same set of kube objects every time + decorated := catalogSourceDecorator{tt.in.catsrc} + + pod := decorated.Pod(registryImageName) + outPod, err := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, pod, outPod) + + service := decorated.Service() + outService, err := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(service.GetName(), metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, service, outService) + + serviceAccount := decorated.ServiceAccount() + outServiceAccount, err := client.KubernetesInterface().CoreV1().ServiceAccounts(serviceAccount.GetNamespace()).Get(serviceAccount.GetName(), metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, serviceAccount, outServiceAccount) + + role := decorated.Role() + outRole, err := client.KubernetesInterface().RbacV1().Roles(role.GetNamespace()).Get(role.GetName(), metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, role, outRole) + + roleBinding := decorated.RoleBinding() + outRoleBinding, err := client.KubernetesInterface().RbacV1().RoleBindings(roleBinding.GetNamespace()).Get(roleBinding.GetName(), metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, roleBinding, outRoleBinding) + }) + } +} + +// TODO configmap changes detected, pod recreated +// TODO catalogsource deleted, pods deleted diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index d1ed758f100..e3862441ada 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -3,11 +3,12 @@ package queueinformer import ( "fmt" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/pkg/errors" ) // An Operator is a collection of QueueInformers