Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support per-task RestartPolicy #7288

Merged
merged 3 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 203 additions & 14 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -222,9 +223,10 @@ func TestJobs_Canonicalize(t *testing.T) {
},
Tasks: []*Task{
{
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
RestartPolicy: defaultBatchJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -316,10 +318,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -363,6 +366,10 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
// inherit other values from TG
Attempts: intToPtr(20),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
Expand Down Expand Up @@ -486,6 +493,12 @@ func TestJobs_Canonicalize(t *testing.T) {
"db": 6379,
}},
},
RestartPolicy: &RestartPolicy{
Interval: timeToPtr(5 * time.Minute),
Attempts: intToPtr(20),
Delay: timeToPtr(25 * time.Second),
Mode: stringToPtr("delay"),
},
Resources: &Resources{
CPU: intToPtr(500),
MemoryMB: intToPtr(256),
Expand Down Expand Up @@ -712,10 +725,11 @@ func TestJobs_Canonicalize(t *testing.T) {
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
Expand Down Expand Up @@ -753,12 +767,187 @@ func TestJobs_Canonicalize(t *testing.T) {
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: defaultServiceJobRestartPolicy(),
},
},
},
},
},
},

{
name: "restart_merge",
input: &Job{
Name: stringToPtr("foo"),
ID: stringToPtr("bar"),
ParentID: stringToPtr("lol"),
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
},
},
},
},
{
Name: stringToPtr("baz"),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
Tasks: []*Task{
{
Name: "task1",
},
},
},
},
},
expected: &Job{
Namespace: stringToPtr(DefaultNamespace),
ID: stringToPtr("bar"),
Name: stringToPtr("foo"),
Region: stringToPtr("global"),
Type: stringToPtr("service"),
ParentID: stringToPtr("lol"),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
ConsulToken: stringToPtr(""),
VaultToken: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
TaskGroups: []*TaskGroup{
{
Name: stringToPtr("bar"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Attempts: intToPtr(5),
Delay: timeToPtr(1 * time.Second),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
{
Name: stringToPtr("baz"),
Count: intToPtr(1),
EphemeralDisk: &EphemeralDisk{
Sticky: boolToPtr(false),
Migrate: boolToPtr(false),
SizeMB: intToPtr(300),
},
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: intToPtr(0),
Interval: timeToPtr(0),
DelayFunction: stringToPtr("exponential"),
Delay: timeToPtr(30 * time.Second),
MaxDelay: timeToPtr(1 * time.Hour),
Unlimited: boolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
Migrate: DefaultMigrateStrategy(),
Tasks: []*Task{
{
Name: "task1",
LogConfig: DefaultLogConfig(),
Resources: DefaultResources(),
KillTimeout: timeToPtr(5 * time.Second),
RestartPolicy: &RestartPolicy{
Delay: timeToPtr(20 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr("fail"),
},
},
},
},
Expand Down
56 changes: 37 additions & 19 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,6 @@ func (g *TaskGroup) Canonicalize(job *Job) {
if g.Scaling != nil {
g.Scaling.Canonicalize(*g.Count)
}
for _, t := range g.Tasks {
t.Canonicalize(g, job)
}
if g.EphemeralDisk == nil {
g.EphemeralDisk = DefaultEphemeralDisk()
} else {
Expand Down Expand Up @@ -515,30 +512,20 @@ func (g *TaskGroup) Canonicalize(job *Job) {
var defaultRestartPolicy *RestartPolicy
switch *job.Type {
case "service", "system":
// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultServiceJobRestartPolicy()
default:
// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
defaultRestartPolicy = &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
defaultRestartPolicy = defaultBatchJobRestartPolicy()
}

if g.RestartPolicy != nil {
defaultRestartPolicy.Merge(g.RestartPolicy)
}
g.RestartPolicy = defaultRestartPolicy

for _, t := range g.Tasks {
t.Canonicalize(g, job)
}

for _, spread := range g.Spreads {
spread.Canonicalize()
}
Expand All @@ -553,6 +540,28 @@ func (g *TaskGroup) Canonicalize(job *Job) {
}
}

// These needs to be in sync with DefaultServiceJobRestartPolicy in
// in nomad/structs/structs.go
func defaultServiceJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(2),
Interval: timeToPtr(30 * time.Minute),
Mode: stringToPtr(RestartPolicyModeFail),
}
}

// These needs to be in sync with DefaultBatchJobRestartPolicy in
// in nomad/structs/structs.go
func defaultBatchJobRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Delay: timeToPtr(15 * time.Second),
Attempts: intToPtr(3),
Interval: timeToPtr(24 * time.Hour),
Mode: stringToPtr(RestartPolicyModeFail),
}
}

// Constrain is used to add a constraint to a task group.
func (g *TaskGroup) Constrain(c *Constraint) *TaskGroup {
g.Constraints = append(g.Constraints, c)
Expand Down Expand Up @@ -645,6 +654,7 @@ type Task struct {
Env map[string]string
Services []*Service
Resources *Resources
RestartPolicy *RestartPolicy
Meta map[string]string
KillTimeout *time.Duration `mapstructure:"kill_timeout"`
LogConfig *LogConfig `mapstructure:"logs"`
Expand Down Expand Up @@ -697,6 +707,14 @@ func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
if t.CSIPluginConfig != nil {
t.CSIPluginConfig.Canonicalize()
}
if t.RestartPolicy == nil {
t.RestartPolicy = tg.RestartPolicy
} else {
tgrp := &RestartPolicy{}
*tgrp = *tg.RestartPolicy
tgrp.Merge(t.RestartPolicy)
t.RestartPolicy = tgrp
}
}

// TaskArtifact is used to download artifacts before running a task.
Expand Down
4 changes: 3 additions & 1 deletion client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,12 @@ func TestAllocations_GarbageCollect(t *testing.T) {

a := mock.Alloc()
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{
rp := &nstructs.RestartPolicy{
Attempts: 0,
Mode: nstructs.RestartPolicyModeFail,
}
a.Job.TaskGroups[0].RestartPolicy = rp
a.Job.TaskGroups[0].Tasks[0].RestartPolicy = rp
a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10ms",
}
Expand Down
Loading