Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1779313: Enable multiple namespaces sync if catsrc is updated in global ns #1125

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 51 additions & 32 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,25 @@ const (
type Operator struct {
queueinformer.Operator

logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources *grpc.SourceStore
sourcesLastUpdate sharedtime.SharedTime
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
clientAttenuator *scoped.ClientAttenuator
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources *grpc.SourceStore
sourcesLastUpdate sharedtime.SharedTime
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
catalogSubscriberIndexer map[string]cache.Indexer
clientAttenuator *scoped.ClientAttenuator
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
dynamicClient: dynamicClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
dynamicClient: dynamicClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
catalogSubscriberIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
Expand Down Expand Up @@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire Subscriptions
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil {
dinhxuanvu marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[namespace] = subIndexer

subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace))
op.subQueueSet.Set(namespace, subQueue)
subSyncer, err := subscription.NewSyncer(
Expand Down Expand Up @@ -339,6 +347,17 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
default:
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/lib/index/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package indexer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: although there are some exceptions, this probably shouldn't be in the lib folder since it contains OLM specific logic.


import (
"fmt"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
"k8s.io/client-go/tools/cache"
)

const (
// PresentCatalogIndexFuncKey is the recommended key to use for registering
// the index func with an indexer.
PresentCatalogIndexFuncKey string = "presentcatalogindexfunc"
)

// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace
// of the given object (Subscription)
func PresentCatalogIndexFunc(obj interface{}) ([]string, error) {
sub, ok := obj.(*v1alpha1.Subscription)
if !ok {
return []string{""}, fmt.Errorf("invalid object of type: %T", obj)
dinhxuanvu marked this conversation as resolved.
Show resolved Hide resolved
}

if sub.Spec.CatalogSource != "" && sub.Spec.CatalogSourceNamespace != "" {
return []string{sub.Spec.CatalogSource + "/" + sub.Spec.CatalogSourceNamespace}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the common convention is namespace/name.

}

return []string{""}, nil
}

// CatalogSubscriberNamespaces returns the list of namespace (as a map with namespace as key)
// which has Suscriptions(s) that subscribe(s) to a given CatalogSource (name/namespace)
func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the result map isn't being used as a map outside of this function -- seems kinda leaky

nsSet := map[string]struct{}{}
index := fmt.Sprintf("%s/%s", name, namespace)

for _, indexer := range indexers {
subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index)
if err != nil {
return nil, err
}
for _, item := range subs {
s, ok := item.(*v1alpha1.Subscription)
if !ok {
continue
}
// Add to set
nsSet[s.GetNamespace()] = struct{}{}
}
}

return nsSet, nil
}
113 changes: 111 additions & 2 deletions test/e2e/catalog_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,115 @@ func TestCatalogLoadingBetweenRestarts(t *testing.T) {
t.Logf("Catalog source sucessfully loaded after rescale")
}

func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) {
defer cleaner.NotifyTestComplete(t, true)

globalNS := operatorNamespace
c := newKubeClient(t)
crc := newCRClient(t)

// Determine which namespace is global. Should be `openshift-marketplace` for OCP 4.2+.
// Locally it is `olm`
namespaces, _ := c.KubernetesInterface().CoreV1().Namespaces().List(metav1.ListOptions{})
for _, ns := range namespaces.Items {
if ns.GetName() == "openshift-marketplace" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it would be nice to take this as a test argument so we can avoid making configuration assumptions in our test logic.

globalNS = "openshift-marketplace"
}
}

mainPackageName := genName("nginx-")

mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName)
mainPackageReplacement := fmt.Sprintf("%s-replacement", mainPackageStable)

stableChannel := "stable"

mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil)

crdPlural := genName("ins-")

