diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 67d72ab7d5..8a4171f210 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -66,24 +66,25 @@ const ( type Operator struct { queueinformer.Operator - logger *logrus.Logger - clock utilclock.Clock - opClient operatorclient.ClientInterface - client versioned.Interface - dynamicClient dynamic.Interface - lister operatorlister.OperatorLister - catsrcQueueSet *queueinformer.ResourceQueueSet - subQueueSet *queueinformer.ResourceQueueSet - ipQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.RateLimitingInterface - namespace string - sources *grpc.SourceStore - sourcesLastUpdate sharedtime.SharedTime - resolver resolver.Resolver - reconciler reconciler.RegistryReconcilerFactory - csvProvidedAPIsIndexer map[string]cache.Indexer - clientAttenuator *scoped.ClientAttenuator - serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier + logger *logrus.Logger + clock utilclock.Clock + opClient operatorclient.ClientInterface + client versioned.Interface + dynamicClient dynamic.Interface + lister operatorlister.OperatorLister + catsrcQueueSet *queueinformer.ResourceQueueSet + subQueueSet *queueinformer.ResourceQueueSet + ipQueueSet *queueinformer.ResourceQueueSet + nsResolveQueue workqueue.RateLimitingInterface + namespace string + sources *grpc.SourceStore + sourcesLastUpdate sharedtime.SharedTime + resolver resolver.Resolver + reconciler reconciler.RegistryReconcilerFactory + csvProvidedAPIsIndexer map[string]cache.Indexer + catalogSubscriberIndexer map[string]cache.Indexer + clientAttenuator *scoped.ClientAttenuator + serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Allocate the new instance of an Operator. op := &Operator{ - Operator: queueOperator, - logger: logger, - clock: clock, - opClient: opClient, - dynamicClient: dynamicClient, - client: crClient, - lister: lister, - namespace: operatorNamespace, - resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient), - catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), - subQueueSet: queueinformer.NewEmptyResourceQueueSet(), - csvProvidedAPIsIndexer: map[string]cache.Indexer{}, - serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), - clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient), + Operator: queueOperator, + logger: logger, + clock: clock, + opClient: opClient, + dynamicClient: dynamicClient, + client: crClient, + lister: lister, + namespace: operatorNamespace, + resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient), + catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), + subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + csvProvidedAPIsIndexer: map[string]cache.Indexer{}, + catalogSubscriberIndexer: map[string]cache.Indexer{}, + serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), + clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient), } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now) @@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire Subscriptions subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions() op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister()) + if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil { + return nil, err + } + subIndexer := subInformer.Informer().GetIndexer() + op.catalogSubscriberIndexer[namespace] = subIndexer + subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace)) op.subQueueSet.Set(namespace, subQueue) subSyncer, err := subscription.NewSyncer( @@ -339,6 +347,17 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { switch state.State { case connectivity.Ready: + if o.namespace == state.Key.Namespace { + namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer, + state.Key.Name, state.Key.Namespace) + + if err == nil { + for ns := range namespaces { + o.nsResolveQueue.Add(ns) + } + } + } + o.nsResolveQueue.Add(state.Key.Namespace) default: if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { diff --git a/pkg/lib/index/catalog.go b/pkg/lib/index/catalog.go new file mode 100644 index 0000000000..17c4ee9deb --- /dev/null +++ b/pkg/lib/index/catalog.go @@ -0,0 +1,53 @@ +package indexer + +import ( + "fmt" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "k8s.io/client-go/tools/cache" +) + +const ( + // PresentCatalogIndexFuncKey is the recommended key to use for registering + // the index func with an indexer. + PresentCatalogIndexFuncKey string = "presentcatalogindexfunc" +) + +// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace +// of the given object (Subscription) +func PresentCatalogIndexFunc(obj interface{}) ([]string, error) { + sub, ok := obj.(*v1alpha1.Subscription) + if !ok { + return []string{""}, fmt.Errorf("invalid object of type: %T", obj) + } + + if sub.Spec.CatalogSource != "" && sub.Spec.CatalogSourceNamespace != "" { + return []string{sub.Spec.CatalogSource + "/" + sub.Spec.CatalogSourceNamespace}, nil + } + + return []string{""}, nil +} + +// CatalogSubscriberNamespaces returns the list of namespace (as a map with namespace as key) +// which has Suscriptions(s) that subscribe(s) to a given CatalogSource (name/namespace) +func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) { + nsSet := map[string]struct{}{} + index := fmt.Sprintf("%s/%s", name, namespace) + + for _, indexer := range indexers { + subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index) + if err != nil { + return nil, err + } + for _, item := range subs { + s, ok := item.(*v1alpha1.Subscription) + if !ok { + continue + } + // Add to set + nsSet[s.GetNamespace()] = struct{}{} + } + } + + return nsSet, nil +} diff --git a/test/e2e/catalog_e2e_test.go b/test/e2e/catalog_e2e_test.go index 22e677616c..3338329be2 100644 --- a/test/e2e/catalog_e2e_test.go +++ b/test/e2e/catalog_e2e_test.go @@ -81,6 +81,115 @@ func TestCatalogLoadingBetweenRestarts(t *testing.T) { t.Logf("Catalog source sucessfully loaded after rescale") } +func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) { + defer cleaner.NotifyTestComplete(t, true) + + globalNS := operatorNamespace + c := newKubeClient(t) + crc := newCRClient(t) + + // Determine which namespace is global. Should be `openshift-marketplace` for OCP 4.2+. + // Locally it is `olm` + namespaces, _ := c.KubernetesInterface().CoreV1().Namespaces().List(metav1.ListOptions{}) + for _, ns := range namespaces.Items { + if ns.GetName() == "openshift-marketplace" { + globalNS = "openshift-marketplace" + } + } + + mainPackageName := genName("nginx-") + + mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) + mainPackageReplacement := fmt.Sprintf("%s-replacement", mainPackageStable) + + stableChannel := "stable" + + mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) + + crdPlural := genName("ins-") + + mainCRD := newCRD(crdPlural) + mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + replacementCSV := newCSV(mainPackageReplacement, testNamespace, mainPackageStable, semver.MustParse("0.2.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + + mainCatalogName := genName("mock-ocs-main-") + + // Create separate manifests for each CatalogSource + mainManifests := []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: mainPackageStable}, + }, + DefaultChannelName: stableChannel, + }, + } + + // Create the initial catalogsource + createInternalCatalogSource(t, c, crc, mainCatalogName, globalNS, mainManifests, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV}) + + // Attempt to get the catalog source before creating install plan + _, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced) + require.NoError(t, err) + + subscriptionSpec := &v1alpha1.SubscriptionSpec{ + CatalogSource: mainCatalogName, + CatalogSourceNamespace: globalNS, + Package: mainPackageName, + Channel: stableChannel, + StartingCSV: mainCSV.GetName(), + InstallPlanApproval: v1alpha1.ApprovalManual, + } + + // Create Subscription + subscriptionName := genName("sub-") + createSubscriptionForCatalogWithSpec(t, crc, testNamespace, subscriptionName, subscriptionSpec) + + subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker) + require.NoError(t, err) + require.NotNil(t, subscription) + + installPlanName := subscription.Status.Install.Name + requiresApprovalChecker := buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseRequiresApproval) + fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, requiresApprovalChecker) + require.NoError(t, err) + + fetchedInstallPlan.Spec.Approved = true + _, err = crc.OperatorsV1alpha1().InstallPlans(testNamespace).Update(fetchedInstallPlan) + require.NoError(t, err) + + _, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvSucceededChecker) + require.NoError(t, err) + + // Update manifest + mainManifests = []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: replacementCSV.GetName()}, + }, + DefaultChannelName: stableChannel, + }, + } + + // Update catalog configmap + updateInternalCatalog(t, c, crc, mainCatalogName, globalNS, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, replacementCSV}, mainManifests) + + // Get updated catalogsource + fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced) + require.NoError(t, err) + + subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradePendingChecker) + require.NoError(t, err) + require.NotNil(t, subscription) + + // Ensure the timing + catalogConnState := fetchedUpdatedCatalog.Status.GRPCConnectionState + subUpdatedTime := subscription.Status.LastUpdated + timeLapse := subUpdatedTime.Sub(catalogConnState.LastConnectTime.Time).Seconds() + require.True(t, timeLapse < 60) +} + func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) { defer cleaner.NotifyTestComplete(t, true) @@ -153,8 +262,8 @@ func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) { fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool { before := fetchedInitialCatalog.Status.ConfigMapResource after := catalog.Status.ConfigMapResource - if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) && - after.ResourceVersion != before.ResourceVersion { + if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) && + after.ResourceVersion != before.ResourceVersion { fmt.Println("catalog updated") return true } diff --git a/test/e2e/subscription_e2e_test.go b/test/e2e/subscription_e2e_test.go index 982cc47277..02e5e28fa0 100644 --- a/test/e2e/subscription_e2e_test.go +++ b/test/e2e/subscription_e2e_test.go @@ -1491,7 +1491,7 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v require.NoError(t, err) // Get initial configmap - configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{}) + configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{}) require.NoError(t, err) // Update package to point to new csv @@ -1515,11 +1515,11 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v configMap.Data[registry.ConfigMapCSVName] = string(csvsRaw) // Update configmap - _, err = c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Update(configMap) + _, err = c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Update(configMap) require.NoError(t, err) // wait for catalog to update - _, err = fetchCatalogSource(t, crc, catalogSourceName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool { + _, err = fetchCatalogSource(t, crc, catalogSourceName, namespace, func(catalog *v1alpha1.CatalogSource) bool { before := fetchedInitialCatalog.Status.ConfigMapResource after := catalog.Status.ConfigMapResource if after != nil && after.LastUpdateTime.After(before.LastUpdateTime.Time) && after.ResourceVersion != before.ResourceVersion {