diff --git a/chart/flux/templates/helm-operator-crd.yaml b/chart/flux/templates/helm-operator-crd.yaml index 39c0e4142..626cc0ecd 100644 --- a/chart/flux/templates/helm-operator-crd.yaml +++ b/chart/flux/templates/helm-operator-crd.yaml @@ -43,6 +43,21 @@ spec: type: boolean forceUpgrade: type: boolean + rollback: + type: object + properties: + enable: + type: boolean + force: + type: boolean + recreate: + type: boolean + disableHooks: + type: boolean + timeout: + type: int64 + wait: + type: boolean valueFileSecrets: type: array items: diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index df83db562..6091802f3 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -225,8 +225,8 @@ func main() { // the status updater, to keep track of the release status for // every HelmRelease - statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace) - go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) + statusUpdater := status.New(ifClient, fhrInformer.Lister(), helmClient) + go statusUpdater.Loop(shutdown, log.With(logger, "component", "statusupdater")) // start HTTP server go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown) diff --git a/deploy-helm/flux-helm-release-crd.yaml b/deploy-helm/flux-helm-release-crd.yaml index d9bf9a81f..11dae71dd 100644 --- a/deploy-helm/flux-helm-release-crd.yaml +++ b/deploy-helm/flux-helm-release-crd.yaml @@ -35,6 +35,21 @@ spec: type: boolean forceUpgrade: type: boolean + rollback: + type: object + properties: + enable: + type: boolean + force: + type: boolean + recreate: + type: boolean + disableHooks: + type: boolean + timeout: + type: int64 + wait: + type: boolean valueFileSecrets: type: array items: diff --git a/integrations/apis/flux.weave.works/v1beta1/types.go b/integrations/apis/flux.weave.works/v1beta1/types.go index 7b3b0f63e..155e81d2c 100644 --- a/integrations/apis/flux.weave.works/v1beta1/types.go +++ b/integrations/apis/flux.weave.works/v1beta1/types.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/ghodss/yaml" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/helm/pkg/chartutil" @@ -94,7 +94,8 @@ type GitChartSource struct { SkipDepUpdate bool `json:"skipDepUpdate,omitempty"` } -// DefaultGitRef is the ref assumed if the Ref field is not given in a GitChartSource +// DefaultGitRef is the ref assumed if the Ref field is not given in +// a GitChartSource const DefaultGitRef = "master" func (s GitChartSource) RefOrDefault() string { @@ -119,6 +120,22 @@ func (s RepoChartSource) CleanRepoURL() string { return cleanURL + "/" } +type Rollback struct { + Enable bool `json:"enable,omitempty"` + Force bool `json:"force,omitempty"` + Recreate bool `json:"recreate,omitempty"` + DisableHooks bool `json:"disableHooks,omitempty"` + Timeout *int64 `json:"timeout,omitempty"` + Wait bool `json:"wait,omitempty"` +} + +func (r Rollback) GetTimeout() int64 { + if r.Timeout == nil { + return 300 + } + return *r.Timeout +} + // HelmReleaseSpec is the spec for a HelmRelease resource type HelmReleaseSpec struct { ChartSource `json:"chart"` @@ -135,6 +152,9 @@ type HelmReleaseSpec struct { // Force resource update through delete/recreate, allows recovery from a failed state // +optional ForceUpgrade bool `json:"forceUpgrade,omitempty"` + // Enable rollback and configure options + // +optional + Rollback Rollback `json:"rollback,omitempty"` } // GetTimeout returns the install or upgrade timeout (defaults to 300s) @@ -145,6 +165,22 @@ func (r HelmRelease) GetTimeout() int64 { return *r.Spec.Timeout } +// GetValuesFromSources maintains backwards compatibility with +// ValueFileSecrets by merging them into the ValuesFrom array. +func (r HelmRelease) GetValuesFromSources() []ValuesFromSource { + valuesFrom := r.Spec.ValuesFrom + // Maintain backwards compatibility with ValueFileSecrets + if r.Spec.ValueFileSecrets != nil { + var secretKeyRefs []ValuesFromSource + for _, ref := range r.Spec.ValueFileSecrets { + s := &v1.SecretKeySelector{LocalObjectReference: ref} + secretKeyRefs = append(secretKeyRefs, ValuesFromSource{SecretKeyRef: s}) + } + valuesFrom = append(secretKeyRefs, valuesFrom...) + } + return valuesFrom +} + type HelmReleaseStatus struct { // ReleaseName is the name as either supplied or generated. // +optional @@ -154,6 +190,14 @@ type HelmReleaseStatus struct { // managed by this resource. ReleaseStatus string `json:"releaseStatus"` + // ObservedGeneration is the most recent generation observed by + // the controller. + ObservedGeneration int64 `json:"observedGeneration"` + + // ValuesChecksum holds the SHA256 checksum of the last applied + // values. + ValuesChecksum string `json:"valuesChecksum"` + // Revision would define what Git hash or Chart version has currently // been deployed. // +optional @@ -171,6 +215,8 @@ type HelmReleaseCondition struct { Type HelmReleaseConditionType `json:"type"` Status v1.ConditionStatus `json:"status"` // +optional + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + // +optional LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` // +optional Reason string `json:"reason,omitempty"` @@ -187,6 +233,9 @@ const ( // Released means the chart release, as specified in this // HelmRelease, has been processed by Helm. HelmReleaseReleased HelmReleaseConditionType = "Released" + // RolledBack means the chart to which the HelmRelease refers + // has been rolled back + HelmReleaseRolledBack HelmReleaseConditionType = "RolledBack" ) // FluxHelmValues embeds chartutil.Values so we can implement deepcopy on map[string]interface{} diff --git a/integrations/apis/flux.weave.works/v1beta1/zz_generated.deepcopy.go b/integrations/apis/flux.weave.works/v1beta1/zz_generated.deepcopy.go index c3cc46ce6..bf121d906 100644 --- a/integrations/apis/flux.weave.works/v1beta1/zz_generated.deepcopy.go +++ b/integrations/apis/flux.weave.works/v1beta1/zz_generated.deepcopy.go @@ -140,6 +140,7 @@ func (in *HelmRelease) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HelmReleaseCondition) DeepCopyInto(out *HelmReleaseCondition) { *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) return } @@ -209,6 +210,7 @@ func (in *HelmReleaseSpec) DeepCopyInto(out *HelmReleaseSpec) { *out = new(int64) **out = **in } + in.Rollback.DeepCopyInto(&out.Rollback) return } @@ -266,6 +268,27 @@ func (in *RepoChartSource) DeepCopy() *RepoChartSource { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Rollback) DeepCopyInto(out *Rollback) { + *out = *in + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(int64) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Rollback. +func (in *Rollback) DeepCopy() *Rollback { + if in == nil { + return nil + } + out := new(Rollback) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ValuesFromSource) DeepCopyInto(out *ValuesFromSource) { *out = *in diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index 16b0b3e27..7aba21236 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -42,14 +42,13 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/labels" - "github.com/go-kit/kit/log" google_protobuf "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" "github.com/ncabatoff/go-seq/seq" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -74,6 +73,7 @@ const ( ReasonInstallFailed = "HelmInstallFailed" ReasonDependencyFailed = "UpdateDependencyFailed" ReasonUpgradeFailed = "HelmUgradeFailed" + ReasonRollbackFailed = "HelmRollbackFailed" ReasonCloned = "GitRepoCloned" ReasonSuccess = "HelmSuccess" ) @@ -280,17 +280,45 @@ func (chs *ChartChangeSync) maybeMirror(fhr fluxv1beta1.HelmRelease) { } } +// CompareValuesChecksum recalculates the checksum of the values +// and compares it to the last recorded checksum. +func (chs *ChartChangeSync) CompareValuesChecksum(fhr fluxv1beta1.HelmRelease) bool { + chartPath, ok := "", false + if fhr.Spec.ChartSource.GitChartSource != nil { + // We need to hold the lock until have compared the values, + // so that the clone doesn't get swapped out from under us. + chs.clonesMu.Lock() + defer chs.clonesMu.Unlock() + chartPath, _, ok = chs.getGitChartSource(fhr) + if !ok { + return false + } + } else if fhr.Spec.ChartSource.RepoChartSource != nil { + chartPath, _, ok = chs.getRepoChartSource(fhr) + if !ok { + return false + } + } + + values, err := release.Values(chs.kubeClient.CoreV1(), fhr.Namespace, chartPath, fhr.GetValuesFromSources(), fhr.Spec.Values) + if err != nil { + return false + } + + strValues, err := values.YAML() + if err != nil { + return false + } + + return fhr.Status.ValuesChecksum == release.ValuesChecksum([]byte(strValues)) +} + // ReconcileReleaseDef asks the ChartChangeSync to examine the release // associated with a HelmRelease, and install or upgrade the // release if the chart it refers to has changed. func (chs *ChartChangeSync) ReconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { - chs.reconcileReleaseDef(fhr) -} + defer chs.updateObservedGeneration(fhr) -// reconcileReleaseDef looks up the helm release associated with a -// HelmRelease resource, and either installs, upgrades, or does -// nothing, depending on the state (or absence) of the release. -func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { releaseName := fhr.ReleaseName() // Attempt to retrieve an upgradable release, in case no release @@ -303,82 +331,38 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { opts := release.InstallOptions{DryRun: false} - chartPath := "" - chartRevision := "" + chartPath, chartRevision, ok := "", "", false if fhr.Spec.ChartSource.GitChartSource != nil { - chartSource := fhr.Spec.ChartSource.GitChartSource // We need to hold the lock until after we're done releasing // the chart, so that the clone doesn't get swapped out from // under us. TODO(michael) consider having a lock per clone. chs.clonesMu.Lock() defer chs.clonesMu.Unlock() - chartClone, ok := chs.clones[releaseName] - // Validate the clone we have for the release is the same as - // is being referenced in the chart source. - if ok { - ok = chartClone.remote == chartSource.GitURL && chartClone.ref == chartSource.RefOrDefault() - if !ok { - if chartClone.export != nil { - chartClone.export.Clean() - } - delete(chs.clones, releaseName) - } - } - // FIXME(michael): if it's not cloned, and it's not going to - // be, we might not want to wait around until the next tick - // before reporting what's wrong with it. But if we just use - // repo.Ready(), we'll force all charts through that blocking - // code, rather than waiting for things to sync in good time. + chartPath, chartRevision, ok = chs.getGitChartSource(fhr) if !ok { - repo, ok := chs.mirrors.Get(mirrorName(chartSource)) - if !ok { - chs.maybeMirror(fhr) - chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo "+chartSource.GitURL+" not mirrored yet") - chs.logger.Log("info", "chart repo not cloned yet", "resource", fhr.ResourceID().String()) - } else { - status, err := repo.Status() - if status != git.RepoReady { - chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo not mirrored yet: "+err.Error()) - chs.logger.Log("info", "chart repo not ready yet", "resource", fhr.ResourceID().String(), "status", string(status), "err", err) - } - } return } - chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonCloned, "successfully cloned git repo") - chartPath = filepath.Join(chartClone.export.Dir(), chartSource.Path) - chartRevision = chartClone.head - - if chs.config.UpdateDeps && !fhr.Spec.ChartSource.GitChartSource.SkipDepUpdate { - if err := updateDependencies(chartPath, ""); err != nil { - chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonDependencyFailed, err.Error()) - chs.logger.Log("warning", "failed to update chart dependencies", "resource", fhr.ResourceID().String(), "err", err) - return - } - } - } else if fhr.Spec.ChartSource.RepoChartSource != nil { // TODO(michael): make this dispatch more natural, or factor it out - chartSource := fhr.Spec.ChartSource.RepoChartSource - path, err := ensureChartFetched(chs.config.ChartCache, chartSource) - if err != nil { - chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonDownloadFailed, "chart download failed: "+err.Error()) - chs.logger.Log("info", "chart download failed", "resource", fhr.ResourceID().String(), "err", err) + } else if fhr.Spec.ChartSource.RepoChartSource != nil { + chartPath, chartRevision, ok = chs.getRepoChartSource(fhr) + if !ok { return } - chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonDownloaded, "chart fetched: "+filepath.Base(path)) - chartPath = path - chartRevision = chartSource.Version } if rel == nil { - _, err := chs.release.Install(chartPath, releaseName, fhr, release.InstallAction, opts, &chs.kubeClient) + _, checksum, err := chs.release.Install(chartPath, releaseName, fhr, release.InstallAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonInstallFailed, err.Error()) chs.logger.Log("warning", "failed to install chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm install succeeded") - if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { + if err = status.SetReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err) } + if err = status.SetValuesChecksum(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, checksum); err != nil { + chs.logger.Log("warning", "could not update the values checksum", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err) + } return } @@ -404,20 +388,44 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "resource", fhr.ResourceID().String()) return } - _, err = chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) + _, checksum, err := chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonUpgradeFailed, err.Error()) + if err = status.SetValuesChecksum(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, checksum); err != nil { + chs.logger.Log("warning", "could not update the values checksum", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err) + } chs.logger.Log("warning", "failed to upgrade chart", "resource", fhr.ResourceID().String(), "err", err) + chs.RollbackRelease(fhr) return } chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm upgrade succeeded") - if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { + if err = status.SetReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err) } + if err = status.SetValuesChecksum(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, checksum); err != nil { + chs.logger.Log("warning", "could not update the values checksum", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err) + } return } } +// RollbackRelease rolls back a helm release +func (chs *ChartChangeSync) RollbackRelease(fhr fluxv1beta1.HelmRelease) { + defer chs.updateObservedGeneration(fhr) + + if !fhr.Spec.Rollback.Enable { + return + } + + releaseName := fhr.ReleaseName() + _, err := chs.release.Rollback(releaseName, fhr) + if err != nil { + chs.logger.Log("warning", "unable to rollback chart release", "resource", fhr.ResourceID().String(), "release", releaseName, "err", err) + chs.setCondition(fhr, fluxv1beta1.HelmReleaseRolledBack, v1.ConditionFalse, ReasonRollbackFailed, err.Error()) + } + chs.setCondition(fhr, fluxv1beta1.HelmReleaseRolledBack, v1.ConditionTrue, ReasonSuccess, "helm rollback succeeded") +} + // DeleteRelease deletes the helm release associated with a // HelmRelease. This exists mainly so that the operator code can // call it when it is handling a resource deletion. @@ -471,26 +479,96 @@ func (chs *ChartChangeSync) getCustomResourcesForMirror(mirror string) ([]fluxv1 return fhrs, nil } -// setCondition saves the status of a condition, if it's new -// information. New information is something that adds or changes the -// status, reason or message (i.e., anything but the transition time) -// for one of the types of condition. -func (chs *ChartChangeSync) setCondition(fhr fluxv1beta1.HelmRelease, typ fluxv1beta1.HelmReleaseConditionType, st v1.ConditionStatus, reason, message string) error { - for _, c := range fhr.Status.Conditions { - if c.Type == typ && c.Status == st && c.Message == message && c.Reason == reason { - return nil +// setCondition saves the status of a condition. +func (chs *ChartChangeSync) setCondition(hr fluxv1beta1.HelmRelease, typ fluxv1beta1.HelmReleaseConditionType, st v1.ConditionStatus, reason, message string) error { + hrClient := chs.ifClient.FluxV1beta1().HelmReleases(hr.Namespace) + condition := status.NewCondition(typ, st, reason, message) + return status.SetCondition(hrClient, hr, condition) +} + +// updateObservedGeneration updates the observed generation of the +// given HelmRelease to the generation. +func (chs *ChartChangeSync) updateObservedGeneration(hr fluxv1beta1.HelmRelease) error { + hrClient := chs.ifClient.FluxV1beta1().HelmReleases(hr.Namespace) + + return status.SetObservedGeneration(hrClient, hr, hr.Generation) +} + +func (chs *ChartChangeSync) getGitChartSource(fhr v1beta1.HelmRelease) (string, string, bool) { + chartPath, chartRevision := "", "" + chartSource := fhr.Spec.GitChartSource + if chartSource == nil { + return chartPath, chartRevision, false + } + + releaseName := fhr.ReleaseName() + chartClone, ok := chs.clones[releaseName] + // Validate the clone we have for the release is the same as + // is being referenced in the chart source. + if ok { + ok = chartClone.remote == chartSource.GitURL && chartClone.ref == chartSource.RefOrDefault() + if !ok { + if chartClone.export != nil { + chartClone.export.Clean() + } + delete(chs.clones, releaseName) + } + } + + // FIXME(michael): if it's not cloned, and it's not going to + // be, we might not want to wait around until the next tick + // before reporting what's wrong with it. But if we just use + // repo.Ready(), we'll force all charts through that blocking + // code, rather than waiting for things to sync in good time. + if !ok { + repo, ok := chs.mirrors.Get(mirrorName(chartSource)) + if !ok { + chs.maybeMirror(fhr) + chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo "+chartSource.GitURL+" not mirrored yet") + chs.logger.Log("info", "chart repo not cloned yet", "resource", fhr.ResourceID().String()) + } else { + status, err := repo.Status() + if status != git.RepoReady { + chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo not mirrored yet: "+err.Error()) + chs.logger.Log("info", "chart repo not ready yet", "resource", fhr.ResourceID().String(), "status", string(status), "err", err) + } } + return chartPath, chartRevision, false + } + chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonCloned, "successfully cloned git repo") + chartPath = filepath.Join(chartClone.export.Dir(), chartSource.Path) + chartRevision = chartClone.head + + if chs.config.UpdateDeps && !fhr.Spec.ChartSource.GitChartSource.SkipDepUpdate { + if err := updateDependencies(chartPath, ""); err != nil { + chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonDependencyFailed, err.Error()) + chs.logger.Log("warning", "failed to update chart dependencies", "resource", fhr.ResourceID().String(), "err", err) + return chartPath, chartRevision, false + } + } + + return chartPath, chartRevision, true +} + +func (chs *ChartChangeSync) getRepoChartSource(fhr v1beta1.HelmRelease) (string, string, bool) { + chartPath, chartRevision := "", "" + chartSource := fhr.Spec.ChartSource.RepoChartSource + if chartSource == nil { + return chartPath, chartRevision, false } - fhrClient := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace) - cond := fluxv1beta1.HelmReleaseCondition{ - Type: typ, - Status: st, - LastTransitionTime: metav1.Now(), - Reason: reason, - Message: message, + path, err := ensureChartFetched(chs.config.ChartCache, chartSource) + if err != nil { + chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonDownloadFailed, "chart download failed: "+err.Error()) + chs.logger.Log("info", "chart download failed", "resource", fhr.ResourceID().String(), "err", err) + return chartPath, chartRevision, false } - return status.UpdateConditions(fhrClient, fhr, cond) + + chs.setCondition(fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonDownloaded, "chart fetched: "+filepath.Base(path)) + chartPath = path + chartRevision = chartSource.Version + + return chartPath, chartRevision, true } func sortStrings(ss []string) []string { @@ -549,7 +627,7 @@ func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_relea // Get the desired release state opts := release.InstallOptions{DryRun: true} tempRelName := string(fhr.UID) - desRel, err := chs.release.Install(chartsRepo, tempRelName, fhr, release.InstallAction, opts, &chs.kubeClient) + desRel, _, err := chs.release.Install(chartsRepo, tempRelName, fhr, release.InstallAction, opts, &chs.kubeClient) if err != nil { return false, err } diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 622c235ab..c08939fdf 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -23,6 +23,7 @@ import ( fhrv1 "github.com/weaveworks/flux/integrations/client/informers/externalversions/flux.weave.works/v1beta1" iflister "github.com/weaveworks/flux/integrations/client/listers/flux.weave.works/v1beta1" "github.com/weaveworks/flux/integrations/helm/chartsync" + "github.com/weaveworks/flux/integrations/helm/status" ) const ( @@ -96,8 +97,8 @@ func New( // ----- EVENT HANDLERS for HelmRelease resources change --------- fhrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(new interface{}) { - _, ok := checkCustomResourceType(controller.logger, new) - if ok { + fhr, ok := checkCustomResourceType(controller.logger, new) + if ok && !status.HasRolledBack(fhr) { controller.enqueueJob(new) } }, @@ -285,6 +286,15 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) { return } + // Skip if the current HelmRelease generation has been rolled + // back, as otherwise we will end up in a loop of failure, but + // continue if the checksum of the values differs, as the failure + // may have been the result of the values contents. + if newFhr.Spec.Rollback.Enable && status.HasRolledBack(newFhr) && c.sync.CompareValuesChecksum(newFhr) { + c.logger.Log("warning", "release has been rolled back, skipping", "resource", newFhr.ResourceID().String()) + return + } + log := []string{"info", "enqueuing release"} if diff != "" && c.logDiffs { log = append(log, "diff", diff) diff --git a/integrations/helm/release/release.go b/integrations/helm/release/release.go index ca420b62e..a3caeaba6 100644 --- a/integrations/helm/release/release.go +++ b/integrations/helm/release/release.go @@ -2,6 +2,8 @@ package release import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "io/ioutil" "net/url" @@ -14,7 +16,6 @@ import ( "github.com/ghodss/yaml" "github.com/go-kit/kit/log" "github.com/spf13/pflag" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -97,11 +98,31 @@ func (r *Release) GetUpgradableRelease(name string) (*hapi_release.Release, erro } } +// shouldRollback determines if a release should be rolled back +// based on the status of the Helm release. +func (r *Release) shouldRollback(name string) (bool, error) { + rls, err := r.HelmClient.ReleaseStatus(name) + if err != nil { + return false, err + } + + status := rls.GetInfo().GetStatus() + switch status.Code { + case hapi_release.Status_FAILED: + r.logger.Log("info", "rolling back release", "release", name) + return true, nil + case hapi_release.Status_PENDING_ROLLBACK: + r.logger.Log("info", "release already has a rollback pending", "release", name) + return false, nil + default: + return false, fmt.Errorf("release with status %s cannot be rolled back", status.Code.String()) + } +} + func (r *Release) canDelete(name string) (bool, error) { rls, err := r.HelmClient.ReleaseStatus(name) if err != nil { - r.logger.Log("error", fmt.Sprintf("Error finding status for release (%s): %#v", name, err)) return false, err } /* @@ -124,7 +145,6 @@ func (r *Release) canDelete(name string) (bool, error) { r.logger.Log("info", fmt.Sprintf("Release %s already deleted", name)) return false, nil default: - r.logger.Log("info", fmt.Sprintf("Release %s with status %s cannot be deleted", name, status.Code.String())) return false, fmt.Errorf("release %s with status %s cannot be deleted", name, status.Code.String()) } } @@ -137,7 +157,7 @@ func (r *Release) canDelete(name string) (bool, error) { // TODO(michael): cloneDir is only relevant if installing from git; // either split this procedure into two varieties, or make it more // general and calculate the path to the chart in the caller. -func (r *Release) Install(chartPath, releaseName string, fhr flux_v1beta1.HelmRelease, action Action, opts InstallOptions, kubeClient *kubernetes.Clientset) (release *hapi_release.Release, err error) { +func (r *Release) Install(chartPath, releaseName string, fhr flux_v1beta1.HelmRelease, action Action, opts InstallOptions, kubeClient *kubernetes.Clientset) (release *hapi_release.Release, checksum string, err error) { defer func(start time.Time) { ObserveRelease( start, @@ -150,14 +170,14 @@ func (r *Release) Install(chartPath, releaseName string, fhr flux_v1beta1.HelmRe }(time.Now()) if chartPath == "" { - return nil, fmt.Errorf("empty path to chart supplied for resource %q", fhr.ResourceID().String()) + return nil, "", fmt.Errorf("empty path to chart supplied for resource %q", fhr.ResourceID().String()) } _, err = os.Stat(chartPath) switch { case os.IsNotExist(err): - return nil, fmt.Errorf("no file or dir at path to chart: %s", chartPath) + return nil, "", fmt.Errorf("no file or dir at path to chart: %s", chartPath) case err != nil: - return nil, fmt.Errorf("error statting path given for chart %s: %s", chartPath, err.Error()) + return nil, "", fmt.Errorf("error statting path given for chart %s: %s", chartPath, err.Error()) } r.logger.Log("info", fmt.Sprintf("processing release %s (as %s)", fhr.ReleaseName(), releaseName), @@ -165,28 +185,19 @@ func (r *Release) Install(chartPath, releaseName string, fhr flux_v1beta1.HelmRe "options", fmt.Sprintf("%+v", opts), "timeout", fmt.Sprintf("%vs", fhr.GetTimeout())) - valuesFrom := fhr.Spec.ValuesFrom - // Maintain backwards compatibility with ValueFileSecrets - if fhr.Spec.ValueFileSecrets != nil { - var secretKeyRefs []flux_v1beta1.ValuesFromSource - for _, ref := range fhr.Spec.ValueFileSecrets { - s := &v1.SecretKeySelector{LocalObjectReference: ref} - secretKeyRefs = append(secretKeyRefs, flux_v1beta1.ValuesFromSource{SecretKeyRef: s}) - } - valuesFrom = append(secretKeyRefs, valuesFrom...) - } - vals, err := values(kubeClient.CoreV1(), fhr.Namespace, chartPath, valuesFrom, fhr.Spec.Values) + vals, err := Values(kubeClient.CoreV1(), fhr.Namespace, chartPath, fhr.GetValuesFromSources(), fhr.Spec.Values) if err != nil { r.logger.Log("error", fmt.Sprintf("Failed to compose values for Chart release [%s]: %v", fhr.Spec.ReleaseName, err)) - return nil, err + return nil, "", err } strVals, err := vals.YAML() if err != nil { r.logger.Log("error", fmt.Sprintf("Problem with supplied customizations for Chart release [%s]: %v", fhr.Spec.ReleaseName, err)) - return nil, err + return nil, "", err } rawVals := []byte(strVals) + checksum = ValuesChecksum(rawVals) switch action { case InstallAction: @@ -209,15 +220,15 @@ func (r *Release) Install(chartPath, releaseName string, fhr flux_v1beta1.HelmRe _, err = r.HelmClient.DeleteRelease(releaseName, k8shelm.DeletePurge(true)) if err != nil { r.logger.Log("error", fmt.Sprintf("Release deletion error: %#v", err)) - return nil, err + return nil, "", err } } - return nil, err + return nil, checksum, err } if !opts.DryRun { r.annotateResources(res.Release, fhr) } - return res.Release, err + return res.Release, checksum, err case UpgradeAction: res, err := r.HelmClient.UpdateRelease( releaseName, @@ -227,21 +238,53 @@ func (r *Release) Install(chartPath, releaseName string, fhr flux_v1beta1.HelmRe k8shelm.UpgradeTimeout(fhr.GetTimeout()), k8shelm.ResetValues(fhr.Spec.ResetValues), k8shelm.UpgradeForce(fhr.Spec.ForceUpgrade), + k8shelm.UpgradeWait(fhr.Spec.Rollback.Enable), ) if err != nil { r.logger.Log("error", fmt.Sprintf("Chart upgrade release failed: %s: %#v", fhr.Spec.ReleaseName, err)) - return nil, err + return nil, checksum, err } if !opts.DryRun { r.annotateResources(res.Release, fhr) } - return res.Release, err + return res.Release, checksum, err default: err = fmt.Errorf("Valid install options: CREATE, UPDATE. Provided: %s", action) r.logger.Log("error", err.Error()) + return nil, "", err + } +} + +// Rollback rolls back a Chart release if required +func (r *Release) Rollback(releaseName string, fhr flux_v1beta1.HelmRelease) (*hapi_release.Release, error) { + ok, err := r.shouldRollback(releaseName) + if !ok { + if err != nil { + return nil, err + } + return nil, nil + } + + res, err := r.HelmClient.RollbackRelease( + releaseName, + k8shelm.RollbackVersion(0), // '0' makes Helm fetch the latest deployed release + k8shelm.RollbackTimeout(fhr.Spec.Rollback.GetTimeout()), + k8shelm.RollbackForce(fhr.Spec.Rollback.Force), + k8shelm.RollbackRecreate(fhr.Spec.Rollback.Recreate), + k8shelm.RollbackDisableHooks(fhr.Spec.Rollback.DisableHooks), + k8shelm.RollbackWait(fhr.Spec.Rollback.Wait), + k8shelm.RollbackDescription("Automated rollback by Helm operator"), + ) + if err != nil { + r.logger.Log("error", fmt.Sprintf("failed to rollback release: %#v", err)) return nil, err } + + r.annotateResources(res.Release, fhr) + r.logger.Log("info", "rolled back release", "release", releaseName) + + return res.Release, err } // Delete purges a Chart release @@ -326,14 +369,9 @@ func (r *Release) annotateResources(release *hapi_release.Release, fhr flux_v1be } } -// fhrResourceID constructs a resource.ID for a HelmRelease resource. -func fhrResourceID(fhr flux_v1beta1.HelmRelease) resource.ID { - return resource.MakeID(fhr.Namespace, "HelmRelease", fhr.Name) -} - -// values tries to resolve all given value file sources and merges +// Values tries to resolve all given value file sources and merges // them into one Values struct. It returns the merged Values. -func values(corev1 k8sclientv1.CoreV1Interface, ns string, chartPath string, valuesFromSource []flux_v1beta1.ValuesFromSource, values chartutil.Values) (chartutil.Values, error) { +func Values(corev1 k8sclientv1.CoreV1Interface, ns string, chartPath string, valuesFromSource []flux_v1beta1.ValuesFromSource, values chartutil.Values) (chartutil.Values, error) { result := chartutil.Values{} for _, v := range valuesFromSource { @@ -437,6 +475,19 @@ func values(corev1 k8sclientv1.CoreV1Interface, ns string, chartPath string, val return result, nil } +// ValuesChecksum calculates the SHA256 checksum of the given raw +// values. +func ValuesChecksum(rawValues []byte) string { + hasher := sha256.New() + hasher.Write(rawValues) + return hex.EncodeToString(hasher.Sum(nil)) +} + +// fhrResourceID constructs a resource.ID for a HelmRelease resource. +func fhrResourceID(fhr flux_v1beta1.HelmRelease) resource.ID { + return resource.MakeID(fhr.Namespace, "HelmRelease", fhr.Name) +} + // Merges source and destination `chartutils.Values`, preferring values from the source Values // This is slightly adapted from https://github.com/helm/helm/blob/2332b480c9cb70a0d8a85247992d6155fbe82416/cmd/helm/install.go#L359 func mergeValues(dest, src chartutil.Values) chartutil.Values { diff --git a/integrations/helm/status/conditions.go b/integrations/helm/status/conditions.go index 14e892aa5..927e9de43 100644 --- a/integrations/helm/status/conditions.go +++ b/integrations/helm/status/conditions.go @@ -1,43 +1,66 @@ package status import ( + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" v1beta1client "github.com/weaveworks/flux/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// We can't rely on having UpdateStatus, or strategic merge patching -// for custom resources. So we have to create an object which -// represents the merge path or JSON patch to apply. -func UpdateConditionsPatch(status *v1beta1.HelmReleaseStatus, updates ...v1beta1.HelmReleaseCondition) { - newConditions := make([]v1beta1.HelmReleaseCondition, len(status.Conditions)) - oldConditions := status.Conditions - for i, c := range oldConditions { - newConditions[i] = c - } -updates: - for _, up := range updates { - for i, c := range oldConditions { - if c.Type == up.Type { - newConditions[i] = up - continue updates - } - } - newConditions = append(newConditions, up) + +// NewCondition creates a new HelmReleaseCondition. +func NewCondition(conditionType v1beta1.HelmReleaseConditionType, status v1.ConditionStatus, reason, message string) v1beta1.HelmReleaseCondition { + return v1beta1.HelmReleaseCondition{ + Type: conditionType, + Status: status, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, } - status.Conditions = newConditions } -// UpdateConditions retrieves a new copy of the HelmRelease given, -// applies the updates to this copy, and updates the resource in the -// cluster. -func UpdateConditions(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, updates ...v1beta1.HelmReleaseCondition) error { - cFhr, err := client.Get(fhr.Name, v1.GetOptions{}) +// SetCondition updates the HelmRelease to include the given condition. +func SetCondition(client v1beta1client.HelmReleaseInterface, hr v1beta1.HelmRelease, + condition v1beta1.HelmReleaseCondition) error { + + cHr, err := client.Get(hr.Name, metav1.GetOptions{}) if err != nil { return err } - UpdateConditionsPatch(&cFhr.Status, updates...) - _, err = client.UpdateStatus(cFhr) + currCondition := GetCondition(cHr.Status, condition.Type) + if currCondition != nil && currCondition.Status == condition.Status { + condition.LastTransitionTime = currCondition.LastTransitionTime + } + + newConditions := filterOutCondition(cHr.Status.Conditions, condition.Type) + cHr.Status.Conditions = append(newConditions, condition) + + _, err = client.UpdateStatus(cHr) return err } + +// GetCondition returns the condition with the given type. +func GetCondition(status v1beta1.HelmReleaseStatus, conditionType v1beta1.HelmReleaseConditionType) *v1beta1.HelmReleaseCondition { + for i := range status.Conditions { + c := status.Conditions[i] + if c.Type == conditionType { + return &c + } + } + return nil +} + +// filterOutCondition returns a new slice of conditions without the +// conditions of the given type. +func filterOutCondition(conditions []v1beta1.HelmReleaseCondition, conditionType v1beta1.HelmReleaseConditionType) []v1beta1.HelmReleaseCondition { + var newConditions []v1beta1.HelmReleaseCondition + for _, c := range conditions { + if c.Type == conditionType { + continue + } + newConditions = append(newConditions, c) + } + return newConditions +} diff --git a/integrations/helm/status/status.go b/integrations/helm/status/status.go index eab26ae9b..c0881d54e 100644 --- a/integrations/helm/status/status.go +++ b/integrations/helm/status/status.go @@ -17,34 +17,38 @@ import ( "time" "github.com/go-kit/kit/log" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kube "k8s.io/client-go/kubernetes" "k8s.io/helm/pkg/helm" + helmrelease "k8s.io/helm/pkg/proto/hapi/release" "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" - fluxclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" + ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" + iflister "github.com/weaveworks/flux/integrations/client/listers/flux.weave.works/v1beta1" v1beta1client "github.com/weaveworks/flux/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1" ) const period = 10 * time.Second type Updater struct { - fluxhelm fluxclientset.Interface + hrClient ifclientset.Interface + hrLister iflister.HelmReleaseLister kube kube.Interface helmClient *helm.Client namespace string } -func New(fhrClient fluxclientset.Interface, kubeClient kube.Interface, helmClient *helm.Client, namespace string) *Updater { +func New(hrClient ifclientset.Interface, hrLister iflister.HelmReleaseLister, helmClient *helm.Client) *Updater { return &Updater{ - fluxhelm: fhrClient, - kube: kubeClient, + hrClient: hrClient, + hrLister: hrLister, helmClient: helmClient, - namespace: namespace, } } -func (a *Updater) Loop(stop <-chan struct{}, logger log.Logger) { +func (u *Updater) Loop(stop <-chan struct{}, logger log.Logger) { ticker := time.NewTicker(period) var logErr error @@ -55,43 +59,23 @@ bail: break bail case <-ticker.C: } - var namespaces []string - if a.namespace != "" { - namespaces = append(namespaces, a.namespace) - } else { - all, err := a.kube.CoreV1().Namespaces().List(metav1.ListOptions{}) - if err != nil { - logErr = err - break bail - } - for _, ns := range all.Items { - namespaces = append(namespaces, ns.Name) - } + list, err := u.hrLister.List(labels.Everything()) + if err != nil { + logErr = err + break bail } - - // Look up HelmReleases - for _, ns := range namespaces { - fhrClient := a.fluxhelm.FluxV1beta1().HelmReleases(ns) - fhrs, err := fhrClient.List(metav1.ListOptions{}) - if err != nil { - logErr = err - break bail + for _, hr := range list { + nsHrClient := u.hrClient.FluxV1beta1().HelmReleases(hr.Namespace) + releaseName := hr.ReleaseName() + releaseStatus, _ := u.helmClient.ReleaseStatus(releaseName) + // If we are unable to get the status, we do not care why + if releaseStatus == nil { + continue } - for _, fhr := range fhrs.Items { - releaseName := fhr.ReleaseName() - // If we don't get the content, we don't care why - content, _ := a.helmClient.ReleaseContent(releaseName) - if content == nil { - continue - } - status := content.GetRelease().GetInfo().GetStatus() - if status.GetCode().String() != fhr.Status.ReleaseStatus { - err := UpdateReleaseStatus(fhrClient, fhr, releaseName, status.GetCode().String()) - if err != nil { - logger.Log("namespace", ns, "resource", fhr.Name, "err", err) - continue - } - } + statusStr := releaseStatus.Info.Status.Code.String() + if err := SetReleaseStatus(nsHrClient, *hr, releaseName, statusStr); err != nil { + logger.Log("namespace", hr.Namespace, "resource", hr.Name, "err", err) + continue } } } @@ -100,25 +84,115 @@ bail: logger.Log("loop", "stopping", "err", logErr) } -func UpdateReleaseStatus(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, releaseName, releaseStatus string) error { - cFhr, err := client.Get(fhr.Name, metav1.GetOptions{}) + +// SetReleaseStatus updates the status of the HelmRelease to the given +// release name and/or release status. +func SetReleaseStatus(client v1beta1client.HelmReleaseInterface, hr v1beta1.HelmRelease, releaseName, releaseStatus string) error { + cHr, err := client.Get(hr.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + if cHr.Status.ReleaseName == releaseName && cHr.Status.ReleaseStatus == releaseStatus { + return nil + } + + cHr.Status.ReleaseName = releaseName + cHr.Status.ReleaseStatus = releaseStatus + + _, err = client.UpdateStatus(cHr) + return err +} + + +// SetReleaseRevision updates the status of the HelmRelease to the +// given revision. +func SetReleaseRevision(client v1beta1client.HelmReleaseInterface, hr v1beta1.HelmRelease, revision string) error { + cHr, err := client.Get(hr.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + if cHr.Status.Revision == revision { + return nil + } + + cHr.Status.Revision = revision + + _, err = client.UpdateStatus(cHr) + return err +} + +// SetValuesChecksum updates the values checksum of the HelmRelease to +// the given checksum. +func SetValuesChecksum(client v1beta1client.HelmReleaseInterface, hr v1beta1.HelmRelease, valuesChecksum string) error { + cHr, err := client.Get(hr.Name, metav1.GetOptions{}) if err != nil { return err } - cFhr.Status.ReleaseName = releaseName - cFhr.Status.ReleaseStatus = releaseStatus - _, err = client.UpdateStatus(cFhr) + if valuesChecksum == "" || cHr.Status.ValuesChecksum == valuesChecksum { + return nil + } + + cHr.Status.ValuesChecksum = valuesChecksum + + _, err = client.UpdateStatus(cHr) return err } -func UpdateReleaseRevision(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, revision string) error { - cFhr, err := client.Get(fhr.Name, metav1.GetOptions{}) +// SetObservedGeneration updates the observed generation status of the +// HelmRelease to the given generation. +func SetObservedGeneration(client v1beta1client.HelmReleaseInterface, hr v1beta1.HelmRelease, generation int64) error { + cHr, err := client.Get(hr.Name, metav1.GetOptions{}) if err != nil { return err } - cFhr.Status.Revision = revision - _, err = client.UpdateStatus(cFhr) + if cHr.Status.ObservedGeneration >= generation { + return nil + } + + cHr.Status.ObservedGeneration = generation + + _, err = client.UpdateStatus(cHr) return err } + +// ReleaseFailed returns if the roll-out of the HelmRelease failed. +func ReleaseFailed(hr v1beta1.HelmRelease) bool { + return hr.Status.ReleaseStatus == helmrelease.Status_FAILED.String() +} + + +// HasSynced returns if the HelmRelease has been processed by the +// controller. +func HasSynced(hr v1beta1.HelmRelease) bool { + return hr.Status.ObservedGeneration >= hr.Generation +} + +// HasRolledBack returns if the current generation of the HelmRelease +// has been rolled back. +func HasRolledBack(hr v1beta1.HelmRelease) bool { + if !HasSynced(hr) { + return false + } + + rolledBack := GetCondition(hr.Status, v1beta1.HelmReleaseRolledBack) + if rolledBack == nil { + return false + } + + chartFetched := GetCondition(hr.Status, v1beta1.HelmReleaseChartFetched) + if chartFetched != nil { + // NB: as two successful state updates can happen right after + // each other, on which we both want to act, we _must_ compare + // the update timestamps as the transition timestamp will only + // change on a status shift. + if chartFetched.Status == v1.ConditionTrue && rolledBack.LastUpdateTime.Before(&chartFetched.LastUpdateTime) { + return false + } + } + + return rolledBack.Status == v1.ConditionTrue +}