Skip to content

Commit

Permalink
feat(registry): when syncing catalogsources, generate pods that serve
Browse files Browse the repository at this point in the history
the operator registry api for configmaps
  • Loading branch information
ecordell committed Nov 8, 2018
1 parent fb9d49a commit e2aa9e5
Show file tree
Hide file tree
Showing 6 changed files with 795 additions and 39 deletions.
2 changes: 1 addition & 1 deletion cmd/catalog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
go http.ListenAndServe(":8080", nil)

// Create a new instance of the operator.
catalogOperator, err := catalog.NewOperator(*kubeConfigPath, *wakeupInterval, *catalogNamespace, strings.Split(*watchedNamespaces, ",")...)
catalogOperator, err := catalog.NewOperator(*kubeConfigPath, *wakeupInterval, *configmapServerImage, *catalogNamespace, strings.Split(*watchedNamespaces, ",")...)
if err != nil {
log.Panicf("error configuring operator: %s", err.Error())
}
Expand Down
142 changes: 111 additions & 31 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"k8s.io/client-go/informers"
"sync"
"time"

Expand Down Expand Up @@ -52,10 +53,11 @@ type Operator struct {
sourcesLastUpdate metav1.Time
dependencyResolver resolver.DependencyResolver
subQueue workqueue.RateLimitingInterface
configmapRegistryReconciler *registry.ConfigMapRegistryReconciler
}

// NewOperator creates a new Catalog Operator.
func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) {
func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, configmapRegistryImage string, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) {
// Default to watching all namespaces.
if watchedNamespaces == nil {
watchedNamespaces = []string{metav1.NamespaceAll}
Expand Down Expand Up @@ -141,57 +143,135 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNa
op.RegisterQueueInformer(informer)
}

// Creates registry pods in response to configmaps
informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval)
roleInformer := informerFactory.Rbac().V1().Roles()
roleBindingInformer := informerFactory.Rbac().V1().RoleBindings()
serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts()
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
configMapInformer := informerFactory.Core().V1().ConfigMaps()
op.configmapRegistryReconciler = &registry.ConfigMapRegistryReconciler{
Image: configmapRegistryImage,
OpClient: op.OpClient,
RoleLister: roleInformer.Lister(),
RoleBindingLister: roleBindingInformer.Lister(),
ServiceAccountLister: serviceAccountInformer.Lister(),
ServiceLister: serviceInformer.Lister(),
PodLister: podInformer.Lister(),
ConfigMapLister: configMapInformer.Lister(),
}

// register informers for configmapRegistryReconciler
registryInformers := []cache.SharedIndexInformer{
roleInformer.Informer(),
roleBindingInformer.Informer(),
serviceAccountInformer.Informer(),
serviceInformer.Informer(),
podInformer.Informer(),
configMapInformer.Informer(),
}

// TODO: won't this possibly conflict since GVK isn't part of the queue entry?
registryQueueInformers := queueinformer.New(
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "registry"),
registryInformers,
op.syncRegistry,
nil,
"registry",
metrics.NewMetricsNil(),
)
for _, informer := range registryQueueInformers {
op.RegisterQueueInformer(informer)
}
return op, nil
}

func (o *Operator) syncRegistry(obj interface{}) (syncError error) {
switch obj.(type) {
case *corev1.ConfigMap:
// requeue catalogsource
}
return nil
}

func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
catsrc, ok := obj.(*v1alpha1.CatalogSource)
if !ok {
log.Debugf("wrong type: %#v", obj)
return fmt.Errorf("casting CatalogSource failed")
}

// Get the catalog source's config map
configMap, err := o.OpClient.KubernetesInterface().CoreV1().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get catalog config map %s when updating status: %s", catsrc.Spec.ConfigMap, err)
logger := log.WithFields(log.Fields{
"source": catsrc.GetName(),
})

if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
return o.syncConfigMapSource(logger, catsrc)
}

o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
_, ok = o.sources[sourceKey]
logger.WithField("sourceType", catsrc.Spec.SourceType).Warn("unknown source type")

