Skip to content

Commit

Permalink
fix panics in package-server
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmerdler committed Dec 4, 2018
1 parent cc1b9ef commit 49d87ef
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 36 deletions.
1 change: 0 additions & 1 deletion cmd/package-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var (
func init() {
flags := cmd.Flags()

// flags.BoolVar(&options.InsecureKubeletTLS, "kubelet-insecure-tls", options.InsecureKubeletTLS, "Do not verify CA of serving certificates presented by Kubelets. For testing purposes only.")
flags.DurationVar(&options.WakeupInterval, "interval", options.WakeupInterval, "Interval at which to re-sync CatalogSources")
flags.StringVar(&options.GlobalNamespace, "global-namespace", options.GlobalNamespace, "Name of the namespace where the global CatalogSources are located")
flags.StringSliceVar(&options.WatchedNamespaces, "watched-namespaces", options.WatchedNamespaces, "List of namespaces the package-server will watch watch for CatalogSources")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
k8s.io/code-generator v0.0.0-20180904193909-8c97d6ab64da
k8s.io/gengo v0.0.0-20181106084056-51747d6e00da // indirect
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92 // indirect
k8s.io/kube-aggregator v0.0.0-20181121072050-af204e4cff09
k8s.io/kube-aggregator v0.0.0-20180905000155-efa32eb095fe
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd
k8s.io/kubernetes v1.11.6-beta.0.0.20181126160157-5933b9771b71
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92 h1:PgoMI/L1Nu5Vmvgm+vGheLuxKST8h6
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-aggregator v0.0.0-20180905000155-efa32eb095fe h1:LM48rywzVEPRg+Os2oUL9/vsztPQGoxmiD3m5VySchw=
k8s.io/kube-aggregator v0.0.0-20180905000155-efa32eb095fe/go.mod h1:8sbzT4QQKDEmSCIbfqjV0sd97GpUT7A4W626sBiYJmU=
k8s.io/kube-aggregator v0.0.0-20181121072050-af204e4cff09 h1:v5wOckd8yeVJcWcnE0xLdW60/Qrd17gXxW24O3aiNxg=
k8s.io/kube-aggregator v0.0.0-20181121072050-af204e4cff09/go.mod h1:8sbzT4QQKDEmSCIbfqjV0sd97GpUT7A4W626sBiYJmU=
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd h1:ggv/Vfza0i5xuhUZyYyxcc25AmQvHY8Zi1C2m8WgBvA=
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.11.6-beta.0.0.20181126160157-5933b9771b71 h1:ZiDzUVY+KNDO1sbcG0hHZokQsNIhjCCCsy06Z4Ck4JA=
Expand Down
73 changes: 41 additions & 32 deletions pkg/package-server/provider/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,22 @@ func parsePackageManifestsFromConfigMap(cm *corev1.ConfigMap, catsrc *operatorsv
logger.Debug("ConfigMap contains CSVs")
csvListJSON, err := yaml.YAMLToJSON([]byte(csvListYaml))
if err != nil {
logrus.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
logger.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
return nil, fmt.Errorf("error loading CSV list yaml from ConfigMap %s: %s", cmName, err)
}

var parsedCSVList []operatorsv1alpha1.ClusterServiceVersion
err = json.Unmarshal([]byte(csvListJSON), &parsedCSVList)
if err != nil {
logrus.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
logger.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
return nil, fmt.Errorf("error parsing CSV list (json) from ConfigMap %s: %s", cmName, err)
}

for _, csv := range parsedCSVList {
found = true

// TODO: add check for invalid CSV definitions
logrus.Debugf("found csv %s", csv.GetName())
logger.Debugf("found csv %s", csv.GetName())
csvs[csv.GetName()] = csv
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func parsePackageManifestsFromConfigMap(cm *corev1.ConfigMap, catsrc *operatorsv
manifest.ObjectMeta.Labels[k] = v
}

logrus.Debugf("retrieved packagemanifest %s", manifest.GetName())
logger.Debugf("retrieved packagemanifest %s", manifest.GetName())
manifests = append(manifests, manifest)
}
}
Expand All @@ -196,6 +196,12 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
return fmt.Errorf("casting catalog source failed")
}

logger := logrus.WithFields(logrus.Fields{
"Action": "Sync CatalogSource",
"name": catsrc.GetName(),
"namespace": catsrc.GetNamespace(),
})

var manifests []packagev1alpha1.PackageManifest

// handle by sourceType
Expand All @@ -217,7 +223,7 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
return fmt.Errorf("catalog source %s in namespace %s source type %s not recognized", catsrc.GetName(), catsrc.GetNamespace(), catsrc.Spec.SourceType)
}

