Skip to content

Commit

Permalink
feat(catalog): use operatorlister in catalog operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ecordell committed Nov 9, 2018
1 parent 927888b commit 9940b92
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 122 deletions.
196 changes: 140 additions & 56 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ import (
"encoding/json"
"errors"
"fmt"
"k8s.io/client-go/informers"
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

Expand Down Expand Up @@ -46,13 +50,15 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) }
// resolving dependencies in a catalog.
type Operator struct {
*queueinformer.Operator
client versioned.Interface
namespace string
sources map[registry.ResourceKey]registry.Source
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
dependencyResolver resolver.DependencyResolver
subQueue workqueue.RateLimitingInterface
client versioned.Interface
namespace string
sources map[registry.ResourceKey]registry.Source
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
dependencyResolver resolver.DependencyResolver
subQueue workqueue.RateLimitingInterface
catSrcQueue workqueue.RateLimitingInterface
lister operatorlister.OperatorLister
configmapRegistryReconciler *registry.ConfigMapRegistryReconciler
}

Expand Down Expand Up @@ -96,6 +102,7 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapR
Operator: queueOperator,
client: crClient,
namespace: operatorNamespace,
lister: operatorlister.NewLister(),
sources: make(map[registry.ResourceKey]registry.Source),
dependencyResolver: &resolver.MultiSourceResolver{},
}
Expand All @@ -113,6 +120,7 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapR
for _, informer := range catsrcQueueInformer {
op.RegisterQueueInformer(informer)
}
op.catSrcQueue = catsrcQueue

// Register InstallPlan informers.
ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "installplans")
Expand Down Expand Up @@ -143,58 +151,115 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapR
op.RegisterQueueInformer(informer)
}

// Creates registry pods in response to configmaps
informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval)
roleInformer := informerFactory.Rbac().V1().Roles()
roleBindingInformer := informerFactory.Rbac().V1().RoleBindings()
serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts()
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
configMapInformer := informerFactory.Core().V1().ConfigMaps()
handleDelete := &cache.ResourceEventHandlerFuncs{
DeleteFunc: op.handleDeletion,
}
// Set up informers for requeuing catalogs
for _, namespace := range watchedNamespaces {
roleQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "role")
roleBindingQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rolebinding")
serviceAccountQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount")
serviceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "service")
podQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod")
configmapQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "configmap")

informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace))
informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval)
roleInformer := informerFactory.Rbac().V1().Roles()
roleBindingInformer := informerFactory.Rbac().V1().RoleBindings()
serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts()
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
configMapInformer := informerFactory.Core().V1().ConfigMaps()

queueInformers := []*queueinformer.QueueInformer{
queueinformer.NewInformer(roleQueue, roleInformer.Informer(), op.syncObject, handleDelete, "role", metrics.NewMetricsNil()),
queueinformer.NewInformer(roleBindingQueue, roleBindingInformer.Informer(), op.syncObject, handleDelete, "rolebinding", metrics.NewMetricsNil()),
queueinformer.NewInformer(serviceAccountQueue, serviceAccountInformer.Informer(), op.syncObject, handleDelete, "serviceaccount", metrics.NewMetricsNil()),
queueinformer.NewInformer(serviceQueue, serviceInformer.Informer(), op.syncObject, handleDelete, "service", metrics.NewMetricsNil()),
queueinformer.NewInformer(podQueue, podInformer.Informer(), op.syncObject, handleDelete, "pod", metrics.NewMetricsNil()),
queueinformer.NewInformer(configmapQueue, configMapInformer.Informer(), op.syncObject, handleDelete, "configmap", metrics.NewMetricsNil()),
}
for _, q := range queueInformers {
op.RegisterQueueInformer(q)
}

op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister())
op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister())
op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister())
op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister())
op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister())
op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister())
}
op.configmapRegistryReconciler = &registry.ConfigMapRegistryReconciler{
Image: configmapRegistryImage,
Image: configmapRegistryImage,
OpClient: op.OpClient,
RoleLister: roleInformer.Lister(),
RoleBindingLister: roleBindingInformer.Lister(),
ServiceAccountLister: serviceAccountInformer.Lister(),
ServiceLister: serviceInformer.Lister(),
PodLister: podInformer.Lister(),
ConfigMapLister: configMapInformer.Lister(),
}

