Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timestamps to evals #5881

Merged
merged 10 commits into from
Aug 7, 2019
2 changes: 2 additions & 0 deletions api/evaluations.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type Evaluation struct {
SnapshotIndex uint64
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
ModifyTime int64
}

// EvalIndexSort is a wrapper to sort evaluations by CreateIndex.
Expand Down
2 changes: 2 additions & 0 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
}

transitionReq := &structs.AllocUpdateDesiredTransitionRequest{
Expand Down
2 changes: 2 additions & 0 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ func (w *deploymentWatcher) getEval() *structs.Evaluation {
JobID: w.j.ID,
DeploymentID: w.deploymentID,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions nomad/drainer/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: job,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
})
}

Expand Down
13 changes: 13 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
JobID: args.Job.ID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
Expand Down Expand Up @@ -580,6 +582,8 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
JobID: job.ID,
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}

// Create a AllocUpdateDesiredTransitionRequest request with the eval and any forced rescheduled allocs
Expand Down Expand Up @@ -659,6 +663,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
Expand Down Expand Up @@ -746,6 +752,8 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: jobNS.ID,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
args.Evals = append(args.Evals, eval)
}
Expand Down Expand Up @@ -1205,6 +1213,9 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
JobModifyIndex: updatedIndex,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
// Timestamps are added for consistency but this eval is never persisted
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}

snap.UpsertEvals(100, []*structs.Evaluation{eval})
Expand Down Expand Up @@ -1424,6 +1435,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
JobID: dispatchJob.ID,
JobModifyIndex: jobCreateIndex,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
Expand Down
36 changes: 36 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func TestJobEndpoint_Register(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("eval CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("eval ModifyTime is unset: %#v", eval)
}
}

func TestJobEndpoint_Register_ACL(t *testing.T) {
Expand Down Expand Up @@ -302,6 +308,12 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("eval CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("eval ModifyTime is unset: %#v", eval)
}

if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1500,6 +1512,12 @@ func TestJobEndpoint_Evaluate(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("eval CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("eval ModifyTime is unset: %#v", eval)
}
}

func TestJobEndpoint_ForceRescheduleEvaluate(t *testing.T) {
Expand Down Expand Up @@ -1569,6 +1587,8 @@ func TestJobEndpoint_ForceRescheduleEvaluate(t *testing.T) {
require.Equal(eval.JobID, job.ID)
require.Equal(eval.JobModifyIndex, resp.JobModifyIndex)
require.Equal(eval.Status, structs.EvalStatusPending)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)

// Lookup the alloc, verify DesiredTransition ForceReschedule
alloc, err = state.AllocByID(ws, alloc.ID)
Expand Down Expand Up @@ -1647,6 +1667,8 @@ func TestJobEndpoint_Evaluate_ACL(t *testing.T) {
require.Equal(eval.JobID, job.ID)
require.Equal(eval.JobModifyIndex, validResp2.JobModifyIndex)
require.Equal(eval.Status, structs.EvalStatusPending)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)
}

func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
Expand Down Expand Up @@ -1790,6 +1812,8 @@ func TestJobEndpoint_Deregister(t *testing.T) {
require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy)
require.Equal(job.ID, eval.JobID)
require.Equal(structs.EvalStatusPending, eval.Status)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)

// Deregister and purge
dereg2 := &structs.JobDeregisterRequest{
Expand Down Expand Up @@ -1820,6 +1844,8 @@ func TestJobEndpoint_Deregister(t *testing.T) {
require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy)
require.Equal(job.ID, eval.JobID)
require.Equal(structs.EvalStatusPending, eval.Status)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)
}

func TestJobEndpoint_Deregister_ACL(t *testing.T) {
Expand Down Expand Up @@ -1899,6 +1925,8 @@ func TestJobEndpoint_Deregister_ACL(t *testing.T) {
require.Equal(eval.JobID, job.ID)
require.Equal(eval.JobModifyIndex, validResp2.JobModifyIndex)
require.Equal(eval.Status, structs.EvalStatusPending)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)
}

func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
Expand Down Expand Up @@ -1959,6 +1987,12 @@ func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("eval CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("eval ModifyTime is unset: %#v", eval)
}
}

func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
Expand Down Expand Up @@ -2165,6 +2199,8 @@ func TestJobEndpoint_BatchDeregister(t *testing.T) {
require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy)
require.Equal(expectedJob.ID, eval.JobID)
require.Equal(structs.EvalStatusPending, eval.Status)
require.NotZero(eval.CreateTime)
require.NotZero(eval.ModifyTime)
}
}

Expand Down
3 changes: 3 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,10 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
// due to the fairly large backoff.
followupEvalWait := s.config.EvalFailedFollowupBaselineDelay +
time.Duration(rand.Int63n(int64(s.config.EvalFailedFollowupDelayRange)))

