Skip to content

Commit

Permalink
feat(catalog): add grpc sourcetype to CatalogSources
Browse files Browse the repository at this point in the history
grpc sourcetype takes a new `image` field which specifies an image
which is expected to serve a registry API over grpc on port 50051.
  • Loading branch information
ecordell committed Jan 9, 2019
1 parent 431a700 commit da77153
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 98 deletions.
5 changes: 5 additions & 0 deletions deploy/chart/templates/0000_30_05-catalogsource.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ spec:
enum:
- internal # deprecated
- configmap
- grpc

configMap:
type: string
description: The name of a ConfigMap that holds the entries for an in-memory catalog.

image:
type: string
description: An image that serves a grpc registry. Only valid for `grpc` sourceType.

displayName:
type: string
description: Pretty name for display
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/apis/operators/v1alpha1/catalogsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ type SourceType string
const (
SourceTypeInternal SourceType = "internal"
SourceTypeConfigmap SourceType = "configmap"
SourceTypeGrpc SourceType = "grpc"
)

type CatalogSourceSpec struct {
SourceType SourceType `json:"sourceType"`
ConfigMap string `json:"configMap,omitempty"`
Image string `json:"image,omitempty"`
Secrets []string `json:"secrets,omitempty"`

// Metadata
Expand Down
101 changes: 45 additions & 56 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) }
// resolving dependencies in a catalog.
type Operator struct {
*queueinformer.Operator
client versioned.Interface
lister operatorlister.OperatorLister
namespace string
sources map[resolver.CatalogKey]resolver.SourceRef
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
resolver resolver.Resolver
subQueue workqueue.RateLimitingInterface
catSrcQueue workqueue.RateLimitingInterface
configmapRegistryReconciler registry.RegistryReconciler
client versioned.Interface
lister operatorlister.OperatorLister
namespace string
sources map[resolver.CatalogKey]resolver.SourceRef
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
resolver resolver.Resolver
subQueue workqueue.RateLimitingInterface
catSrcQueue workqueue.RateLimitingInterface
registryPodReconciler registry.RegistryReconciler
}

// NewOperator creates a new Catalog Operator.
Expand Down Expand Up @@ -207,7 +207,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister())
op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister())
}
op.configmapRegistryReconciler = &registry.ConfigMapRegistryReconciler{
op.registryPodReconciler = &registry.RegistryPodReconciler{
Image: configmapRegistryImage,
OpClient: op.OpClient,
Lister: op.lister,
Expand Down Expand Up @@ -303,53 +303,40 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
"source": catsrc.GetName(),
})

if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
return o.syncConfigMapSource(logger, catsrc)
}

logger.WithField("sourceType", catsrc.Spec.SourceType).Warn("unknown source type")

// TODO: write status about invalid source type

return nil
}

func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.CatalogSource) (syncError error) {
// Get the catalog source's config map
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err)
}

out := catsrc.DeepCopy()
sourceKey := resolver.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}

if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() {
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Namespace: configMap.GetNamespace(),
UID: configMap.GetUID(),
ResourceVersion: configMap.GetResourceVersion(),
if catsrc.Spec.SourceType == v1alpha1.SourceTypeInternal || catsrc.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
// Get the catalog source's config map
configMap, err := o.lister.CoreV1().ConfigMapLister().ConfigMaps(catsrc.GetNamespace()).Get(catsrc.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("failed to get catalog config map %s: %s", catsrc.Spec.ConfigMap, err)
}
out.Status.LastSync = timeNow()

// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}
if catsrc.Status.ConfigMapResource == nil || catsrc.Status.ConfigMapResource.UID != configMap.GetUID() || catsrc.Status.ConfigMapResource.ResourceVersion != configMap.GetResourceVersion() {
// configmap ref nonexistent or updated, write out the new configmap ref to status and exit
out.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Namespace: configMap.GetNamespace(),
UID: configMap.GetUID(),
ResourceVersion: configMap.GetResourceVersion(),
}
out.Status.LastSync = timeNow()

o.sourcesLastUpdate = timeNow()
// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}

return nil
o.sourcesLastUpdate = timeNow()

return nil
}
}

// configmap ref is up to date, continue parsing
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) {
// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it

out := catsrc.DeepCopy()
if err := o.configmapRegistryReconciler.EnsureRegistryServer(out); err != nil {
if err := o.registryPodReconciler.EnsureRegistryServer(out); err != nil {
logger.WithError(err).Warn("couldn't ensure registry server")
return err
}
Expand All @@ -359,7 +346,7 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca
}

// update status
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}

Expand All @@ -368,6 +355,7 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca
return nil
}

// update operator's view of sources
sourcesUpdated := false
func() {
o.sourcesLock.Lock()
Expand All @@ -389,13 +377,14 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca
}
}()

if sourcesUpdated {
// record that we've done work here onto the status
out := catsrc.DeepCopy()
out.Status.LastSync = timeNow()
if _, err = o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}
if !sourcesUpdated {
return nil
}

// record that we've done work here onto the status
out.Status.LastSync = timeNow()
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
return err
}

