Skip to content

Commit

Permalink
Merge pull request #3972 from hashicorp/b-blocked-evals-namespace
Browse files Browse the repository at this point in the history
Fix bug with not including namespace in indexing blocked evals
  • Loading branch information
preetapan committed Mar 13, 2018
2 parents 75209f0 + 918c34d commit 6251e72
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
26 changes: 13 additions & 13 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type EvalBroker struct {
jobEvals map[structs.NamespacedID]string

// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[string]PendingEvaluations
blocked map[structs.NamespacedID]PendingEvaluations

// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[string]PendingEvaluations),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
Expand Down Expand Up @@ -236,17 +236,17 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, queue string) {
}

// Check if there is an evaluation for this JobID pending
tuple := structs.NamespacedID{
namespacedID := structs.NamespacedID{
ID: eval.JobID,
Namespace: eval.Namespace,
}
pendingEval := b.jobEvals[tuple]
pendingEval := b.jobEvals[namespacedID]
if pendingEval == "" {
b.jobEvals[tuple] = eval.ID
b.jobEvals[namespacedID] = eval.ID
} else if pendingEval != eval.ID {
blocked := b.blocked[eval.JobID]
blocked := b.blocked[namespacedID]
heap.Push(&blocked, eval)
b.blocked[eval.JobID] = blocked
b.blocked[namespacedID] = blocked
b.stats.TotalBlocked += 1
return
}
Expand Down Expand Up @@ -519,19 +519,19 @@ func (b *EvalBroker) Ack(evalID, token string) error {
delete(b.unack, evalID)
delete(b.evals, evalID)

tuple := structs.NamespacedID{
namespacedID := structs.NamespacedID{
ID: jobID,
Namespace: unack.Eval.Namespace,
}
delete(b.jobEvals, tuple)
delete(b.jobEvals, namespacedID)

// Check if there are any blocked evaluations
if blocked := b.blocked[jobID]; len(blocked) != 0 {
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)
if len(blocked) > 0 {
b.blocked[jobID] = blocked
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, jobID)
delete(b.blocked, namespacedID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
Expand Down Expand Up @@ -671,7 +671,7 @@ func (b *EvalBroker) Flush() {
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[string]PendingEvaluations)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
Expand Down
43 changes: 43 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -1295,3 +1296,45 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
t.Fatal(e)
})
}

func TestEvalBroker_NamespacedJobs(t *testing.T) {
t.Parallel()
b := testBroker(t, 0)
b.SetEnabled(true)

// Create evals with the same jobid and different namespace
jobId := "test-jobID"

eval1 := mock.Eval()
eval1.JobID = jobId
eval1.Namespace = "n1"
b.Enqueue(eval1)

// This eval should not block
eval2 := mock.Eval()
eval2.JobID = jobId
eval2.Namespace = "default"
b.Enqueue(eval2)

// This eval should block
eval3 := mock.Eval()
eval3.JobID = jobId
eval3.Namespace = "default"
b.Enqueue(eval3)

require := require.New(t)
out1, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Equal(eval1.ID, out1.ID)

out2, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Equal(eval2.ID, out2.ID)

out3, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Nil(out3)

require.Equal(1, len(b.blocked))

}

0 comments on commit 6251e72

Please sign in to comment.