mainCRD := newCRD(crdPlural)
mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy)
replacementCSV := newCSV(mainPackageReplacement, testNamespace, mainPackageStable, semver.MustParse("0.2.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy)

mainCatalogName := genName("mock-ocs-main-")

// Create separate manifests for each CatalogSource
mainManifests := []registry.PackageManifest{
{
PackageName: mainPackageName,
Channels: []registry.PackageChannel{
{Name: stableChannel, CurrentCSVName: mainPackageStable},
},
DefaultChannelName: stableChannel,
},
}

// Create the initial catalogsource
createInternalCatalogSource(t, c, crc, mainCatalogName, globalNS, mainManifests, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV})

// Attempt to get the catalog source before creating install plan
_, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced)
require.NoError(t, err)

subscriptionSpec := &v1alpha1.SubscriptionSpec{
CatalogSource: mainCatalogName,
CatalogSourceNamespace: globalNS,
Package: mainPackageName,
Channel: stableChannel,
StartingCSV: mainCSV.GetName(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to specify a StartingCSV or a manual approval strategy

InstallPlanApproval: v1alpha1.ApprovalManual,
}

// Create Subscription
subscriptionName := genName("sub-")
createSubscriptionForCatalogWithSpec(t, crc, testNamespace, subscriptionName, subscriptionSpec)

subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker)
require.NoError(t, err)
require.NotNil(t, subscription)

installPlanName := subscription.Status.Install.Name
requiresApprovalChecker := buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseRequiresApproval)
fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, requiresApprovalChecker)
require.NoError(t, err)

fetchedInstallPlan.Spec.Approved = true
_, err = crc.OperatorsV1alpha1().InstallPlans(testNamespace).Update(fetchedInstallPlan)
require.NoError(t, err)
Comment on lines +152 to +159
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you only put the manifests for the first CSV into the catalog, there's no need to step through updates using a manual approval strategy.


_, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvSucceededChecker)
require.NoError(t, err)

// Update manifest
mainManifests = []registry.PackageManifest{
{
PackageName: mainPackageName,
Channels: []registry.PackageChannel{
{Name: stableChannel, CurrentCSVName: replacementCSV.GetName()},
},
DefaultChannelName: stableChannel,
},
}

// Update catalog configmap
updateInternalCatalog(t, c, crc, mainCatalogName, globalNS, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, replacementCSV}, mainManifests)

// Get updated catalogsource
fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced)
require.NoError(t, err)

subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradePendingChecker)
require.NoError(t, err)
require.NotNil(t, subscription)

// Ensure the timing
catalogConnState := fetchedUpdatedCatalog.Status.GRPCConnectionState
subUpdatedTime := subscription.Status.LastUpdated
timeLapse := subUpdatedTime.Sub(catalogConnState.LastConnectTime.Time).Seconds()
require.True(t, timeLapse < 60)
}

func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) {
defer cleaner.NotifyTestComplete(t, true)

Expand Down Expand Up @@ -153,8 +262,8 @@ func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) {
fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool {
before := fetchedInitialCatalog.Status.ConfigMapResource
after := catalog.Status.ConfigMapResource
if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) &&
after.ResourceVersion != before.ResourceVersion {
if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) &&
after.ResourceVersion != before.ResourceVersion {
fmt.Println("catalog updated")
return true
}
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/subscription_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,7 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v
require.NoError(t, err)

// Get initial configmap
configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{})
configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{})
require.NoError(t, err)

// Update package to point to new csv
Expand All @@ -1515,11 +1515,11 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v
configMap.Data[registry.ConfigMapCSVName] = string(csvsRaw)

// Update configmap
_, err = c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Update(configMap)
_, err = c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Update(configMap)
require.NoError(t, err)

// wait for catalog to update
_, err = fetchCatalogSource(t, crc, catalogSourceName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool {
_, err = fetchCatalogSource(t, crc, catalogSourceName, namespace, func(catalog *v1alpha1.CatalogSource) bool {
before := fetchedInitialCatalog.Status.ConfigMapResource
after := catalog.Status.ConfigMapResource
if after != nil && after.LastUpdateTime.After(before.LastUpdateTime.Time) && after.ResourceVersion != before.ResourceVersion {
Expand Down