Skip to content

Commit

Permalink
Fix user queue in scheduler that was not thread-safe
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 committed Jul 10, 2024
1 parent ee8d110 commit d505f87
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"math/rand"
"sort"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -37,7 +38,8 @@ type querier struct {
// This struct holds user queues for pending requests. It also keeps track of connected queriers,
// and mapping between users and queriers.
type queues struct {
userQueues map[string]*userQueue
userQueues map[string]*userQueue
userQueuesMx sync.RWMutex

// List of all users with queues, used for iteration when searching for next queue to handle.
// Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink
Expand Down Expand Up @@ -103,6 +105,9 @@ func (q *queues) len() int {
}

func (q *queues) deleteQueue(userID string) {
q.userQueuesMx.Lock()
defer q.userQueuesMx.Unlock()

uq := q.userQueues[userID]
if uq == nil {
return
Expand Down Expand Up @@ -132,6 +137,9 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
maxQueriers = 0
}

q.userQueuesMx.Lock()
defer q.userQueuesMx.Unlock()

uq := q.userQueues[userID]
priorityEnabled := q.limits.QueryPriority(userID).Enabled
maxOutstanding := q.limits.MaxOutstandingPerTenant(userID)
Expand Down Expand Up @@ -237,6 +245,9 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us
continue
}

q.userQueuesMx.RLock()
defer q.userQueuesMx.RUnlock()

uq := q.userQueues[u]

if uq.queriers != nil {
Expand Down
33 changes: 33 additions & 0 deletions pkg/scheduler/queue/user_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"math/rand"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -457,6 +458,38 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) {
}
}

func TestGetOrAddQueueConcurrency(t *testing.T) {
const numGoRoutines = 100
limits := MockLimits{
MaxOutstanding: 3,
}
q := newUserQueues(0, 0, limits, nil)
q.addQuerierConnection("q-1")
q.addQuerierConnection("q-2")
q.addQuerierConnection("q-3")
q.addQuerierConnection("q-4")
q.addQuerierConnection("q-5")

var wg sync.WaitGroup
wg.Add(numGoRoutines)

for i := 0; i < numGoRoutines; i++ {
go func(maxOutstanding int) {
defer wg.Done()
limits.MaxOutstanding = maxOutstanding + 50
q.limits = limits
queue := q.getOrAddQueue("userID", 2)
if rand.Int()%2 == 0 {
queue.enqueueRequest(MockRequest{})
} else if rand.Int()%9 == 0 {
queue.dequeueRequest(0, false)
}
}(i)
}

wg.Wait()
}

func generateTenant(r *rand.Rand) string {
return fmt.Sprint("tenant-", r.Int()%5)
}
Expand Down

0 comments on commit d505f87

Please sign in to comment.