Skip to content

Commit

Permalink
Add --hook-parallelism flag
Browse files Browse the repository at this point in the history
Add --hook-parallelism flag to helm deployment hooks but not the testing hook

What this PR does / why we need it:
Adds hook parallelism with a default of 1 as a flag for applicable commands. Does not add parallelism for test hooks as that seems to be the reason why this helm/community#165 was closed but I think there is still value having deployment hooks work in parallel.

Special notes for your reviewer:
Based off of helm#8946 by abaehremc and
helm#7792 by akhilles
  • Loading branch information
JvD-Ericsson committed Feb 9, 2023
1 parent 5abcf74 commit b1bade4
Show file tree
Hide file tree
Showing 15 changed files with 880 additions and 103 deletions.
1 change: 1 addition & 0 deletions cmd/helm/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal
f.BoolVar(&client.DryRun, "dry-run", false, "simulate an install")
f.BoolVar(&client.Force, "force", false, "force resource updates through a replacement strategy")
f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during install")
f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel")
f.BoolVar(&client.Replace, "replace", false, "re-use the given name, only if that name is a deleted release which remains in the history. This is unsafe in production")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
Expand Down
1 change: 1 addition & 0 deletions cmd/helm/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable")
f.BoolVar(&client.Force, "force", false, "force resource update through delete/recreate if needed")
f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback")
f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
Expand Down
1 change: 1 addition & 0 deletions cmd/helm/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func newUninstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f := cmd.Flags()
f.BoolVar(&client.DryRun, "dry-run", false, "simulate a uninstall")
f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during uninstallation")
f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel")
f.BoolVar(&client.KeepHistory, "keep-history", false, "remove all associated resources and mark the release as deleted, but retain the release history")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all the resources are deleted before returning. It will wait for as long as --timeout")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
Expand Down
2 changes: 2 additions & 0 deletions cmd/helm/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
instClient.Force = client.Force
instClient.DryRun = client.DryRun
instClient.DisableHooks = client.DisableHooks
instClient.HookParallelism = client.HookParallelism
instClient.SkipCRDs = client.SkipCRDs
instClient.Timeout = client.Timeout
instClient.Wait = client.Wait
Expand Down Expand Up @@ -220,6 +221,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.MarkDeprecated("recreate-pods", "functionality will no longer be updated. Consult the documentation for other methods to recreate pods")
f.BoolVar(&client.Force, "force", false, "force resource updates through a replacement strategy")
f.BoolVar(&client.DisableHooks, "no-hooks", false, "disable pre/post upgrade hooks")
f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel")
f.BoolVar(&client.DisableOpenAPIValidation, "disable-openapi-validation", false, "if set, the upgrade process will not validate rendered templates against the Kubernetes OpenAPI Schema")
f.BoolVar(&client.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed when an upgrade is performed with install flag enabled. By default, CRDs are installed if not already present, when an upgrade is performed with install flag enabled")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
Expand Down
10 changes: 9 additions & 1 deletion pkg/action/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -281,3 +281,11 @@ func TestGetVersionSet(t *testing.T) {
t.Error("Non-existent version is reported found.")
}
}

