Skip to content

Commit

Permalink
Merge pull request operator-framework#386 from njhale/multi-source-re…
Browse files Browse the repository at this point in the history
…solution

feat(catalog): multiple CatalogSource resolution
  • Loading branch information
njhale authored Jul 24, 2018
2 parents 2ffdd94 + b934bd8 commit 8b0f68e
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 290 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ tags

### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/extensions.json
.history

### VisualStudio ###
Expand Down
6 changes: 6 additions & 0 deletions deploy/chart/templates/06-installplan.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ spec:
- clusterServiceVersionNames
- approval
properties:
source:
type: string
description: Name of the preferred CatalogSource
sourceNamespace:
type: string
description: Namespace that contains the preffered CatalogSource
clusterServiceVersionNames:
type: array
description: A list of the names of the Cluster Services
Expand Down
26 changes: 15 additions & 11 deletions pkg/api/apis/installplan/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
k8sjson "k8s.io/apimachinery/pkg/runtime/serializer/json"

csvv1alpha1 "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/clusterserviceversion/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
)

const (
Expand All @@ -32,6 +33,8 @@ const (

// InstallPlanSpec defines a set of Application resources to be installed
type InstallPlanSpec struct {
CatalogSource string `json:"source"`
CatalogSourceNamespace string `json:"sourceNamespace"`
ClusterServiceVersionNames []string `json:"clusterServiceVersionNames"`
Approval Approval `json:"approval"`
Approved bool `json:"approved"`
Expand Down Expand Up @@ -89,7 +92,7 @@ var ErrInvalidInstallPlan = errors.New("the InstallPlan contains invalid data")
type InstallPlanStatus struct {
Phase InstallPlanPhase `json:"phase"`
Conditions []InstallPlanCondition `json:"conditions,omitempty"`
CatalogSources []string `json:"catalogSources"`
CatalogSources []registry.SourceKey `json:"catalogSources"`
Plan []Step `json:"plan,omitempty"`
}

Expand Down Expand Up @@ -153,12 +156,13 @@ type Step struct {
// StepResource represents the status of a resource to be tracked by an
// InstallPlan.
type StepResource struct {
CatalogSource string `json:"sourceName"`
Group string `json:"group"`
Version string `json:"version"`
Kind string `json:"kind"`
Name string `json:"name"`
Manifest string `json:"manifest,omitempty"`
CatalogSource string `json:"sourceName"`
CatalogSourceNamespace string `json:"sourceNamespace"`
Group string `json:"group"`
Version string `json:"version"`
Kind string `json:"kind"`
Name string `json:"name"`
Manifest string `json:"manifest,omitempty"`
}

// NewStepResourceFromCSV creates an unresolved Step for the provided CSV.
Expand Down Expand Up @@ -217,14 +221,14 @@ type InstallPlan struct {

// EnsureCatalogSource ensures that a CatalogSource is present in the Status
// block of an InstallPlan.
func (p *InstallPlan) EnsureCatalogSource(catalogSourceName string) {
for _, source := range p.Status.CatalogSources {
if source == catalogSourceName {
func (p *InstallPlan) EnsureCatalogSource(sourceKey registry.SourceKey) {
for _, srcKey := range p.Status.CatalogSources {
if srcKey == sourceKey {
return
}
}

p.Status.CatalogSources = append(p.Status.CatalogSources, catalogSourceName)
p.Status.CatalogSources = append(p.Status.CatalogSources, sourceKey)
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/apis/installplan/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
package v1alpha1

import (
registry "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
runtime "k8s.io/apimachinery/pkg/runtime"
)

Expand Down Expand Up @@ -138,7 +139,7 @@ func (in *InstallPlanStatus) DeepCopyInto(out *InstallPlanStatus) {
}
if in.CatalogSources != nil {
in, out := &in.CatalogSources, &out.CatalogSources
*out = make([]string, len(*in))
*out = make([]registry.SourceKey, len(*in))
copy(*out, *in)
}
if in.Plan != nil {
Expand Down
78 changes: 54 additions & 24 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Operator struct {
sources map[registry.SourceKey]registry.Source
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
subscriptions map[registry.SubscriptionKey]subscriptionv1alpha1.Subscription
subscriptionsLock sync.RWMutex
dependencyResolver resolver.DependencyResolver
}

Expand Down Expand Up @@ -88,8 +90,10 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNa
client: crClient,
namespace: operatorNamespace,
sources: make(map[registry.SourceKey]registry.Source),
dependencyResolver: &resolver.SingleSourceResolver{},
subscriptions: make(map[registry.SubscriptionKey]subscriptionv1alpha1.Subscription),
dependencyResolver: &resolver.MultiSourceResolver{},
}

// Register CatalogSource informers.
catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "catalogsources")
catsrcQueueInformer := queueinformer.New(
Expand Down Expand Up @@ -172,6 +176,11 @@ func (o *Operator) syncSubscriptions(obj interface{}) (syncError error) {
}
syncError = fmt.Errorf("error transitioning Subscription: %s and error updating Subscription status: %s", syncError, updateErr)
log.Info(syncError)
} else {
// map subcription
o.subscriptionsLock.Lock()
defer o.subscriptionsLock.Unlock()
o.subscriptions[registry.SubscriptionKey{Name: sub.GetName(), Namespace: sub.GetNamespace()}] = *updatedSub
}
}
return
Expand Down Expand Up @@ -268,36 +277,33 @@ func (o *Operator) ResolvePlan(plan *v1alpha1.InstallPlan) error {
return fmt.Errorf("cannot resolve InstallPlan without any Catalog Sources")
}

// Copy the sources for resolution
o.sourcesLock.RLock()
sourcesSnapshot := make(map[registry.SourceKey]registry.Source)
for key, source := range o.sources {
sourcesSnapshot[key] = source
// Copy the sources for resolution from the included namespaces
includedNamespaces := map[string]struct{}{
o.namespace: struct{}{},
plan.Namespace: struct{}{},
}
o.sourcesLock.RUnlock()

var notFoundErr error
var steps []v1alpha1.Step
for srcKey := range sourcesSnapshot {
log.Debugf("resolving against source %v", srcKey)
plan.EnsureCatalogSource(srcKey.Name)
steps, notFoundErr = o.dependencyResolver.ResolveInstallPlan(sourcesSnapshot, srcKey, CatalogLabel, plan)
if notFoundErr != nil {
continue
}
sourcesSnapshot := o.getSourcesSnapshot(plan, includedNamespaces)

// Set the resolved steps
plan.Status.Plan = steps
// Attempt to resolve the InstallPlan
steps, usedSources, notFoundErr := o.dependencyResolver.ResolveInstallPlan(sourcesSnapshot, CatalogLabel, plan)
if notFoundErr != nil {
return notFoundErr
}

// Set the resolved steps
plan.Status.Plan = steps
plan.Status.CatalogSources = usedSources

// Look up the CatalogSource.
catsrc, err := o.client.CatalogsourceV1alpha1().CatalogSources(o.namespace).Get(srcKey.Name, metav1.GetOptions{})
// Add secrets for each used catalog source
for _, sourceKey := range plan.Status.CatalogSources {
catsrc, err := o.client.CatalogsourceV1alpha1().CatalogSources(sourceKey.Namespace).Get(sourceKey.Name, metav1.GetOptions{})
if err != nil {
return err
}

for _, secretName := range catsrc.Spec.Secrets {
// Attempt to look up the secret.
_, err := o.OpClient.KubernetesInterface().CoreV1().Secrets(plan.Namespace).Get(secretName, metav1.GetOptions{})
_, err := o.OpClient.KubernetesInterface().CoreV1().Secrets(sourceKey.Namespace).Get(secretName, metav1.GetOptions{})
status := v1alpha1.StepStatusUnknown
if k8serrors.IsNotFound(err) {
status = v1alpha1.StepStatusNotPresent
Expand All @@ -319,10 +325,9 @@ func (o *Operator) ResolvePlan(plan *v1alpha1.InstallPlan) error {
Status: status,
}}, plan.Status.Plan...)
}
return nil
}

return notFoundErr
return nil
}

// ExecutePlan applies a planned InstallPlan to a namespace.
Expand Down Expand Up @@ -432,3 +437,28 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {

return nil
}

func (o *Operator) getSourcesSnapshot(plan *v1alpha1.InstallPlan, includedNamespaces map[string]struct{}) []registry.SourceRef {
o.sourcesLock.RLock()
defer o.sourcesLock.RUnlock()
sourcesSnapshot := []registry.SourceRef{}

for key, source := range o.sources {
// Only copy catalog sources in included namespaces
if _, ok := includedNamespaces[key.Namespace]; ok {
ref := registry.SourceRef{
Source: source,
SourceKey: key,
}
if key.Name == plan.Spec.CatalogSource && key.Namespace == plan.Spec.CatalogSourceNamespace {
// Prepend preffered catalog source
sourcesSnapshot = append([]registry.SourceRef{ref}, sourcesSnapshot...)
} else {
// Append the catalog source
sourcesSnapshot = append(sourcesSnapshot, ref)
}
}
}

return sourcesSnapshot
}
7 changes: 7 additions & 0 deletions pkg/controller/operators/catalog/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (o *Operator) syncSubscription(sub *v1alpha1.Subscription) (*v1alpha1.Subsc
log.Infof("installplan %s not found: creating new plan", sub.Status.Install.Name)
sub.Status.Install = nil
}

// Install CSV if doesn't exist
sub.Status.State = v1alpha1.SubscriptionStateUpgradePending
ip := &ipv1alpha1.InstallPlan{
Expand All @@ -96,6 +97,11 @@ func (o *Operator) syncSubscription(sub *v1alpha1.Subscription) (*v1alpha1.Subsc
ownerutil.AddNonBlockingOwner(ip, sub)
ip.SetGenerateName(fmt.Sprintf("install-%s-", sub.Status.CurrentCSV))
ip.SetNamespace(sub.GetNamespace())

// Inherit the subscription's catalog source
ip.Spec.CatalogSource = sub.Spec.CatalogSource
ip.Spec.CatalogSourceNamespace = sub.Spec.CatalogSourceNamespace

res, err := o.client.InstallplanV1alpha1().InstallPlans(sub.GetNamespace()).Create(ip)
if err != nil {
return sub, fmt.Errorf("failed to ensure current CSV %s installed: %v", sub.Status.CurrentCSV, err)
Expand All @@ -122,6 +128,7 @@ func (o *Operator) syncSubscription(sub *v1alpha1.Subscription) (*v1alpha1.Subsc
sub.Status.State = v1alpha1.SubscriptionStateAtLatest
return sub, fmt.Errorf("nil replacement CSV for %s returned from catalog", sub.Status.CurrentCSV)
}

// Update subscription with new latest
sub.Status.CurrentCSV = repl.GetName()
sub.Status.Install = nil
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/operators/catalog/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ func TestSyncSubscription(t *testing.T) {
},
},
Spec: ipv1alpha1.InstallPlanSpec{
CatalogSource: "flying-unicorns",
CatalogSourceNamespace: "",
ClusterServiceVersionNames: []string{"latest-and-greatest"},
Approval: ipv1alpha1.ApprovalAutomatic,
},
Expand Down Expand Up @@ -501,6 +503,8 @@ func TestSyncSubscription(t *testing.T) {
},
},
Spec: ipv1alpha1.InstallPlanSpec{
CatalogSource: "flying-unicorns",
CatalogSourceNamespace: "",
ClusterServiceVersionNames: []string{"latest-and-greatest"},
Approval: ipv1alpha1.ApprovalAutomatic,
},
Expand Down Expand Up @@ -581,6 +585,8 @@ func TestSyncSubscription(t *testing.T) {
},
},
Spec: ipv1alpha1.InstallPlanSpec{
CatalogSource: "flying-unicorns",
CatalogSourceNamespace: "",
ClusterServiceVersionNames: []string{"latest-and-greatest"},
Approval: ipv1alpha1.ApprovalManual,
},
Expand Down Expand Up @@ -658,6 +664,8 @@ func TestSyncSubscription(t *testing.T) {
},
},
Spec: ipv1alpha1.InstallPlanSpec{
CatalogSource: "flying-unicorns",
CatalogSourceNamespace: "",
ClusterServiceVersionNames: []string{"pending"},
Approval: ipv1alpha1.ApprovalAutomatic,
},
Expand Down Expand Up @@ -939,7 +947,7 @@ func TestSyncSubscription(t *testing.T) {
registry.SourceKey{Name: tt.initial.catalogName, Namespace: "ns"}: catalogFake,
},
sourcesLastUpdate: tt.initial.sourcesLastUpdate,
dependencyResolver: &resolver.SingleSourceResolver{},
dependencyResolver: &resolver.MultiSourceResolver{},
}

// run subscription sync
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var _ Source = &InMem{}

type InMem struct {
// map ClusterServiceVersion name to to their resource definition
// map ClusterServiceVersion name to their resource definition
clusterservices map[string]v1alpha1.ClusterServiceVersion

// map ClusterServiceVersions by name to metadata for the CSV that replaces it
Expand Down
Loading

0 comments on commit 8b0f68e

Please sign in to comment.