// update manifests
logger.Debug("updating in-memory PackageManifests")
m.mu.Lock()
defer m.mu.Unlock()
for _, manifest := range manifests {
Expand All @@ -228,18 +234,18 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
}

if pm, ok := m.manifests[key]; ok {
// use existing CreationTimestamp
logger.Debugf("package %s already exists", key.packageName)
manifest.CreationTimestamp = pm.ObjectMeta.CreationTimestamp
} else {
// set CreationTimestamp if first time seeing the PackageManifest
logger.Debugf("new package %s found", key.packageName)
manifest.CreationTimestamp = metav1.NewTime(time.Now())
for _, add := range m.add {
if add.namespace == manifest.Status.CatalogSourceNamespace || add.namespace == metav1.NamespaceAll || manifest.Status.CatalogSourceNamespace == m.globalNamespace {
logger.Debugf("sending new package %s to watcher for namespace %s", key.packageName, add.namespace)
add.ch <- manifest
}
}
}

m.manifests[key] = manifest
}

Expand Down Expand Up @@ -276,45 +282,48 @@ func (m *InMemoryProvider) List(namespace string) (*packagev1alpha1.PackageManif
matching = append(matching, pm)
}
}

manifestList.Items = matching
}

return manifestList, nil
}

func (m *InMemoryProvider) Subscribe(namespace string, stopCh <-chan struct{}) (PackageChan, PackageChan, PackageChan, error) {
logger := logrus.WithFields(logrus.Fields{
"Action": "PackageManifest Subscribe",
"namespace": namespace,
})

m.mu.Lock()
defer m.mu.Unlock()

add := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
modify := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
delete := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
addIndex := len(m.add)
modifyIndex := len(m.modify)
deleteIndex := len(m.delete)
m.add = append(m.add, add)
m.modify = append(m.modify, modify)
m.delete = append(m.delete, delete)
addEvent := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
modifyEvent := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
deleteEvent := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
m.add = append(m.add, addEvent)
m.modify = append(m.modify, modifyEvent)
m.delete = append(m.delete, deleteEvent)

removeChan := func(target chan packagev1alpha1.PackageManifest, all []eventChan) []eventChan {
for i, event := range all {
if event.ch == target {
logger.Debugf("closing channel")
close(event.ch)
return append(all[:i], all[i+1:]...)
}
}
return all
}

go func() {
<-stopCh
m.mu.Lock()
defer m.mu.Unlock()
for _, add := range m.add {
m.add = append(m.add[:addIndex], m.add[:addIndex+1]...)
close(add.ch)
}
for _, modify := range m.modify {
m.modify = append(m.modify[:modifyIndex], m.modify[:modifyIndex+1]...)
close(modify.ch)
}
for _, delete := range m.delete {
m.delete = append(m.delete[:deleteIndex], m.delete[:deleteIndex+1]...)
close(delete.ch)
}

m.add = removeChan(addEvent.ch, m.add)
m.modify = removeChan(modifyEvent.ch, m.modify)
m.delete = removeChan(deleteEvent.ch, m.delete)
return
}()

return add.ch, modify.ch, delete.ch, nil
return addEvent.ch, modifyEvent.ch, deleteEvent.ch, nil
}
21 changes: 21 additions & 0 deletions test/e2e/packagemanifest_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func TestPackageManifestLoading(t *testing.T) {
}

watcher, err := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})
defer watcher.Stop()
require.NoError(t, err)
receivedPackage := make(chan bool)
go func() {
event := <-watcher.ResultChan()
pkg := event.Object.(*packagev1alpha1.PackageManifest)
Expand All @@ -93,6 +95,7 @@ func TestPackageManifestLoading(t *testing.T) {
require.NotNil(t, pkg)
require.Equal(t, packageName, pkg.GetName())
require.Equal(t, expectedStatus, pkg.Status)
receivedPackage <- true
return
}()

Expand All @@ -102,8 +105,26 @@ func TestPackageManifestLoading(t *testing.T) {

pm, err := fetchPackageManifest(t, pmc, testNamespace, packageName, packageManifestHasStatus)

require.True(t, <-receivedPackage)
require.NoError(t, err, "error getting package manifest")
require.NotNil(t, pm)
require.Equal(t, packageName, pm.GetName())
require.Equal(t, expectedStatus, pm.Status)
}

func TestPackageManifestMultipleWatches(t *testing.T) {
pmc := newPMClient(t)

watcherA, _ := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})
watcherB, _ := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})
watcherC, _ := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})

defer watcherB.Stop()
defer watcherC.Stop()
watcherA.Stop()

list, err := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).List(metav1.ListOptions{})

require.NoError(t, err)
require.NotEqual(t, 0, len(list.Items))
}

0 comments on commit 49d87ef

Please sign in to comment.