diff --git a/api/allocations.go b/api/allocations.go index d622f86b0a5a..80987fb4b451 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -86,6 +86,7 @@ type Allocation struct { EvalID string Name string NodeID string + NodeName string JobID string Job *Job TaskGroup string @@ -149,6 +150,7 @@ type AllocationListStub struct { Name string Namespace string NodeID string + NodeName string JobID string JobType string JobVersion uint64 diff --git a/command/alloc_status.go b/command/alloc_status.go index 44ae2bb0a10f..b656657b8aac 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -233,6 +233,7 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength fmt.Sprintf("Eval ID|%s", limit(alloc.EvalID, uuidLength)), fmt.Sprintf("Name|%s", alloc.Name), fmt.Sprintf("Node ID|%s", limit(alloc.NodeID, uuidLength)), + fmt.Sprintf("Node Name|%s", alloc.NodeName), fmt.Sprintf("Job ID|%s", alloc.JobID), fmt.Sprintf("Job Version|%d", getVersion(alloc.Job)), fmt.Sprintf("Client Status|%s", alloc.ClientStatus), diff --git a/command/alloc_status_test.go b/command/alloc_status_test.go index fbd19942497b..b3c7fef2070e 100644 --- a/command/alloc_status_test.go +++ b/command/alloc_status_test.go @@ -120,9 +120,11 @@ func TestAllocStatusCommand_Run(t *testing.T) { } // get an alloc id allocId1 := "" + nodeName := "" if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { if len(allocs) > 0 { allocId1 = allocs[0].ID + nodeName = allocs[0].NodeName } } if allocId1 == "" { @@ -141,6 +143,9 @@ func TestAllocStatusCommand_Run(t *testing.T) { t.Fatalf("expected to have 'Modified' but saw: %s", out) } + nodeNameRegexpStr := fmt.Sprintf(`\nNode Name\s+= %s\n`, regexp.QuoteMeta(nodeName)) + require.Regexp(t, regexp.MustCompile(nodeNameRegexpStr), out) + ui.OutputWriter.Reset() if code := cmd.Run([]string{"-address=" + url, "-verbose", allocId1}); code != 0 { diff --git a/command/job_status.go b/command/job_status.go index 6ee4cd00abc3..63fd163e2081 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -413,12 +413,13 @@ func formatAllocListStubs(stubs []*api.AllocationListStub, verbose bool, uuidLen allocs := make([]string, len(stubs)+1) if verbose { - allocs[0] = "ID|Eval ID|Node ID|Task Group|Version|Desired|Status|Created|Modified" + allocs[0] = "ID|Eval ID|Node ID|Node Name|Task Group|Version|Desired|Status|Created|Modified" for i, alloc := range stubs { - allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d|%s|%s|%s|%s", + allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%d|%s|%s|%s|%s", limit(alloc.ID, uuidLength), limit(alloc.EvalID, uuidLength), limit(alloc.NodeID, uuidLength), + alloc.NodeName, alloc.TaskGroup, alloc.JobVersion, alloc.DesiredStatus, diff --git a/command/job_status_test.go b/command/job_status_test.go index e0bc40b565b3..745af018e4c9 100644 --- a/command/job_status_test.go +++ b/command/job_status_test.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "regexp" "strings" "testing" "time" @@ -123,6 +124,14 @@ func TestJobStatusCommand_Run(t *testing.T) { if code := cmd.Run([]string{"-address=" + url, "-verbose", "job2_sfx"}); code != 0 { t.Fatalf("expected exit 0, got: %d", code) } + + nodeName := "" + if allocs, _, err := client.Jobs().Allocations("job2_sfx", false, nil); err == nil { + if len(allocs) > 0 { + nodeName = allocs[0].NodeName + } + } + out = ui.OutputWriter.String() if strings.Contains(out, "job1_sfx") || !strings.Contains(out, "job2_sfx") { t.Fatalf("expected only job2_sfx, got: %s", out) @@ -139,6 +148,15 @@ func TestJobStatusCommand_Run(t *testing.T) { if !strings.Contains(out, "Modified") { t.Fatal("should have modified header") } + + // string calculations based on 1-byte chars, not using runes + allocationsTableName := "Allocations\n" + allocationsTableStr := strings.Split(out, allocationsTableName)[1] + nodeNameHeaderStr := "Node Name" + nodeNameHeaderIndex := strings.Index(allocationsTableStr, nodeNameHeaderStr) + nodeNameRegexpStr := fmt.Sprintf(`.*%s.*\n.{%d}%s`, nodeNameHeaderStr, nodeNameHeaderIndex, regexp.QuoteMeta(nodeName)) + require.Regexp(t, regexp.MustCompile(nodeNameRegexpStr), out) + ui.ErrorWriter.Reset() ui.OutputWriter.Reset() diff --git a/nomad/leader.go b/nomad/leader.go index 8e26a56f5142..17c8199962fc 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { return config } - if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) { + if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) { s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion) return nil } @@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { if config != nil { return config } - if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) { + if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) { s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion) return nil } diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index fc9edabdd970..c44e14c31c32 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe } // All servers should be at or above 0.8.0 to apply this operatation - if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) { + if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) { return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion) } @@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe } // All servers should be at or above 0.9.0 to apply this operatation - if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) { + if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) { return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion) } // Apply the update diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f40690d00f20..777b5e8cc6cf 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -15,7 +15,7 @@ import ( "github.com/hashicorp/raft" ) -// planner is used to mange the submitted allocation plans that are waiting +// planner is used to manage the submitted allocation plans that are waiting // to be accessed by the leader type planner struct { *Server @@ -149,52 +149,81 @@ func (p *planner) planApply() { // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - minUpdates := len(result.NodeUpdate) - minUpdates += len(result.NodeAllocation) - // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - Alloc: make([]*structs.Allocation, 0, minUpdates), + Job: plan.Job, }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, EvalID: plan.EvalID, NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)), } - for _, updateList := range result.NodeUpdate { - req.Alloc = append(req.Alloc, updateList...) - } - for _, allocList := range result.NodeAllocation { - req.Alloc = append(req.Alloc, allocList...) - } - for _, preemptions := range result.NodePreemptions { - req.NodePreemptions = append(req.NodePreemptions, preemptions...) - } - - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. + preemptedJobIDs := make(map[structs.NamespacedID]struct{}) now := time.Now().UTC().UnixNano() - for _, alloc := range req.Alloc { - if alloc.CreateTime == 0 { - alloc.CreateTime = now + + if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) { + // Initialize the allocs request using the new optimized log entry format. + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + req.AllocsStopped = make([]*structs.Allocation, 0, len(result.NodeUpdate)) + req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) + + for _, updateList := range result.NodeUpdate { + for _, stoppedAlloc := range updateList { + req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, now)) + } } - alloc.ModifyTime = now - } - // Set modify time for preempted allocs if any - // Also gather jobids to create follow up evals - preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - for _, alloc := range req.NodePreemptions { - alloc.ModifyTime = now - id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} - _, ok := preemptedJobIDs[id] - if !ok { - preemptedJobIDs[id] = struct{}{} + for _, allocList := range result.NodeAllocation { + req.AllocsUpdated = append(req.AllocsUpdated, allocList...) + } + + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.AllocsUpdated, now) + + for _, preemptions := range result.NodePreemptions { + for _, preemptedAlloc := range preemptions { + req.NodePreemptions = append(req.NodePreemptions, normalizePreemptedAlloc(preemptedAlloc, now)) + + // Gather jobids to create follow up evals + appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) + } + } + } else { + // COMPAT 0.11: This branch is deprecated and will only be used to support + // application of older log entries. Expected to be removed in a future version. + + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + minUpdates := len(result.NodeUpdate) + minUpdates += len(result.NodeAllocation) + + // Initialize the allocs request using the older log entry format + req.Alloc = make([]*structs.Allocation, 0, minUpdates) + + for _, updateList := range result.NodeUpdate { + req.Alloc = append(req.Alloc, updateList...) + } + for _, allocList := range result.NodeAllocation { + req.Alloc = append(req.Alloc, allocList...) + } + + for _, preemptions := range result.NodePreemptions { + req.NodePreemptions = append(req.NodePreemptions, preemptions...) + } + + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.Alloc, now) + + // Set modify time for preempted allocs if any + // Also gather jobids to create follow up evals + for _, alloc := range req.NodePreemptions { + alloc.ModifyTime = now + appendNamespacedJobID(preemptedJobIDs, alloc) } } @@ -232,6 +261,39 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap return future, nil } +func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.Allocation { + return &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation, + ModifyTime: now, + } +} + +func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.Allocation { + return &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: stoppedAlloc.DesiredDescription, + ClientStatus: stoppedAlloc.ClientStatus, + ModifyTime: now, + } +} + +func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) { + id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} + if _, ok := jobIDs[id]; !ok { + jobIDs[id] = struct{}{} + } +} + +func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { + for _, alloc := range allocations { + if alloc.CreateTime == 0 { + alloc.CreateTime = timestamp + } + alloc.ModifyTime = timestamp + } +} + // asyncPlanWait is used to apply and respond to a plan async func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { @@ -264,6 +326,17 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) { defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now()) + // Denormalize without the job + err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil) + if err != nil { + return nil, err + } + // Denormalize without the job + err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil) + if err != nil { + return nil, err + } + // Check if the plan exceeds quota overQuota, err := evaluatePlanQuota(snap, plan) if err != nil { @@ -521,15 +594,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri // Remove any preempted allocs if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 { - for _, allocs := range preempted { - remove = append(remove, allocs) - } + remove = append(remove, preempted...) } if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 { - for _, alloc := range updated { - remove = append(remove, alloc) - } + remove = append(remove, updated...) } proposed := structs.RemoveAllocs(existingAlloc, remove) proposed = append(proposed, plan.NodeAllocation[nodeID]...) diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 4dfa7a43b025..f58f95bc1cfe 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -3,8 +3,9 @@ package nomad import ( "reflect" "testing" + "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -62,6 +63,7 @@ func testRegisterJob(t *testing.T, s *Server, j *structs.Job) { } } +// COMPAT 0.11: Tests the older unoptimized code path for applyPlan func TestPlanApply_applyPlan(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -228,6 +230,154 @@ func TestPlanApply_applyPlan(t *testing.T) { assert.Equal(index, evalOut.ModifyIndex) } +func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.Build = "0.9.1" + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Register node + node := mock.Node() + testRegisterNode(t, s1, node) + + // Register a fake deployment + oldDeployment := mock.Deployment() + if err := s1.State().UpsertDeployment(900, oldDeployment); err != nil { + t.Fatalf("UpsertDeployment failed: %v", err) + } + + // Create a deployment + dnew := mock.Deployment() + + // Create a deployment update for the old deployment id + desiredStatus, desiredStatusDescription := "foo", "bar" + updates := []*structs.DeploymentStatusUpdate{ + { + DeploymentID: oldDeployment.ID, + Status: desiredStatus, + StatusDescription: desiredStatusDescription, + }, + } + + // Register allocs, deployment and deployment update + alloc := mock.Alloc() + stoppedAlloc := mock.Alloc() + stoppedAllocDiff := &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: "Desired Description", + ClientStatus: structs.AllocClientStatusLost, + } + preemptedAlloc := mock.Alloc() + preemptedAllocDiff := &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: alloc.ID, + } + s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID)) + s1.State().UpsertAllocs(1100, []*structs.Allocation{stoppedAlloc, preemptedAlloc}) + // Create an eval + eval := mock.Eval() + eval.JobID = alloc.JobID + if err := s1.State().UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + timestampBeforeCommit := time.Now().UTC().UnixNano() + planRes := &structs.PlanResult{ + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: {alloc}, + }, + NodeUpdate: map[string][]*structs.Allocation{ + stoppedAlloc.NodeID: {stoppedAllocDiff}, + }, + NodePreemptions: map[string][]*structs.Allocation{ + preemptedAlloc.NodeID: {preemptedAllocDiff}, + }, + Deployment: dnew, + DeploymentUpdates: updates, + } + + // Snapshot the state + snap, err := s1.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the plan with a deployment + plan := &structs.Plan{ + Job: alloc.Job, + Deployment: dnew, + DeploymentUpdates: updates, + EvalID: eval.ID, + } + + // Apply the plan + future, err := s1.applyPlan(plan, planRes, snap) + assert := assert.New(t) + assert.Nil(err) + + // Verify our optimistic snapshot is updated + ws := memdb.NewWatchSet() + allocOut, err := snap.AllocByID(ws, alloc.ID) + assert.Nil(err) + assert.NotNil(allocOut) + + deploymentOut, err := snap.DeploymentByID(ws, plan.Deployment.ID) + assert.Nil(err) + assert.NotNil(deploymentOut) + + // Check plan does apply cleanly + index, err := planWaitFuture(future) + assert.Nil(err) + assert.NotEqual(0, index) + + // Lookup the allocation + fsmState := s1.fsm.State() + allocOut, err = fsmState.AllocByID(ws, alloc.ID) + assert.Nil(err) + assert.NotNil(allocOut) + assert.True(allocOut.CreateTime > 0) + assert.True(allocOut.ModifyTime > 0) + assert.Equal(allocOut.CreateTime, allocOut.ModifyTime) + + // Verify stopped alloc diff applied cleanly + updatedStoppedAlloc, err := fsmState.AllocByID(ws, stoppedAlloc.ID) + assert.Nil(err) + assert.NotNil(updatedStoppedAlloc) + assert.True(updatedStoppedAlloc.ModifyTime > timestampBeforeCommit) + assert.Equal(updatedStoppedAlloc.DesiredDescription, stoppedAllocDiff.DesiredDescription) + assert.Equal(updatedStoppedAlloc.ClientStatus, stoppedAllocDiff.ClientStatus) + assert.Equal(updatedStoppedAlloc.DesiredStatus, structs.AllocDesiredStatusStop) + + // Verify preempted alloc diff applied cleanly + updatedPreemptedAlloc, err := fsmState.AllocByID(ws, preemptedAlloc.ID) + assert.Nil(err) + assert.NotNil(updatedPreemptedAlloc) + assert.True(updatedPreemptedAlloc.ModifyTime > timestampBeforeCommit) + assert.Equal(updatedPreemptedAlloc.DesiredDescription, + "Preempted by alloc ID "+preemptedAllocDiff.PreemptedByAllocation) + assert.Equal(updatedPreemptedAlloc.DesiredStatus, structs.AllocDesiredStatusEvict) + + // Lookup the new deployment + dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID) + assert.Nil(err) + assert.NotNil(dout) + + // Lookup the updated deployment + dout2, err := fsmState.DeploymentByID(ws, oldDeployment.ID) + assert.Nil(err) + assert.NotNil(dout2) + assert.Equal(desiredStatus, dout2.Status) + assert.Equal(desiredStatusDescription, dout2.StatusDescription) + + // Lookup updated eval + evalOut, err := fsmState.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.Equal(index, evalOut.ModifyIndex) +} + func TestPlanApply_EvalPlan_Simple(t *testing.T) { t.Parallel() state := testStateStore(t) diff --git a/nomad/plan_normalization_test.go b/nomad/plan_normalization_test.go new file mode 100644 index 000000000000..ab5f5b5caa5c --- /dev/null +++ b/nomad/plan_normalization_test.go @@ -0,0 +1,63 @@ +package nomad + +import ( + "bytes" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" + "github.com/ugorji/go/codec" +) + +// Whenever this test is changed, care should be taken to ensure the older msgpack size +// is recalculated when new fields are introduced in ApplyPlanResultsRequest +func TestPlanNormalize(t *testing.T) { + // This size was calculated using the older ApplyPlanResultsRequest format, in which allocations + // didn't use OmitEmpty and only the job was normalized in the stopped and preempted allocs. + // The newer format uses OmitEmpty and uses a minimal set of fields for the diff of the + // stopped and preempted allocs. The file for the older format hasn't been checked in, because + // it's not a good idea to check-in a 20mb file to the git repo. + unoptimizedLogSize := 19460168 + + numUpdatedAllocs := 10000 + numStoppedAllocs := 8000 + numPreemptedAllocs := 2000 + mockAlloc := mock.Alloc() + mockAlloc.Job = nil + + mockUpdatedAllocSlice := make([]*structs.Allocation, numUpdatedAllocs) + for i := 0; i < numUpdatedAllocs; i++ { + mockUpdatedAllocSlice = append(mockUpdatedAllocSlice, mockAlloc) + } + + now := time.Now().UTC().UnixNano() + mockStoppedAllocSlice := make([]*structs.Allocation, numStoppedAllocs) + for i := 0; i < numStoppedAllocs; i++ { + mockStoppedAllocSlice = append(mockStoppedAllocSlice, normalizeStoppedAlloc(mockAlloc, now)) + } + + mockPreemptionAllocSlice := make([]*structs.Allocation, numPreemptedAllocs) + for i := 0; i < numPreemptedAllocs; i++ { + mockPreemptionAllocSlice = append(mockPreemptionAllocSlice, normalizePreemptedAlloc(mockAlloc, now)) + } + + // Create a plan result + applyPlanLogEntry := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + AllocsUpdated: mockUpdatedAllocSlice, + AllocsStopped: mockStoppedAllocSlice, + }, + NodePreemptions: mockPreemptionAllocSlice, + } + + handle := structs.MsgpackHandle + var buf bytes.Buffer + if err := codec.NewEncoder(&buf, handle).Encode(applyPlanLogEntry); err != nil { + t.Fatalf("Encoding failed: %v", err) + } + + optimizedLogSize := buf.Len() + assert.True(t, float64(optimizedLogSize)/float64(unoptimizedLogSize) < 0.62) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 84201cbaaf0e..24b90a2f45b4 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -170,6 +170,21 @@ RUN_QUERY: // UpsertPlanResults is used to upsert the results of a plan. func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error { + snapshot, err := s.Snapshot() + if err != nil { + return err + } + + err = snapshot.DenormalizeAllocationsSlice(results.AllocsStopped, results.Job) + if err != nil { + return err + } + + err = snapshot.DenormalizeAllocationsSlice(results.NodePreemptions, results.Job) + if err != nil { + return err + } + txn := s.db.Txn(true) defer txn.Abort() @@ -185,34 +200,6 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - structs.DenormalizeAllocationJobs(results.Job, results.Alloc) - - // COMPAT(0.11): Remove in 0.11 - // Calculate the total resources of allocations. It is pulled out in the - // payload to avoid encoding something that can be computed, but should be - // denormalized prior to being inserted into MemDB. - for _, alloc := range results.Alloc { - if alloc.Resources != nil { - continue - } - - alloc.Resources = new(structs.Resources) - for _, task := range alloc.TaskResources { - alloc.Resources.Add(task) - } - - // Add the shared resources - alloc.Resources.Add(alloc.SharedResources) - } - - // Upsert the allocations - if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil { - return err - } - // COMPAT: Nomad versions before 0.7.1 did not include the eval ID when // applying the plan. Thus while we are upgrading, we ignore updating the // modify index of evaluations from older plans. @@ -223,35 +210,33 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } - // Prepare preempted allocs in the plan results for update - var preemptedAllocs []*structs.Allocation - for _, preemptedAlloc := range results.NodePreemptions { - // Look for existing alloc - existing, err := txn.First("allocs", "id", preemptedAlloc.ID) - if err != nil { - return fmt.Errorf("alloc lookup failed: %v", err) - } + noOfAllocs := len(results.NodePreemptions) - // Nothing to do if this does not exist - if existing == nil { - continue - } - exist := existing.(*structs.Allocation) + if len(results.Alloc) > 0 { + // COMPAT 0.11: This branch will be removed, when Alloc is removed + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.Alloc, results.Job) + noOfAllocs += len(results.Alloc) + } else { + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.AllocsUpdated, results.Job) + noOfAllocs += len(results.AllocsStopped) + len(results.AllocsUpdated) + } - // Copy everything from the existing allocation - copyAlloc := exist.Copy() + allocsToUpsert := make([]*structs.Allocation, 0, noOfAllocs) - // Only update the fields set by the scheduler - copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus - copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation - copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription - copyAlloc.ModifyTime = preemptedAlloc.ModifyTime - preemptedAllocs = append(preemptedAllocs, copyAlloc) + // COMPAT 0.11: This append should be removed when Alloc is removed + allocsToUpsert = append(allocsToUpsert, results.Alloc...) - } + allocsToUpsert = append(allocsToUpsert, results.AllocsStopped...) + allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) + allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...) - // Upsert the preempted allocations - if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil { + if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil { return err } @@ -266,6 +251,28 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return nil } +func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) { + structs.DenormalizeAllocationJobs(job, allocs) + + // COMPAT(0.11): Remove in 0.11 + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + for _, alloc := range allocs { + if alloc.Resources != nil { + continue + } + + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } + + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) + } +} + // upsertDeploymentUpdates updates the deployments given the passed status // updates. func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { @@ -4100,6 +4107,67 @@ type StateSnapshot struct { StateStore } +// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the +// Allocation for each of the Allocation diffs and merges the updated attributes with +// the existing Allocation, and attaches the Job provided +func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error { + for _, allocDiffs := range nodeAllocations { + if err := s.DenormalizeAllocationsSlice(allocDiffs, job); err != nil { + return err + } + } + return nil +} + +// DenormalizeAllocationsSlice queries the Allocation for each of the Allocation diffs and merges +// the updated attributes with the existing Allocation, and attaches the Job provided +func (s *StateSnapshot) DenormalizeAllocationsSlice(allocDiffs []*structs.Allocation, job *structs.Job) error { + // Output index for denormalized Allocations + j := 0 + + for _, allocDiff := range allocDiffs { + alloc, err := s.AllocByID(nil, allocDiff.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + if alloc == nil { + continue + } + + // Merge the updates to the Allocation + allocCopy := alloc.CopySkipJob() + allocCopy.Job = job + + if allocDiff.PreemptedByAllocation != "" { + // If alloc is a preemption + allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation + allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation) + allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict + } else { + // If alloc is a stopped alloc + allocCopy.DesiredDescription = allocDiff.DesiredDescription + allocCopy.DesiredStatus = structs.AllocDesiredStatusStop + if allocDiff.ClientStatus != "" { + allocCopy.ClientStatus = allocDiff.ClientStatus + } + } + if allocDiff.ModifyTime != 0 { + allocCopy.ModifyTime = allocDiff.ModifyTime + } + + // Update the allocDiff in the slice to equal the denormalized alloc + allocDiffs[j] = allocCopy + j++ + } + // Retain only the denormalized Allocations in the slice + allocDiffs = allocDiffs[:j] + return nil +} + +func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string { + return fmt.Sprintf("Preempted by alloc ID %v", PreemptedByAllocID) +} + // StateRestore is used to optimize the performance when // restoring state by only using a single large transaction // instead of thousands of sub transactions diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 3cf3b9776068..0e0539b48d79 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -88,6 +88,7 @@ func TestStateStore_Blocking_MinQuery(t *testing.T) { } } +// COMPAT 0.11: Uses AllocUpdateRequest.Alloc // This test checks that: // 1) The job is denormalized // 2) Allocations are created @@ -140,6 +141,94 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing assert.EqualValues(1000, evalOut.ModifyIndex) } +// This test checks that: +// 1) The job is denormalized +// 2) Allocations are denormalized and updated with the diff +func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + job := alloc.Job + alloc.Job = nil + + stoppedAlloc := mock.Alloc() + stoppedAlloc.Job = job + stoppedAllocDiff := &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: "desired desc", + ClientStatus: structs.AllocClientStatusLost, + } + preemptedAlloc := mock.Alloc() + preemptedAlloc.Job = job + preemptedAllocDiff := &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: alloc.ID, + } + + if err := state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := state.UpsertJob(999, job); err != nil { + t.Fatalf("err: %v", err) + } + + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + if err := state.UpsertEvals(1, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + AllocsUpdated: []*structs.Allocation{alloc}, + AllocsStopped: []*structs.Allocation{stoppedAllocDiff}, + Job: job, + }, + EvalID: eval.ID, + NodePreemptions: []*structs.Allocation{preemptedAllocDiff}, + } + assert := assert.New(t) + planModifyIndex := uint64(1000) + err := state.UpsertPlanResults(planModifyIndex, &res) + assert.Nil(err) + + ws := memdb.NewWatchSet() + out, err := state.AllocByID(ws, alloc.ID) + assert.Nil(err) + assert.Equal(alloc, out) + + updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID) + assert.Nil(err) + assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription) + assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus) + assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus) + assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) + assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex) + + updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID) + assert.Nil(err) + assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus) + assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation) + assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) + assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex) + + index, err := state.Index("allocs") + assert.Nil(err) + assert.EqualValues(planModifyIndex, index) + + if watchFired(ws) { + t.Fatalf("bad") + } + + evalOut, err := state.EvalByID(ws, eval.ID) + assert.Nil(err) + assert.NotNil(evalOut) + assert.EqualValues(planModifyIndex, evalOut.ModifyIndex) +} + // This test checks that the deployment is created and allocations count towards // the deployment func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { @@ -271,11 +360,9 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { require.NoError(err) minimalPreemptedAlloc := &structs.Allocation{ - ID: preemptedAlloc.ID, - Namespace: preemptedAlloc.Namespace, - DesiredStatus: structs.AllocDesiredStatusEvict, - ModifyTime: time.Now().Unix(), - DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID), + ID: preemptedAlloc.ID, + PreemptedByAllocation: alloc.ID, + ModifyTime: time.Now().Unix(), } // Create eval for preempted job @@ -316,7 +403,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { preempted, err := state.AllocByID(ws, preemptedAlloc.ID) require.NoError(err) require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict) - require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID)) + require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by alloc ID %v", alloc.ID)) // Verify eval for preempted job preemptedJobEval, err := state.EvalByID(ws, eval2.ID) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 88b2db3727c5..76056cff1293 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -28,8 +28,8 @@ import ( "github.com/gorhill/cronexpr" "github.com/hashicorp/consul/api" hcodec "github.com/hashicorp/go-msgpack/codec" - multierror "github.com/hashicorp/go-multierror" - version "github.com/hashicorp/go-version" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" @@ -654,9 +654,9 @@ type ApplyPlanResultsRequest struct { // the evaluation itself being updated. EvalID string - // NodePreemptions is a slice of allocations from other lower priority jobs + // NodePreemptions is a slice of allocation diffs from other lower priority jobs // that are preempted. Preempted allocations are marked as evicted. - NodePreemptions []*Allocation + NodePreemptions []*AllocationDiff // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan @@ -667,9 +667,18 @@ type ApplyPlanResultsRequest struct { // to cause evictions or to assign new allocations. Both can be done // within a single transaction type AllocUpdateRequest struct { + // COMPAT 0.11 // Alloc is the list of new allocations to assign + // Deprecated: Replaced with two separate slices, one containing stopped allocations + // and another containing updated allocations Alloc []*Allocation + // Allocations to stop. Contains only the diff, not the entire allocation + AllocsStopped []*AllocationDiff + + // New or updated allocations + AllocsUpdated []*Allocation + // Evals is the list of new evaluations to create // Evals are valid only when used in the Raft RPC Evals []*Evaluation @@ -7139,6 +7148,9 @@ const ( // Allocation is used to allocate the placement of a task group to a node. type Allocation struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // ID of the allocation (UUID) ID string @@ -7154,6 +7166,9 @@ type Allocation struct { // NodeID is the node this is being placed on NodeID string + // NodeName is the name of the node this is being placed on. + NodeName string + // Job is the parent job of the task group being allocated. // This is copied at allocation time to avoid issues if the job // definition is updated. @@ -7250,6 +7265,10 @@ type Allocation struct { ModifyTime int64 } +// AllocationDiff is a type alias for Allocation used to indicate that a diff is +// and not the entire allocation +type AllocationDiff = Allocation + // Index returns the index of the allocation. If the allocation is from a task // group with count greater than 1, there will be multiple allocations for it. func (a *Allocation) Index() uint { @@ -7264,11 +7283,12 @@ func (a *Allocation) Index() uint { return uint(num) } +// Copy provides a copy of the allocation and deep copies the job func (a *Allocation) Copy() *Allocation { return a.copyImpl(true) } -// Copy provides a copy of the allocation but doesn't deep copy the job +// CopySkipJob provides a copy of the allocation but doesn't deep copy the job func (a *Allocation) CopySkipJob() *Allocation { return a.copyImpl(false) } @@ -7615,6 +7635,7 @@ func (a *Allocation) Stub() *AllocListStub { Name: a.Name, Namespace: a.Namespace, NodeID: a.NodeID, + NodeName: a.NodeName, JobID: a.JobID, JobType: a.Job.Type, JobVersion: a.Job.Version, @@ -7642,6 +7663,7 @@ type AllocListStub struct { Name string Namespace string NodeID string + NodeName string JobID string JobType string JobVersion uint64 @@ -7998,6 +8020,9 @@ const ( // potentially taking action (allocation of work) or doing nothing if the state // of the world does not require it. type Evaluation struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // ID is a randomly generated UUID used for this evaluation. This // is assigned upon the creation of the evaluation. ID string @@ -8265,6 +8290,9 @@ func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation { // are submitted to the leader which verifies that resources have // not been overcommitted before admitting the plan. type Plan struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // EvalID is the evaluation ID this plan is associated with EvalID string @@ -8316,9 +8344,9 @@ type Plan struct { NodePreemptions map[string][]*Allocation } -// AppendUpdate marks the allocation for eviction. The clientStatus of the +// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the // allocation may be optionally set by passing in a non-empty value. -func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) { +func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) { newAlloc := new(Allocation) *newAlloc = *alloc @@ -8334,7 +8362,7 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien // Strip the resources as it can be rebuilt. newAlloc.Resources = nil - newAlloc.DesiredStatus = desiredStatus + newAlloc.DesiredStatus = AllocDesiredStatusStop newAlloc.DesiredDescription = desiredDesc if clientStatus != "" { @@ -8348,12 +8376,12 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien // AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan. // To minimize the size of the plan, this only sets a minimal set of fields in the allocation -func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) { +func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string) { newAlloc := &Allocation{} newAlloc.ID = alloc.ID newAlloc.JobID = alloc.JobID newAlloc.Namespace = alloc.Namespace - newAlloc.DesiredStatus = desiredStatus + newAlloc.DesiredStatus = AllocDesiredStatusEvict newAlloc.PreemptedByAllocation = preemptingAllocID desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID) @@ -8406,6 +8434,27 @@ func (p *Plan) IsNoOp() bool { len(p.DeploymentUpdates) == 0 } +func (p *Plan) NormalizeAllocations() { + for _, allocs := range p.NodeUpdate { + for i, alloc := range allocs { + allocs[i] = &Allocation{ + ID: alloc.ID, + DesiredDescription: alloc.DesiredDescription, + ClientStatus: alloc.ClientStatus, + } + } + } + + for _, allocs := range p.NodePreemptions { + for i, alloc := range allocs { + allocs[i] = &Allocation{ + ID: alloc.ID, + PreemptedByAllocation: alloc.PreemptedByAllocation, + } + } + } +} + // PlanResult is the result of a plan submitted to the leader. type PlanResult struct { // NodeUpdate contains all the updates that were committed. diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 90b33b19827a..e574f51dc582 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/hashicorp/consul/api" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -2842,6 +2842,100 @@ func TestTaskArtifact_Validate_Checksum(t *testing.T) { } } +func TestPlan_NormalizeAllocations(t *testing.T) { + t.Parallel() + plan := &Plan{ + NodeUpdate: make(map[string][]*Allocation), + NodePreemptions: make(map[string][]*Allocation), + } + stoppedAlloc := MockAlloc() + desiredDesc := "Desired desc" + plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost) + preemptedAlloc := MockAlloc() + preemptingAllocID := uuid.Generate() + plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID) + + plan.NormalizeAllocations() + + actualStoppedAlloc := plan.NodeUpdate[stoppedAlloc.NodeID][0] + expectedStoppedAlloc := &Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: desiredDesc, + ClientStatus: AllocClientStatusLost, + } + assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc) + actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0] + expectedPreemptedAlloc := &Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptingAllocID, + } + assert.Equal(t, expectedPreemptedAlloc, actualPreemptedAlloc) +} + +func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) { + t.Parallel() + plan := &Plan{ + NodeUpdate: make(map[string][]*Allocation), + } + alloc := MockAlloc() + desiredDesc := "Desired desc" + + plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost) + + appendedAlloc := plan.NodeUpdate[alloc.NodeID][0] + expectedAlloc := new(Allocation) + *expectedAlloc = *alloc + expectedAlloc.DesiredDescription = desiredDesc + expectedAlloc.DesiredStatus = AllocDesiredStatusStop + expectedAlloc.ClientStatus = AllocClientStatusLost + expectedAlloc.Job = nil + assert.Equal(t, expectedAlloc, appendedAlloc) + assert.Equal(t, alloc.Job, plan.Job) +} + +func TestPlan_AppendPreemptedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) { + t.Parallel() + plan := &Plan{ + NodePreemptions: make(map[string][]*Allocation), + } + alloc := MockAlloc() + preemptingAllocID := uuid.Generate() + + plan.AppendPreemptedAlloc(alloc, preemptingAllocID) + + appendedAlloc := plan.NodePreemptions[alloc.NodeID][0] + expectedAlloc := &Allocation{ + ID: alloc.ID, + PreemptedByAllocation: preemptingAllocID, + JobID: alloc.JobID, + Namespace: alloc.Namespace, + DesiredStatus: AllocDesiredStatusEvict, + DesiredDescription: fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID), + AllocatedResources: alloc.AllocatedResources, + TaskResources: alloc.TaskResources, + SharedResources: alloc.SharedResources, + } + assert.Equal(t, expectedAlloc, appendedAlloc) +} + +func TestAllocation_MsgPackTags(t *testing.T) { + t.Parallel() + planType := reflect.TypeOf(Allocation{}) + + msgPackTags, _ := planType.FieldByName("_struct") + + assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`)) +} + +func TestEvaluation_MsgPackTags(t *testing.T) { + t.Parallel() + planType := reflect.TypeOf(Evaluation{}) + + msgPackTags, _ := planType.FieldByName("_struct") + + assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`)) +} + func TestAllocation_Terminated(t *testing.T) { type desiredState struct { ClientStatus string diff --git a/nomad/util.go b/nomad/util.go index 44b7119242ba..74dc41e04ecd 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -14,6 +14,11 @@ import ( "github.com/hashicorp/serf/serf" ) +// MinVersionPlanNormalization is the minimum version to support the +// normalization of Plan in SubmitPlan, and the denormalization raft log entry committed +// in ApplyPlanResultsRequest +var MinVersionPlanNormalization = version.Must(version.NewVersion("0.9.1")) + // ensurePath is used to make sure a path exists func ensurePath(path string, dir bool) error { if !dir { @@ -145,9 +150,9 @@ func isNomadServer(m serf.Member) (bool, *serverParts) { // ServersMeetMinimumVersion returns whether the given alive servers are at least on the // given Nomad version -func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool { +func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool { for _, member := range members { - if valid, parts := isNomadServer(member); valid && parts.Status == serf.StatusAlive { + if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) { // Check if the versions match - version.LessThan will return true for // 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments()) diff --git a/nomad/util_test.go b/nomad/util_test.go index 4fe670cdc71f..f216d5e4243a 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -86,23 +86,8 @@ func TestIsNomadServer(t *testing.T) { } } -func TestServersMeetMinimumVersion(t *testing.T) { +func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) { t.Parallel() - makeMember := func(version string) serf.Member { - return serf.Member{ - Name: "foo", - Addr: net.IP([]byte{127, 0, 0, 1}), - Tags: map[string]string{ - "role": "nomad", - "region": "aws", - "dc": "east-aws", - "port": "10000", - "build": version, - "vsn": "1", - }, - Status: serf.StatusAlive, - } - } cases := []struct { members []serf.Member @@ -112,7 +97,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server, meets reqs { members: []serf.Member{ - makeMember("0.7.5"), + makeMember("0.7.5", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -120,7 +105,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server in dev, meets reqs { members: []serf.Member{ - makeMember("0.8.5-dev"), + makeMember("0.8.5-dev", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -128,7 +113,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server with meta, meets reqs { members: []serf.Member{ - makeMember("0.7.5+ent"), + makeMember("0.7.5+ent", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -136,16 +121,17 @@ func TestServersMeetMinimumVersion(t *testing.T) { // One server, doesn't meet reqs { members: []serf.Member{ - makeMember("0.7.5"), + makeMember("0.7.5", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.8.0")), expected: false, }, - // Multiple servers, meets req version + // Multiple servers, meets req version, includes failed that doesn't meet req { members: []serf.Member{ - makeMember("0.7.5"), - makeMember("0.8.0"), + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), + makeMember("0.7.0", serf.StatusFailed), }, ver: version.Must(version.NewVersion("0.7.5")), expected: true, @@ -153,8 +139,8 @@ func TestServersMeetMinimumVersion(t *testing.T) { // Multiple servers, doesn't meet req version { members: []serf.Member{ - makeMember("0.7.5"), - makeMember("0.8.0"), + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), }, ver: version.Must(version.NewVersion("0.8.0")), expected: false, @@ -162,13 +148,67 @@ func TestServersMeetMinimumVersion(t *testing.T) { } for _, tc := range cases { - result := ServersMeetMinimumVersion(tc.members, tc.ver) + result := ServersMeetMinimumVersion(tc.members, tc.ver, false) if result != tc.expected { t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc) } } } +func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) { + t.Parallel() + + cases := []struct { + members []serf.Member + ver *version.Version + expected bool + }{ + // Multiple servers, meets req version + { + members: []serf.Member{ + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), + makeMember("0.7.5", serf.StatusFailed), + }, + ver: version.Must(version.NewVersion("0.7.5")), + expected: true, + }, + // Multiple servers, doesn't meet req version + { + members: []serf.Member{ + makeMember("0.7.5", serf.StatusAlive), + makeMember("0.8.0", serf.StatusAlive), + makeMember("0.7.0", serf.StatusFailed), + }, + ver: version.Must(version.NewVersion("0.7.5")), + expected: false, + }, + } + + for _, tc := range cases { + result := ServersMeetMinimumVersion(tc.members, tc.ver, true) + if result != tc.expected { + t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc) + } + } +} + +func makeMember(version string, status serf.MemberStatus) serf.Member { + return serf.Member{ + Name: "foo", + Addr: net.IP([]byte{127, 0, 0, 1}), + Tags: map[string]string{ + "role": "nomad", + "region": "aws", + "dc": "east-aws", + "port": "10000", + "build": version, + "vsn": "1", + }, + Status: status, + } +} + func TestShuffleStrings(t *testing.T) { t.Parallel() // Generate input diff --git a/nomad/worker.go b/nomad/worker.go index f0aefb62d547..a50c58e68dd4 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -310,6 +310,12 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Normalize stopped and preempted allocs before RPC + normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true) + if normalizePlan { + plan.NormalizeAllocations() + } + // Setup the request req := structs.PlanRequest{ Plan: plan, diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 2f3e31728310..a03e739bfc37 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -8,13 +8,14 @@ import ( "time" log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" ) type NoopScheduler struct { @@ -390,6 +391,57 @@ func TestWorker_SubmitPlan(t *testing.T) { } } +func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + c.Build = "0.9.1" + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Register node + node := mock.Node() + testRegisterNode(t, s1, node) + + job := mock.Job() + eval1 := mock.Eval() + eval1.JobID = job.ID + s1.fsm.State().UpsertJob(0, job) + s1.fsm.State().UpsertEvals(0, []*structs.Evaluation{eval1}) + + stoppedAlloc := mock.Alloc() + preemptedAlloc := mock.Alloc() + s1.fsm.State().UpsertAllocs(5, []*structs.Allocation{stoppedAlloc, preemptedAlloc}) + + // Create an allocation plan + plan := &structs.Plan{ + Job: job, + EvalID: eval1.ID, + NodeUpdate: make(map[string][]*structs.Allocation), + NodePreemptions: make(map[string][]*structs.Allocation), + } + desiredDescription := "desired desc" + plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost) + preemptingAllocID := uuid.Generate() + plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID) + + // Attempt to submit a plan + w := &Worker{srv: s1, logger: s1.logger} + w.SubmitPlan(plan) + + assert.Equal(t, &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptingAllocID, + }, plan.NodePreemptions[preemptedAlloc.NodeID][0]) + assert.Equal(t, &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: desiredDescription, + ClientStatus: structs.AllocClientStatusLost, + }, plan.NodeUpdate[stoppedAlloc.NodeID][0]) +} + func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 03b35b943ae5..bcd8dc81ac5c 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -5,8 +5,8 @@ import ( "time" log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -365,7 +365,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Handle the stop for _, stop := range results.stop { - s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus) + s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus) } // Handle the in-place updates @@ -463,7 +463,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() prevAllocation := missing.PreviousAllocation() if stopPrevAlloc { - s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "") } // Compute penalty nodes for rescheduled allocs @@ -495,12 +495,12 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul TaskGroup: tg.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, + NodeName: option.Node.Name, DeploymentID: deploymentID, TaskResources: resources.OldTaskResources(), AllocatedResources: resources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, - SharedResources: &structs.Resources{ DiskMB: tg.EphemeralDisk.SizeMB, }, diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index e3f4015fbd13..dc0966341ac8 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -4,7 +4,7 @@ import ( "fmt" log "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -210,18 +210,18 @@ func (s *SystemScheduler) computeJobAllocs() error { // Add all the allocs to stop for _, e := range diff.stop { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "") + s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "") } // Add all the allocs to migrate for _, e := range diff.migrate { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted, "") + s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "") } // Lost allocations should be transitioned to desired status stop and client // status lost. for _, e := range diff.lost { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost) + s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost) } // Attempt to do the upgrades in place @@ -331,11 +331,11 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { TaskGroup: missing.TaskGroup.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, + NodeName: option.Node.Name, TaskResources: resources.OldTaskResources(), AllocatedResources: resources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, - SharedResources: &structs.Resources{ DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB, }, @@ -351,7 +351,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { if option.PreemptedAllocs != nil { var preemptedAllocIDs []string for _, stop := range option.PreemptedAllocs { - s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID) + s.plan.AppendPreemptedAlloc(stop, alloc.ID) preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) if s.eval.AnnotatePlan && s.plan.Annotations != nil { diff --git a/scheduler/testing.go b/scheduler/testing.go index f0501010246b..876ff101db5c 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -7,7 +7,7 @@ import ( testing "github.com/mitchellh/go-testing-interface" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -53,6 +53,8 @@ type Harness struct { nextIndex uint64 nextIndexLock sync.Mutex + + optimizePlan bool } // NewHarness is used to make a new testing harness @@ -101,22 +103,17 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er result.AllocIndex = index // Flatten evicts and allocs - var allocs []*structs.Allocation + now := time.Now().UTC().UnixNano() + allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate)) for _, updateList := range plan.NodeUpdate { - allocs = append(allocs, updateList...) - } - for _, allocList := range plan.NodeAllocation { - allocs = append(allocs, allocList...) + allocsStopped = append(allocsStopped, updateList...) } - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. - now := time.Now().UTC().UnixNano() - for _, alloc := range allocs { - if alloc.CreateTime == 0 { - alloc.CreateTime = now - } + allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation)) + for _, allocList := range plan.NodeAllocation { + allocsUpdated = append(allocsUpdated, allocList...) } + updateCreateTimestamp(allocsUpdated, now) // Set modify time for preempted allocs and flatten them var preemptedAllocs []*structs.Allocation @@ -130,8 +127,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - Alloc: allocs, + Job: plan.Job, }, Deployment: plan.Deployment, DeploymentUpdates: plan.DeploymentUpdates, @@ -139,11 +135,39 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er NodePreemptions: preemptedAllocs, } + if h.optimizePlan { + req.AllocsStopped = allocsStopped + req.AllocsUpdated = allocsUpdated + } else { + // COMPAT 0.11: Handles unoptimized log format + var allocs []*structs.Allocation + allocs = append(allocs, allocsStopped...) + allocs = append(allocs, allocsUpdated...) + updateCreateTimestamp(allocs, now) + req.Alloc = allocs + } + // Apply the full plan err := h.State.UpsertPlanResults(index, &req) return result, nil, err } +// OptimizePlan is a function used only for Harness to help set the optimzePlan field, +// since Harness doesn't have access to a Server object +func (h *Harness) OptimizePlan(optimize bool) { + h.optimizePlan = optimize +} + +func updateCreateTimestamp(allocations []*structs.Allocation, now int64) { + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + for _, alloc := range allocations { + if alloc.CreateTime == 0 { + alloc.CreateTime = now + } + } +} + func (h *Harness) UpdateEval(eval *structs.Evaluation) error { // Ensure sequential plan application h.planLock.Lock() diff --git a/scheduler/util.go b/scheduler/util.go index c9700538167d..becae99601bd 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -507,8 +507,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // the current allocation is discounted when checking for feasibility. // Otherwise we would be trying to fit the tasks current resources and // updated resources. After select is called we can remove the evict. - ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, - allocInPlace, "") + ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "") // Attempt to match the task group option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions @@ -573,7 +572,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri n := len(allocs) for i := 0; i < n && i < *limit; i++ { a := allocs[i] - ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, "") + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "") diff.place = append(diff.place, a) } if n <= *limit { @@ -734,7 +733,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc if alloc.DesiredStatus == structs.AllocDesiredStatusStop && (alloc.ClientStatus == structs.AllocClientStatusRunning || alloc.ClientStatus == structs.AllocClientStatusPending) { - plan.AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost) + plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost) } } } @@ -784,7 +783,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy // the current allocation is discounted when checking for feasibility. // Otherwise we would be trying to fit the tasks current resources and // updated resources. After select is called we can remove the evict. - ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "") + ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "") // Attempt to match the task group option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions