Skip to content

Commit

Permalink
Enable multiple namespaces sync if catsrc is updated in global ns
Browse files Browse the repository at this point in the history
Currently, if CatalogSource is updated and connection is Ready,
only the namespace where CatalogSource resides gets resynced. Now, a list
of namespaces that contain Subscriptions that use updated CatalogSource
will get resynced instead.

Signed-off-by: Vu Dinh <vdinh@redhat.com>
  • Loading branch information
dinhxuanvu committed Nov 15, 2019
1 parent 1debd99 commit cfd087c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 33 deletions.
88 changes: 55 additions & 33 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,25 @@ const (
type Operator struct {
queueinformer.Operator

logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources *grpc.SourceStore
sourcesLastUpdate sharedtime.SharedTime
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
clientAttenuator *scoped.ClientAttenuator
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources *grpc.SourceStore
sourcesLastUpdate sharedtime.SharedTime
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
catalogSubscriberIndexer map[string]cache.Indexer
clientAttenuator *scoped.ClientAttenuator
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -124,20 +125,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
dynamicClient: dynamicClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
dynamicClient: dynamicClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
resolver: resolver.NewOperatorsV1alpha1Resolver(lister, crClient),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
catalogSubscriberIndexer: map[string]cache.Indexer{},
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
Expand Down Expand Up @@ -202,6 +204,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire Subscriptions
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
if err := subInformer.Informer().AddIndexers(cache.Indexers{index.PresentCatalogIndexFuncKey: index.PresentCatalogIndexFunc}); err != nil {
return nil, err
}
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[namespace] = subIndexer

subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/subs", namespace))
op.subQueueSet.Set(namespace, subQueue)
subSyncer, err := subscription.NewSyncer(
Expand Down Expand Up @@ -339,7 +347,21 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
o.nsResolveQueue.Add(state.Key.Namespace)
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)

if err != nil {
o.nsResolveQueue.Add(state.Key.Namespace)
} else {
namespaces[state.Key.Namespace] = struct{}{}
for ns, _ := range namespaces {
o.nsResolveQueue.Add(ns)
}
}
} else {
o.nsResolveQueue.Add(state.Key.Namespace)
}
default:
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
Expand Down
56 changes: 56 additions & 0 deletions pkg/lib/index/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package indexer

import (
"fmt"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
"k8s.io/client-go/tools/cache"
)

const (
// PresentCatalogIndexFuncKey is the recommended key to use for registering
// the index func with an indexer.
PresentCatalogIndexFuncKey string = "presentcatalogindexfunc"
)

// PresentCatalogIndexFunc returns index from CatalogSource/CatalogSourceNamespace
// of the given object (Subscription)
func PresentCatalogIndexFunc(obj interface{}) ([]string, error) {
sub, ok := obj.(*v1alpha1.Subscription)
if !ok {
return []string{""}, fmt.Errorf("invalid object of type: %T", obj)
}

index := fmt.Sprintf("%s/%s", sub.Spec.CatalogSource, sub.Spec.CatalogSourceNamespace)

return []string{index}, nil
}

// CatalogSubscriberNamespaces returns the list of namespaces of Subscription(s)
// that uses the given CatalogSource (name/namespace)
func CatalogSubscriberNamespaces(indexers map[string]cache.Indexer, name, namespace string) (map[string]struct{}, error) {
nsSet := map[string]struct{}{}
var index string

index = fmt.Sprintf("%s/%s", name, namespace)

for _, indexer := range indexers {
subs, err := indexer.ByIndex(PresentCatalogIndexFuncKey, index)
if err != nil {
return nil, err
}
for _, item := range subs {
s, ok := item.(*v1alpha1.Subscription)
if !ok {
continue
}
if s.Spec.CatalogSource == "" || s.Spec.CatalogSourceNamespace == "" {
continue
}
// Add to set
nsSet[s.GetNamespace()] = struct{}{}
}
}

return nsSet, nil
}

0 comments on commit cfd087c

Please sign in to comment.