Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Affinity struct, API and parsing #4512

Merged
merged 8 commits into from
Jul 24, 2018
Merged
9 changes: 9 additions & 0 deletions api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestCompose(t *testing.T) {
// Compose a task group
grp := NewTaskGroup("grp1", 2).
Constrain(NewConstraint("kernel.name", "=", "linux")).
AddAffinity(NewAffinity("${node.class}", "=", "large", 50)).
SetMeta("foo", "bar").
AddTask(task)

Expand Down Expand Up @@ -72,6 +73,14 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
Affinities: []*Affinity{
{
LTarget: "${node.class}",
RTarget: "large",
Operand: "=",
Weight: 50,
},
},
Tasks: []*Task{
{
Name: "task1",
Expand Down
7 changes: 7 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ type Job struct {
AllAtOnce *bool `mapstructure:"all_at_once"`
Datacenters []string
Constraints []*Constraint
Affinities []*Affinity
TaskGroups []*TaskGroup
Update *UpdateStrategy
Periodic *PeriodicConfig
Expand Down Expand Up @@ -836,6 +837,12 @@ func (j *Job) Constrain(c *Constraint) *Job {
return j
}

// AddAffinity is used to add an affinity to a job.
func (j *Job) AddAffinity(a *Affinity) *Job {
j.Affinities = append(j.Affinities, a)
return j
}

// AddTaskGroup adds a task group to an existing job.
func (j *Job) AddTaskGroup(grp *TaskGroup) *Job {
j.TaskGroups = append(j.TaskGroups, grp)
Expand Down
36 changes: 36 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,42 @@ func TestJobs_Constrain(t *testing.T) {
}
}

func TestJobs_AddAffinity(t *testing.T) {
t.Parallel()
job := &Job{Affinities: nil}

// Create and add an affinity
out := job.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(job.Affinities); n != 1 {
t.Fatalf("expected 1 affinity, got: %d", n)
}

// Check that the job was returned
if job != out {
t.Fatalf("expect: %#v, got: %#v", job, out)
}

// Adding another affinity preserves the original
job.AddAffinity(NewAffinity("${node.datacenter}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(job.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, job.Affinities)
}
}

func TestJobs_Sort(t *testing.T) {
t.Parallel()
jobs := []*JobListStub{
Expand Down
31 changes: 31 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ func (r *ReschedulePolicy) Canonicalize(jobType string) {
}
}

// Affinity is used to serialize task group affinities
type Affinity struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Constraint operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight float64 // Weight applied to nodes that match the affinity. Can be negative
}

func NewAffinity(LTarget string, Operand string, RTarget string, Weight float64) *Affinity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to change; just fyi: you can skip repeating the type if it's the same between arguments:

func NewAffinity(LTarget, Operand, RTarget string, Weight float64) *Affinity {

return &Affinity{
LTarget: LTarget,
RTarget: RTarget,
Operand: Operand,
Weight: Weight,
}
}

func NewDefaultReschedulePolicy(jobType string) *ReschedulePolicy {
var dp *ReschedulePolicy
switch jobType {
Expand Down Expand Up @@ -413,6 +430,7 @@ type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
Expand Down Expand Up @@ -543,6 +561,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup {
return g
}

// AddAffinity is used to add a new affinity to a task group.
func (g *TaskGroup) AddAffinity(a *Affinity) *TaskGroup {
g.Affinities = append(g.Affinities, a)
return g
}

// RequireDisk adds a ephemeral disk to the task group
func (g *TaskGroup) RequireDisk(disk *EphemeralDisk) *TaskGroup {
g.EphemeralDisk = disk
Expand Down Expand Up @@ -583,6 +607,7 @@ type Task struct {
User string
Config map[string]interface{}
Constraints []*Constraint
Affinities []*Affinity
Env map[string]string
Services []*Service
Resources *Resources
Expand Down Expand Up @@ -771,6 +796,12 @@ func (t *Task) Constrain(c *Constraint) *Task {
return t
}

// AddAffinity adds a new affinity to a single task.
func (t *Task) AddAffinity(a *Affinity) *Task {
t.Affinities = append(t.Affinities, a)
return t
}

// SetLogConfig sets a log config to a task
func (t *Task) SetLogConfig(l *LogConfig) *Task {
t.LogConfig = l
Expand Down
72 changes: 72 additions & 0 deletions api/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,42 @@ func TestTaskGroup_Constrain(t *testing.T) {
}
}

func TestTaskGroup_AddAffinity(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)

// Add an affinity to the group
out := grp.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(grp.Affinities); n != 1 {
t.Fatalf("expected 1 affinity, got: %d", n)
}

// Check that the group was returned
if out != grp {
t.Fatalf("expected: %#v, got: %#v", grp, out)
}

// Add a second affinity
grp.AddAffinity(NewAffinity("${node.affinity}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.affinity}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(grp.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp.Constraints)
}
}

func TestTaskGroup_SetMeta(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
Expand Down Expand Up @@ -232,6 +268,42 @@ func TestTask_Constrain(t *testing.T) {
}
}

func TestTask_AddAffinity(t *testing.T) {
t.Parallel()
task := NewTask("task1", "exec")

// Add an affinity to the task
out := task.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(task.Affinities); n != 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.Fatalf("expected 1 affinity, got: %d", n)
}

// Check that the task was returned
if out != task {
t.Fatalf("expected: %#v, got: %#v", task, out)
}

// Add a second affinity
task.AddAffinity(NewAffinity("${node.datacenter}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(task.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, task.Affinities)
}
}

func TestTask_Artifact(t *testing.T) {
t.Parallel()
a := TaskArtifact{
Expand Down
34 changes: 34 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,15 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
}
}

if l := len(job.Affinities); l != 0 {
j.Affinities = make([]*structs.Affinity, l)
for i, a := range job.Affinities {
aff := &structs.Affinity{}
ApiAffinityToStructs(a, aff)
j.Affinities[i] = aff
}
}

// COMPAT: Remove in 0.7.0. Update has been pushed into the task groups
if job.Update != nil {
j.Update = structs.UpdateStrategy{}
Expand Down Expand Up @@ -675,6 +684,15 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}
}

if l := len(taskGroup.Affinities); l != 0 {
tg.Affinities = make([]*structs.Affinity, l)
for k, affinity := range taskGroup.Affinities {
a := &structs.Affinity{}
ApiAffinityToStructs(affinity, a)
tg.Affinities[k] = a
}
}

tg.RestartPolicy = &structs.RestartPolicy{
Attempts: *taskGroup.RestartPolicy.Attempts,
Interval: *taskGroup.RestartPolicy.Interval,
Expand Down Expand Up @@ -754,6 +772,15 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
}
}

if l := len(apiTask.Affinities); l != 0 {
structsTask.Affinities = make([]*structs.Affinity, l)
for i, a := range apiTask.Affinities {
aff := &structs.Affinity{}
ApiAffinityToStructs(a, aff)
structsTask.Affinities[i] = aff
}
}

if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services {
Expand Down Expand Up @@ -892,3 +919,10 @@ func ApiConstraintToStructs(c1 *api.Constraint, c2 *structs.Constraint) {
c2.RTarget = c1.RTarget
c2.Operand = c1.Operand
}

func ApiAffinityToStructs(a1 *api.Affinity, a2 *structs.Affinity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have this return the structs affinity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the pattern in the rest of this file, like we do for constraint above, but happy to fix this one and the upcoming equivalent method for spread.

a2.LTarget = a1.LTarget
a2.Operand = a1.Operand
a2.RTarget = a1.RTarget
a2.Weight = a1.Weight
}
48 changes: 48 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "c",
},
},
Affinities: []*api.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Update: &api.UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
MaxParallel: helper.IntToPtr(5),
Expand Down Expand Up @@ -1248,6 +1256,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*api.Affinity{
{
LTarget: "x",
RTarget: "y",
Operand: "z",
Weight: 100,
},
},
RestartPolicy: &api.RestartPolicy{
Interval: helper.TimeToPtr(1 * time.Second),
Attempts: helper.IntToPtr(5),
Expand Down Expand Up @@ -1303,6 +1319,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*api.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},

Services: []*api.Service{
{
Expand Down Expand Up @@ -1443,6 +1467,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "c",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Update: structs.UpdateStrategy{
Stagger: 1 * time.Second,
MaxParallel: 5,
Expand Down Expand Up @@ -1474,6 +1506,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "x",
RTarget: "y",
Operand: "z",
Weight: 100,
},
},
RestartPolicy: &structs.RestartPolicy{
Interval: 1 * time.Second,
Attempts: 5,
Expand Down Expand Up @@ -1528,6 +1568,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Env: map[string]string{
"hello": "world",
},
Expand Down
Loading