Skip to content

Commit

Permalink
template: apply splay value on change_mode script
Browse files Browse the repository at this point in the history
Previously, the splay timeout was only applied if a template re-render
caused a restart or a signal action. The `change_mode = "script"` was
running after the `if restart || len(signals) != 0` check, so it was
invoked at all times.

This change refactors the logic so it's easier to notice that new
`change_mode` options should start only after `splay` is applied.
  • Loading branch information
lgfa29 committed Sep 29, 2022
1 parent 50001d1 commit 9d88839
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 39 deletions.
89 changes: 51 additions & 38 deletions client/allocrunner/taskrunner/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,52 +463,65 @@ func (tm *TaskTemplateManager) onTemplateRendered(handledRenders map[string]time
handling = append(handling, id)
}

if restart || len(signals) != 0 {
if splay != 0 {
ns := splay.Nanoseconds()
offset := rand.Int63n(ns)
t := time.Duration(offset)

select {
case <-time.After(t):
case <-tm.shutdownCh:
return
}
}
shouldHandle := restart || len(signals) != 0 || len(scripts) != 0
if !shouldHandle {
return
}

// Apply splay timeout to avoid applying change_mode too frequently.
if splay != 0 {
ns := splay.Nanoseconds()
offset := rand.Int63n(ns)
t := time.Duration(offset)

// Update handle time
for _, id := range handling {
handledRenders[id] = events[id].LastDidRender
select {
case <-time.After(t):
case <-tm.shutdownCh:
return
}
}

if restart {
tm.config.Lifecycle.Restart(context.Background(),
structs.NewTaskEvent(structs.TaskRestartSignal).
SetDisplayMessage("Template with change_mode restart re-rendered"), false)
} else if len(signals) != 0 {
var mErr multierror.Error
for signal := range signals {
s := tm.signals[signal]
event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetDisplayMessage("Template re-rendered")
if err := tm.config.Lifecycle.Signal(event, signal); err != nil {
_ = multierror.Append(&mErr, err)
}
}
// Update handle time
for _, id := range handling {
handledRenders[id] = events[id].LastDidRender
}

if err := mErr.ErrorOrNil(); err != nil {
flat := make([]os.Signal, 0, len(signals))
for signal := range signals {
flat = append(flat, tm.signals[signal])
}
if restart {
tm.config.Lifecycle.Restart(context.Background(),
structs.NewTaskEvent(structs.TaskRestartSignal).
SetDisplayMessage("Template with change_mode restart re-rendered"), false)
} else {
// Handle signals and scripts since the task may have multiple
// templates with mixed change_mode values.
tm.handleChangeModeSignal(signals)
tm.handleChangeModeScript(scripts)
}
}

tm.config.Lifecycle.Kill(context.Background(),
structs.NewTaskEvent(structs.TaskKilling).
SetFailsTask().
SetDisplayMessage(fmt.Sprintf("Template failed to send signals %v: %v", flat, err)))
}
func (tm *TaskTemplateManager) handleChangeModeSignal(signals map[string]struct{}) {
var mErr multierror.Error
for signal := range signals {
s := tm.signals[signal]
event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetDisplayMessage("Template re-rendered")
if err := tm.config.Lifecycle.Signal(event, signal); err != nil {
_ = multierror.Append(&mErr, err)
}
}

if err := mErr.ErrorOrNil(); err != nil {
flat := make([]os.Signal, 0, len(signals))
for signal := range signals {
flat = append(flat, tm.signals[signal])
}

tm.config.Lifecycle.Kill(context.Background(),
structs.NewTaskEvent(structs.TaskKilling).
SetFailsTask().
SetDisplayMessage(fmt.Sprintf("Template failed to send signals %v: %v", flat, err)))
}
}

func (tm *TaskTemplateManager) handleChangeModeScript(scripts []*structs.ChangeScript) {
// process script execution concurrently
var wg sync.WaitGroup
for _, script := range scripts {
Expand Down
128 changes: 127 additions & 1 deletion client/allocrunner/taskrunner/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ BAR={{key "bar"}}
// Update the keys in Consul
harness.consul.SetKV(t, key1, []byte(content1_2))

// Wait for restart
// Wait for script execution
timeout := time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second)
OUTER:
for {
Expand Down Expand Up @@ -1373,6 +1373,132 @@ BAR={{key "bar"}}
require.Contains(harness.mockHooks.KillEvent.DisplayMessage, "task is being killed")
}

func TestTaskTemplateManager_ChangeModeMixed(t *testing.T) {
ci.Parallel(t)

templateRestart := &structs.Template{
EmbeddedTmpl: `
RESTART={{key "restart"}}
COMMON={{key "common"}}
`,
DestPath: "restart",
ChangeMode: structs.TemplateChangeModeRestart,
}
templateSignal := &structs.Template{
EmbeddedTmpl: `
SIGNAL={{key "signal"}}
COMMON={{key "common"}}
`,
DestPath: "signal",
ChangeMode: structs.TemplateChangeModeSignal,
ChangeSignal: "SIGALRM",
}
templateScript := &structs.Template{
EmbeddedTmpl: `
SCRIPT={{key "script"}}
COMMON={{key "common"}}
`,
DestPath: "script",
ChangeMode: structs.TemplateChangeModeScript,
ChangeScript: &structs.ChangeScript{
Command: "/bin/foo",
Args: []string{},
Timeout: 5 * time.Second,
FailOnError: true,
},
}
templates := []*structs.Template{
templateRestart,
templateSignal,
templateScript,
}

me := mockExecutor{DesiredExit: 0, DesiredErr: nil}
harness := newTestHarness(t, templates, true, false)
harness.start(t)
harness.manager.SetDriverHandle(&me)
defer harness.stop()

// Ensure no unblock
select {
case <-harness.mockHooks.UnblockCh:
require.Fail(t, "Task unblock should not have been called")
case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second):
}

// Write the key to Consul
harness.consul.SetKV(t, "common", []byte(fmt.Sprintf("%v", time.Now())))
harness.consul.SetKV(t, "restart", []byte(fmt.Sprintf("%v", time.Now())))
harness.consul.SetKV(t, "signal", []byte(fmt.Sprintf("%v", time.Now())))
harness.consul.SetKV(t, "script", []byte(fmt.Sprintf("%v", time.Now())))

// Wait for the unblock
select {
case <-harness.mockHooks.UnblockCh:
case <-time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second):
require.Fail(t, "Task unblock should have been called")
}

t.Run("restart takes precedence", func(t *testing.T) {
// Update the common Consul key.
harness.consul.SetKV(t, "common", []byte(fmt.Sprintf("%v", time.Now())))

// Collect some events.
timeout := time.After(time.Duration(3*testutil.TestMultiplier()) * time.Second)
events := []*structs.TaskEvent{}
OUTER:
for {
select {
case <-harness.mockHooks.RestartCh:
// Consume restarts so the channel is clean for other tests.
case <-harness.mockHooks.SignalCh:
require.Fail(t, "signal not expected")
case ev := <-harness.mockHooks.EmitEventCh:
events = append(events, ev)
case <-timeout:
break OUTER
}
}

for _, ev := range events {
require.NotContains(t, ev.DisplayMessage, templateScript.ChangeScript.Command)
require.NotContains(t, ev.Type, structs.TaskSignaling)
}
})

t.Run("signal and script", func(t *testing.T) {
// Update the signal and script Consul keys.
harness.consul.SetKV(t, "signal", []byte(fmt.Sprintf("%v", time.Now())))
harness.consul.SetKV(t, "script", []byte(fmt.Sprintf("%v", time.Now())))

// Wait for a events.
var gotSignal, gotScript bool
timeout := time.After(time.Duration(5*testutil.TestMultiplier()) * time.Second)
for {
select {
case <-harness.mockHooks.RestartCh:
require.Fail(t, "restart not expected")
case ev := <-harness.mockHooks.EmitEventCh:
if strings.Contains(ev.DisplayMessage, templateScript.ChangeScript.Command) {
// Make sure we only run script once.
require.False(t, gotScript)
gotScript = true
}
case <-harness.mockHooks.SignalCh:
// Make sure we only signal once.
require.False(t, gotSignal)
gotSignal = true
case <-timeout:
require.Fail(t, "timeout waiting for script and signal")
}

if gotScript && gotSignal {
break
}
}
})
}

// TestTaskTemplateManager_FiltersProcessEnvVars asserts that we only render
// environment variables found in task env-vars and not read the nomad host
// process environment variables. nomad host process environment variables
Expand Down

0 comments on commit 9d88839

Please sign in to comment.