From a9a97cc0609adfb4062b936ec6db137d3407cc7d Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Thu, 14 Nov 2019 02:24:10 -0500 Subject: [PATCH 1/4] Enable multiple namespaces sync if catsrc is updated in global ns Currently, if CatalogSource is updated and connection is Ready, only the namespace where CatalogSource resides gets resynced. Now, a list of namespaces that contain Subscriptions that use updated CatalogSource will get resynced instead. Signed-off-by: Vu Dinh --- pkg/controller/operators/catalog/operator.go | 83 ++++++++++++-------- pkg/lib/index/catalog.go | 55 +++++++++++++ 2 files changed, 106 insertions(+), 32 deletions(-) create mode 100644 pkg/lib/index/catalog.go diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 67d72ab7d5..4bd98ad031 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -66,24 +66,25 @@ const ( type Operator struct { queueinformer.Operator - logger *logrus.Logger - clock utilclock.Clock - opClient operatorclient.ClientInterface - client versioned.Interface - dynamicClient dynamic.Interface - lister operatorlister.OperatorLister - catsrcQueueSet *queueinformer.ResourceQueueSet - subQueueSet *queueinformer.ResourceQueueSet - ipQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.RateLimitingInterface - namespace string - sources *grpc.SourceStore - sourcesLastUpdate sharedtime.SharedTime - resolver resolver.Resolver - reconciler reconciler.RegistryReconcilerFactory - csvProvidedAPIsIndexer map[string]cache.Indexer - clientAttenuator *scoped.ClientAttenuator - serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier + logger *logrus.Logger + clock utilclock.Clock + opClient operatorclient.ClientInterface + client versioned.Interface + dynamicClient dynamic.Interface + lister operatorlister.OperatorLister + catsrcQueueSet *queueinformer.ResourceQueueSet + subQueueSet *queueinformer.ResourceQueueSet + ipQueueSet *queueinformer.ResourceQueueSet + nsResolveQueue workqueue.RateLimitingInterface + namespace string + sources *grpc.SourceStore + sourcesLastUpdate sharedtime.SharedTime + resolver resolver.Resolver + reconciler reconciler.RegistryReconcilerFactory + csvProvidedAPIsIndexer map[string]cache.Indexer + catalogSubscriberIndexer map[string]cache.Indexer + clientAttenuator *scoped.ClientAttenuator + serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Allocate the new instance of an Operator. op := &Operator{ - Operator: queueOperator, - logger: logger, - clock: clock, - opClient: opClient, - dynamicClient: dynamicClient, - client: crClient, - lister: lister, - namespace: operatorNamespace, - resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient), - catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), - subQueueSet: queueinformer.NewEmptyResourceQueueSet(), - csvProvidedAPIsIndexer: map[string]cache.Indexer{}, - serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), - clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient), + Operator: queueOperator, + logger: logger, + clock: clock, + opClient: opClient, + dynamicClient: dynamicClient, + client: crClient, + lister: lister, + namespace: operatorNamespace, + resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient), + catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), + subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + csvProvidedAPIsIndexer: map[string]cache.Indexer{}, + catalogSubscriberIndexer: map[string]cache.Indexer{}, + serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), + clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient), } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now) @@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire Subscriptions subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions() op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister()) + if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil { + return nil, err + } + subIndexer := subInformer.Informer().GetIndexer() + op.catalogSubscriberIndexer[namespace] = subIndexer + subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace)) op.subQueueSet.Set(namespace, subQueue) subSyncer, err := subscription.NewSyncer( @@ -339,6 +347,17 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { switch state.State { case connectivity.Ready: + if o.namespace == state.Key.Namespace { + subs, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer, + state.Key.Name, state.Key.Namespace) + + if err == nil { + for _, ns := range subs { + o.nsResolveQueue.Add(ns) + } + } + } + o.nsResolveQueue.Add(state.Key.Namespace) default: if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { diff --git a/pkg/lib/index/catalog.go b/pkg/lib/index/catalog.go new file mode 100644 index 0000000000..513054f941 --- /dev/null +++ b/pkg/lib/index/catalog.go @@ -0,0 +1,55 @@ +package indexer + +import ( + "fmt" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "k8s.io/client-go/tools/cache" +) + +const ( + // PresentCatalogIndexFuncKey is the recommended key to use for registering + // the index func with an indexer. + PresentCatalogIndexFuncKey string = "presentcatalogindexfunc" +) + +// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace +// of the given object (Subscription) +func PresentCatalogIndexFunc(obj interface{}) ([]string, error) { + indicies := []string{} + + sub, ok := obj.(*v1alpha1.Subscription) + if !ok { + return indicies, fmt.Errorf("invalid object of type: %T", obj) + } + + indicies = append(indicies, fmt.Sprintf("%s/%s", sub.Spec.CatalogSource, + sub.Spec.CatalogSourceNamespace)) + + return indicies, nil +} + +// CatalogSubscriberNamespaces returns the list of Suscriptions' name and namespace +// (name/namespace as key and namespace as value) that uses the given CatalogSource (name/namespace) +func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]string, error) { + nsSet := map[string]string{} + index := fmt.Sprintf("%s/%s", name, namespace) + + for _, indexer := range indexers { + subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index) + if err != nil { + return nil, err + } + for _, item := range subs { + s, ok := item.(*v1alpha1.Subscription) + if !ok { + continue + } + // Add to set + key := fmt.Sprintf("%s/%s", s.GetName(), s.GetNamespace()) + nsSet[key] = s.GetNamespace() + } + } + + return nsSet, nil +} From bd50fd2879b21a71c15c5c3ca7516bd30227cbfc Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Thu, 21 Nov 2019 17:24:32 -0500 Subject: [PATCH 2/4] Add e2e test case for syncSourceState Signed-off-by: Vu Dinh --- test/e2e/catalog_e2e_test.go | 112 ++++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) diff --git a/test/e2e/catalog_e2e_test.go b/test/e2e/catalog_e2e_test.go index 22e677616c..bab3c3700f 100644 --- a/test/e2e/catalog_e2e_test.go +++ b/test/e2e/catalog_e2e_test.go @@ -81,6 +81,114 @@ func TestCatalogLoadingBetweenRestarts(t *testing.T) { t.Logf("Catalog source sucessfully loaded after rescale") } +func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) { + defer cleaner.NotifyTestComplete(t, true) + + globalNS := operatorNamespace + c := newKubeClient(t) + crc := newCRClient(t) + + // Determine which namespace is global. Should be `openshift-marketplace` for OCP 4.2+. + // Locally it is `olm` + namespaces, _ := c.KubernetesInterface().CoreV1().Namespaces().List(metav1.ListOptions{}) + for _, ns := range namespaces.Items { + if ns.GetName() == "openshift-marketplace" { + globalNS = "openshift-marketplace" + } + } + + mainPackageName := genName("nginx-") + + mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) + mainPackageReplacement := fmt.Sprintf("%s-replacement", mainPackageStable) + + stableChannel := "stable" + + mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) + + crdPlural := genName("ins-") + + mainCRD := newCRD(crdPlural) + mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + replacementCSV := newCSV(mainPackageReplacement, testNamespace, mainPackageStable, semver.MustParse("0.2.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + + mainCatalogName := genName("mock-ocs-main-") + + // Create separate manifests for each CatalogSource + mainManifests := []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: mainPackageStable}, + }, + DefaultChannelName: stableChannel, + }, + } + + // Create the initial catalogsource + createInternalCatalogSource(t, c, crc, mainCatalogName, globalNS, mainManifests, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV}) + + // Attempt to get the catalog source before creating install plan + _, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced) + require.NoError(t, err) + + subscriptionSpec := &v1alpha1.SubscriptionSpec{ + CatalogSource: mainCatalogName, + CatalogSourceNamespace: globalNS, + Package: mainPackageName, + Channel: stableChannel, + StartingCSV: mainCSV.GetName(), + InstallPlanApproval: v1alpha1.ApprovalAutomatic, + } + + // Create Subscription + subscriptionName := genName("sub-") + createSubscriptionForCatalogWithSpec(t, crc, testNamespace, subscriptionName, subscriptionSpec) + + subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateAtLatestChecker) + require.NoError(t, err) + require.NotNil(t, subscription) + _, err = fetchCSV(t, crc, subscription.Status.CurrentCSV, testNamespace, buildCSVConditionChecker(v1alpha1.CSVPhaseSucceeded)) + require.NoError(t, err) + + // Update manifest + mainManifests = []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: replacementCSV.GetName()}, + }, + DefaultChannelName: stableChannel, + }, + } + + // Update catalog configmap + updateInternalCatalog(t, c, crc, mainCatalogName, globalNS, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, replacementCSV}, mainManifests) + + // Get updated catalogsource + fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, func(catalog *v1alpha1.CatalogSource) bool { + registry := catalog.Status.RegistryServiceStatus + connState := catalog.Status.GRPCConnectionState + if registry != nil && connState != nil && connState.LastObservedState == "READY" && !connState.LastConnectTime.IsZero() { + fmt.Printf("catalog %s pod with address %s\n", catalog.GetName(), registry.Address()) + return registryPodHealthy(registry.Address()) + } + fmt.Printf("waiting for catalog pod %v to be available (for sync)\n", catalog.GetName()) + return false + }) + require.NoError(t, err) + + subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradeAvailableChecker) + require.NoError(t, err) + require.NotNil(t, subscription) + + // Ensure the timing + catalogConnState := fetchedUpdatedCatalog.Status.GRPCConnectionState + subUpdatedTime := subscription.Status.LastUpdated + timeLapse := subUpdatedTime.Sub(catalogConnState.LastConnectTime.Time).Seconds() + require.True(t, timeLapse < 60) +} + func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) { defer cleaner.NotifyTestComplete(t, true) @@ -153,8 +261,8 @@ func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) { fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool { before := fetchedInitialCatalog.Status.ConfigMapResource after := catalog.Status.ConfigMapResource - if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) && - after.ResourceVersion != before.ResourceVersion { + if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) && + after.ResourceVersion != before.ResourceVersion { fmt.Println("catalog updated") return true } From 9f85abd42a3d1925578d4773895711f32a549e03 Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Fri, 22 Nov 2019 03:20:46 -0500 Subject: [PATCH 3/4] Fix updateInternalCatalog namespace bug Signed-off-by: Vu Dinh --- test/e2e/subscription_e2e_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/subscription_e2e_test.go b/test/e2e/subscription_e2e_test.go index 982cc47277..02e5e28fa0 100644 --- a/test/e2e/subscription_e2e_test.go +++ b/test/e2e/subscription_e2e_test.go @@ -1491,7 +1491,7 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v require.NoError(t, err) // Get initial configmap - configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{}) + configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{}) require.NoError(t, err) // Update package to point to new csv @@ -1515,11 +1515,11 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v configMap.Data[registry.ConfigMapCSVName] = string(csvsRaw) // Update configmap - _, err = c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Update(configMap) + _, err = c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Update(configMap) require.NoError(t, err) // wait for catalog to update - _, err = fetchCatalogSource(t, crc, catalogSourceName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool { + _, err = fetchCatalogSource(t, crc, catalogSourceName, namespace, func(catalog *v1alpha1.CatalogSource) bool { before := fetchedInitialCatalog.Status.ConfigMapResource after := catalog.Status.ConfigMapResource if after != nil && after.LastUpdateTime.After(before.LastUpdateTime.Time) && after.ResourceVersion != before.ResourceVersion { From 56c7cb854df8f5a794520b8805a4ff2d1598d811 Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Tue, 26 Nov 2019 10:54:39 -0500 Subject: [PATCH 4/4] Change index func to return list of ns instead Signed-off-by: Vu Dinh --- pkg/controller/operators/catalog/operator.go | 4 +-- pkg/lib/index/catalog.go | 22 +++++++-------- test/e2e/catalog_e2e_test.go | 29 ++++++++++---------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 4bd98ad031..8a4171f210 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -348,11 +348,11 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { switch state.State { case connectivity.Ready: if o.namespace == state.Key.Namespace { - subs, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer, + namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer, state.Key.Name, state.Key.Namespace) if err == nil { - for _, ns := range subs { + for ns := range namespaces { o.nsResolveQueue.Add(ns) } } diff --git a/pkg/lib/index/catalog.go b/pkg/lib/index/catalog.go index 513054f941..17c4ee9deb 100644 --- a/pkg/lib/index/catalog.go +++ b/pkg/lib/index/catalog.go @@ -16,23 +16,22 @@ const ( // PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace // of the given object (Subscription) func PresentCatalogIndexFunc(obj interface{}) ([]string, error) { - indicies := []string{} - sub, ok := obj.(*v1alpha1.Subscription) if !ok { - return indicies, fmt.Errorf("invalid object of type: %T", obj) + return []string{""}, fmt.Errorf("invalid object of type: %T", obj) } - indicies = append(indicies, fmt.Sprintf("%s/%s", sub.Spec.CatalogSource, - sub.Spec.CatalogSourceNamespace)) + if sub.Spec.CatalogSource != "" && sub.Spec.CatalogSourceNamespace != "" { + return []string{sub.Spec.CatalogSource + "/" + sub.Spec.CatalogSourceNamespace}, nil + } - return indicies, nil + return []string{""}, nil } -// CatalogSubscriberNamespaces returns the list of Suscriptions' name and namespace -// (name/namespace as key and namespace as value) that uses the given CatalogSource (name/namespace) -func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]string, error) { - nsSet := map[string]string{} +// CatalogSubscriberNamespaces returns the list of namespace (as a map with namespace as key) +// which has Suscriptions(s) that subscribe(s) to a given CatalogSource (name/namespace) +func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) { + nsSet := map[string]struct{}{} index := fmt.Sprintf("%s/%s", name, namespace) for _, indexer := range indexers { @@ -46,8 +45,7 @@ func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namesp continue } // Add to set - key := fmt.Sprintf("%s/%s", s.GetName(), s.GetNamespace()) - nsSet[key] = s.GetNamespace() + nsSet[s.GetNamespace()] = struct{}{} } } diff --git a/test/e2e/catalog_e2e_test.go b/test/e2e/catalog_e2e_test.go index bab3c3700f..3338329be2 100644 --- a/test/e2e/catalog_e2e_test.go +++ b/test/e2e/catalog_e2e_test.go @@ -138,17 +138,27 @@ func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) { Package: mainPackageName, Channel: stableChannel, StartingCSV: mainCSV.GetName(), - InstallPlanApproval: v1alpha1.ApprovalAutomatic, + InstallPlanApproval: v1alpha1.ApprovalManual, } // Create Subscription subscriptionName := genName("sub-") createSubscriptionForCatalogWithSpec(t, crc, testNamespace, subscriptionName, subscriptionSpec) - subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateAtLatestChecker) + subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker) require.NoError(t, err) require.NotNil(t, subscription) - _, err = fetchCSV(t, crc, subscription.Status.CurrentCSV, testNamespace, buildCSVConditionChecker(v1alpha1.CSVPhaseSucceeded)) + + installPlanName := subscription.Status.Install.Name + requiresApprovalChecker := buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseRequiresApproval) + fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, requiresApprovalChecker) + require.NoError(t, err) + + fetchedInstallPlan.Spec.Approved = true + _, err = crc.OperatorsV1alpha1().InstallPlans(testNamespace).Update(fetchedInstallPlan) + require.NoError(t, err) + + _, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvSucceededChecker) require.NoError(t, err) // Update manifest @@ -166,19 +176,10 @@ func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) { updateInternalCatalog(t, c, crc, mainCatalogName, globalNS, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, replacementCSV}, mainManifests) // Get updated catalogsource - fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, func(catalog *v1alpha1.CatalogSource) bool { - registry := catalog.Status.RegistryServiceStatus - connState := catalog.Status.GRPCConnectionState - if registry != nil && connState != nil && connState.LastObservedState == "READY" && !connState.LastConnectTime.IsZero() { - fmt.Printf("catalog %s pod with address %s\n", catalog.GetName(), registry.Address()) - return registryPodHealthy(registry.Address()) - } - fmt.Printf("waiting for catalog pod %v to be available (for sync)\n", catalog.GetName()) - return false - }) + fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced) require.NoError(t, err) - subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradeAvailableChecker) + subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradePendingChecker) require.NoError(t, err) require.NotNil(t, subscription)