func withSecondHook(hookManifest string) chartOption {
return func(opts *chartOptions) {
opts.Templates = append(opts.Templates,
&chart.File{Name: "templates/hooks-test", Data: []byte(hookManifest)},
)
}
}
180 changes: 110 additions & 70 deletions pkg/action/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package action
import (
"bytes"
"sort"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -26,100 +27,139 @@ import (
helmtime "helm.sh/helm/v3/pkg/time"
)

// execHook executes all of the hooks for the given hook event.
func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error {
executingHooks := []*release.Hook{}
// execHookEvent executes all of the hooks for the given hook event.
func (cfg *Configuration) execHookEvent(rl *release.Release, event release.HookEvent, timeout time.Duration, parallelism int) error {
if parallelism < 1 {
parallelism = 1
}

weightedHooks := make(map[int][]*release.Hook)
for _, h := range rl.Hooks {
for _, e := range h.Events {
if e == hook {
executingHooks = append(executingHooks, h)
if e == event {
// Set default delete policy to before-hook-creation
if h.DeletePolicies == nil || len(h.DeletePolicies) == 0 {
// TODO(jlegrone): Only apply before-hook-creation delete policy to run to completion
// resources. For all other resource types update in place if a
// resource with the same name already exists and is owned by the
// current release.
h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation}
}
weightedHooks[h.Weight] = append(weightedHooks[h.Weight], h)
}
}
}

// hooke are pre-ordered by kind, so keep order stable
sort.Stable(hookByWeight(executingHooks))

for _, h := range executingHooks {
// Set default delete policy to before-hook-creation
if h.DeletePolicies == nil || len(h.DeletePolicies) == 0 {
// TODO(jlegrone): Only apply before-hook-creation delete policy to run to completion
// resources. For all other resource types update in place if a
// resource with the same name already exists and is owned by the
// current release.
h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation}
}

if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation); err != nil {
return err
}

resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest), true)
if err != nil {
return errors.Wrapf(err, "unable to build kubernetes object for %s hook %s", hook, h.Path)
weights := make([]int, 0, len(weightedHooks))
for w := range weightedHooks {
weights = append(weights, w)
// sort hooks in each weighted group by name
sort.Slice(weightedHooks[w], func(i, j int) bool {
return weightedHooks[w][i].Name < weightedHooks[w][j].Name
})
}
sort.Ints(weights)

var mut sync.RWMutex
for _, w := range weights {
sem := make(chan struct{}, parallelism)
errsChan := make(chan error)
errs := make([]error, 0)
for _, h := range weightedHooks[w] {
// execute hooks in parallel (with limited parallelism enforced by semaphore)
go func(h *release.Hook) {
sem <- struct{}{}
errsChan <- cfg.execHook(rl, h, &mut, timeout)
<-sem
}(h)
}

// Record the time at which the hook was applied to the cluster
h.LastRun = release.HookExecution{
StartedAt: helmtime.Now(),
Phase: release.HookPhaseRunning,
// collect errors
for range weightedHooks[w] {
if err := <-errsChan; err != nil {
errs = append(errs, err)
}
}
cfg.recordRelease(rl)

// As long as the implementation of WatchUntilReady does not panic, HookPhaseFailed or HookPhaseSucceeded
// should always be set by this function. If we fail to do that for any reason, then HookPhaseUnknown is
// the most appropriate value to surface.
h.LastRun.Phase = release.HookPhaseUnknown

// Create hook resources
if _, err := cfg.KubeClient.Create(resources); err != nil {
h.LastRun.CompletedAt = helmtime.Now()
h.LastRun.Phase = release.HookPhaseFailed
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
if len(errs) > 0 {
return errors.Errorf("%s hook event failed with %d error(s): %s", event, len(errs), joinErrors(errs))
}

// Watch hook resources until they have completed
err = cfg.KubeClient.WatchUntilReady(resources, timeout)
// Note the time of success/failure
h.LastRun.CompletedAt = helmtime.Now()
// Mark hook as succeeded or failed
if err != nil {
h.LastRun.Phase = release.HookPhaseFailed
// If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
if err := cfg.deleteHookByPolicy(h, release.HookFailed); err != nil {
}
// If all hooks are successful, check the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, w := range weights {
for _, h := range weightedHooks[w] {
if err := cfg.deleteHookByPolicy(h, release.HookSucceeded); err != nil {
return err
}
return err
}
h.LastRun.Phase = release.HookPhaseSucceeded
}
return nil
}

// If all hooks are successful, check the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks {
if err := cfg.deleteHookByPolicy(h, release.HookSucceeded); err != nil {
return err
// execHook executes a hook.
func (cfg *Configuration) execHook(rl *release.Release, h *release.Hook, mut *sync.RWMutex, timeout time.Duration) (err error) {
if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation); err != nil {
return err
}

resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest), true)
if err != nil {
return errors.Wrapf(err, "unable to build kubernetes object for applying hook %s", h.Path)
}

// Record the time at which the hook was applied to the cluster
updateHookPhase(h, mut, release.HookPhaseRunning)
// Thread safety: exclusive lock is necessary to ensure that none of the hook structs are modified during recordRelease
mut.Lock()
cfg.recordRelease(rl)
mut.Unlock()

// As long as the implementation of WatchUntilReady does not panic, HookPhaseFailed or HookPhaseSucceeded
// should always be set by this function. If we fail to do that for any reason, then HookPhaseUnknown is
// the most appropriate value to surface.
defer func() {
if panic := recover(); panic != nil {
updateHookPhase(h, mut, release.HookPhaseUnknown)
err = errors.Errorf("panicked while executing hook %s", h.Path)
}
}()

// Create hook resources
if _, err = cfg.KubeClient.Create(resources); err != nil {
updateHookPhase(h, mut, release.HookPhaseFailed)
return errors.Wrapf(err, "warning: hook %s failed", h.Path)
}

// Watch hook resources until they have completed then mark hook as succeeded or failed
if err = cfg.KubeClient.WatchUntilReady(resources, timeout); err != nil {
updateHookPhase(h, mut, release.HookPhaseFailed)
// If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook.
if deleteHookErr := cfg.deleteHookByPolicy(h, release.HookFailed); deleteHookErr != nil {
return deleteHookErr
}
return err
}
updateHookPhase(h, mut, release.HookPhaseSucceeded)
return nil
}

// hookByWeight is a sorter for hooks
type hookByWeight []*release.Hook

func (x hookByWeight) Len() int { return len(x) }
func (x hookByWeight) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
func (x hookByWeight) Less(i, j int) bool {
if x[i].Weight == x[j].Weight {
return x[i].Name < x[j].Name
// updateHookPhase updates the phase of a hook in a thread-safe manner.
func updateHookPhase(h *release.Hook, mut *sync.RWMutex, phase release.HookPhase) {
// Thread safety: shared lock is sufficient because each execHook goroutine operates on a different hook
completedAtTime := helmtime.Now()
mut.RLock()
startedAtTime := helmtime.Now()
switch phase {
case release.HookPhaseRunning:
h.LastRun.StartedAt = startedAtTime
case release.HookPhaseSucceeded, release.HookPhaseFailed:
h.LastRun.CompletedAt = completedAtTime
}
return x[i].Weight < x[j].Weight
h.LastRun.Phase = phase
mut.RUnlock()
}

// deleteHookByPolicy deletes a hook if the hook policy instructs it to
// deleteHookByPolicy deletes a hook if the hook policy instructs it to.
func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy) error {
// Never delete CustomResourceDefinitions; this could cause lots of
// cascading garbage collection.
Expand Down
6 changes: 4 additions & 2 deletions pkg/action/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Install struct {
CreateNamespace bool
DryRun bool
DisableHooks bool
HookParallelism int
Replace bool
Wait bool
WaitForJobs bool
Expand Down Expand Up @@ -360,7 +361,7 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t

// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil {
if err := i.cfg.execHookEvent(rel, release.HookPreInstall, i.Timeout, i.HookParallelism); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err))
return
}
Expand Down Expand Up @@ -396,7 +397,7 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
}