followupEval := eval.CreateFailedFollowUpEval(followupEvalWait)
updateEval.NextEval = followupEval.ID
updateEval.ModifyTime = time.Now().UTC().UnixNano()
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved

// Update via Raft
req := structs.EvalUpdateRequest{
Expand Down Expand Up @@ -570,6 +572,7 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
newEval := dup.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("existing blocked evaluation exists for job %q", newEval.JobID)
newEval.ModifyTime = time.Now().UTC().UnixNano()
cancel[i] = newEval
}

Expand Down
14 changes: 8 additions & 6 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,14 @@ func PeriodicJob() *structs.Job {

func Eval() *structs.Evaluation {
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
Priority: 50,
Type: structs.JobTypeService,
JobID: uuid.Generate(),
Status: structs.EvalStatusPending,
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
Priority: 50,
Type: structs.JobTypeService,
JobID: uuid.Generate(),
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
return eval
}
Expand Down
8 changes: 8 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
evals = append(evals, eval)
}
Expand Down Expand Up @@ -1095,6 +1097,8 @@ func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Alloc
}
_, exists := evalsByJobId[namespacedID]
if !exists {
eval.CreateTime = time.Now().UTC().UnixNano()
eval.ModifyTime = time.Now().UTC().UnixNano()
trimmedEvals = append(trimmedEvals, eval)
evalsByJobId[namespacedID] = struct{}{}
}
Expand Down Expand Up @@ -1261,6 +1265,8 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
NodeID: nodeID,
NodeModifyIndex: nodeIndex,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
evals = append(evals, eval)
evalIDs = append(evalIDs, eval.ID)
Expand All @@ -1285,6 +1291,8 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
NodeID: nodeID,
NodeModifyIndex: nodeIndex,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
evals = append(evals, eval)
evalIDs = append(evalIDs, eval.ID)
Expand Down
12 changes: 12 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2351,6 +2351,12 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
if eval.JobID != expJobID {
t.Fatalf("JobID incorrect on type %v: %#v", schedType, eval)
}
if eval.CreateTime == 0 {
t.Fatalf("CreateTime is unset on type %v: %#v", schedType, eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("ModifyTime is unset on type %v: %#v", schedType, eval)
}
}
}

Expand Down Expand Up @@ -2433,6 +2439,12 @@ func TestClientEndpoint_Evaluate(t *testing.T) {
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
if eval.CreateTime == 0 {
t.Fatalf("CreateTime is unset: %#v", eval)
}
if eval.ModifyTime == 0 {
t.Fatalf("ModifyTime is unset: %#v", eval)
}
}

func TestClientEndpoint_Evaluate_ACL(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
JobID: job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
Expand Down
2 changes: 2 additions & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
evals = append(evals, eval)
}
Expand Down
10 changes: 10 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8394,6 +8394,9 @@ type Evaluation struct {
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64

CreateTime int64
ModifyTime int64
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
}

// TerminalStatus returns if the current status is terminal and
Expand Down Expand Up @@ -8504,6 +8507,9 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation {
Status: EvalStatusPending,
Wait: wait,
PreviousEval: e.ID,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
// TODO(@jasmine): is a NextRollingEval technically created now or when original eval was created?
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
jazzyfresh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -8527,6 +8533,8 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool,
ClassEligibility: classEligibility,
EscapedComputedClass: escaped,
QuotaLimitReached: quotaReached,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
}

Expand All @@ -8546,6 +8554,8 @@ func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation {
Status: EvalStatusPending,
Wait: wait,
PreviousEval: e.ID,
CreateTime: time.Now().UTC().UnixNano(),
ModifyTime: time.Now().UTC().UnixNano(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func (w *Worker) UpdateEval(eval *structs.Evaluation) error {

// Store the snapshot index in the eval
eval.SnapshotIndex = w.snapshotIndex
eval.ModifyTime = time.Now().UTC().UnixNano()

// Setup the request
req := structs.EvalUpdateRequest{
Expand Down Expand Up @@ -386,6 +387,8 @@ func (w *Worker) CreateEval(eval *structs.Evaluation) error {

// Store the snapshot index in the eval
eval.SnapshotIndex = w.snapshotIndex
eval.CreateTime = time.Now().UTC().UnixNano()
eval.ModifyTime = time.Now().UTC().UnixNano()

// Setup the request
req := structs.EvalUpdateRequest{
Expand Down Expand Up @@ -447,6 +450,7 @@ func (w *Worker) ReblockEval(eval *structs.Evaluation) error {

// Store the snapshot index in the eval
eval.SnapshotIndex = w.snapshotIndex
eval.ModifyTime = time.Now().UTC().UnixNano()

// Setup the request
req := structs.EvalUpdateRequest{
Expand Down