Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix health checking for ephemeral poststart tasks #11945

Merged
merged 5 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,9 @@ const (
)

type TaskLifecycle struct {
Hook string `mapstructure:"hook" hcl:"hook,optional"`
Sidecar bool `mapstructure:"sidecar" hcl:"sidecar,optional"`
Hook string `mapstructure:"hook" hcl:"hook,optional"`
Sidecar bool `mapstructure:"sidecar" hcl:"sidecar,optional"`
IgnoreMinHealthyTime bool `mapstructure:"ignore_min_healthy_time" hcl:"ignore_min_healthy_time,optional"`
}

// Determine if lifecycle has user-input values
Expand Down
40 changes: 30 additions & 10 deletions client/allochealth/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type Tracker struct {
// not needed
allocStopped chan struct{}

// lifecycleTasks is a map of ephemeral tasks and their lifecycle hooks.
// lifecycleTasks is a map of ephemeral tasks and their lifecycle configs.
// These tasks may terminate without affecting alloc health
lifecycleTasks map[string]string
lifecycleTasks map[string]*structs.TaskLifecycleConfig

// l is used to lock shared fields listed below
l sync.Mutex
Expand Down Expand Up @@ -110,15 +110,15 @@ func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.A
consulClient: consulClient,
checkLookupInterval: consulCheckLookupInterval,
logger: logger,
lifecycleTasks: map[string]string{},
lifecycleTasks: map[string]*structs.TaskLifecycleConfig{},
}

