-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(catalog): add grpc sourcetype to CatalogSources #656
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ import ( | |
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" | ||
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" | ||
olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors" | ||
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" | ||
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" | ||
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver" | ||
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" | ||
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" | ||
|
@@ -54,16 +54,16 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) } | |
// resolving dependencies in a catalog. | ||
type Operator struct { | ||
*queueinformer.Operator | ||
client versioned.Interface | ||
lister operatorlister.OperatorLister | ||
namespace string | ||
sources map[resolver.CatalogKey]resolver.SourceRef | ||
sourcesLock sync.RWMutex | ||
sourcesLastUpdate metav1.Time | ||
resolver resolver.Resolver | ||
subQueue workqueue.RateLimitingInterface | ||
catSrcQueue workqueue.RateLimitingInterface | ||
configmapRegistryReconciler registry.RegistryReconciler | ||
client versioned.Interface | ||
lister operatorlister.OperatorLister | ||
namespace string | ||
sources map[resolver.CatalogKey]resolver.SourceRef | ||
sourcesLock sync.RWMutex | ||
sourcesLastUpdate metav1.Time | ||
resolver resolver.Resolver | ||
subQueue workqueue.RateLimitingInterface | ||
catSrcQueue workqueue.RateLimitingInterface | ||
reconciler reconciler.ReconcilerReconciler | ||
} | ||
|
||
// NewOperator creates a new Catalog Operator. | ||
|
@@ -207,10 +207,10 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti | |
op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) | ||
op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) | ||
} | ||
op.configmapRegistryReconciler = ®istry.ConfigMapRegistryReconciler{ | ||
Image: configmapRegistryImage, | ||
OpClient: op.OpClient, | ||
Lister: op.lister, | ||
op.reconciler = &reconciler.RegistryReconcilerReconciler{ | ||
ConfigMapServerImage: configmapRegistryImage, | ||
OpClient: op.OpClient, | ||
Lister: op.lister, | ||
} | ||
return op, nil | ||
} | ||
|
@@ -281,15 +281,29 @@ func (o *Operator) handleDeletion(obj interface{}) { | |
} | ||
|
||
func (o *Operator) handleCatSrcDeletion(obj interface{}) { | ||
if catsrc, ok := obj.(*v1alpha1.CatalogSource); ok { | ||
sourceKey := resolver.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} | ||
func() { | ||
o.sourcesLock.Lock() | ||
defer o.sourcesLock.Unlock() | ||
delete(o.sources, sourceKey) | ||
}() | ||
o.Log.WithField("source", sourceKey).Info("removed client for deleted catalogsource") | ||
catsrc, ok := obj.(metav1.Object) | ||
if !ok { | ||
if !ok { | ||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) | ||
if !ok { | ||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) | ||
return | ||
} | ||
|
||
catsrc, ok = tombstone.Obj.(metav1.Object) | ||
if !ok { | ||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Namespace %#v", obj)) | ||
return | ||
} | ||
} | ||
} | ||
sourceKey := resolver.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} | ||
func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to execute in an anonymous function if the outer function returns immediately after this (ie. No real work is being done between the lock and the return)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For future changes, this keeps the unlock close to to the reason we're locking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that - was just commenting on this specific case - it makes sense to be consistent with the rest of the codebase. |
||
o.sourcesLock.Lock() | ||
defer o.sourcesLock.Unlock() | ||
delete(o.sources, sourceKey) | ||
}() | ||
o.Log.WithField("source", sourceKey).Info("removed client for deleted catalogsource") | ||
} | ||
|
||
func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { | ||
|
@@ -303,53 +317,45 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { | |
"source": catsrc.GetName(), | ||
}) | ||
|
||
if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap { | ||
return o.syncConfigMapSource(logger, catsrc) | ||
} | ||
|
||
logger.WithField("sourceType", catsrc.Spec.SourceType).Warn("unknown source type") | ||
|
||
// TODO: write status about invalid source type | ||
|
||
return nil | ||
} | ||
|
||
func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) { | ||
// Get the catalog source's config map | ||
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap) | ||
if err != nil { | ||
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err) | ||
} | ||
|
||
out := catsrc.DeepCopy() | ||
sourceKey := resolver.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()} | ||
|
||
if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() { | ||
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit | ||
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ | ||
Name: configMap.GetName(), | ||
Namespace: configMap.GetNamespace(), | ||
UID: configMap.GetUID(), | ||
ResourceVersion: configMap.GetResourceVersion(), | ||
if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap { | ||
// Get the catalog source's config map | ||
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap) | ||
if err != nil { | ||
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err) | ||
} | ||
out.Status.LastSync = timeNow() | ||
|
||
// update status | ||
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { | ||
return err | ||
} | ||
if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() { | ||
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit | ||
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{ | ||
Name: configMap.GetName(), | ||
Namespace: configMap.GetNamespace(), | ||
UID: configMap.GetUID(), | ||
ResourceVersion: configMap.GetResourceVersion(), | ||
} | ||
out.Status.LastSync = timeNow() | ||
|
||
o.sourcesLastUpdate = timeNow() | ||
// update status | ||
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
o.sourcesLastUpdate = timeNow() | ||
|
||
return nil | ||
} | ||
} | ||
|
||
// configmap ref is up to date, continue parsing | ||
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) { | ||
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it | ||
reconciler := o.reconciler.ReconcilerForSourceType(catsrc.Spec.SourceType) | ||
if reconciler == nil { | ||
return fmt.Errorf("no reconciler for source type %s", catsrc.Spec.SourceType) | ||
} | ||
|
||
out := catsrc.DeepCopy() | ||
if err := o.configmapRegistryReconciler.EnsureRegistryServer(out); err != nil { | ||
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it | ||
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) { | ||
if err := reconciler.EnsureRegistryServer(out); err != nil { | ||
logger.WithError(err).Warn("couldn't ensure registry server") | ||
return err | ||
} | ||
|
@@ -359,7 +365,7 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca | |
} | ||
|
||
// update status | ||
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { | ||
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { | ||
return err | ||
} | ||
|
||
|
@@ -368,6 +374,7 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca | |
return nil | ||
} | ||
|
||
// update operator's view of sources | ||
sourcesUpdated := false | ||
func() { | ||
o.sourcesLock.Lock() | ||
|
@@ -389,13 +396,14 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca | |
} | ||
}() | ||
|
||
if sourcesUpdated { | ||
// record that we've done work here onto the status | ||
out := catsrc.DeepCopy() | ||
out.Status.LastSync = timeNow() | ||
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { | ||
return err | ||
} | ||
if !sourcesUpdated { | ||
return nil | ||
} | ||
|
||
// record that we've done work here onto the status | ||
out.Status.LastSync = timeNow() | ||
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil { | ||
return err | ||
} | ||
|
||
// Sync any dependent Subscriptions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥 still prefer
ReconcilerFactory
.