Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.3] Bug 1775323: Enable multiple namespaces sync if catsrc is updated in global ns #1166

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 51 additions & 32 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,25 @@ const (
type Operator struct {
queueinformer.Operator

logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources *grpc.SourceStore
sourcesLastUpdate sharedtime.SharedTime
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
clientAttenuator *scoped.ClientAttenuator
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources *grpc.SourceStore
sourcesLastUpdate sharedtime.SharedTime
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
catalogSubscriberIndexer map[string]cache.Indexer
clientAttenuator *scoped.ClientAttenuator
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
dynamicClient: dynamicClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
dynamicClient: dynamicClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
catalogSubscriberIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
Expand Down Expand Up @@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire Subscriptions
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil {
return nil, err
}
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[namespace] = subIndexer

subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace))
op.subQueueSet.Set(namespace, subQueue)
subSyncer, err := subscription.NewSyncer(
Expand Down Expand Up @@ -339,6 +347,17 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
default:
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/lib/index/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package indexer

import (
"fmt"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
"k8s.io/client-go/tools/cache"
)

const (
// PresentCatalogIndexFuncKey is the recommended key to use for registering
// the index func with an indexer.
PresentCatalogIndexFuncKey string = "presentcatalogindexfunc"
)

// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace
// of the given object (Subscription)
func PresentCatalogIndexFunc(obj interface{}) ([]string, error) {
sub, ok := obj.(*v1alpha1.Subscription)
if !ok {
return []string{""}, fmt.Errorf("invalid object of type: %T", obj)
}

if sub.Spec.CatalogSource != "" && sub.Spec.CatalogSourceNamespace != "" {
return []string{sub.Spec.CatalogSource + "/" + sub.Spec.CatalogSourceNamespace}, nil
}

return []string{""}, nil
}

// CatalogSubscriberNamespaces returns the list of namespace (as a map with namespace as key)
// which has Suscriptions(s) that subscribe(s) to a given CatalogSource (name/namespace)
func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) {
nsSet := map[string]struct{}{}
index := fmt.Sprintf("%s/%s", name, namespace)

for _, indexer := range indexers {
subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index)
if err != nil {
return nil, err
}
for _, item := range subs {
s, ok := item.(*v1alpha1.Subscription)
if !ok {
continue
}
// Add to set
nsSet[s.GetNamespace()] = struct{}{}
}
}

return nsSet, nil
}
113 changes: 111 additions & 2 deletions test/e2e/catalog_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,115 @@ func TestCatalogLoadingBetweenRestarts(t *testing.T) {
t.Logf("Catalog source sucessfully loaded after rescale")
}

func TestGlobalCatalogUpdateTriggersSubscriptionSync(t *testing.T) {
defer cleaner.NotifyTestComplete(t, true)

globalNS := operatorNamespace
c := newKubeClient(t)
crc := newCRClient(t)

// Determine which namespace is global. Should be `openshift-marketplace` for OCP 4.2+.
// Locally it is `olm`
namespaces, _ := c.KubernetesInterface().CoreV1().Namespaces().List(metav1.ListOptions{})
for _, ns := range namespaces.Items {
if ns.GetName() == "openshift-marketplace" {
globalNS = "openshift-marketplace"
}
}

mainPackageName := genName("nginx-")

mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName)
mainPackageReplacement := fmt.Sprintf("%s-replacement", mainPackageStable)

stableChannel := "stable"

mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil)

crdPlural := genName("ins-")

mainCRD := newCRD(crdPlural)
mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy)
replacementCSV := newCSV(mainPackageReplacement, testNamespace, mainPackageStable, semver.MustParse("0.2.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy)

mainCatalogName := genName("mock-ocs-main-")

// Create separate manifests for each CatalogSource
mainManifests := []registry.PackageManifest{
{
PackageName: mainPackageName,
Channels: []registry.PackageChannel{
{Name: stableChannel, CurrentCSVName: mainPackageStable},
},
DefaultChannelName: stableChannel,
},
}

// Create the initial catalogsource
createInternalCatalogSource(t, c, crc, mainCatalogName, globalNS, mainManifests, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV})

// Attempt to get the catalog source before creating install plan
_, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced)
require.NoError(t, err)