// Check for catalog source changes
if ok && catsrc.Status.ConfigMapResource != nil && catsrc.Status.ConfigMapResource.Name == configMap.GetName() && catsrc.Status.ConfigMapResource.ResourceVersion == configMap.GetResourceVersion() {
return nil
}
// TODO: write status about invalid source type

// Update status subresource
out := catsrc.DeepCopy()
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Namespace: configMap.GetNamespace(),
UID: configMap.GetUID(),
ResourceVersion: configMap.GetResourceVersion(),
}
out.Status.LastSync = timeNow()
return nil
}

_, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out)
func (o *Operator) syncConfigMapSource(logger *log.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) {

// Get the catalog source's config map
configMap, err := o.configmapRegistryReconciler.ConfigMapLister.ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("failed to update catalog source %s status: %s", out.GetName(), err)
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err)
}

// Create a new in-mem registry
src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("failed to create catalog source from ConfigMap %s: %s", out.Spec.ConfigMap, err)
if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID == configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.ResourceVersion {
// configmap ref nonexistant or updated, write out the new configmap ref to status and exit
out := catsrc.DeepCopy()
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Namespace: configMap.GetNamespace(),
UID: configMap.GetUID(),
ResourceVersion: configMap.GetResourceVersion(),
}
out.Status.LastSync = timeNow()

// update source map
o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
sourceKey := registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
src, err := registry.NewInMemoryFromConfigMap(o.OpClient, out.GetNamespace(), out.Spec.ConfigMap)
o.sources[sourceKey] = src
if err != nil {
return err
}

// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil {
return err
}
o.sourcesLastUpdate = timeNow()
return nil
}

// Update sources map
o.sources[sourceKey] = src
o.sourcesLastUpdate = timeNow()
// configmap ref is up to date, continue parsing
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) {
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it

out := catsrc.DeepCopy()
if err := o.configmapRegistryReconciler.EnsureRegistryServer(out); err != nil {
logger.WithError(err).Warn("couldn't ensure registry server")
return err
}
// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err!= nil {
return err
}
o.sourcesLastUpdate = timeNow()
return nil
}

// no updates
return nil
}

Expand Down
88 changes: 83 additions & 5 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package catalog