// register informers for configmapRegistryReconciler
registryInformers := []cache.SharedIndexInformer{
roleInformer.Informer(),
roleBindingInformer.Informer(),
serviceAccountInformer.Informer(),
serviceInformer.Informer(),
podInformer.Informer(),
configMapInformer.Informer(),
}

// TODO: won't this possibly conflict since GVK isn't part of the queue entry?
registryQueueInformers := queueinformer.New(
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "registry"),
registryInformers,
op.syncRegistry,
nil,
"registry",
metrics.NewMetricsNil(),
)
for _, informer := range registryQueueInformers {
op.RegisterQueueInformer(informer)
Lister: op.lister,
}
return op, nil
}

func (o *Operator) syncRegistry(obj interface{}) (syncError error) {
switch obj.(type) {
case *corev1.ConfigMap:
// requeue catalogsource
func (o *Operator) syncObject(obj interface{}) (syncError error) {
// Assert as runtime.Object
runtimeObj, ok := obj.(runtime.Object)
if !ok {
syncError = errors.New("object sync: casting to runtime.Object failed")
log.Warn(syncError.Error())
return
}

gvk := runtimeObj.GetObjectKind().GroupVersionKind()
logger := log.WithFields(log.Fields{
"group": gvk.Group,
"version": gvk.Version,
"kind": gvk.Kind,
})

// Assert as metav1.Object
metaObj, ok := obj.(metav1.Object)
if !ok {
syncError = errors.New("object sync: casting to metav1.Object failed")
logger.Warn(syncError.Error())
return
}
logger = logger.WithFields(log.Fields{
"name": metaObj.GetName(),
"namespace": metaObj.GetNamespace(),
})

logger.Debug("syncing")

if ownerutil.IsOwnedByKind(metaObj, v1alpha1.CatalogSourceKind) {
logger.Debug("requeueing owner CatalogSource")
owner := ownerutil.GetOwnerByKind(metaObj, v1alpha1.CatalogSourceKind)
o.catSrcQueue.AddRateLimited(fmt.Sprintf("%s/%s", metaObj.GetNamespace(), owner.Name))
}

return nil
}

func (o *Operator) handleDeletion(obj interface{}) {
ownee, ok := obj.(metav1.Object)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}

ownee, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Namespace %#v", obj))
return
}
}

if ownerutil.IsOwnedByKind(ownee, v1alpha1.CatalogSourceKind) {
owner := ownerutil.GetOwnerByKind(ownee, v1alpha1.CatalogSourceKind)
o.catSrcQueue.AddRateLimited(fmt.Sprintf("%s/%s", ownee.GetNamespace(), owner.Name))
}
}

func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
catsrc, ok := obj.(*v1alpha1.CatalogSource)
if !ok {
Expand All @@ -203,7 +268,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
}

logger := log.WithFields(log.Fields{
"source": catsrc.GetName(),
"source": catsrc.GetName(),
})

if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
Expand All @@ -220,12 +285,14 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) {

// Get the catalog source's config map
configMap, err := o.configmapRegistryReconciler.ConfigMapLister.ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap)
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err)
}

if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID == configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.ResourceVersion {
sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}

if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() {
// configmap ref nonexistant or updated, write out the new configmap ref to status and exit
out := catsrc.DeepCopy()
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Expand All @@ -239,21 +306,33 @@ func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.Catal
// update source map
o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap)
o.sources[sourceKey] = src
if err != nil {
return err
}

// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil {
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}
o.sourcesLastUpdate = timeNow()
return nil
}

// configmap not parsed to memory, but also not out of date
if _, ok := o.sources[sourceKey]; !ok {
// update source map
o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
src, err := registry.NewInMemoryFromConfigMap(o.OpClient, catsrc.GetNamespace(), catsrc.Spec.ConfigMap)
o.sources[sourceKey] = src
if err != nil {
return err
}
o.sourcesLastUpdate = timeNow()
}

// configmap ref is up to date, continue parsing
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) {
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
Expand All @@ -263,8 +342,13 @@ func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.Catal
logger.WithError(err).Warn("couldn't ensure registry server")
return err
}

if !catsrc.Status.LastSync.Before(&out.Status.LastSync) {
return nil
}

// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil {
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}
o.sourcesLastUpdate = timeNow()
Expand Down
Loading

0 comments on commit 9940b92

Please sign in to comment.