subscriptionSpec := &v1alpha1.SubscriptionSpec{
CatalogSource: mainCatalogName,
CatalogSourceNamespace: globalNS,
Package: mainPackageName,
Channel: stableChannel,
StartingCSV: mainCSV.GetName(),
InstallPlanApproval: v1alpha1.ApprovalManual,
}

// Create Subscription
subscriptionName := genName("sub-")
createSubscriptionForCatalogWithSpec(t, crc, testNamespace, subscriptionName, subscriptionSpec)

subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker)
require.NoError(t, err)
require.NotNil(t, subscription)

installPlanName := subscription.Status.Install.Name
requiresApprovalChecker := buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseRequiresApproval)
fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, requiresApprovalChecker)
require.NoError(t, err)

fetchedInstallPlan.Spec.Approved = true
_, err = crc.OperatorsV1alpha1().InstallPlans(testNamespace).Update(fetchedInstallPlan)
require.NoError(t, err)

_, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvSucceededChecker)
require.NoError(t, err)

// Update manifest
mainManifests = []registry.PackageManifest{
{
PackageName: mainPackageName,
Channels: []registry.PackageChannel{
{Name: stableChannel, CurrentCSVName: replacementCSV.GetName()},
},
DefaultChannelName: stableChannel,
},
}

// Update catalog configmap
updateInternalCatalog(t, c, crc, mainCatalogName, globalNS, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, replacementCSV}, mainManifests)

// Get updated catalogsource
fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, globalNS, catalogSourceRegistryPodSynced)
require.NoError(t, err)

subscription, err = fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateUpgradePendingChecker)
require.NoError(t, err)
require.NotNil(t, subscription)

// Ensure the timing
catalogConnState := fetchedUpdatedCatalog.Status.GRPCConnectionState
subUpdatedTime := subscription.Status.LastUpdated
timeLapse := subUpdatedTime.Sub(catalogConnState.LastConnectTime.Time).Seconds()
require.True(t, timeLapse < 60)
}

func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) {
defer cleaner.NotifyTestComplete(t, true)

Expand Down Expand Up @@ -153,8 +262,8 @@ func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) {
fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool {
before := fetchedInitialCatalog.Status.ConfigMapResource
after := catalog.Status.ConfigMapResource
if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) &&
after.ResourceVersion != before.ResourceVersion {
if after != nil && before.LastUpdateTime.Before(&after.LastUpdateTime) &&
after.ResourceVersion != before.ResourceVersion {
fmt.Println("catalog updated")
return true
}
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/subscription_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,7 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v
require.NoError(t, err)

// Get initial configmap
configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{})
configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{})
require.NoError(t, err)

// Update package to point to new csv
Expand All @@ -1515,11 +1515,11 @@ func updateInternalCatalog(t *testing.T, c operatorclient.ClientInterface, crc v
configMap.Data[registry.ConfigMapCSVName] = string(csvsRaw)

// Update configmap
_, err = c.KubernetesInterface().CoreV1().ConfigMaps(testNamespace).Update(configMap)
_, err = c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Update(configMap)
require.NoError(t, err)

// wait for catalog to update
_, err = fetchCatalogSource(t, crc, catalogSourceName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool {
_, err = fetchCatalogSource(t, crc, catalogSourceName, namespace, func(catalog *v1alpha1.CatalogSource) bool {
before := fetchedInitialCatalog.Status.ConfigMapResource
after := catalog.Status.ConfigMapResource
if after != nil && after.LastUpdateTime.After(before.LastUpdateTime.Time) && after.ResourceVersion != before.ResourceVersion {
Expand Down