From 753f644ca0a06ea4b7dddf5fbcfb64e30487bca1 Mon Sep 17 00:00:00 2001 From: Jordan Keister Date: Fri, 14 Jun 2024 15:50:23 -0500 Subject: [PATCH] predicate error tracking sketch Signed-off-by: Jordan Keister --- .../filter/bundle_predicates.go | 72 +- .../filter/bundle_predicates_test.go | 2 +- internal/catalogmetadata/filter/filter.go | 39 +- .../clusterextension_controller.go | 23 +- .../clusterextension_controller.go.orig | 769 ++++++++++++++++++ 5 files changed, 857 insertions(+), 48 deletions(-) create mode 100644 internal/controllers/clusterextension_controller.go.orig diff --git a/internal/catalogmetadata/filter/bundle_predicates.go b/internal/catalogmetadata/filter/bundle_predicates.go index 8f16c6c5e..2ee4838c2 100644 --- a/internal/catalogmetadata/filter/bundle_predicates.go +++ b/internal/catalogmetadata/filter/bundle_predicates.go @@ -1,6 +1,8 @@ package filter import ( + "fmt" + mmsemver "github.com/Masterminds/semver/v3" bsemver "github.com/blang/semver/v4" @@ -11,16 +13,20 @@ import ( ) func WithPackageName(packageName string) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { - return bundle.Package == packageName + return func(bundle *catalogmetadata.Bundle) (bool, []string) { + value := bundle.Package == packageName + if !value { + return false, []string{packageName} + } + return value, nil } } func InMastermindsSemverRange(semverRange *mmsemver.Constraints) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { + return func(bundle *catalogmetadata.Bundle) (bool, []string) { bVersion, err := bundle.Version() if err != nil { - return false + return false, []string{err.Error()} } // No error should occur here because the simple version was successfully parsed by blang // We are unaware of any tests cases that would cause one to fail but not the other @@ -28,42 +34,59 @@ func InMastermindsSemverRange(semverRange *mmsemver.Constraints) Predicate[catal // there might be that one extreme edge case that might cause one to fail but not the other mVersion, err := mmsemver.NewVersion(bVersion.String()) if err != nil { - return false + return false, []string{err.Error()} + } + res := semverRange.Check(mVersion) + if !res { + return false, []string{fmt.Sprintf("no package %q matching version %q found", bundle.Package, semverRange.String())} } - return semverRange.Check(mVersion) + return true, nil } } -func InBlangSemverRange(semverRange bsemver.Range) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { +func InBlangSemverRange(semverRange bsemver.Range, semverRangeString string) Predicate[catalogmetadata.Bundle] { + return func(bundle *catalogmetadata.Bundle) (bool, []string) { bundleVersion, err := bundle.Version() if err != nil { - return false + return false, []string{err.Error()} } - return semverRange(*bundleVersion) + if !semverRange(*bundleVersion) { + return false, []string{fmt.Sprintf("no package %q matching version %q found", bundle.Package, semverRangeString)} + } + return true, nil } } func InChannel(channelName string) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { + return func(bundle *catalogmetadata.Bundle) (bool, []string) { for _, ch := range bundle.InChannels { if ch.Name == channelName { - return true + return true, nil } } - return false + return false, []string{fmt.Sprintf("no package %q found in channel %q", bundle.Package, channelName)} } } func WithBundleImage(bundleImage string) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { - return bundle.Image == bundleImage + return func(bundle *catalogmetadata.Bundle) (bool, []string) { + res := bundle.Image == bundleImage + if !res { + return false, []string{fmt.Sprintf("no matching bundle image %q found for package %s", bundleImage, bundle.Package)} + } else { + return true, nil + } } } func WithBundleName(bundleName string) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { - return bundle.Name == bundleName + return func(bundle *catalogmetadata.Bundle) (bool, []string) { + res := bundle.Name == bundleName + if !res { + return false, []string{fmt.Sprintf("no matching bundle name %q found for package %s", bundleName, bundle.Package)} + } else { + return true, nil + } } } @@ -87,20 +110,25 @@ func LegacySuccessor(installedBundle *ocv1alpha1.BundleMetadata) Predicate[catal return false } - return func(candidateBundle *catalogmetadata.Bundle) bool { + return func(candidateBundle *catalogmetadata.Bundle) (bool, []string) { for _, ch := range candidateBundle.InChannels { for _, chEntry := range ch.Entries { if candidateBundle.Name == chEntry.Name && isSuccessor(chEntry) { - return true + return true, nil } } } - return false + return false, []string{fmt.Sprintf("no legacy successor found for bundle name %q", candidateBundle.Name)} } } func WithDeprecation(deprecated bool) Predicate[catalogmetadata.Bundle] { - return func(bundle *catalogmetadata.Bundle) bool { - return bundle.HasDeprecation() == deprecated + return func(bundle *catalogmetadata.Bundle) (bool, []string) { + res := bundle.HasDeprecation() == deprecated + if !res { + return false, []string{fmt.Sprintf("no bundle %q found with desired deprecation status %t", bundle.Name, deprecated)} + } else { + return true, nil + } } } diff --git a/internal/catalogmetadata/filter/bundle_predicates_test.go b/internal/catalogmetadata/filter/bundle_predicates_test.go index 51bb5746a..6b743306a 100644 --- a/internal/catalogmetadata/filter/bundle_predicates_test.go +++ b/internal/catalogmetadata/filter/bundle_predicates_test.go @@ -92,7 +92,7 @@ func TestInBlangSemverRange(t *testing.T) { vRange := bsemver.MustParseRange(">=1.0.0") - f := filter.InBlangSemverRange(vRange) + f := filter.InBlangSemverRange(vRange, ">=1.0.0") assert.True(t, f(b1)) assert.False(t, f(b2)) diff --git a/internal/catalogmetadata/filter/filter.go b/internal/catalogmetadata/filter/filter.go index 36af20661..935011abe 100644 --- a/internal/catalogmetadata/filter/filter.go +++ b/internal/catalogmetadata/filter/filter.go @@ -5,43 +5,54 @@ import ( ) // Predicate returns true if the object should be kept when filtering -type Predicate[T catalogmetadata.Schemas] func(entity *T) bool +type Predicate[T catalogmetadata.Schemas] func(entity *T) (bool, []string) // Filter filters a slice accordingly to -func Filter[T catalogmetadata.Schemas](in []*T, test Predicate[T]) []*T { +func Filter[T catalogmetadata.Schemas](in []*T, test Predicate[T]) ([]*T, []string) { out := []*T{} + var errs []string for i := range in { - if test(in[i]) { + res, e := test(in[i]) + if res { out = append(out, in[i]) + } else { + errs = append(errs, e...) } } - return out + return out, errs } func And[T catalogmetadata.Schemas](predicates ...Predicate[T]) Predicate[T] { - return func(obj *T) bool { + return func(obj *T) (bool, []string) { for _, predicate := range predicates { - if !predicate(obj) { - return false + if res, errs := predicate(obj); !res { + return false, errs } } - return true + return true, nil } } func Or[T catalogmetadata.Schemas](predicates ...Predicate[T]) Predicate[T] { - return func(obj *T) bool { + return func(obj *T) (bool, []string) { + var errs []string for _, predicate := range predicates { - if predicate(obj) { - return true + if res, e := predicate(obj); !res { + errs = append(errs, e...) + } else { + return true, nil } } - return false + return false, errs } } func Not[T catalogmetadata.Schemas](predicate Predicate[T]) Predicate[T] { - return func(obj *T) bool { - return !predicate(obj) + return func(obj *T) (bool, []string) { + predicateTrue, errs := predicate(obj) + if predicateTrue { + return false, errs + } + return true, nil } } diff --git a/internal/controllers/clusterextension_controller.go b/internal/controllers/clusterextension_controller.go index 9a791fef5..bf49c6572 100644 --- a/internal/controllers/clusterextension_controller.go +++ b/internal/controllers/clusterextension_controller.go @@ -402,7 +402,7 @@ func (r *ClusterExtensionReconciler) resolve(ctx context.Context, ext ocv1alpha1 predicates = append(predicates, upgradePredicate) } - resultSet := catalogfilter.Filter(allBundles, catalogfilter.And(predicates...)) + resultSet, errs := catalogfilter.Filter(allBundles, catalogfilter.And(predicates...)) var upgradeErrorPrefix string if installedBundle != nil { @@ -413,16 +413,17 @@ func (r *ClusterExtensionReconciler) resolve(ctx context.Context, ext ocv1alpha1 upgradeErrorPrefix = fmt.Sprintf("error upgrading from currently installed version %q: ", installedBundleVersion.String()) } if len(resultSet) == 0 { - switch { - case versionRange != "" && channelName != "": - return nil, fmt.Errorf("%sno package %q matching version %q in channel %q found", upgradeErrorPrefix, packageName, versionRange, channelName) - case versionRange != "": - return nil, fmt.Errorf("%sno package %q matching version %q found", upgradeErrorPrefix, packageName, versionRange) - case channelName != "": - return nil, fmt.Errorf("%sno package %q in channel %q found", upgradeErrorPrefix, packageName, channelName) - default: - return nil, fmt.Errorf("%sno package %q found", upgradeErrorPrefix, packageName) - } + return nil, fmt.Errorf("%s %s", upgradeErrorPrefix, errs) + // switch { + // case versionRange != "" && channelName != "": + // return nil, fmt.Errorf("%sno package %q matching version %q in channel %q found", upgradeErrorPrefix, packageName, versionRange, channelName) + // case versionRange != "": + // return nil, fmt.Errorf("%sno package %q matching version %q found", upgradeErrorPrefix, packageName, versionRange) + // case channelName != "": + // return nil, fmt.Errorf("%sno package %q in channel %q found", upgradeErrorPrefix, packageName, channelName) + // default: + // return nil, fmt.Errorf("%sno package %q found", upgradeErrorPrefix, packageName) + // } } sort.SliceStable(resultSet, func(i, j int) bool { diff --git a/internal/controllers/clusterextension_controller.go.orig b/internal/controllers/clusterextension_controller.go.orig new file mode 100644 index 000000000..9a791fef5 --- /dev/null +++ b/internal/controllers/clusterextension_controller.go.orig @@ -0,0 +1,769 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "sort" + "strings" + "sync" + "time" + + mmsemver "github.com/Masterminds/semver/v3" + bsemver "github.com/blang/semver/v4" + "github.com/go-logr/logr" + "helm.sh/helm/v3/pkg/action" + "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/chartutil" + "helm.sh/helm/v3/pkg/postrender" + "helm.sh/helm/v3/pkg/release" + "helm.sh/helm/v3/pkg/storage/driver" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + apimachyaml "k8s.io/apimachinery/pkg/util/yaml" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + crcontroller "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + crhandler "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/operator-framework/api/pkg/operators/v1alpha1" + catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" + helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client" + "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/alpha/property" + rukpakv1alpha2 "github.com/operator-framework/rukpak/api/v1alpha2" + registryv1handler "github.com/operator-framework/rukpak/pkg/handler" + helmpredicate "github.com/operator-framework/rukpak/pkg/helm-operator-plugins/predicate" + rukpaksource "github.com/operator-framework/rukpak/pkg/source" + "github.com/operator-framework/rukpak/pkg/storage" + "github.com/operator-framework/rukpak/pkg/util" + + ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" + "github.com/operator-framework/operator-controller/internal/catalogmetadata" + catalogfilter "github.com/operator-framework/operator-controller/internal/catalogmetadata/filter" + catalogsort "github.com/operator-framework/operator-controller/internal/catalogmetadata/sort" + "github.com/operator-framework/operator-controller/internal/conditionsets" + "github.com/operator-framework/operator-controller/internal/labels" +) + +// ClusterExtensionReconciler reconciles a ClusterExtension object +type ClusterExtensionReconciler struct { + client.Client + ReleaseNamespace string + BundleProvider BundleProvider + Unpacker rukpaksource.Unpacker + ActionClientGetter helmclient.ActionClientGetter + Storage storage.Storage + Handler registryv1handler.Handler + dynamicWatchMutex sync.RWMutex + dynamicWatchGVKs sets.Set[schema.GroupVersionKind] + controller crcontroller.Controller + cache cache.Cache + InstalledBundleGetter InstalledBundleGetter +} + +type InstalledBundleGetter interface { + GetInstalledBundle(ctx context.Context, ext *ocv1alpha1.ClusterExtension) (*ocv1alpha1.BundleMetadata, error) +} + +const ( + bundleConnectionAnnotation string = "bundle.connection.config/insecureSkipTLSVerify" +) + +//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions,verbs=get;list;watch +//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions/status,verbs=update;patch +//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=pods,verbs=list;watch;create;delete +//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=list;watch +//+kubebuilder:rbac:groups=core,resources=pods/log,verbs=get +//+kubebuilder:rbac:groups=*,resources=*,verbs=* + +//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=clustercatalogs,verbs=list;watch +//+kubebuilder:rbac:groups=catalogd.operatorframework.io,resources=catalogmetadata,verbs=list;watch + +// The operator controller needs to watch all the bundle objects and reconcile accordingly. Though not ideal, but these permissions are required. +// This has been taken from rukpak, and an issue was created before to discuss it: https://github.com/operator-framework/rukpak/issues/800. +func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + l := log.FromContext(ctx).WithName("operator-controller") + l.V(1).Info("starting") + defer l.V(1).Info("ending") + + var existingExt = &ocv1alpha1.ClusterExtension{} + if err := r.Client.Get(ctx, req.NamespacedName, existingExt); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + var updateError error + + reconciledExt := existingExt.DeepCopy() + res, err := r.reconcile(ctx, reconciledExt) + updateError = errors.Join(updateError, err) + + // Do checks before any Update()s, as Update() may modify the resource structure! + updateStatus := !equality.Semantic.DeepEqual(existingExt.Status, reconciledExt.Status) + updateFinalizers := !equality.Semantic.DeepEqual(existingExt.Finalizers, reconciledExt.Finalizers) + unexpectedFieldsChanged := checkForUnexpectedFieldChange(*existingExt, *reconciledExt) + + if updateStatus { + err = r.Client.Status().Update(ctx, reconciledExt) + updateError = errors.Join(updateError, err) + } + + if unexpectedFieldsChanged { + panic("spec or metadata changed by reconciler") + } + + if updateFinalizers { + err = r.Client.Update(ctx, reconciledExt) + updateError = errors.Join(updateError, err) + } + + return res, updateError +} + +// ensureAllConditionsWithReason checks that all defined condition types exist in the given ClusterExtension, +// and assigns a specified reason and custom message to any missing condition. +func ensureAllConditionsWithReason(ext *ocv1alpha1.ClusterExtension, reason v1alpha1.ConditionReason, message string) { + for _, condType := range conditionsets.ConditionTypes { + cond := apimeta.FindStatusCondition(ext.Status.Conditions, condType) + if cond == nil { + // Create a new condition with a valid reason and add it + newCond := metav1.Condition{ + Type: condType, + Status: metav1.ConditionFalse, + Reason: string(reason), + Message: message, + ObservedGeneration: ext.GetGeneration(), + LastTransitionTime: metav1.NewTime(time.Now()), + } + ext.Status.Conditions = append(ext.Status.Conditions, newCond) + } + } +} + +// Compare resources - ignoring status & metadata.finalizers +func checkForUnexpectedFieldChange(a, b ocv1alpha1.ClusterExtension) bool { + a.Status, b.Status = ocv1alpha1.ClusterExtensionStatus{}, ocv1alpha1.ClusterExtensionStatus{} + a.Finalizers, b.Finalizers = []string{}, []string{} + return !equality.Semantic.DeepEqual(a, b) +} + +// Helper function to do the actual reconcile +// +// Today we always return ctrl.Result{} and an error. +// But in the future we might update this function +// to return different results (e.g. requeue). +// +/* The reconcile functions performs the following major tasks: +1. Resolution: Run the resolution to find the bundle from the catalog which needs to be installed. +2. Validate: Ensure that the bundle returned from the resolution for install meets our requirements. +3. Unpack: Unpack the contents from the bundle and store in a localdir in the pod. +4. Install: The process of installing involves: +4.1 Converting the CSV in the bundle into a set of plain k8s objects. +4.2 Generating a chart from k8s objects. +4.3 Apply the release on cluster. +*/ +//nolint:unparam +func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alpha1.ClusterExtension) (ctrl.Result, error) { + // run resolution + bundle, err := r.resolve(ctx, *ext) + if err != nil { + // Note: We don't distinguish between resolution-specific errors and generic errors + ext.Status.ResolvedBundle = nil + ext.Status.InstalledBundle = nil + setResolvedStatusConditionFailed(ext, err.Error()) + ensureAllConditionsWithReason(ext, ocv1alpha1.ReasonResolutionFailed, err.Error()) + return ctrl.Result{}, err + } + + if err := r.validateBundle(bundle); err != nil { + ext.Status.ResolvedBundle = nil + ext.Status.InstalledBundle = nil + setResolvedStatusConditionFailed(ext, err.Error()) + setInstalledStatusConditionFailed(ext, err.Error()) + setDeprecationStatusesUnknown(ext, "deprecation checks have not been attempted as installation has failed") + return ctrl.Result{}, err + } + // set deprecation status after _successful_ resolution + SetDeprecationStatus(ext, bundle) + + bundleVersion, err := bundle.Version() + if err != nil { + ext.Status.ResolvedBundle = nil + ext.Status.InstalledBundle = nil + setResolvedStatusConditionFailed(ext, err.Error()) + setInstalledStatusConditionFailed(ext, err.Error()) + return ctrl.Result{}, err + } + + ext.Status.ResolvedBundle = bundleMetadataFor(bundle) + setResolvedStatusConditionSuccess(ext, fmt.Sprintf("resolved to %q", bundle.Image)) + + // Generate a BundleDeployment from the ClusterExtension to Unpack. + // Note: The BundleDeployment here is not a k8s API, its a simple Go struct which + // necessary embedded values. + bd := r.generateBundleDeploymentForUnpack(bundle.Image, ext) + unpackResult, err := r.Unpacker.Unpack(ctx, bd) + if err != nil { + setStatusUnpackFailed(ext, err.Error()) + return ctrl.Result{}, err + } + + switch unpackResult.State { + case rukpaksource.StatePending: + setStatusUnpackPending(ext, unpackResult.Message) + // There must be a limit to number of entries if status is stuck at + // unpack pending. + setHasValidBundleUnknown(ext, "unpack pending") + setInstalledStatusConditionUnknown(ext, "installation has not been attempted as unpack is pending") + + return ctrl.Result{}, nil + case rukpaksource.StateUnpacked: + // TODO: Add finalizer to clean the stored bundles, after https://github.com/operator-framework/rukpak/pull/897 + // merges. + if err := r.Storage.Store(ctx, ext, unpackResult.Bundle); err != nil { + setStatusUnpackFailed(ext, err.Error()) + return ctrl.Result{}, err + } + setStatusUnpacked(ext, fmt.Sprintf("unpack successful: %v", unpackResult.Message)) + default: + setStatusUnpackFailed(ext, "unexpected unpack status") + // We previously exit with a failed status if error is not nil. + return ctrl.Result{}, fmt.Errorf("unexpected unpack status: %v", unpackResult.Message) + } + + bundleFS, err := r.Storage.Load(ctx, ext) + if err != nil { + setHasValidBundleFailed(ext, err.Error()) + return ctrl.Result{}, err + } + + chrt, values, err := r.Handler.Handle(ctx, bundleFS, bd) + if err != nil { + setInstalledStatusConditionFailed(ext, err.Error()) + return ctrl.Result{}, err + } + + ac, err := r.ActionClientGetter.ActionClientFor(ctx, ext) + if err != nil { + ext.Status.InstalledBundle = nil + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonErrorGettingClient, err)) + return ctrl.Result{}, err + } + + post := &postrenderer{ + labels: map[string]string{ + labels.OwnerKindKey: ocv1alpha1.ClusterExtensionKind, + labels.OwnerNameKey: ext.GetName(), + labels.BundleNameKey: bundle.Name, + labels.PackageNameKey: bundle.Package, + labels.BundleVersionKey: bundleVersion.String(), + }, + } + + rel, state, err := r.getReleaseState(ac, ext, chrt, values, post) + if err != nil { + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonErrorGettingReleaseState, err)) + return ctrl.Result{}, err + } + + switch state { + case stateNeedsInstall: + rel, err = ac.Install(ext.GetName(), r.ReleaseNamespace, chrt, values, func(install *action.Install) error { + install.CreateNamespace = false + install.Labels = map[string]string{labels.BundleNameKey: bundle.Name, labels.PackageNameKey: bundle.Package, labels.BundleVersionKey: bundleVersion.String()} + return nil + }, helmclient.AppendInstallPostRenderer(post)) + if err != nil { + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err)) + return ctrl.Result{}, err + } + case stateNeedsUpgrade: + rel, err = ac.Upgrade(ext.GetName(), r.ReleaseNamespace, chrt, values, helmclient.AppendUpgradePostRenderer(post)) + if err != nil { + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonUpgradeFailed, err)) + return ctrl.Result{}, err + } + case stateUnchanged: + if err := ac.Reconcile(rel); err != nil { + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonResolutionFailed, err)) + return ctrl.Result{}, err + } + default: + return ctrl.Result{}, fmt.Errorf("unexpected release state %q", state) + } + + relObjects, err := util.ManifestObjects(strings.NewReader(rel.Manifest), fmt.Sprintf("%s-release-manifest", rel.Name)) + if err != nil { + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonCreateDynamicWatchFailed, err)) + return ctrl.Result{}, err + } + + for _, obj := range relObjects { + if err := func() error { + r.dynamicWatchMutex.Lock() + defer r.dynamicWatchMutex.Unlock() + + _, isWatched := r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()] + if !isWatched { + if err := r.controller.Watch( + source.Kind(r.cache, + obj, + crhandler.EnqueueRequestForOwner(r.Scheme(), r.RESTMapper(), ext, crhandler.OnlyControllerOwner()), + helmpredicate.DependentPredicateFuncs[client.Object](), + ), + ); err != nil { + return err + } + r.dynamicWatchGVKs[obj.GetObjectKind().GroupVersionKind()] = sets.Empty{} + } + return nil + }(); err != nil { + ext.Status.InstalledBundle = nil + setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonCreateDynamicWatchFailed, err)) + return ctrl.Result{}, err + } + } + ext.Status.InstalledBundle = bundleMetadataFor(bundle) + setInstalledStatusConditionSuccess(ext, fmt.Sprintf("Instantiated bundle %s successfully", ext.GetName())) + + return ctrl.Result{}, nil +} + +// resolve returns a Bundle from the catalog that needs to get installed on the cluster. +func (r *ClusterExtensionReconciler) resolve(ctx context.Context, ext ocv1alpha1.ClusterExtension) (*catalogmetadata.Bundle, error) { + allBundles, err := r.BundleProvider.Bundles(ctx) + if err != nil { + return nil, fmt.Errorf("error fetching bundles: %w", err) + } + + packageName := ext.Spec.PackageName + channelName := ext.Spec.Channel + versionRange := ext.Spec.Version + + installedBundle, err := r.InstalledBundleGetter.GetInstalledBundle(ctx, &ext) + if err != nil { + return nil, err + } + + predicates := []catalogfilter.Predicate[catalogmetadata.Bundle]{ + catalogfilter.WithPackageName(packageName), + } + + if channelName != "" { + predicates = append(predicates, catalogfilter.InChannel(channelName)) + } + + if versionRange != "" { + vr, err := mmsemver.NewConstraint(versionRange) + if err != nil { + return nil, fmt.Errorf("invalid version range %q: %w", versionRange, err) + } + predicates = append(predicates, catalogfilter.InMastermindsSemverRange(vr)) + } + + if ext.Spec.UpgradeConstraintPolicy != ocv1alpha1.UpgradeConstraintPolicyIgnore && installedBundle != nil { + upgradePredicate, err := SuccessorsPredicate(ext.Spec.PackageName, installedBundle) + if err != nil { + return nil, err + } + + predicates = append(predicates, upgradePredicate) + } + + resultSet := catalogfilter.Filter(allBundles, catalogfilter.And(predicates...)) + + var upgradeErrorPrefix string + if installedBundle != nil { + installedBundleVersion, err := mmsemver.NewVersion(installedBundle.Version) + if err != nil { + return nil, err + } + upgradeErrorPrefix = fmt.Sprintf("error upgrading from currently installed version %q: ", installedBundleVersion.String()) + } + if len(resultSet) == 0 { + switch { + case versionRange != "" && channelName != "": + return nil, fmt.Errorf("%sno package %q matching version %q in channel %q found", upgradeErrorPrefix, packageName, versionRange, channelName) + case versionRange != "": + return nil, fmt.Errorf("%sno package %q matching version %q found", upgradeErrorPrefix, packageName, versionRange) + case channelName != "": + return nil, fmt.Errorf("%sno package %q in channel %q found", upgradeErrorPrefix, packageName, channelName) + default: + return nil, fmt.Errorf("%sno package %q found", upgradeErrorPrefix, packageName) + } + } + + sort.SliceStable(resultSet, func(i, j int) bool { + return catalogsort.ByVersion(resultSet[i], resultSet[j]) + }) + sort.SliceStable(resultSet, func(i, j int) bool { + return catalogsort.ByDeprecated(resultSet[i], resultSet[j]) + }) + + return resultSet[0], nil +} + +// SetDeprecationStatus will set the appropriate deprecation statuses for a ClusterExtension +// based on the provided bundle +func SetDeprecationStatus(ext *ocv1alpha1.ClusterExtension, bundle *catalogmetadata.Bundle) { + // reset conditions to false + conditionTypes := []string{ + ocv1alpha1.TypeDeprecated, + ocv1alpha1.TypePackageDeprecated, + ocv1alpha1.TypeChannelDeprecated, + ocv1alpha1.TypeBundleDeprecated, + } + + for _, conditionType := range conditionTypes { + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: conditionType, + Reason: ocv1alpha1.ReasonDeprecated, + Status: metav1.ConditionFalse, + Message: "", + ObservedGeneration: ext.Generation, + }) + } + + // There are two early return scenarios here: + // 1) The bundle is not deprecated (i.e bundle deprecations) + // AND there are no other deprecations associated with the bundle + // 2) The bundle is not deprecated, there are deprecations associated + // with the bundle (i.e at least one channel the bundle is present in is deprecated OR whole package is deprecated), + // and the ClusterExtension does not specify a channel. This is because the channel deprecations + // are a loose deprecation coupling on the bundle. A ClusterExtension installation is only + // considered deprecated by a channel deprecation when a deprecated channel is specified via + // the spec.channel field. + if (!bundle.IsDeprecated() && !bundle.HasDeprecation()) || (!bundle.IsDeprecated() && ext.Spec.Channel == "") { + return + } + + deprecationMessages := []string{} + + for _, deprecation := range bundle.Deprecations { + switch deprecation.Reference.Schema { + case declcfg.SchemaPackage: + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypePackageDeprecated, + Reason: ocv1alpha1.ReasonDeprecated, + Status: metav1.ConditionTrue, + Message: deprecation.Message, + ObservedGeneration: ext.Generation, + }) + case declcfg.SchemaChannel: + if ext.Spec.Channel != deprecation.Reference.Name { + continue + } + + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeChannelDeprecated, + Reason: ocv1alpha1.ReasonDeprecated, + Status: metav1.ConditionTrue, + Message: deprecation.Message, + ObservedGeneration: ext.Generation, + }) + case declcfg.SchemaBundle: + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeBundleDeprecated, + Reason: ocv1alpha1.ReasonDeprecated, + Status: metav1.ConditionTrue, + Message: deprecation.Message, + ObservedGeneration: ext.Generation, + }) + } + + deprecationMessages = append(deprecationMessages, deprecation.Message) + } + + if len(deprecationMessages) > 0 { + apimeta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeDeprecated, + Reason: ocv1alpha1.ReasonDeprecated, + Status: metav1.ConditionTrue, + Message: strings.Join(deprecationMessages, ";"), + ObservedGeneration: ext.Generation, + }) + } +} + +func (r *ClusterExtensionReconciler) generateBundleDeploymentForUnpack(bundlePath string, ce *ocv1alpha1.ClusterExtension) *rukpakv1alpha2.BundleDeployment { + return &rukpakv1alpha2.BundleDeployment{ + TypeMeta: metav1.TypeMeta{ + Kind: ce.Kind, + APIVersion: ce.APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: ce.Name, + UID: ce.UID, + }, + Spec: rukpakv1alpha2.BundleDeploymentSpec{ + InstallNamespace: ce.Spec.InstallNamespace, + Source: rukpakv1alpha2.BundleSource{ + Type: rukpakv1alpha2.SourceTypeImage, + Image: &rukpakv1alpha2.ImageSource{ + Ref: bundlePath, + InsecureSkipTLSVerify: isInsecureSkipTLSVerifySet(ce), + }, + }, + }, + } +} + +func isInsecureSkipTLSVerifySet(ce *ocv1alpha1.ClusterExtension) bool { + if ce == nil { + return false + } + value, ok := ce.Annotations[bundleConnectionAnnotation] + if !ok { + return false + } + return value == "true" +} + +// SetupWithManager sets up the controller with the Manager. +func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error { + controller, err := ctrl.NewControllerManagedBy(mgr). + For(&ocv1alpha1.ClusterExtension{}). + Watches(&catalogd.ClusterCatalog{}, + crhandler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))). + WithEventFilter(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + oldObject, isOldCatalog := ue.ObjectOld.(*catalogd.ClusterCatalog) + newObject, isNewCatalog := ue.ObjectNew.(*catalogd.ClusterCatalog) + + if !isOldCatalog || !isNewCatalog { + return true + } + + if oldObject.Status.ResolvedSource != nil && newObject.Status.ResolvedSource != nil { + if oldObject.Status.ResolvedSource.Image != nil && newObject.Status.ResolvedSource.Image != nil { + return oldObject.Status.ResolvedSource.Image.ResolvedRef != newObject.Status.ResolvedSource.Image.ResolvedRef + } + } + return true + }, + }). + Watches(&corev1.Pod{}, mapOwneeToOwnerHandler(mgr.GetClient(), mgr.GetLogger(), &ocv1alpha1.ClusterExtension{})). + Build(r) + + if err != nil { + return err + } + r.controller = controller + r.cache = mgr.GetCache() + r.dynamicWatchGVKs = sets.New[schema.GroupVersionKind]() + + return nil +} + +func mapOwneeToOwnerHandler(cl client.Client, log logr.Logger, owner client.Object) crhandler.EventHandler { + return crhandler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { + ownerGVK, err := apiutil.GVKForObject(owner, cl.Scheme()) + if err != nil { + log.Error(err, "map ownee to owner: lookup GVK for owner") + return nil + } + + type ownerInfo struct { + key types.NamespacedName + gvk schema.GroupVersionKind + } + var oi *ownerInfo + + for _, ref := range obj.GetOwnerReferences() { + gv, err := schema.ParseGroupVersion(ref.APIVersion) + if err != nil { + log.Error(err, fmt.Sprintf("map ownee to owner: parse ownee's owner reference group version %q", ref.APIVersion)) + return nil + } + refGVK := gv.WithKind(ref.Kind) + if refGVK == ownerGVK && ref.Controller != nil && *ref.Controller { + oi = &ownerInfo{ + key: types.NamespacedName{Name: ref.Name}, + gvk: ownerGVK, + } + break + } + } + if oi == nil { + return nil + } + return []reconcile.Request{{NamespacedName: oi.key}} + }) +} + +// Generate reconcile requests for all cluster extensions affected by a catalog change +func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) crhandler.MapFunc { + return func(ctx context.Context, _ client.Object) []reconcile.Request { + // no way of associating an extension to a catalog so create reconcile requests for everything + clusterExtensions := ocv1alpha1.ClusterExtensionList{} + err := c.List(ctx, &clusterExtensions) + if err != nil { + logger.Error(err, "unable to enqueue cluster extensions for catalog reconcile") + return nil + } + var requests []reconcile.Request + for _, ext := range clusterExtensions.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: ext.GetNamespace(), + Name: ext.GetName(), + }, + }) + } + return requests + } +} + +type releaseState string + +const ( + stateNeedsInstall releaseState = "NeedsInstall" + stateNeedsUpgrade releaseState = "NeedsUpgrade" + stateUnchanged releaseState = "Unchanged" + stateError releaseState = "Error" +) + +func (r *ClusterExtensionReconciler) getReleaseState(cl helmclient.ActionInterface, obj metav1.Object, chrt *chart.Chart, values chartutil.Values, post *postrenderer) (*release.Release, releaseState, error) { + currentRelease, err := cl.Get(obj.GetName()) + if err != nil && !errors.Is(err, driver.ErrReleaseNotFound) { + return nil, stateError, err + } + if errors.Is(err, driver.ErrReleaseNotFound) { + return nil, stateNeedsInstall, nil + } + + desiredRelease, err := cl.Upgrade(obj.GetName(), r.ReleaseNamespace, chrt, values, func(upgrade *action.Upgrade) error { + upgrade.DryRun = true + return nil + }, helmclient.AppendUpgradePostRenderer(post)) + if err != nil { + return currentRelease, stateError, err + } + if desiredRelease.Manifest != currentRelease.Manifest || + currentRelease.Info.Status == release.StatusFailed || + currentRelease.Info.Status == release.StatusSuperseded { + return currentRelease, stateNeedsUpgrade, nil + } + return currentRelease, stateUnchanged, nil +} + +type DefaultInstalledBundleGetter struct { + helmclient.ActionClientGetter +} + +func (d *DefaultInstalledBundleGetter) GetInstalledBundle(ctx context.Context, ext *ocv1alpha1.ClusterExtension) (*ocv1alpha1.BundleMetadata, error) { + cl, err := d.ActionClientFor(ctx, ext) + if err != nil { + return nil, err + } + + release, err := cl.Get(ext.GetName()) + if err != nil && !errors.Is(err, driver.ErrReleaseNotFound) { + return nil, err + } + if release == nil { + return nil, nil + } + + return &ocv1alpha1.BundleMetadata{ + Name: release.Labels[labels.BundleNameKey], + Version: release.Labels[labels.BundleVersionKey], + }, nil +} + +type postrenderer struct { + labels map[string]string + cascade postrender.PostRenderer +} + +func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) { + var buf bytes.Buffer + dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024) + for { + obj := unstructured.Unstructured{} + err := dec.Decode(&obj) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels)) + b, err := obj.MarshalJSON() + if err != nil { + return nil, err + } + buf.Write(b) + } + if p.cascade != nil { + return p.cascade.Run(&buf) + } + return &buf, nil +} + +// bundleMetadataFor returns a BundleMetadata for the given bundle. If the provided bundle is nil, +// this function panics. It is up to the caller to ensure that the bundle is non-nil. +func bundleMetadataFor(bundle *catalogmetadata.Bundle) *ocv1alpha1.BundleMetadata { + if bundle == nil { + panic("programmer error: provided bundle must be non-nil to create BundleMetadata") + } + ver, err := bundle.Version() + if err != nil { + ver = &bsemver.Version{} + } + return &ocv1alpha1.BundleMetadata{ + Name: bundle.Name, + Version: ver.String(), + } +} + +func (r *ClusterExtensionReconciler) validateBundle(bundle *catalogmetadata.Bundle) error { + unsupportedProps := sets.New[string]( + property.TypePackageRequired, + property.TypeGVKRequired, + property.TypeConstraint, + ) + for i := range bundle.Properties { + if unsupportedProps.Has(bundle.Properties[i].Type) { + return fmt.Errorf( + "bundle %q has a dependency declared via property %q which is currently not supported", + bundle.Name, + bundle.Properties[i].Type, + ) + } + } + + return nil +}