if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil {
if err := i.cfg.execHookEvent(rel, release.HookPostInstall, i.Timeout, i.HookParallelism); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err))
return
}
Expand Down Expand Up @@ -444,6 +445,7 @@ func (i *Install) failRelease(rel *release.Release, err error) (*release.Release
i.cfg.Log("Install failed and atomic is set, uninstalling release")
uninstall := NewUninstall(i.cfg)
uninstall.DisableHooks = i.DisableHooks
uninstall.HookParallelism = i.HookParallelism
uninstall.KeepHistory = false
uninstall.Timeout = i.Timeout
if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil {
Expand Down
82 changes: 82 additions & 0 deletions pkg/action/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,85 @@ func TestNameAndChartGenerateName(t *testing.T) {
})
}
}

func TestInstallRelease_HookParallelism(t *testing.T) {
is := assert.New(t)
t.Run("hook parallelism of 0 defaults to 1", func(t *testing.T) {
instAction := installAction(t)
instAction.HookParallelism = 0
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
if err != nil {
t.Fatalf("Failed install: %s", err)
}
is.Equal(res.Name, "test-install-release", "Expected release name.")
is.Equal(res.Namespace, "spaced")

rel, err := instAction.cfg.Releases.Get(res.Name, res.Version)
is.NoError(err)

is.Len(rel.Hooks, 1)
is.Equal(rel.Hooks[0].Manifest, manifestWithHook)
is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall)
is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete")

is.NotEqual(len(res.Manifest), 0)
is.NotEqual(len(rel.Manifest), 0)
is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world")
is.Equal(rel.Info.Description, "Install complete")
})

t.Run("hook parallelism greater than number of hooks", func(t *testing.T) {
instAction := installAction(t)
instAction.HookParallelism = 10
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
if err != nil {
t.Fatalf("Failed install: %s", err)
}
is.Equal(res.Name, "test-install-release", "Expected release name.")
is.Equal(res.Namespace, "spaced")

rel, err := instAction.cfg.Releases.Get(res.Name, res.Version)
is.NoError(err)

is.Len(rel.Hooks, 1)
is.Equal(rel.Hooks[0].Manifest, manifestWithHook)
is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall)
is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete")

is.NotEqual(len(res.Manifest), 0)
is.NotEqual(len(rel.Manifest), 0)
is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world")
is.Equal(rel.Info.Description, "Install complete")
})

t.Run("hook parallelism with multiple hooks", func(t *testing.T) {
instAction := installAction(t)
instAction.HookParallelism = 2
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(withSecondHook(manifestWithHook)), vals)
if err != nil {
t.Fatalf("Failed install: %s", err)
}
is.Equal(res.Name, "test-install-release", "Expected release name.")
is.Equal(res.Namespace, "spaced")

rel, err := instAction.cfg.Releases.Get(res.Name, res.Version)
is.NoError(err)

is.Len(rel.Hooks, 2)
is.Equal(rel.Hooks[0].Manifest, manifestWithHook)
is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall)
is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete")

is.Equal(rel.Hooks[1].Manifest, manifestWithHook)
is.Equal(rel.Hooks[1].Events[0], release.HookPostInstall)
is.Equal(rel.Hooks[1].Events[1], release.HookPreDelete, "Expected event 1 is pre-delete")

is.NotEqual(len(res.Manifest), 0)
is.NotEqual(len(rel.Manifest), 0)
is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world")
is.Equal(rel.Info.Description, "Install complete")
})
}
Loading

0 comments on commit b1bade4

Please sign in to comment.