diff --git a/cmd/catalog/main.go b/cmd/catalog/main.go index f2848d50739..b103c576410 100644 --- a/cmd/catalog/main.go +++ b/cmd/catalog/main.go @@ -69,7 +69,7 @@ func main() { go http.ListenAndServe(":8080", nil) // Create a new instance of the operator. - catalogOperator, err := catalog.NewOperator(*kubeConfigPath, *wakeupInterval, *catalogNamespace, strings.Split(*watchedNamespaces, ",")...) + catalogOperator, err := catalog.NewOperator(*kubeConfigPath, *wakeupInterval, *configmapServerImage, *catalogNamespace, strings.Split(*watchedNamespaces, ",")...) if err != nil { log.Panicf("error configuring operator: %s", err.Error()) } diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 3b5e8ec9afb..4f30a3964bb 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "k8s.io/client-go/informers" "sync" "time" @@ -52,10 +53,11 @@ type Operator struct { sourcesLastUpdate metav1.Time dependencyResolver resolver.DependencyResolver subQueue workqueue.RateLimitingInterface + configmapRegistryReconciler *registry.ConfigMapRegistryReconciler } // NewOperator creates a new Catalog Operator. -func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) { +func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapRegistryImage string, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) { // Default to watching all namespaces. if watchedNamespaces == nil { watchedNamespaces = []string{metav1.NamespaceAll} @@ -141,9 +143,58 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNa op.RegisterQueueInformer(informer) } + // Creates registry pods in response to configmaps + informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval) + roleInformer := informerFactory.Rbac().V1().Roles() + roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + configMapInformer := informerFactory.Core().V1().ConfigMaps() + op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ + Image: configmapRegistryImage, + OpClient: op.OpClient, + RoleLister: roleInformer.Lister(), + RoleBindingLister: roleBindingInformer.Lister(), + ServiceAccountLister: serviceAccountInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + PodLister: podInformer.Lister(), + ConfigMapLister: configMapInformer.Lister(), + } + + // register informers for configmapRegistryReconciler + registryInformers := []cache.SharedIndexInformer{ + roleInformer.Informer(), + roleBindingInformer.Informer(), + serviceAccountInformer.Informer(), + serviceInformer.Informer(), + podInformer.Informer(), + configMapInformer.Informer(), + } + + // TODO: won't this possibly conflict since GVK isn't part of the queue entry? + registryQueueInformers := queueinformer.New( + workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "registry"), + registryInformers, + op.syncRegistry, + nil, + "registry", + metrics.NewMetricsNil(), + ) + for _, informer := range registryQueueInformers { + op.RegisterQueueInformer(informer) + } return op, nil } +func (o *Operator) syncRegistry(obj interface{}) (syncError error) { + switch obj.(type) { + case *corev1.ConfigMap: + // requeue catalogsource + } + return nil +} + func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { catsrc, ok := obj.(*v1alpha1.CatalogSource) if !ok { @@ -151,47 +202,76 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return fmt.Errorf("casting CatalogSource failed") } - // Get the catalog source's config map - configMap, err := o.OpClient.KubernetesInterface().CoreV1().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get catalog config map %s when updating status: %s", catsrc.Spec.ConfigMap, err) + logger := log.WithFields(log.Fields{ + "source": catsrc.GetName(), + }) + + if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap { + return o.syncConfigMapSource(logger, catsrc) } - o.sourcesLock.Lock() - defer o.sourcesLock.Unlock() - sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} - _, ok = o.sources[sourceKey] + logger.WithField("sourceType", catsrc.Spec.SourceType).Warn("unknown source type") - // Check for catalog source changes - if ok && catsrc.Status.ConfigMapResource != nil && catsrc.Status.ConfigMapResource.Name == configMap.GetName() && catsrc.Status.ConfigMapResource.ResourceVersion == configMap.GetResourceVersion() { - return nil - } + // TODO: write status about invalid source type - // Update status subresource - out := catsrc.DeepCopy() - out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ - Name: configMap.GetName(), - Namespace: configMap.GetNamespace(), - UID: configMap.GetUID(), - ResourceVersion: configMap.GetResourceVersion(), - } - out.Status.LastSync = timeNow() + return nil +} - _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out) +func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) { + + // Get the catalog source's config map + configMap, err := o.configmapRegistryReconciler.ConfigMapLister.ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap) if err != nil { - return fmt.Errorf("failed to update catalog source %s status: %s", out.GetName(), err) + return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err) } - // Create a new in-mem registry - src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap) - if err != nil { - return fmt.Errorf("failed to create catalog source from ConfigMap %s: %s", out.Spec.ConfigMap, err) + if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID == configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.ResourceVersion { + // configmap ref nonexistant or updated, write out the new configmap ref to status and exit + out := catsrc.DeepCopy() + out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ + Name: configMap.GetName(), + Namespace: configMap.GetNamespace(), + UID: configMap.GetUID(), + ResourceVersion: configMap.GetResourceVersion(), + } + out.Status.LastSync = timeNow() + + // update source map + o.sourcesLock.Lock() + defer o.sourcesLock.Unlock() + sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} + src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap) + o.sources[sourceKey] = src + if err != nil { + return err + } + + // update status + if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil { + return err + } + o.sourcesLastUpdate = timeNow() + return nil } - // Update sources map - o.sources[sourceKey] = src - o.sourcesLastUpdate = timeNow() + // configmap ref is up to date, continue parsing + if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) { + // if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it + + out := catsrc.DeepCopy() + if err := o.configmapRegistryReconciler.EnsureRegistryServer(out); err != nil { + logger.WithError(err).Warn("couldn't ensure registry server") + return err + } + // update status + if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil { + return err + } + o.sourcesLastUpdate = timeNow() + return nil + } + // no updates return nil } diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index 874c48bf063..1cd726bb8c8 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -2,7 +2,11 @@ package catalog import ( "errors" + "fmt" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "testing" + "time" "github.com/ghodss/yaml" @@ -139,6 +143,32 @@ func TestSyncCatalogSources(t *testing.T) { expectedStatus *v1alpha1.CatalogSourceStatus expectedError error }{ + { + testName: "CatalogSourceWithInvalidSourceType", + operatorNamespace: "cool-namespace", + catalogSource: &v1alpha1.CatalogSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cool-catalog", + Namespace: "cool-namespace", + UID: types.UID("catalog-uid"), + }, + Spec: v1alpha1.CatalogSourceSpec{ + ConfigMap: "cool-configmap", + SourceType: "nope", + }, + }, + configMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cool-configmap", + Namespace: "cool-namespace", + UID: types.UID("configmap-uid"), + ResourceVersion: "resource-version", + }, + Data: fakeConfigMapData(), + }, + expectedStatus: nil, + expectedError: nil, + }, { testName: "CatalogSourceWithBackingConfigMap", operatorNamespace: "cool-namespace", @@ -150,6 +180,7 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeInternal, }, }, configMap: &corev1.ConfigMap{ @@ -182,6 +213,7 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeConfigmap, }, Status: v1alpha1.CatalogSourceStatus{ ConfigMapResource: &v1alpha1.ConfigMapResourceReference{ @@ -222,6 +254,7 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeConfigmap, }, }, configMap: &corev1.ConfigMap{ @@ -234,7 +267,7 @@ func TestSyncCatalogSources(t *testing.T) { Data: map[string]string{}, }, expectedStatus: nil, - expectedError: errors.New("failed to create catalog source from ConfigMap cool-configmap: error parsing ConfigMap cool-configmap: no valid resources found"), + expectedError: errors.New("error parsing ConfigMap cool-configmap: no valid resources found"), }, { testName: "CatalogSourceWithMissingConfigMap", @@ -247,21 +280,25 @@ func TestSyncCatalogSources(t *testing.T) { }, Spec: v1alpha1.CatalogSourceSpec{ ConfigMap: "cool-configmap", + SourceType: v1alpha1.SourceTypeConfigmap, }, }, configMap: &corev1.ConfigMap{}, expectedStatus: nil, - expectedError: errors.New("failed to get catalog config map cool-configmap when updating status: configmaps \"cool-configmap\" not found"), + expectedError: errors.New("failed to get catalog config map cool-configmap: configmap \"cool-configmap\" not found"), }, } for _, tt := range tests { t.Run(tt.testName, func(t *testing.T) { + stopc := make(chan struct{}) + defer close(stopc) + // Create existing objects clientObjs := []runtime.Object{tt.catalogSource} k8sObjs := []runtime.Object{tt.configMap} // Create test operator - op, err := NewFakeOperator(clientObjs, k8sObjs, nil, nil, resolver, tt.operatorNamespace) + op, err := NewFakeOperator(clientObjs, k8sObjs, nil, nil, resolver, tt.operatorNamespace, stopc) require.NoError(t, err) // Run sync @@ -373,8 +410,8 @@ func fakeConfigMapData() map[string]string { return data } -// NewFakeOprator creates a new operator using fake clients -func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string) (*Operator, error) { +// NewFakeOperator creates a new operator using fake clients +func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string, stopc <- chan struct{}) (*Operator, error) { // Create client fakes clientFake := fake.NewSimpleClientset(clientObjs...) opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), apiextensionsfake.NewSimpleClientset(extObjs...), apiregistrationfake.NewSimpleClientset(regObjs...)) @@ -385,6 +422,15 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO return nil, err } + // Creates registry pods in response to configmaps + informerFactory := informers.NewSharedInformerFactory(opClientFake.KubernetesInterface(), 5*time.Second) + roleInformer := informerFactory.Rbac().V1().Roles() + roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() + serviceInformer := informerFactory.Core().V1().Services() + podInformer := informerFactory.Core().V1().Pods() + configMapInformer := informerFactory.Core().V1().ConfigMaps() + // Create the new operator queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake) op := &Operator{ @@ -395,6 +441,38 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO dependencyResolver: resolver, } + op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ + Image: "test:pod", + OpClient: op.OpClient, + RoleLister: roleInformer.Lister(), + RoleBindingLister: roleBindingInformer.Lister(), + ServiceAccountLister: serviceAccountInformer.Lister(), + ServiceLister: serviceInformer.Lister(), + PodLister: podInformer.Lister(), + ConfigMapLister: configMapInformer.Lister(), + } + + // register informers for configmapRegistryReconciler + registryInformers := []cache.SharedIndexInformer{ + roleInformer.Informer(), + roleBindingInformer.Informer(), + serviceAccountInformer.Informer(), + serviceInformer.Informer(), + podInformer.Informer(), + configMapInformer.Informer(), + } + + var hasSyncedCheckFns []cache.InformerSynced + for _, informer := range registryInformers { + op.RegisterInformer(informer) + hasSyncedCheckFns = append(hasSyncedCheckFns, informer.HasSynced) + go informer.Run(stopc) + } + + if ok := cache.WaitForCacheSync(stopc, hasSyncedCheckFns...); !ok { + return nil, fmt.Errorf("failed to wait for caches to sync") + } + return op, nil } diff --git a/pkg/controller/registry/reconciler.go b/pkg/controller/registry/reconciler.go new file mode 100644 index 00000000000..fc8d8ed48fe --- /dev/null +++ b/pkg/controller/registry/reconciler.go @@ -0,0 +1,316 @@ +package registry + +import ( + "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/util/intstr" + v1lister "k8s.io/client-go/listers/core/v1" + rbacv1lister "k8s.io/client-go/listers/rbac/v1" + "time" +) + +//for test stubbing and for ensuring standardization of timezones to UTC +var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) } + +// catalogsource wraps CatalogSource to add our derivation methods +type catalogSourceDeriver struct { + v1alpha1.CatalogSource +} + +func (s *catalogSourceDeriver) serviceAccountName() string { + return s.GetName()+"-configmap-server" +} + +func (s *catalogSourceDeriver) roleName() string { + return s.GetName() + "configmap-reader" +} + + +func (s *catalogSourceDeriver) ConfigMapChanges(configMap *v1.ConfigMap) bool { + if s.Status.ConfigMapResource == nil { + return false + } + if s.Status.ConfigMapResource.Name != configMap.GetName() { + return false + } + if s.Status.ConfigMapResource.ResourceVersion == configMap.GetResourceVersion() { + return false + } + return true +} + +func (s *catalogSourceDeriver) Service() *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.GetName(), + Namespace: s.GetNamespace(), + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "grpc", + Port: 50051, + TargetPort: intstr.FromInt(50051), + }, + }, + Selector: map[string]string{ + "catalogSourceDeriver": s.GetName(), + }, + }, + } +} + +func (s *catalogSourceDeriver) Pod(image string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: s.GetName(), + Namespace: s.GetNamespace(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "configmap-registry-server", + Image: image, + Args: []string{"-c", s.GetName(), "-n", s.GetNamespace()}, + }, + }, + ServiceAccountName: s.GetName()+"-configmap-server", + }, + } +} + +func (s *catalogSourceDeriver) ServiceAccount() *v1.ServiceAccount { + return &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.serviceAccountName(), + Namespace: s.GetNamespace(), + }, + } +} + +func (s *catalogSourceDeriver) Role() *rbacv1.Role { + return &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.roleName(), + Namespace: s.GetNamespace(), + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"get"}, + APIGroups: []string{""}, + Resources: []string{"ConfigMap"}, + ResourceNames: []string{s.Spec.ConfigMap}, + }, + }, + } +} + +func (s *catalogSourceDeriver) RoleBinding() *rbacv1.RoleBinding { + return &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.GetName() + "-server-configmap-reader", + Namespace: s.GetNamespace(), + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + APIGroup: "rbac.authorization.k8s.io", + Name: s.serviceAccountName(), + Namespace: s.GetNamespace(), + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: s.roleName(), + }, + } +} + +type RegistryReconciler interface { + EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error +} + +type ConfigMapRegistryReconciler struct { + ConfigMapLister v1lister.ConfigMapLister + ServiceLister v1lister.ServiceLister + RoleBindingLister rbacv1lister.RoleBindingLister + RoleLister rbacv1lister.RoleLister + PodLister v1lister.PodLister + ServiceAccountLister v1lister.ServiceAccountLister + OpClient operatorclient.ClientInterface + Image string +} + +var _ RegistryReconciler = &ConfigMapRegistryReconciler{} + +func (c *ConfigMapRegistryReconciler) currentService(source catalogSourceDeriver) *v1.Service { + serviceName := source.Service().GetName() + service, err := c.ServiceLister.Services(source.GetNamespace()).Get(serviceName) + if err!= nil { + logrus.WithField("service", serviceName).Warn("couldn't find service in cache") + return nil + } + return service +} + +func (c *ConfigMapRegistryReconciler) currentServiceAccount(source catalogSourceDeriver) *v1.ServiceAccount { + serviceAccountName := source.ServiceAccount().GetName() + serviceAccount, err := c.ServiceAccountLister.ServiceAccounts(source.GetNamespace()).Get(serviceAccountName) + if err!= nil { + logrus.WithField("serviceAccouint", serviceAccountName).WithError(err).Warn("couldn't find service account in cache") + return nil + } + return serviceAccount +} + +func (c *ConfigMapRegistryReconciler) currentRole(source catalogSourceDeriver) *rbacv1.Role { + roleName := source.Role().GetName() + role, err := c.RoleLister.Roles(source.GetNamespace()).Get(roleName) + if err!= nil { + logrus.WithField("role", roleName).WithError(err).Warn("couldn't find role in cache") + return nil + } + return role +} + +func (c *ConfigMapRegistryReconciler) currentRoleBinding(source catalogSourceDeriver) *rbacv1.RoleBinding { + roleBindingName := source.RoleBinding().GetName() + roleBinding, err := c.RoleBindingLister.RoleBindings(source.GetNamespace()).Get(roleBindingName) + if err!= nil { + logrus.WithField("roleBinding", roleBindingName).WithError(err).Warn("couldn't find role binding in cache") + return nil + } + return roleBinding +} + +func (c *ConfigMapRegistryReconciler) currentPod(source catalogSourceDeriver, image string) *v1.Pod { + podName := source.Pod(image).GetName() + pod, err := c.PodLister.Pods(source.GetNamespace()).Get(podName) + if err!= nil { + logrus.WithField("pod", podName).WithError(err).Warn("couldn't find pod in cache") + return nil + } + return pod +} + +// Ensure that all components of registry server are up to date. +func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error { + source := catalogSourceDeriver{*catalogSource} + + if source.Status.ConfigMapResource == nil || source.Status.ConfigMapResource.UID == "" { + return fmt.Errorf("no configmap in catalogsource status") + } + + // fetch configmap first, exit early if we can't find it + configMap, err := c.ConfigMapLister.ConfigMaps(source.GetNamespace()).Get(source.Spec.ConfigMap) + if err != nil { + return err + } + + // if service status is nil, we force create every object to ensure they're created the first time + overwrite := source.Status.RegistryServiceStatus == nil + + // recreate the pod if there are configmap changes; this causes the db to be rebuilt + overwritePod := overwrite || source.ConfigMapChanges(configMap) + + if err := c.ensureServiceAccount(source, overwrite); err != nil { + return err + } + if err := c.ensureRole(source, overwrite); err != nil { + return err + } + if err := c.ensureRoleBinding(source, overwrite); err != nil { + return err + } + if err := c.ensurePod(source, overwritePod); err != nil { + return err + } + if err := c.ensureService(source, overwrite); err!= nil { + return err + } + catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{ + Protocol: "grpc", + ServiceName: source.Service().GetName(), + ServiceNamespace: source.GetNamespace(), + Port: string(source.Service().Spec.Ports[0].Port), + } + catalogSource.Status.LastSync = timeNow() + return nil +} + +func (c *ConfigMapRegistryReconciler) ensureServiceAccount(source catalogSourceDeriver, overwrite bool) error { + serviceAccount := source.ServiceAccount() + if c.currentServiceAccount(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteServiceAccount(serviceAccount.GetNamespace(), serviceAccount.GetName(), metav1.NewDeleteOptions(0)); err!=nil { + return err + } + } + _, err := c.OpClient.CreateServiceAccount(serviceAccount) + return err +} + +func (c *ConfigMapRegistryReconciler) ensureRole(source catalogSourceDeriver, overwrite bool) error { + role := source.Role() + if c.currentRole(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteRole(role.GetNamespace(), role.GetName(), metav1.NewDeleteOptions(0)); err!=nil { + return err + } + } + _, err := c.OpClient.CreateRole(role) + return err +} + +func (c *ConfigMapRegistryReconciler) ensureRoleBinding(source catalogSourceDeriver, overwrite bool) error { + roleBinding := source.RoleBinding() + if c.currentRoleBinding(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteRoleBinding(roleBinding.GetNamespace(), roleBinding.GetName(), metav1.NewDeleteOptions(0)); err!=nil { + return err + } + } + _, err := c.OpClient.CreateRoleBinding(roleBinding) + return err +} + +func (c *ConfigMapRegistryReconciler) ensurePod(source catalogSourceDeriver, overwrite bool) error { + pod := source.Pod(c.Image) + if c.currentPod(source, c.Image) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(pod.GetName(), metav1.NewDeleteOptions(0)); err!=nil { + return err + } + } + _, err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Create(pod) + return err +} + +func (c *ConfigMapRegistryReconciler) ensureService(source catalogSourceDeriver, overwrite bool) error { + service := source.Service() + if c.currentService(source) != nil { + if !overwrite { + return nil + } + if err := c.OpClient.DeleteService(service.GetNamespace(), service.GetName(), metav1.NewDeleteOptions(0)); err!=nil { + return err + } + } + _, err := c.OpClient.CreateService(service) + return err +} + diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index d1ed758f100..e3862441ada 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -3,11 +3,12 @@ package queueinformer import ( "fmt" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/pkg/errors" ) // An Operator is a collection of QueueInformers