Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1571 from endocode/dongsu/fleetd-conflict-global-…
Browse files Browse the repository at this point in the history
…unit

fleetd: support conflict in global unit
  • Loading branch information
Dongsu Park committed Jul 1, 2016
2 parents e28dbf4 + c2e360d commit 6fb1256
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Documentation/unit-files-and-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Note that these requirements are derived directly from systemd, with the only ex
| `MachineOf` | Limit eligible machines to the one that hosts a specific unit. |
| `MachineMetadata` | Limit eligible machines to those with this specific metadata. |
| `Conflicts` | Prevent a unit from being collocated with other units using glob-matching on the other unit names. |
| `Global` | Schedule this unit on all agents in the cluster. A unit is considered invalid if options other than `MachineMetadata` are provided alongside `Global=true`. |
| `Global` | Schedule this unit on those agents in the cluster, which satisfy the conditions of both `MachineMetadata` and `Conflicts` if any of them is also given. A unit is considered invalid if options other than `MachineMetadata` and `Conflicts` are provided alongside `Global=true`. If `MachineMetadata` is provided alongside `Global=true`, only the agents having the metadata can be scheduled on. If `Conflicts` is provided alongside `Global=true`, only the agents not having the conflicting units can be scheduled on. The conflicting units also can not be scheduled on the agents which already have the existing conflicting global unit.|
| `Replaces` | Schedule a specified unit on another machine. A unit is considered invalid if options `Global` or `Conflicts` are provided alongside `Replaces=`. A circular replacement between multiple units is not allowed. |

