Skip to content

Commit

Permalink
job_hooks: add implicit constraint when using Consul for services. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed Apr 20, 2022
1 parent 4c55339 commit 8eb569f
Show file tree
Hide file tree
Showing 10 changed files with 834 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .changelog/12602.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
consul: Added implicit Consul constraint for task groups utilising Consul service and check registrations
```
2 changes: 2 additions & 0 deletions client/alloc_watcher_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func TestPrevAlloc_StreamAllocDir_TLS(t *testing.T) {
Operand: "=",
},
}
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Tasks[0].Services = nil
job.TaskGroups[0].Count = 1
job.TaskGroups[0].EphemeralDisk.Sticky = true
job.TaskGroups[0].EphemeralDisk.Migrate = true
Expand Down
1 change: 1 addition & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
job := mock.Job()
// allow running job on any node including self client, that may not be a Linux box
job.Constraints = nil
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Count = 1
task := job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
Expand Down
2 changes: 1 addition & 1 deletion command/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestIntegration_Command_RoundTripJob(t *testing.T) {
defer srv.Shutdown()

{
cmd := exec.Command("nomad", "job", "init")
cmd := exec.Command("nomad", "job", "init", "-short")
cmd.Dir = tmpDir
assert.Nil(cmd.Run())
}
Expand Down
121 changes: 74 additions & 47 deletions nomad/job_endpoint_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ var (
Operand: structs.ConstraintSemver,
}

// consulServiceDiscoveryConstraint is the implicit constraint added to
// task groups which include services utilising the Consul provider. The
// Consul version is pinned to a minimum of that which introduced the
// namespace feature.
consulServiceDiscoveryConstraint = &structs.Constraint{
LTarget: "${attr.consul.version}",
RTarget: ">= 1.7.0",
Operand: structs.ConstraintSemver,
}

// nativeServiceDiscoveryConstraint is the constraint injected into task
// groups that utilise Nomad's native service discovery feature. This is
// needed, as operators can disable the client functionality, and therefore
Expand Down Expand Up @@ -134,77 +144,94 @@ func (jobImpliedConstraints) Mutate(j *structs.Job) (*structs.Job, []error, erro
// Identify which task groups are utilising Nomad native service discovery.
nativeServiceDisco := j.RequiredNativeServiceDiscovery()

// Identify which task groups are utilising Consul service discovery.
consulServiceDisco := j.RequiredConsulServiceDiscovery()

// Hot path
if len(signals) == 0 && len(vaultBlocks) == 0 && len(nativeServiceDisco) == 0 {
if len(signals) == 0 && len(vaultBlocks) == 0 &&
len(nativeServiceDisco) == 0 && len(consulServiceDisco) == 0 {
return j, nil, nil
}

// Add Vault constraints if no Vault constraint exists
// Iterate through all the task groups within the job and add any required
// constraints. When adding new implicit constraints, they should go inside
// this single loop, with a new constraintMatcher if needed.
for _, tg := range j.TaskGroups {
_, ok := vaultBlocks[tg.Name]
if !ok {
// Not requesting Vault
continue

// If the task group utilises Vault, run the mutator.
if _, ok := vaultBlocks[tg.Name]; ok {
mutateConstraint(constraintMatcherLeft, tg, vaultConstraint)
}

found := false
for _, c := range tg.Constraints {
if c.LTarget == vaultConstraintLTarget {
found = true
break
}
// Check whether the task group is using signals. In the case that it
// is, we flatten the signals and build a constraint, then run the
// mutator.
if tgSignals, ok := signals[tg.Name]; ok {
required := helper.MapStringStringSliceValueSet(tgSignals)
sigConstraint := getSignalConstraint(required)
mutateConstraint(constraintMatcherFull, tg, sigConstraint)
}

if !found {
tg.Constraints = append(tg.Constraints, vaultConstraint)
// If the task group utilises Nomad service discovery, run the mutator.
if ok := nativeServiceDisco[tg.Name]; ok {
mutateConstraint(constraintMatcherFull, tg, nativeServiceDiscoveryConstraint)
}
}

// Add signal constraints
for _, tg := range j.TaskGroups {
tgSignals, ok := signals[tg.Name]
if !ok {
// Not requesting signal
continue
// If the task group utilises Consul service discovery, run the mutator.
if ok := consulServiceDisco[tg.Name]; ok {
mutateConstraint(constraintMatcherLeft, tg, consulServiceDiscoveryConstraint)
}
}

// Flatten the signals
required := helper.MapStringStringSliceValueSet(tgSignals)
sigConstraint := getSignalConstraint(required)
return j, nil, nil
}

found := false
for _, c := range tg.Constraints {
if c.Equals(sigConstraint) {
found = true
break
}
}
// constraintMatcher is a custom type which helps control how constraints are
// identified as being present within a task group.
type constraintMatcher uint

if !found {
tg.Constraints = append(tg.Constraints, sigConstraint)
}
}
const (
// constraintMatcherFull ensures that a constraint is only considered found
// when they match totally. This check is performed using the
// structs.Constraint Equals function.
constraintMatcherFull constraintMatcher = iota

// constraintMatcherLeft ensure that a constraint is considered found if
// the constraints LTarget is matched only. This allows an existing
// constraint to override the proposed implicit one.
constraintMatcherLeft
)

// Add the Nomad service discovery constraints.
for _, tg := range j.TaskGroups {
if ok := nativeServiceDisco[tg.Name]; !ok {
continue
}
// mutateConstraint is a generic mutator used to set implicit constraints
// within the task group if they are needed.
func mutateConstraint(matcher constraintMatcher, taskGroup *structs.TaskGroup, constraint *structs.Constraint) {

found := false
for _, c := range tg.Constraints {
if c.Equals(nativeServiceDiscoveryConstraint) {
var found bool

// It's possible to switch on the matcher within the constraint loop to
// reduce repetition. This, however, means switching per constraint,
// therefore we do it here.
switch matcher {
case constraintMatcherFull:
for _, c := range taskGroup.Constraints {
if c.Equals(constraint) {
found = true
break
}
}

if !found {
tg.Constraints = append(tg.Constraints, nativeServiceDiscoveryConstraint)
case constraintMatcherLeft:
for _, c := range taskGroup.Constraints {
if c.LTarget == constraint.LTarget {
found = true
break
}
}
}

return j, nil, nil
// If we didn't find a suitable constraint match, add one.
if !found {
taskGroup.Constraints = append(taskGroup.Constraints, constraint)
}
}

// jobValidate validates a Job and task drivers and returns an error if there is
Expand Down
Loading

0 comments on commit 8eb569f

Please sign in to comment.