import (
"errors"
"fmt"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"testing"
"time"

"github.com/ghodss/yaml"

Expand Down Expand Up @@ -139,6 +143,32 @@ func TestSyncCatalogSources(t *testing.T) {
expectedStatus *v1alpha1.CatalogSourceStatus
expectedError error
}{
{
testName: "CatalogSourceWithInvalidSourceType",
operatorNamespace: "cool-namespace",
catalogSource: &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Name: "cool-catalog",
Namespace: "cool-namespace",
UID: types.UID("catalog-uid"),
},
Spec: v1alpha1.CatalogSourceSpec{
ConfigMap: "cool-configmap",
SourceType: "nope",
},
},
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "cool-configmap",
Namespace: "cool-namespace",
UID: types.UID("configmap-uid"),
ResourceVersion: "resource-version",
},
Data: fakeConfigMapData(),
},
expectedStatus: nil,
expectedError: nil,
},
{
testName: "CatalogSourceWithBackingConfigMap",
operatorNamespace: "cool-namespace",
Expand All @@ -150,6 +180,7 @@ func TestSyncCatalogSources(t *testing.T) {
},
Spec: v1alpha1.CatalogSourceSpec{
ConfigMap: "cool-configmap",
SourceType: v1alpha1.SourceTypeInternal,
},
},
configMap: &corev1.ConfigMap{
Expand Down Expand Up @@ -182,6 +213,7 @@ func TestSyncCatalogSources(t *testing.T) {
},
Spec: v1alpha1.CatalogSourceSpec{
ConfigMap: "cool-configmap",
SourceType: v1alpha1.SourceTypeConfigmap,
},
Status: v1alpha1.CatalogSourceStatus{
ConfigMapResource: &v1alpha1.ConfigMapResourceReference{
Expand Down Expand Up @@ -222,6 +254,7 @@ func TestSyncCatalogSources(t *testing.T) {
},
Spec: v1alpha1.CatalogSourceSpec{
ConfigMap: "cool-configmap",
SourceType: v1alpha1.SourceTypeConfigmap,
},
},
configMap: &corev1.ConfigMap{
Expand All @@ -234,7 +267,7 @@ func TestSyncCatalogSources(t *testing.T) {
Data: map[string]string{},
},
expectedStatus: nil,
expectedError: errors.New("failed to create catalog source from ConfigMap cool-configmap: error parsing ConfigMap cool-configmap: no valid resources found"),
expectedError: errors.New("error parsing ConfigMap cool-configmap: no valid resources found"),
},
{
testName: "CatalogSourceWithMissingConfigMap",
Expand All @@ -247,21 +280,25 @@ func TestSyncCatalogSources(t *testing.T) {
},
Spec: v1alpha1.CatalogSourceSpec{
ConfigMap: "cool-configmap",
SourceType: v1alpha1.SourceTypeConfigmap,
},
},
configMap: &corev1.ConfigMap{},
expectedStatus: nil,
expectedError: errors.New("failed to get catalog config map cool-configmap when updating status: configmaps \"cool-configmap\" not found"),
expectedError: errors.New("failed to get catalog config map cool-configmap: configmap \"cool-configmap\" not found"),
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
stopc := make(chan struct{})
defer close(stopc)

// Create existing objects
clientObjs := []runtime.Object{tt.catalogSource}
k8sObjs := []runtime.Object{tt.configMap}

// Create test operator
op, err := NewFakeOperator(clientObjs, k8sObjs, nil, nil, resolver, tt.operatorNamespace)
op, err := NewFakeOperator(clientObjs, k8sObjs, nil, nil, resolver, tt.operatorNamespace, stopc)
require.NoError(t, err)

// Run sync
Expand Down Expand Up @@ -373,8 +410,8 @@ func fakeConfigMapData() map[string]string {
return data
}

// NewFakeOprator creates a new operator using fake clients
func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string) (*Operator, error) {
// NewFakeOperator creates a new operator using fake clients
func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extObjs []runtime.Object, regObjs []runtime.Object, resolver resolver.DependencyResolver, namespace string, stopc <- chan struct{}) (*Operator, error) {
// Create client fakes
clientFake := fake.NewSimpleClientset(clientObjs...)
opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), apiextensionsfake.NewSimpleClientset(extObjs...), apiregistrationfake.NewSimpleClientset(regObjs...))
Expand All @@ -385,6 +422,15 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO
return nil, err
}

// Creates registry pods in response to configmaps
informerFactory := informers.NewSharedInformerFactory(opClientFake.KubernetesInterface(), 5*time.Second)
roleInformer := informerFactory.Rbac().V1().Roles()
roleBindingInformer := informerFactory.Rbac().V1().RoleBindings()
serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts()
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
configMapInformer := informerFactory.Core().V1().ConfigMaps()

// Create the new operator
queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake)
op := &Operator{
Expand All @@ -395,6 +441,38 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO
dependencyResolver: resolver,
}

op.configmapRegistryReconciler = &registry.ConfigMapRegistryReconciler{
Image: "test:pod",
OpClient: op.OpClient,
RoleLister: roleInformer.Lister(),
RoleBindingLister: roleBindingInformer.Lister(),
ServiceAccountLister: serviceAccountInformer.Lister(),
ServiceLister: serviceInformer.Lister(),
PodLister: podInformer.Lister(),
ConfigMapLister: configMapInformer.Lister(),
}

// register informers for configmapRegistryReconciler
registryInformers := []cache.SharedIndexInformer{
roleInformer.Informer(),
roleBindingInformer.Informer(),
serviceAccountInformer.Informer(),
serviceInformer.Informer(),
podInformer.Informer(),
configMapInformer.Informer(),
}

var hasSyncedCheckFns []cache.InformerSynced
for _, informer := range registryInformers {
op.RegisterInformer(informer)
hasSyncedCheckFns = append(hasSyncedCheckFns, informer.HasSynced)
go informer.Run(stopc)
}

if ok := cache.WaitForCacheSync(stopc, hasSyncedCheckFns...); !ok {
return nil, fmt.Errorf("failed to wait for caches to sync")
}

return op, nil
}

Expand Down
Loading

0 comments on commit e2aa9e5

Please sign in to comment.