t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
for _, task := range t.tg.Tasks {
t.taskHealth[task.Name] = &taskHealthState{task: task}

if task.Lifecycle != nil && !task.Lifecycle.Sidecar {
t.lifecycleTasks[task.Name] = task.Lifecycle.Hook
t.lifecycleTasks[task.Name] = task.Lifecycle
}

for _, s := range task.Services {
Expand Down Expand Up @@ -277,15 +277,35 @@ func (t *Tracker) watchTaskEvents() {
// Detect if the alloc is unhealthy or if all tasks have started yet
latestStartTime := time.Time{}
for taskName, state := range alloc.TaskStates {
// If the task is a poststop task we do not want to evaluate it
// since it will remain pending until the main task has finished
// or exited.
if t.lifecycleTasks[taskName] == structs.TaskLifecycleHookPoststop {
continue
if t.lifecycleTasks[taskName] != nil {
// If the task is a poststop task we do not want to evaluate it
// since it will remain pending until the main task has finished or
// exited.
if t.lifecycleTasks[taskName].Hook == structs.TaskLifecycleHookPoststop {
continue
}

// If this is a poststart task which has already succeeded we
// want to check for two possible success conditions before
// attempting to evaluate it.
if t.lifecycleTasks[taskName].Hook == structs.TaskLifecycleHookPoststart && state.Successful() {

// If the task was successful and it's runtime is at least
// t.minHealthyTime, skip evaluation.
if state.FinishedAt.Sub(state.StartedAt) >= t.minHealthyTime {
continue
}
beautifulentropy marked this conversation as resolved.
Show resolved Hide resolved

// If the task was successful and the user set
// 'ignore_min_healthy_time' to 'true', skip evaluation.
if t.lifecycleTasks[taskName].IgnoreMinHealthyTime {
continue
}
}
}

// One of the tasks has failed so we can exit watching
if state.Failed || (!state.FinishedAt.IsZero() && t.lifecycleTasks[taskName] != structs.TaskLifecycleHookPrestart) {
if state.Failed || (!state.FinishedAt.IsZero() && t.lifecycleTasks[taskName].Hook != structs.TaskLifecycleHookPrestart) {
beautifulentropy marked this conversation as resolved.
Show resolved Hide resolved
t.setTaskHealth(false, true)
return
}
Expand Down
82 changes: 82 additions & 0 deletions client/allochealth/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,88 @@ func TestTracker_Checks_PendingPostStop_Healthy(t *testing.T) {
}
}

func TestTracker_Checks_PendingPostStart_Healthy(t *testing.T) {
t.Parallel()

alloc := mock.LifecycleAllocWithPoststartDeploy(false)
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = time.Millisecond * 100
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
"web": {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
"post": {
State: structs.TaskStateDead,
StartedAt: time.Now(),
FinishedAt: time.Now().Add(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime),
},
}

logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()

consul := consul.NewMockConsulServiceClient(t, logger)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

select {
case <-time.After(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime * 2):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}

func TestTracker_Checks_PendingPostStartIgnoreMinHealthyTime_Healthy(t *testing.T) {
t.Parallel()

alloc := mock.LifecycleAllocWithPoststartDeploy(true)
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = time.Millisecond * 100
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
"web": {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
"post": {
State: structs.TaskStateDead,
StartedAt: time.Now(),
FinishedAt: time.Now().Add(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime / 2),
},
}

logger := testlog.HCLogger(t)
b := cstructs.NewAllocBroadcaster(logger)
defer b.Close()

consul := consul.NewMockConsulServiceClient(t, logger)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

checkInterval := 10 * time.Millisecond
tracker := NewTracker(ctx, logger, alloc, b.Listen(), consul,
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime, true)
tracker.checkLookupInterval = checkInterval
tracker.Start()

select {
case <-time.After(alloc.Job.TaskGroups[0].Migrate.MinHealthyTime * 2):
require.Fail(t, "timed out while waiting for health")
case h := <-tracker.HealthyCh():
require.True(t, h)
}
}

func TestTracker_Checks_Unhealthy(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 3 additions & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,8 +1172,9 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,

if apiTask.Lifecycle != nil {
structsTask.Lifecycle = &structs.TaskLifecycleConfig{
Hook: apiTask.Lifecycle.Hook,
Sidecar: apiTask.Lifecycle.Sidecar,
Hook: apiTask.Lifecycle.Hook,
Sidecar: apiTask.Lifecycle.Sidecar,
IgnoreMinHealthyTime: apiTask.Lifecycle.IgnoreMinHealthyTime,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
valid := []string{
"hook",
"sidecar",
"ignore_min_healthy_time",
}
if err := checkHCLKeys(lifecycleBlock.Val, valid); err != nil {
return nil, multierror.Prefix(err, "lifecycle ->")
Expand Down
182 changes: 182 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,112 @@ func LifecycleJobWithPoststopDeploy() *structs.Job {
return job
}

func LifecycleJobWithPoststartDeploy(ignoreMinHealthyTime bool) *structs.Job {
lifecycleConfig := &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPoststart,
IgnoreMinHealthyTime: ignoreMinHealthyTime,
}
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-service-%s", uuid.Generate()),
Name: "my-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeBatch,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{
{
Name: "web",
Count: 1,
Migrate: structs.DefaultMigrateStrategy(),
RestartPolicy: &structs.RestartPolicy{
Attempts: 0,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
Mode: structs.RestartPolicyModeFail,
},
Tasks: []*structs.Task{
{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
{
Name: "side",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
Lifecycle: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
{
Name: "post",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
Lifecycle: lifecycleConfig,
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
{
Name: "init",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
Lifecycle: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: false,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
},
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}

func LifecycleAllocWithPoststopDeploy() *structs.Allocation {
alloc := &structs.Allocation{
ID: uuid.Generate(),
Expand Down Expand Up @@ -759,6 +865,82 @@ func LifecycleAllocWithPoststopDeploy() *structs.Allocation {
return alloc
}

func LifecycleAllocWithPoststartDeploy(ignoreMinHealthyTime bool) *structs.Allocation {
alloc := &structs.Allocation{
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: "12345678-abcd-efab-cdef-123456789xyz",
Namespace: structs.DefaultNamespace,
TaskGroup: "web",

// TODO Remove once clientv2 gets merged
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
TaskResources: map[string]*structs.Resources{
"web": {
CPU: 1000,
MemoryMB: 256,
},
"init": {
CPU: 1000,
MemoryMB: 256,
},
"side": {
CPU: 1000,
MemoryMB: 256,
},
"post": {
CPU: 1000,
MemoryMB: 256,
},
},

AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
"init": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
"side": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
"post": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
},
},
Job: LifecycleJobWithPoststartDeploy(ignoreMinHealthyTime),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc.JobID = alloc.Job.ID
return alloc
}

func MaxParallelJob() *structs.Job {
update := *structs.DefaultUpdateStrategy
update.MaxParallel = 0
Expand Down
Loading