See [more information][unit-scheduling] on these parameters and how they impact scheduling decisions.
Expand Down
15 changes: 12 additions & 3 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,25 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
for _, u := range units {
u := u
md := u.RequiredTargetMetadata()
if u.IsGlobal() && !machine.HasMetadata(&ms, md) {
log.Debugf("Agent unable to run global unit %s: missing required metadata", u.Name)
continue

if u.IsGlobal() {
if !machine.HasMetadata(&ms, md) {
log.Debugf("Agent unable to run global unit %s: missing required metadata", u.Name)
continue
}
}

if !u.IsGlobal() {
sUnit, ok := sUnitMap[u.Name]
if !ok || sUnit.TargetMachineID == "" || sUnit.TargetMachineID != ms.ID {
continue
}
}

if cExists, _ := as.HasConflict(u.Name, u.Conflicts()); cExists {
continue
}

as.Units[u.Name] = &u
}

Expand Down
6 changes: 3 additions & 3 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (as *AgentState) unitScheduled(name string) bool {
return as.Units[name] != nil
}

// hasConflict determines whether there are any known conflicts with the given Unit
func (as *AgentState) hasConflict(pUnitName string, pConflicts []string) (found bool, conflict string) {
// HasConflict determines whether there are any known conflicts with the given Unit
func (as *AgentState) HasConflict(pUnitName string, pConflicts []string) (found bool, conflict string) {
for _, eUnit := range as.Units {
if pUnitName == eUnit.Name {
continue
Expand Down Expand Up @@ -145,7 +145,7 @@ func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
}
}

if cExists, cJobName := as.hasConflict(j.Name, j.Conflicts()); cExists {
if cExists, cJobName := as.HasConflict(j.Name, j.Conflicts()); cExists {
return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
}

Expand Down
2 changes: 1 addition & 1 deletion agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestHasConflicts(t *testing.T) {
}

for i, tt := range tests {
got, conflict := tt.cState.hasConflict(tt.job.Name, tt.job.Conflicts())
got, conflict := tt.cState.HasConflict(tt.job.Name, tt.job.Conflicts())
if got != tt.want {
var msg string
if tt.want == true {
Expand Down
2 changes: 0 additions & 2 deletions api/units.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ func ValidateOptions(opts []*schema.UnitOption) error {
return errors.New("MachineID cannot be used with Replaces")
case isGlobal && hasPeers:
return errors.New("Global cannot be used with Peers")
case isGlobal && hasConflicts:
return errors.New("Global cannot be used with Conflicts")
case isGlobal && hasReplaces:
return errors.New("Global cannot be used with Replaces")
case hasConflicts && hasReplaces:
Expand Down
7 changes: 4 additions & 3 deletions api/units_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func TestValidateOptions(t *testing.T) {
},
true,
},
// Global with Peers/Conflicts no good
// Global with Conflicts is ok
{
[]*schema.UnitOption{
&schema.UnitOption{
Expand All @@ -660,7 +660,7 @@ func TestValidateOptions(t *testing.T) {
},
makeConflictUO("foo.service"),
},
false,
true,
},
{
[]*schema.UnitOption{
Expand All @@ -671,8 +671,9 @@ func TestValidateOptions(t *testing.T) {
},
makeConflictUO("bar.service"),
},
false,
true,
},
// Global with peer no good
{
[]*schema.UnitOption{
&schema.UnitOption{
Expand Down
9 changes: 7 additions & 2 deletions engine/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ func (cs *clusterState) agents() map[string]*agent.AgentState {
for _, gu := range cs.gUnits {
gu := gu
for _, a := range agents {
if machine.HasMetadata(a.MState, gu.RequiredTargetMetadata()) {
a.Units[gu.Name] = gu
if !machine.HasMetadata(a.MState, gu.RequiredTargetMetadata()) {
continue
}

if cExists, _ := a.HasConflict(gu.Name, gu.Conflicts()); cExists {
continue
}
a.Units[gu.Name] = gu
}
}

Expand Down
6 changes: 0 additions & 6 deletions fleetctl/fleetctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,6 @@ MachineOf=zxcvq`),
"foo.service",
newUnitFile(t, `[X-Fleet]
Global=true
Conflicts=bar`),
},
{
"foo.service",
newUnitFile(t, `[X-Fleet]
Global=true
Replaces=bar`),
},
{
Expand Down
9 changes: 9 additions & 0 deletions functional/fixtures/units/conflict-global.0.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Global=true
Conflicts=conflict-global.*.service
9 changes: 9 additions & 0 deletions functional/fixtures/units/conflict-global.1.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Global=true
Conflicts=conflict-global.*.service
80 changes: 80 additions & 0 deletions functional/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,83 @@ func TestScheduleGlobalUnits(t *testing.T) {
}
}
}

// TestScheduleGlobalConflicts starts 2 global units that conflict with each
// other, and check if only the first one can be found.
func TestScheduleGlobalConflicts(t *testing.T) {
// Create a three-member cluster
cluster, err := platform.NewNspawnCluster("smoke")
if err != nil {
t.Fatal(err)
}
defer cluster.Destroy(t)
members, err := platform.CreateNClusterMembers(cluster, 3)
if err != nil {
t.Fatal(err)
}
m0 := members[0]
machines, err := cluster.WaitForNMachines(m0, 3)
if err != nil {
t.Fatal(err)
}

cfGlobal0 := "fixtures/units/conflict-global.0.service"
cfGlobal1 := "fixtures/units/conflict-global.1.service"

// Launch a global unit
stdout, stderr, err := cluster.Fleetctl(m0, "start", "--no-block", cfGlobal0)
if err != nil {
t.Fatalf("Failed starting units: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err)
}

// the global unit should show up active on 3 machines
_, err = cluster.WaitForNActiveUnits(m0, 3)
if err != nil {
t.Fatal(err)
}

// Now add another global unit, which actually should not be started.
stdout, stderr, err = cluster.Fleetctl(m0, "start", "--no-block", cfGlobal1)
if err != nil {
t.Fatalf("Failed starting unit: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err)
}

// Should see only 3 units
states, err := cluster.WaitForNActiveUnits(m0, 3)
if err != nil {
t.Fatal(err)
}

// Each machine should have a single global unit conflict-global.0.service,
// but not conflict-global.1.service.
us0 := states[path.Base(cfGlobal0)]
us1 := states[path.Base(cfGlobal1)]
for _, mach := range machines {
var found bool
for _, state := range us0 {
if state.Machine == mach {
found = true
break
}
}
if !found {
t.Fatalf("Did not find global unit on machine %v", mach)
t.Logf("Found unit states:")
for _, state := range states {
t.Logf("%#v", state)
}
}

found = false
for _, state := range us1 {
if state.Machine == mach {
found = true
break
}
}
if found {
t.Fatalf("Did find global unit %s on machine %v", us1, mach)
t.Logf("Global units were not conflicted as expected.")
}
}
}

0 comments on commit 6fb1256

Please sign in to comment.