diff --git a/api/compose_test.go b/api/compose_test.go index f4b21fcbd3d8..5f3ca68a534e 100644 --- a/api/compose_test.go +++ b/api/compose_test.go @@ -12,18 +12,18 @@ func TestCompose(t *testing.T) { SetMeta("foo", "bar"). Constrain(NewConstraint("kernel.name", "=", "linux")). Require(&Resources{ - CPU: 1250, - MemoryMB: 1024, - DiskMB: 2048, - IOPS: 500, - Networks: []*NetworkResource{ - &NetworkResource{ - CIDR: "0.0.0.0/0", - MBits: 100, - ReservedPorts: []Port{{"", 80}, {"", 443}}, + CPU: 1250, + MemoryMB: 1024, + DiskMB: 2048, + IOPS: 500, + Networks: []*NetworkResource{ + &NetworkResource{ + CIDR: "0.0.0.0/0", + MBits: 100, + ReservedPorts: []Port{{"", 80}, {"", 443}}, + }, }, - }, - }) + }) // Compose a task group grp := NewTaskGroup("grp1", 2). diff --git a/api/util_test.go b/api/util_test.go index 4ae9f985a554..cc6276b0f651 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -22,15 +22,15 @@ func assertWriteMeta(t *testing.T, wm *WriteMeta) { func testJob() *Job { task := NewTask("task1", "exec"). Require(&Resources{ - CPU: 100, - MemoryMB: 256, - DiskMB: 25, - IOPS: 10, - }). + CPU: 100, + MemoryMB: 256, + DiskMB: 25, + IOPS: 10, + }). SetLogConfig(&LogConfig{ - MaxFiles: 1, - MaxFileSizeMB: 2, - }) + MaxFiles: 1, + MaxFileSizeMB: 2, + }) group := NewTaskGroup("group1", 1). AddTask(task) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3e3976264eee..9fda29dccdba 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" @@ -19,6 +18,14 @@ const ( // allocSyncRetryIntv is the interval on which we retry updating // the status of the allocation allocSyncRetryIntv = 15 * time.Second + + // taskReceivedSyncLimit is how long the client will wait before sending + // that a task was received to the server. The client does not immediately + // send that the task was received to the server because another transistion + // to running or failed is likely to occur immediately after and a single + // update will transfer all past state information. If not other transistion + // has occured up to this limit, we will send to the server. + taskReceivedSyncLimit = 30 * time.Second ) // AllocStateUpdater is used to update the status of an allocation @@ -45,7 +52,12 @@ type AllocRunner struct { restored map[string]struct{} taskLock sync.RWMutex - taskStatusLock sync.RWMutex + // taskReceivedTimer is used to mitigate updates sent to the server because + // we expect that shortly after receiving an alloc it will transistion + // state. We use a timer to send the update if this hasn't happened after a + // reasonable time. + taskReceivedTimer *time.Timer + taskStatusLock sync.RWMutex updateCh chan *structs.Allocation @@ -126,7 +138,8 @@ func (r *AllocRunner) RestoreState() error { if err := tr.RestoreState(); err != nil { r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err) mErr.Errors = append(mErr.Errors, err) - } else { + } else if !r.alloc.TerminalStatus() { + // Only start if the alloc isn't in a terminal status. go tr.Run() } } @@ -323,6 +336,24 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv taskState.State = state r.appendTaskEvent(taskState, event) + // We don't immediately mark ourselves as dirty, since in most cases there + // will immediately be another state transistion. This reduces traffic to + // the server. + if event != nil && event.Type == structs.TaskReceived { + if r.taskReceivedTimer == nil { + r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() { + // Send a dirty signal to sync our state. + r.dirtyCh <- struct{}{} + }) + } + return + } + + // Cancel any existing received state timer. + if r.taskReceivedTimer != nil { + r.taskReceivedTimer.Stop() + } + select { case r.dirtyCh <- struct{}{}: default: diff --git a/client/task_runner.go b/client/task_runner.go index 3d1b6f5dd43a..2a683ed9cc3c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -95,6 +95,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, destroyCh: make(chan struct{}), waitCh: make(chan struct{}), } + + // Set the state to pending. + tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) return tc } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index c757c5893bdb..f31348560e63 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -68,20 +68,24 @@ func TestTaskRunner_SimpleRun(t *testing.T) { t.Fatalf("timeout") } - if len(upd.events) != 2 { - t.Fatalf("should have 2 updates: %#v", upd.events) + if len(upd.events) != 3 { + t.Fatalf("should have 3 updates: %#v", upd.events) } if upd.state != structs.TaskStateDead { t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if upd.events[0].Type != structs.TaskStarted { - t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted) + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) } - if upd.events[1].Type != structs.TaskTerminated { - t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated) + if upd.events[1].Type != structs.TaskStarted { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) + } + + if upd.events[2].Type != structs.TaskTerminated { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated) } } @@ -107,20 +111,24 @@ func TestTaskRunner_Destroy(t *testing.T) { t.Fatalf("timeout") } - if len(upd.events) != 2 { - t.Fatalf("should have 2 updates: %#v", upd.events) + if len(upd.events) != 3 { + t.Fatalf("should have 3 updates: %#v", upd.events) } if upd.state != structs.TaskStateDead { t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if upd.events[0].Type != structs.TaskStarted { - t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted) + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) + } + + if upd.events[1].Type != structs.TaskStarted { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) } - if upd.events[1].Type != structs.TaskKilled { - t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled) + if upd.events[2].Type != structs.TaskKilled { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled) } } diff --git a/command/util_test.go b/command/util_test.go index 7019b8286a97..67d712b62d0e 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -41,14 +41,14 @@ func testServer( func testJob(jobID string) *api.Job { task := api.NewTask("task1", "exec"). Require(&api.Resources{ - MemoryMB: 256, - DiskMB: 20, - CPU: 100, - }). + MemoryMB: 256, + DiskMB: 20, + CPU: 100, + }). SetLogConfig(&api.LogConfig{ - MaxFiles: 1, - MaxFileSizeMB: 2, - }) + MaxFiles: 1, + MaxFileSizeMB: 2, + }) group := api.NewTaskGroup("group1", 1). AddTask(task) diff --git a/nomad/leader.go b/nomad/leader.go index 229a9f2191ef..88a62802f328 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -102,7 +102,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // evaluation broker if numWorkers := len(s.workers); numWorkers > 1 { // Disabling half the workers frees half the CPUs. - for i := 0; i < numWorkers / 2; i++ { + for i := 0; i < numWorkers/2; i++ { s.workers[i].SetPause(true) } } @@ -366,7 +366,7 @@ func (s *Server) revokeLeadership() error { // Unpause our worker if we paused previously if len(s.workers) > 1 { - for i := 0; i < len(s.workers) / 2; i++ { + for i := 0; i < len(s.workers)/2; i++ { s.workers[i].SetPause(false) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e2f733e6d674..93f43f57a4af 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1687,6 +1687,10 @@ const ( // failure in the driver. TaskDriverFailure = "Driver Failure" + // Task Received signals that the task has been pulled by the client at the + // given timestamp. + TaskReceived = "Received" + // Task Started signals that the task was started and its timestamp can be // used to determine the running length of the task. TaskStarted = "Started" diff --git a/scheduler/feasible.go b/scheduler/feasible.go index ffd333099984..cad2e4f486ca 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -131,7 +131,7 @@ func (c *DriverChecker) hasDrivers(option *structs.Node) bool { if err != nil { c.ctx.Logger(). Printf("[WARN] scheduler.DriverChecker: node %v has invalid driver setting %v: %v", - option.ID, driverStr, value) + option.ID, driverStr, value) return false }