Skip to content

Commit

Permalink
modified Job.ScaleStatus to ignore deployments and look directly at the
Browse files Browse the repository at this point in the history
allocations, ignoring canaries
  • Loading branch information
cgbaker committed Apr 27, 2020
1 parent 738f3cb commit d623b4b
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 32 deletions.
52 changes: 38 additions & 14 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,10 +1806,6 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
reply.JobScaleStatus = nil
return nil
}
deployment, err := state.LatestDeploymentByJobID(ws, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}

events, eventsIndex, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID)
if err != nil {
Expand All @@ -1819,6 +1815,13 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
events = make(map[string][]*structs.ScalingEvent)
}

var allocs []*structs.Allocation
var allocsIndex uint64
allocs, err = state.AllocsByJob(ws, job.Namespace, job.ID, false)
if err != nil {
return err
}

// Setup the output
reply.JobScaleStatus = &structs.JobScaleStatus{
JobID: job.ID,
Expand All @@ -1832,24 +1835,45 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
tgScale := &structs.TaskGroupScaleStatus{
Desired: tg.Count,
}
if deployment != nil {
if ds, ok := deployment.TaskGroups[tg.Name]; ok {
tgScale.Placed = ds.PlacedAllocs
tgScale.Healthy = ds.HealthyAllocs
tgScale.Unhealthy = ds.UnhealthyAllocs
}
}
tgScale.Events = events[tg.Name]
reply.JobScaleStatus.TaskGroups[tg.Name] = tgScale
}

maxIndex := job.ModifyIndex
if deployment != nil && deployment.ModifyIndex > maxIndex {
maxIndex = deployment.ModifyIndex
for _, alloc := range allocs {
// TODO: ignore canaries until we figure out what we should do with canaries
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
continue
}
if alloc.TerminalStatus() {
continue
}
tgScale, ok := reply.JobScaleStatus.TaskGroups[alloc.TaskGroup]
if !ok || tgScale == nil {
continue
}
tgScale.Placed++
if alloc.ClientStatus == structs.AllocClientStatusRunning {
tgScale.Running++
}
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.HasHealth() {
if alloc.DeploymentStatus.IsHealthy() {
tgScale.Healthy++
} else if alloc.DeploymentStatus.IsUnhealthy() {
tgScale.Unhealthy++
}
}
if alloc.ModifyIndex > allocsIndex {
allocsIndex = alloc.ModifyIndex
}
}

maxIndex := job.ModifyIndex
if eventsIndex > maxIndex {
maxIndex = eventsIndex
}
if allocsIndex > maxIndex {
maxIndex = allocsIndex
}
reply.Index = maxIndex

// Set the query response
Expand Down
98 changes: 80 additions & 18 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5627,42 +5627,104 @@ func TestJobEndpoint_GetScaleStatus(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()

job := mock.Job()
jobV1 := mock.Job()

// check before job registration
// check before registration
// Fetch the scaling status
get := &structs.JobScaleStatusRequest{
JobID: job.ID,
JobID: jobV1.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: job.Namespace,
Namespace: jobV1.Namespace,
},
}
var resp2 structs.JobScaleStatusResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.ScaleStatus", get, &resp2))
require.Nil(resp2.JobScaleStatus)

// Create the register request
err := state.UpsertJob(1000, job)
require.Nil(err)
// stopped (previous version)
require.NoError(state.UpsertJob(1000, jobV1), "UpsertJob")
a0 := mock.Alloc()
a0.Job = jobV1
a0.Namespace = jobV1.Namespace
a0.JobID = jobV1.ID
a0.ClientStatus = structs.AllocClientStatusComplete
require.NoError(state.UpsertAllocs(1010, []*structs.Allocation{a0}), "UpsertAllocs")

jobV2 := jobV1.Copy()
require.NoError(state.UpsertJob(1100, jobV2), "UpsertJob")
a1 := mock.Alloc()
a1.Job = jobV2
a1.Namespace = jobV2.Namespace
a1.JobID = jobV2.ID
a1.ClientStatus = structs.AllocClientStatusRunning
// healthy
a1.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
a2 := mock.Alloc()
a2.Job = jobV2
a2.Namespace = jobV2.Namespace
a2.JobID = jobV2.ID
a2.ClientStatus = structs.AllocClientStatusPending
// unhealthy
a2.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
a3 := mock.Alloc()
a3.Job = jobV2
a3.Namespace = jobV2.Namespace
a3.JobID = jobV2.ID
a3.ClientStatus = structs.AllocClientStatusRunning
// canary
a3.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
Canary: true,
}
// no health
a4 := mock.Alloc()
a4.Job = jobV2
a4.Namespace = jobV2.Namespace
a4.JobID = jobV2.ID
a4.ClientStatus = structs.AllocClientStatusRunning
// upsert allocations
require.NoError(state.UpsertAllocs(1110, []*structs.Allocation{a1, a2, a3, a4}), "UpsertAllocs")

event := &structs.ScalingEvent{
Time: time.Now().Unix(),
Count: helper.Int64ToPtr(5),
Message: "message",
Error: false,
Meta: map[string]interface{}{
"a": "b",
},
EvalID: nil,
}

require.NoError(state.UpsertScalingEvent(1003, &structs.ScalingEventRequest{
Namespace: jobV2.Namespace,
JobID: jobV2.ID,
TaskGroup: jobV2.TaskGroups[0].Name,
ScalingEvent: event,
}), "UpsertScalingEvent")

// check after job registration
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.ScaleStatus", get, &resp2))
require.NotNil(resp2.JobScaleStatus)

expectedStatus := structs.JobScaleStatus{
JobID: job.ID,
JobCreateIndex: job.CreateIndex,
JobModifyIndex: job.ModifyIndex,
JobStopped: job.Stop,
JobID: jobV2.ID,
JobCreateIndex: jobV2.CreateIndex,
JobModifyIndex: a1.CreateIndex,
JobStopped: jobV2.Stop,
TaskGroups: map[string]*structs.TaskGroupScaleStatus{
job.TaskGroups[0].Name: {
Desired: job.TaskGroups[0].Count,
Placed: 0,
Running: 0,
Healthy: 0,
Unhealthy: 0,
Events: nil,
jobV2.TaskGroups[0].Name: {
Desired: jobV2.TaskGroups[0].Count,
Placed: 3,
Running: 2,
Healthy: 1,
Unhealthy: 1,
Events: []*structs.ScalingEvent{event},
},
},
}
Expand Down

0 comments on commit d623b4b

Please sign in to comment.