Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker waitForIndex uses StateStore index, not Raft Applied Index #1339

Merged
merged 3 commits into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1001)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
Expand Down Expand Up @@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
Expand Down
37 changes: 32 additions & 5 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,30 +251,57 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()

// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func() (uint64, bool) {
// Snapshot the current state
snap, err := s.fsm.State().Snapshot()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to snapshot state for periodic GC: %v", err)
return 0, false
}

// Store the snapshot's index
snapshotIndex, err := snap.LatestIndex()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to determine snapshot's index for periodic GC: %v", err)
return 0, false
}

return snapshotIndex, true
}

for {

select {
case <-evalGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
}
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
}
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
}
case <-stopCh:
return
}
}
}

// coreJobEval returns an evaluation for a core job
func (s *Server) coreJobEval(job string) *structs.Evaluation {
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
return &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerScheduled,
JobID: job,
Status: structs.EvalStatusPending,
ModifyIndex: s.raft.AppliedIndex(),
ModifyIndex: modifyIndex,
}
}

Expand Down
16 changes: 15 additions & 1 deletion nomad/system_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nomad

import (
"fmt"

"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -16,6 +18,18 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
return err
}

s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC))
// Snapshot the current state
snap, err := s.srv.fsm.State().Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot state: %v", err)
}

// Store the snapshot's index
snapshotIndex, err := snap.LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine snapshot's index: %v", err)
}

s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
return nil
}
5 changes: 1 addition & 4 deletions nomad/system_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
)

func TestSystemEndpoint_GarbageCollect(t *testing.T) {
//s1 := testServer(t, func(c *Config) {
//c.NumSchedulers = 0 // Prevent automatic dequeue
//})
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
Expand All @@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
if err := state.UpsertJob(0, job); err != nil {
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
}

Expand Down
15 changes: 13 additions & 2 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,20 @@ func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
CHECK:
// Snapshot the current state
snap, err := w.srv.fsm.State().Snapshot()
Copy link
Contributor

@slackpad slackpad Jun 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this use case would it be worth being able to query the index without a snapshot since you don't otherwise use the snapshot here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are they cheap enough where you don't care?

Copy link
Contributor Author

@dadgar dadgar Jun 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are cheap enough where I don't care.

func (s *StateStore) Snapshot() (*StateSnapshot, error) {
    snap := &StateSnapshot{
        StateStore: StateStore{
            logger: s.logger,
            db:     s.db.Snapshot(),
            watch:  s.watch,
        },
    }
    return snap, nil
}

But it saves some memory allocations in a hot path so I will update

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thought I had when looking at this was could you use your state store's blocking query watch support for this? That way you get an edge to wake this up with when the data arrives vs. letting the wait expire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be cool! We currently don't have triggers on the index table, so you would have to set up a watch on all the tables. Will put a comment tho

if err != nil {
return fmt.Errorf("failed to snapshot state: %v", err)
}

// Store the snapshot's index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of a stale comment - you aren't really storing it.

snapshotIndex, err := snap.LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine snapshot's index: %v", err)
}

// We only need the FSM state to be as recent as the given index
appliedIndex := w.srv.raft.AppliedIndex()
if index <= appliedIndex {
if index <= snapshotIndex {
w.backoffReset()
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause an increment
go func() {
time.Sleep(10 * time.Millisecond)
s1.raft.Barrier(0)
n := mock.Node()
if err := s1.fsm.state.UpsertNode(index+1, n); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
}()

// Wait for a future index
Expand Down