Skip to content

Commit

Permalink
Merge pull request #476 from alecmerdler/ALM-722
Browse files Browse the repository at this point in the history
Implement Watch for `PackageManifest` API
  • Loading branch information
openshift-merge-robot authored Sep 28, 2018
2 parents ffa56da + a3e3275 commit 1775600
Show file tree
Hide file tree
Showing 10 changed files with 660 additions and 45 deletions.
6 changes: 3 additions & 3 deletions deploy/chart/templates/30_13-packageserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kind: APIService
metadata:
name: v1alpha1.packages.apps.redhat.com
spec:
caBundle: {{ $ca.Cert | b64enc | quote }}
caBundle: {{ b64enc $ca.Cert }}
group: packages.apps.redhat.com
groupPriorityMinimum: 2000
versionPriority: 15
Expand Down Expand Up @@ -77,8 +77,8 @@ metadata:
data:
{{- $altNames := list ( printf "package-server.%s" .Values.namespace ) ( printf "package-server.%s.svc" .Values.namespace ) -}}
{{- $cert := genSignedCert "package-server" nil $altNames 365 $ca }}
tls.crt: {{ $cert.Cert | b64enc | quote }}
tls.key: {{ $cert.Key | b64enc | quote }}
tls.crt: {{ b64enc $cert.Cert }}
tls.key: {{ b64enc $cert.Key }}
---
apiVersion: apps/v1beta1
kind: Deployment
Expand Down
96 changes: 96 additions & 0 deletions pkg/package-server/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 57 additions & 18 deletions pkg/package-server/provider/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
const (
// ConfigMapPackageName is the key for package ConfigMap data
ConfigMapPackageName = "packages"

// ConfigMapCSVName is the key for CSV ConfigMap data
ConfigMapCSVName = "clusterServiceVersions"
)
Expand All @@ -33,23 +32,26 @@ type packageKey struct {
packageName string
}

// InMemoryProvider syncs and provides PackageManifests from the cluster in an in-memory cache
// InMemoryProvider syncs and provides PackageManifests from the cluster using an in-memory cache.
// Should be a global singleton.
type InMemoryProvider struct {
*queueinformer.Operator
mu sync.RWMutex

mu sync.RWMutex
manifests map[packageKey]packagev1alpha1.PackageManifest

add []chan packagev1alpha1.PackageManifest
modify []chan packagev1alpha1.PackageManifest
delete []chan packagev1alpha1.PackageManifest
}

