Skip to content

Commit

Permalink
Merge pull request #821 from hashicorp/f-client-set-receive
Browse files Browse the repository at this point in the history
Client stores when it receives a task
  • Loading branch information
dadgar committed Feb 20, 2016
2 parents d132625 + 6427d91 commit c960729
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 44 deletions.
22 changes: 11 additions & 11 deletions api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ func TestCompose(t *testing.T) {
SetMeta("foo", "bar").
Constrain(NewConstraint("kernel.name", "=", "linux")).
Require(&Resources{
CPU: 1250,
MemoryMB: 1024,
DiskMB: 2048,
IOPS: 500,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "0.0.0.0/0",
MBits: 100,
ReservedPorts: []Port{{"", 80}, {"", 443}},
CPU: 1250,
MemoryMB: 1024,
DiskMB: 2048,
IOPS: 500,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "0.0.0.0/0",
MBits: 100,
ReservedPorts: []Port{{"", 80}, {"", 443}},
},
},
},
})
})

// Compose a task group
grp := NewTaskGroup("grp1", 2).
Expand Down
16 changes: 8 additions & 8 deletions api/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ func assertWriteMeta(t *testing.T, wm *WriteMeta) {
func testJob() *Job {
task := NewTask("task1", "exec").
Require(&Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 25,
IOPS: 10,
}).
CPU: 100,
MemoryMB: 256,
DiskMB: 25,
IOPS: 10,
}).
SetLogConfig(&LogConfig{
MaxFiles: 1,
MaxFileSizeMB: 2,
})
MaxFiles: 1,
MaxFileSizeMB: 2,
})

group := NewTaskGroup("group1", 1).
AddTask(task)
Expand Down
37 changes: 34 additions & 3 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
Expand All @@ -19,6 +18,14 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second

// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
// send that the task was received to the server because another transistion
// to running or failed is likely to occur immediately after and a single
// update will transfer all past state information. If not other transistion
// has occured up to this limit, we will send to the server.
taskReceivedSyncLimit = 30 * time.Second
)

// AllocStateUpdater is used to update the status of an allocation
Expand All @@ -45,7 +52,12 @@ type AllocRunner struct {
restored map[string]struct{}
taskLock sync.RWMutex

taskStatusLock sync.RWMutex
// taskReceivedTimer is used to mitigate updates sent to the server because
// we expect that shortly after receiving an alloc it will transistion
// state. We use a timer to send the update if this hasn't happened after a
// reasonable time.
taskReceivedTimer *time.Timer
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation

Expand Down Expand Up @@ -126,7 +138,8 @@ func (r *AllocRunner) RestoreState() error {
if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else {
} else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status.
go tr.Run()
}
}
Expand Down Expand Up @@ -323,6 +336,24 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
r.appendTaskEvent(taskState, event)

// We don't immediately mark ourselves as dirty, since in most cases there
// will immediately be another state transistion. This reduces traffic to
// the server.
if event != nil && event.Type == structs.TaskReceived {
if r.taskReceivedTimer == nil {
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
// Send a dirty signal to sync our state.
r.dirtyCh <- struct{}{}
})
}
return
}

// Cancel any existing received state timer.
if r.taskReceivedTimer != nil {
r.taskReceivedTimer.Stop()
}

select {
case r.dirtyCh <- struct{}{}:
default:
Expand Down
3 changes: 3 additions & 0 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}

// Set the state to pending.
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
return tc
}

Expand Down
32 changes: 20 additions & 12 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,24 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
t.Fatalf("timeout")
}

if len(upd.events) != 2 {
t.Fatalf("should have 2 updates: %#v", upd.events)
if len(upd.events) != 3 {
t.Fatalf("should have 3 updates: %#v", upd.events)
}

if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}

if upd.events[0].Type != structs.TaskStarted {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}

if upd.events[1].Type != structs.TaskTerminated {
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated)
if upd.events[1].Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
}

if upd.events[2].Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
}
}

Expand All @@ -107,20 +111,24 @@ func TestTaskRunner_Destroy(t *testing.T) {
t.Fatalf("timeout")
}

if len(upd.events) != 2 {
t.Fatalf("should have 2 updates: %#v", upd.events)
if len(upd.events) != 3 {
t.Fatalf("should have 3 updates: %#v", upd.events)
}

if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}

if upd.events[0].Type != structs.TaskStarted {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}

if upd.events[1].Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
}

if upd.events[1].Type != structs.TaskKilled {
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled)
if upd.events[2].Type != structs.TaskKilled {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled)
}

}
Expand Down
14 changes: 7 additions & 7 deletions command/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func testServer(
func testJob(jobID string) *api.Job {
task := api.NewTask("task1", "exec").
Require(&api.Resources{
MemoryMB: 256,
DiskMB: 20,
CPU: 100,
}).
MemoryMB: 256,
DiskMB: 20,
CPU: 100,
}).
SetLogConfig(&api.LogConfig{
MaxFiles: 1,
MaxFileSizeMB: 2,
})
MaxFiles: 1,
MaxFileSizeMB: 2,
})

group := api.NewTaskGroup("group1", 1).
AddTask(task)
Expand Down
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// evaluation broker
if numWorkers := len(s.workers); numWorkers > 1 {
// Disabling half the workers frees half the CPUs.
for i := 0; i < numWorkers / 2; i++ {
for i := 0; i < numWorkers/2; i++ {
s.workers[i].SetPause(true)
}
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *Server) revokeLeadership() error {

// Unpause our worker if we paused previously
if len(s.workers) > 1 {
for i := 0; i < len(s.workers) / 2; i++ {
for i := 0; i < len(s.workers)/2; i++ {
s.workers[i].SetPause(false)
}
}
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,10 @@ const (
// failure in the driver.
TaskDriverFailure = "Driver Failure"

// Task Received signals that the task has been pulled by the client at the
// given timestamp.
TaskReceived = "Received"

// Task Started signals that the task was started and its timestamp can be
// used to determine the running length of the task.
TaskStarted = "Started"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *DriverChecker) hasDrivers(option *structs.Node) bool {
if err != nil {
c.ctx.Logger().
Printf("[WARN] scheduler.DriverChecker: node %v has invalid driver setting %v: %v",
option.ID, driverStr, value)
option.ID, driverStr, value)
return false
}

Expand Down

0 comments on commit c960729

Please sign in to comment.