Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client stores when it receives a task #821

Merged
merged 4 commits into from
Feb 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions api/compose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ func TestCompose(t *testing.T) {
SetMeta("foo", "bar").
Constrain(NewConstraint("kernel.name", "=", "linux")).
Require(&Resources{
CPU: 1250,
MemoryMB: 1024,
DiskMB: 2048,
IOPS: 500,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "0.0.0.0/0",
MBits: 100,
ReservedPorts: []Port{{"", 80}, {"", 443}},
CPU: 1250,
MemoryMB: 1024,
DiskMB: 2048,
IOPS: 500,
Networks: []*NetworkResource{
&NetworkResource{
CIDR: "0.0.0.0/0",
MBits: 100,
ReservedPorts: []Port{{"", 80}, {"", 443}},
},
},
},
})
})

// Compose a task group
grp := NewTaskGroup("grp1", 2).
Expand Down
16 changes: 8 additions & 8 deletions api/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ func assertWriteMeta(t *testing.T, wm *WriteMeta) {
func testJob() *Job {
task := NewTask("task1", "exec").
Require(&Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 25,
IOPS: 10,
}).
CPU: 100,
MemoryMB: 256,
DiskMB: 25,
IOPS: 10,
}).
SetLogConfig(&LogConfig{
MaxFiles: 1,
MaxFileSizeMB: 2,
})
MaxFiles: 1,
MaxFileSizeMB: 2,
})

group := NewTaskGroup("group1", 1).
AddTask(task)
Expand Down
37 changes: 34 additions & 3 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
Expand All @@ -19,6 +18,14 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second

// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
// send that the task was received to the server because another transistion
// to running or failed is likely to occur immediately after and a single
// update will transfer all past state information. If not other transistion
// has occured up to this limit, we will send to the server.
taskReceivedSyncLimit = 30 * time.Second
)

// AllocStateUpdater is used to update the status of an allocation
Expand All @@ -45,7 +52,12 @@ type AllocRunner struct {
restored map[string]struct{}
taskLock sync.RWMutex

taskStatusLock sync.RWMutex
// taskReceivedTimer is used to mitigate updates sent to the server because
// we expect that shortly after receiving an alloc it will transistion
// state. We use a timer to send the update if this hasn't happened after a
// reasonable time.
taskReceivedTimer *time.Timer
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation

Expand Down Expand Up @@ -126,7 +138,8 @@ func (r *AllocRunner) RestoreState() error {
if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else {
} else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status.
go tr.Run()
}
}
Expand Down Expand Up @@ -323,6 +336,24 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
taskState.State = state
r.appendTaskEvent(taskState, event)

// We don't immediately mark ourselves as dirty, since in most cases there
// will immediately be another state transistion. This reduces traffic to
// the server.
if event != nil && event.Type == structs.TaskReceived {
if r.taskReceivedTimer == nil {
r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() {
// Send a dirty signal to sync our state.
r.dirtyCh <- struct{}{}
})
}
return
}

// Cancel any existing received state timer.
if r.taskReceivedTimer != nil {
r.taskReceivedTimer.Stop()
}

select {
case r.dirtyCh <- struct{}{}:
default:
Expand Down
3 changes: 3 additions & 0 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}

// Set the state to pending.
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
return tc
}

Expand Down
32 changes: 20 additions & 12 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,24 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
t.Fatalf("timeout")
}

if len(upd.events) != 2 {
t.Fatalf("should have 2 updates: %#v", upd.events)
if len(upd.events) != 3 {
t.Fatalf("should have 3 updates: %#v", upd.events)
}

if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}

if upd.events[0].Type != structs.TaskStarted {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}

if upd.events[1].Type != structs.TaskTerminated {
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated)
if upd.events[1].Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
}

if upd.events[2].Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
}
}

Expand All @@ -107,20 +111,24 @@ func TestTaskRunner_Destroy(t *testing.T) {
t.Fatalf("timeout")
}

if len(upd.events) != 2 {
t.Fatalf("should have 2 updates: %#v", upd.events)
if len(upd.events) != 3 {
t.Fatalf("should have 3 updates: %#v", upd.events)
}

if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}

if upd.events[0].Type != structs.TaskStarted {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted)
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}

if upd.events[1].Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
}

if upd.events[1].Type != structs.TaskKilled {
t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled)
if upd.events[2].Type != structs.TaskKilled {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled)
}

}
Expand Down
14 changes: 7 additions & 7 deletions command/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func testServer(
func testJob(jobID string) *api.Job {
task := api.NewTask("task1", "exec").
Require(&api.Resources{
MemoryMB: 256,
DiskMB: 20,
CPU: 100,
}).
MemoryMB: 256,
DiskMB: 20,
CPU: 100,
}).
SetLogConfig(&api.LogConfig{
MaxFiles: 1,
MaxFileSizeMB: 2,
})
MaxFiles: 1,
MaxFileSizeMB: 2,
})

group := api.NewTaskGroup("group1", 1).
AddTask(task)
Expand Down
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// evaluation broker
if numWorkers := len(s.workers); numWorkers > 1 {
// Disabling half the workers frees half the CPUs.
for i := 0; i < numWorkers / 2; i++ {
for i := 0; i < numWorkers/2; i++ {
s.workers[i].SetPause(true)
}
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *Server) revokeLeadership() error {

// Unpause our worker if we paused previously
if len(s.workers) > 1 {
for i := 0; i < len(s.workers) / 2; i++ {
for i := 0; i < len(s.workers)/2; i++ {
s.workers[i].SetPause(false)
}
}
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,10 @@ const (
// failure in the driver.
TaskDriverFailure = "Driver Failure"

// Task Received signals that the task has been pulled by the client at the
// given timestamp.
TaskReceived = "Received"

// Task Started signals that the task was started and its timestamp can be
// used to determine the running length of the task.
TaskStarted = "Started"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *DriverChecker) hasDrivers(option *structs.Node) bool {
if err != nil {
c.ctx.Logger().
Printf("[WARN] scheduler.DriverChecker: node %v has invalid driver setting %v: %v",
option.ID, driverStr, value)
option.ID, driverStr, value)
return false
}

Expand Down