diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 67d72ab7d57..d3053c4387f 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -66,24 +66,25 @@ const ( type Operator struct { queueinformer.Operator - logger *logrus.Logger - clock utilclock.Clock - opClient operatorclient.ClientInterface - client versioned.Interface - dynamicClient dynamic.Interface - lister operatorlister.OperatorLister - catsrcQueueSet *queueinformer.ResourceQueueSet - subQueueSet *queueinformer.ResourceQueueSet - ipQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.RateLimitingInterface - namespace string - sources *grpc.SourceStore - sourcesLastUpdate sharedtime.SharedTime - resolver resolver.Resolver - reconciler reconciler.RegistryReconcilerFactory - csvProvidedAPIsIndexer map[string]cache.Indexer - clientAttenuator *scoped.ClientAttenuator - serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier + logger *logrus.Logger + clock utilclock.Clock + opClient operatorclient.ClientInterface + client versioned.Interface + dynamicClient dynamic.Interface + lister operatorlister.OperatorLister + catsrcQueueSet *queueinformer.ResourceQueueSet + subQueueSet *queueinformer.ResourceQueueSet + ipQueueSet *queueinformer.ResourceQueueSet + nsResolveQueue workqueue.RateLimitingInterface + namespace string + sources *grpc.SourceStore + sourcesLastUpdate sharedtime.SharedTime + resolver resolver.Resolver + reconciler reconciler.RegistryReconcilerFactory + csvProvidedAPIsIndexer map[string]cache.Indexer + catalogSubscriberIndexer map[string]cache.Indexer + clientAttenuator *scoped.ClientAttenuator + serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Allocate the new instance of an Operator. op := &Operator{ - Operator: queueOperator, - logger: logger, - clock: clock, - opClient: opClient, - dynamicClient: dynamicClient, - client: crClient, - lister: lister, - namespace: operatorNamespace, - resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient), - catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), - subQueueSet: queueinformer.NewEmptyResourceQueueSet(), - csvProvidedAPIsIndexer: map[string]cache.Indexer{}, - serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), - clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient), + Operator: queueOperator, + logger: logger, + clock: clock, + opClient: opClient, + dynamicClient: dynamicClient, + client: crClient, + lister: lister, + namespace: operatorNamespace, + resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient), + catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), + subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + csvProvidedAPIsIndexer: map[string]cache.Indexer{}, + catalogSubscriberIndexer: map[string]cache.Indexer{}, + serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient), + clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient), } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now) @@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire Subscriptions subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions() op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister()) + if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil { + return nil, err + } + subIndexer := subInformer.Informer().GetIndexer() + op.catalogSubscriberIndexer[namespace] = subIndexer + subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace)) op.subQueueSet.Set(namespace, subQueue) subSyncer, err := subscription.NewSyncer( @@ -339,7 +347,21 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { switch state.State { case connectivity.Ready: - o.nsResolveQueue.Add(state.Key.Namespace) + if o.namespace == state.Key.Namespace { + namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer, + state.Key.Name, state.Key.Namespace) + + if err != nil { + o.nsResolveQueue.Add(state.Key.Namespace) + } else { + namespaces[state.Key.Namespace] = struct{}{} + for ns, _ := range namespaces { + o.nsResolveQueue.Add(ns) + } + } + } else { + o.nsResolveQueue.Add(state.Key.Namespace) + } default: if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change") diff --git a/pkg/lib/index/catalog.go b/pkg/lib/index/catalog.go new file mode 100644 index 00000000000..c23208b6450 --- /dev/null +++ b/pkg/lib/index/catalog.go @@ -0,0 +1,56 @@ +package indexer + +import ( + "fmt" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "k8s.io/client-go/tools/cache" +) + +const ( + // PresentCatalogIndexFuncKey is the recommended key to use for registering + // the index func with an indexer. + PresentCatalogIndexFuncKey string = "presentcatalogindexfunc" +) + +// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace +// of the given object (Subscription) +func PresentCatalogIndexFunc(obj interface{}) ([]string, error) { + sub, ok := obj.(*v1alpha1.Subscription) + if !ok { + return []string{""}, fmt.Errorf("invalid object of type: %T", obj) + } + + index := fmt.Sprintf("%s/%s", sub.Spec.CatalogSource, sub.Spec.CatalogSourceNamespace) + + return []string{index}, nil +} + +// CatalogSubscriberNamespaces returns the list of namespaces of Subscription(s) +// that uses the given CatalogSource (name/namespace) +func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) { + nsSet := map[string]struct{}{} + var index string + + index = fmt.Sprintf("%s/%s", name, namespace) + + for _, indexer := range indexers { + subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index) + if err != nil { + return nil, err + } + for _, item := range subs { + s, ok := item.(*v1alpha1.Subscription) + if !ok { + continue + } + if s.Spec.CatalogSource == "" || s.Spec.CatalogSourceNamespace == "" { + continue + } + // Add to set + nsSet[s.GetNamespace()] = struct{}{} + } + } + + return nsSet, nil +}