// NewInMemoryProvider returns a pointer to a new InMemoryProvider instance
func NewInMemoryProvider(informers []cache.SharedIndexInformer, queueOperator *queueinformer.Operator) *InMemoryProvider {
// instantiate the in-mem provider
prov := &InMemoryProvider{
Operator: queueOperator,
manifests: make(map[packageKey]packagev1alpha1.PackageManifest),
}

// register CatalogSource informers.
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "catalogsources")
queueInformers := queueinformer.New(
queue,
Expand Down Expand Up @@ -78,7 +80,7 @@ func parsePackageManifestsFromConfigMap(cm *corev1.ConfigMap, catalogSourceName,
csvs := make(map[string]operatorsv1alpha1.ClusterServiceVersion)
csvListYaml, ok := cm.Data[ConfigMapCSVName]
if ok {
logger.Debug("ConfigMap contains CSVsf")
logger.Debug("ConfigMap contains CSVs")
csvListJSON, err := yaml.YAMLToJSON([]byte(csvListYaml))
if err != nil {
log.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
Expand Down Expand Up @@ -218,16 +220,32 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
} else {
// set CreationTimestamp if first time seeing the PackageManifest
manifest.CreationTimestamp = metav1.NewTime(time.Now())
for _, ch := range m.add {
ch <- manifest
}
}

log.Debugf("storing packagemanifest at %+v", key)
m.manifests[key] = manifest
}

return nil
}

func (m *InMemoryProvider) ListPackageManifests(namespace string) (*packagev1alpha1.PackageManifestList, error) {
func (m *InMemoryProvider) Get(namespace, name string) (*packagev1alpha1.PackageManifest, error) {
m.mu.RLock()
defer m.mu.RUnlock()

var manifest packagev1alpha1.PackageManifest
for key, pm := range m.manifests {
if key.packageName == name && key.catalogSourceNamespace == namespace {
manifest = pm
}
}

return &manifest, nil
}

func (m *InMemoryProvider) List(namespace string) (*packagev1alpha1.PackageManifestList, error) {
manifestList := &packagev1alpha1.PackageManifestList{}

m.mu.RLock()
Expand All @@ -237,7 +255,6 @@ func (m *InMemoryProvider) ListPackageManifests(namespace string) (*packagev1alp
var matching []packagev1alpha1.PackageManifest
for _, manifest := range m.manifests {
if namespace == metav1.NamespaceAll || manifest.GetNamespace() == namespace {
// tack on the csv spec for each channel
matching = append(matching, manifest)
}
}
Expand All @@ -248,16 +265,38 @@ func (m *InMemoryProvider) ListPackageManifests(namespace string) (*packagev1alp
return manifestList, nil
}

func (m *InMemoryProvider) GetPackageManifest(namespace, name string) (*packagev1alpha1.PackageManifest, error) {
m.mu.RLock()
defer m.mu.RUnlock()
func (m *InMemoryProvider) Subscribe(stopCh <-chan struct{}) (PackageChan, PackageChan, PackageChan, error) {
m.mu.Lock()
defer m.mu.Unlock()

var manifest packagev1alpha1.PackageManifest
for key, pm := range m.manifests {
if key.packageName == name && key.catalogSourceNamespace == namespace {
manifest = pm
add := make(chan packagev1alpha1.PackageManifest)
modify := make(chan packagev1alpha1.PackageManifest)
delete := make(chan packagev1alpha1.PackageManifest)
addIndex := len(m.add)
modifyIndex := len(m.modify)
deleteIndex := len(m.delete)
m.add = append(m.add, add)
m.modify = append(m.modify, modify)
m.delete = append(m.delete, delete)

go func() {
<-stopCh
m.mu.Lock()
defer m.mu.Unlock()
for _, add := range m.add {
m.add = append(m.add[:addIndex], m.add[:addIndex+1]...)
close(add)
}
}
for _, modify := range m.modify {
m.modify = append(m.modify[:modifyIndex], m.modify[:modifyIndex+1]...)
close(modify)
}
for _, delete := range m.delete {
m.delete = append(m.delete[:deleteIndex], m.delete[:deleteIndex+1]...)
close(delete)
}
return
}()

return &manifest, nil
return add, modify, delete, nil
}
74 changes: 70 additions & 4 deletions pkg/package-server/provider/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func packageManifest(value packageValue) packagev1alpha1.PackageManifest {
}
}

func TestListPackageManifests(t *testing.T) {
func TestList(t *testing.T) {
tests := []struct {
namespace string
storedPackages []packageValue
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestListPackageManifests(t *testing.T) {
manifests: storedPackages,
}

manifests, err := prov.ListPackageManifests(test.namespace)
manifests, err := prov.List(test.namespace)

require.NoError(t, err)
require.Equal(t, len(test.expectedPackages), len(manifests.Items))
Expand All @@ -74,7 +74,7 @@ func TestListPackageManifests(t *testing.T) {
}
}

func TestGetPackageManifest(t *testing.T) {
func TestGet(t *testing.T) {
tests := []struct {
namespace string
packageName string
Expand Down Expand Up @@ -110,10 +110,76 @@ func TestGetPackageManifest(t *testing.T) {
manifests: storedPackages,
}

manifest, err := prov.GetPackageManifest(test.namespace, test.packageName)
manifest, err := prov.Get(test.namespace, test.packageName)

require.NoError(t, err)
require.EqualValues(t, packageManifest(test.expectedPackage), *manifest)
})
}
}

func TestSubscribe(t *testing.T) {
tests := []struct {
namespace string
storedPackages []packageValue
subscribers int
description string
}{
{
namespace: "default",
storedPackages: []packageValue{},
subscribers: 1,
description: "NoPackages",
},
{
namespace: "default",
storedPackages: []packageValue{{name: "etcd", namespace: "default"}, {name: "prometheus", namespace: "local"}},
subscribers: 1,
description: "SingleSubscriber",
},
{
namespace: metav1.NamespaceAll,
storedPackages: []packageValue{{name: "etcd", namespace: "default"}, {name: "prometheus", namespace: "local"}},
subscribers: 5,
description: "ManySubscribers",
},
}

type subscriber struct {
add <-chan packagev1alpha1.PackageManifest
modify <-chan packagev1alpha1.PackageManifest
delete <-chan packagev1alpha1.PackageManifest
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
prov := &InMemoryProvider{Operator: &queueinformer.Operator{}}
stopCh := make(chan struct{})

subscribers := make([]subscriber, test.subscribers)
for i := range subscribers {
add, modify, delete, err := prov.Subscribe(stopCh)
require.NoError(t, err)
subscribers[i] = subscriber{add, modify, delete}
}

for _, add := range prov.add {
go func(addCh chan packagev1alpha1.PackageManifest) {
for _, value := range test.storedPackages {
addCh <- packageManifest(value)
}
close(addCh)
return
}(add)
}

for _, sub := range subscribers {
i := 0
for manifest := range sub.add {
require.EqualValues(t, manifest, packageManifest(test.storedPackages[i]))
i = i + 1
}
}
})
}
}
7 changes: 5 additions & 2 deletions pkg/package-server/provider/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apis/packagemanifest/v1alpha1"
)

type PackageChan <-chan v1alpha1.PackageManifest

type PackageManifestProvider interface {
ListPackageManifests(namespace string) (*v1alpha1.PackageManifestList, error)
GetPackageManifest(namespace, name string) (*v1alpha1.PackageManifest, error)
Get(namespace, name string) (*v1alpha1.PackageManifest, error)
List(namespace string) (*v1alpha1.PackageManifestList, error)
Subscribe(stopCh <-chan struct{}) (add, modify, delete PackageChan, err error)
}
Loading

0 comments on commit 1775600

Please sign in to comment.