// Sync any dependent Subscriptions
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO
resolver: &fakes.FakeResolver{},
}

op.configmapRegistryReconciler = &registry.ConfigMapRegistryReconciler{
op.registryPodReconciler = &registry.RegistryPodReconciler{
Image: "test:pod",
OpClient: op.OpClient,
Lister: lister,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestSyncSubscriptions(t *testing.T) {
o, _, err := NewFakeOperator(tt.fields.existingOLMObjs, tt.fields.existingObjects, nil, nil, testNamespace, stopCh)
require.NoError(t, err)

o.configmapRegistryReconciler = &fakes.FakeRegistryReconciler{
o.registryPodReconciler = &fakes.FakeRegistryReconciler{
EnsureRegistryServerStub: func(source *v1alpha1.CatalogSource) error {
return nil
},
Expand Down
84 changes: 51 additions & 33 deletions pkg/controller/registry/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ func (s *catalogSourceDecorator) Selector() labels.Selector {
}

func (s *catalogSourceDecorator) Labels() map[string]string {
return map[string]string{
"olm.catalogSource": s.GetName(),
"olm.configMapResourceVersion": s.Status.ConfigMapResource.ResourceVersion,
labels := map[string]string{
"olm.catalogSource": s.GetName(),
}
if s.Spec.SourceType == v1alpha1.SourceTypeInternal || s.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
labels["olm.configMapResourceVersion"] = s.Status.ConfigMapResource.ResourceVersion
}
return labels
}

func (s *catalogSourceDecorator) ConfigMapChanges(configMap *v1.ConfigMap) bool {
Expand Down Expand Up @@ -178,15 +181,15 @@ type RegistryReconciler interface {
EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error
}

type ConfigMapRegistryReconciler struct {
type RegistryPodReconciler struct {
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
Image string
}

var _ RegistryReconciler = &ConfigMapRegistryReconciler{}
var _ RegistryReconciler = &RegistryPodReconciler{}

func (c *ConfigMapRegistryReconciler) currentService(source catalogSourceDecorator) *v1.Service {
func (c *RegistryPodReconciler) currentService(source catalogSourceDecorator) *v1.Service {
serviceName := source.Service().GetName()
service, err := c.Lister.CoreV1().ServiceLister().Services(source.GetNamespace()).Get(serviceName)
if err != nil {
Expand All @@ -196,7 +199,7 @@ func (c *ConfigMapRegistryReconciler) currentService(source catalogSourceDecorat
return service
}

func (c *ConfigMapRegistryReconciler) currentServiceAccount(source catalogSourceDecorator) *v1.ServiceAccount {
func (c *RegistryPodReconciler) currentServiceAccount(source catalogSourceDecorator) *v1.ServiceAccount {
serviceAccountName := source.ServiceAccount().GetName()
serviceAccount, err := c.Lister.CoreV1().ServiceAccountLister().ServiceAccounts(source.GetNamespace()).Get(serviceAccountName)
if err != nil {
Expand All @@ -206,7 +209,7 @@ func (c *ConfigMapRegistryReconciler) currentServiceAccount(source catalogSource
return serviceAccount
}

func (c *ConfigMapRegistryReconciler) currentRole(source catalogSourceDecorator) *rbacv1.Role {
func (c *RegistryPodReconciler) currentRole(source catalogSourceDecorator) *rbacv1.Role {
roleName := source.Role().GetName()
role, err := c.Lister.RbacV1().RoleLister().Roles(source.GetNamespace()).Get(roleName)
if err != nil {
Expand All @@ -216,7 +219,7 @@ func (c *ConfigMapRegistryReconciler) currentRole(source catalogSourceDecorator)
return role
}

func (c *ConfigMapRegistryReconciler) currentRoleBinding(source catalogSourceDecorator) *rbacv1.RoleBinding {
func (c *RegistryPodReconciler) currentRoleBinding(source catalogSourceDecorator) *rbacv1.RoleBinding {
roleBindingName := source.RoleBinding().GetName()
roleBinding, err := c.Lister.RbacV1().RoleBindingLister().RoleBindings(source.GetNamespace()).Get(roleBindingName)
if err != nil {
Expand All @@ -226,7 +229,7 @@ func (c *ConfigMapRegistryReconciler) currentRoleBinding(source catalogSourceDec
return roleBinding
}

func (c *ConfigMapRegistryReconciler) currentPods(source catalogSourceDecorator, image string) []*v1.Pod {
func (c *RegistryPodReconciler) currentPods(source catalogSourceDecorator, image string) []*v1.Pod {
podName := source.Pod(image).GetName()
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(source.Selector())
if err != nil {
Expand All @@ -239,7 +242,7 @@ func (c *ConfigMapRegistryReconciler) currentPods(source catalogSourceDecorator,
return pods
}

func (c *ConfigMapRegistryReconciler) currentPodsWithCorrectResourceVersion(source catalogSourceDecorator, image string) []*v1.Pod {
func (c *RegistryPodReconciler) currentPodsWithCorrectResourceVersion(source catalogSourceDecorator, image string) []*v1.Pod {
podName := source.Pod(image).GetName()
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels()))
if err != nil {
Expand All @@ -253,30 +256,45 @@ func (c *ConfigMapRegistryReconciler) currentPodsWithCorrectResourceVersion(sour
}

// Ensure that all components of registry server are up to date.
func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error {
func (c *RegistryPodReconciler) EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error {
source := catalogSourceDecorator{catalogSource}

// fetch configmap first, exit early if we can't find it
configMap, err := c.Lister.CoreV1().ConfigMapLister().ConfigMaps(source.GetNamespace()).Get(source.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("unable to get configmap %s/%s from cache", source.GetNamespace(), source.Spec.ConfigMap)
image := c.Image
if source.Spec.SourceType == "grpc" {
image = source.Spec.Image
}

if source.ConfigMapChanges(configMap) {
catalogSource.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Namespace: configMap.GetNamespace(),
UID: configMap.GetUID(),
ResourceVersion: configMap.GetResourceVersion(),
}
if image == "" {
return fmt.Errorf("no image for registry")
}

// if service status is nil, we force create every object to ensure they're created the first time
overwrite := source.Status.RegistryServiceStatus == nil
overwritePod := overwrite

if source.Spec.SourceType == v1alpha1.SourceTypeConfigmap || source.Spec.SourceType == v1alpha1.SourceTypeInternal {
// fetch configmap first, exit early if we can't find it
configMap, err := c.Lister.CoreV1().ConfigMapLister().ConfigMaps(source.GetNamespace()).Get(source.Spec.ConfigMap)
if err != nil {
return fmt.Errorf("unable to get configmap %s/%s from cache", source.GetNamespace(), source.Spec.ConfigMap)
}

// recreate the pod if there are configmap changes; this causes the db to be rebuilt
// recreate the pod if no existing pod is serving the latest configmap
overwritePod := overwrite || source.ConfigMapChanges(configMap) || len(c.currentPodsWithCorrectResourceVersion(source, c.Image)) == 0
if source.ConfigMapChanges(configMap) {
catalogSource.Status.ConfigMapResource = &v1alpha1.ConfigMapResourceReference{
Name: configMap.GetName(),
Namespace: configMap.GetNamespace(),
UID: configMap.GetUID(),
ResourceVersion: configMap.GetResourceVersion(),
}

// recreate the pod if there are configmap changes; this causes the db to be rebuilt
overwritePod = true
}

// recreate the pod if no existing pod is serving the latest image
if len(c.currentPodsWithCorrectResourceVersion(source, image)) == 0 {
overwritePod = true
}
}

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensureServiceAccount(source, overwrite); err != nil {
Expand All @@ -289,7 +307,7 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alph
return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
}
if err := c.ensurePod(source, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(c.Image).GetName())
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(image).GetName())
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", source.Service().GetName())
Expand All @@ -308,7 +326,7 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alph
return nil
}

func (c *ConfigMapRegistryReconciler) ensureServiceAccount(source catalogSourceDecorator, overwrite bool) error {
func (c *RegistryPodReconciler) ensureServiceAccount(source catalogSourceDecorator, overwrite bool) error {
serviceAccount := source.ServiceAccount()
if c.currentServiceAccount(source) != nil {
if !overwrite {
Expand All @@ -322,7 +340,7 @@ func (c *ConfigMapRegistryReconciler) ensureServiceAccount(source catalogSourceD
return err
}

func (c *ConfigMapRegistryReconciler) ensureRole(source catalogSourceDecorator, overwrite bool) error {
func (c *RegistryPodReconciler) ensureRole(source catalogSourceDecorator, overwrite bool) error {
role := source.Role()
if c.currentRole(source) != nil {
if !overwrite {
Expand All @@ -336,7 +354,7 @@ func (c *ConfigMapRegistryReconciler) ensureRole(source catalogSourceDecorator,
return err
}

func (c *ConfigMapRegistryReconciler) ensureRoleBinding(source catalogSourceDecorator, overwrite bool) error {
func (c *RegistryPodReconciler) ensureRoleBinding(source catalogSourceDecorator, overwrite bool) error {
roleBinding := source.RoleBinding()
if c.currentRoleBinding(source) != nil {
if !overwrite {
Expand All @@ -350,7 +368,7 @@ func (c *ConfigMapRegistryReconciler) ensureRoleBinding(source catalogSourceDeco
return err
}

func (c *ConfigMapRegistryReconciler) ensurePod(source catalogSourceDecorator, overwrite bool) error {
func (c *RegistryPodReconciler) ensurePod(source catalogSourceDecorator, overwrite bool) error {
pod := source.Pod(c.Image)
currentPods := c.currentPods(source, c.Image)
if len(currentPods) > 0 {
Expand All @@ -370,7 +388,7 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source catalogSourceDecorator, o
return errors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
}

func (c *ConfigMapRegistryReconciler) ensureService(source catalogSourceDecorator, overwrite bool) error {
func (c *RegistryPodReconciler) ensureService(source catalogSourceDecorator, overwrite bool) error {
service := source.Service()
if c.currentService(source) != nil {
if !overwrite {
Expand Down
Loading

0 comments on commit da77153

Please sign in to comment.