Skip to content

Commit

Permalink
query: support more than 1 job per worker
Browse files Browse the repository at this point in the history
A worker is now able to be queried more than 1 job at a time with
maxJobs constant introduced to limit the job per peer.
  • Loading branch information
kcalvinalvin committed Dec 2, 2024
1 parent d4563a2 commit f56d320
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
17 changes: 15 additions & 2 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"errors"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -58,6 +59,10 @@ type jobResult struct {
type worker struct {
peer Peer

// quit indicates that the worker has already quit and is not accepting
// any more jobs.
quit int32

// nextJob is a channel of queries to be distributed, where the worker
// will poll new work from.
nextJob chan *queryJob
Expand All @@ -70,7 +75,7 @@ var _ Worker = (*worker)(nil)
func NewWorker(peer Peer) Worker {
return &worker{
peer: peer,
nextJob: make(chan *queryJob),
nextJob: make(chan *queryJob, maxJobs),
}
}

Expand All @@ -88,7 +93,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {

// Subscribe to messages from the peer.
msgChan, cancel := peer.SubscribeRecvMsg()
defer cancel()
defer func() {
atomic.AddInt32(&w.quit, 1)
cancel()
}()

for {
log.Tracef("Worker %v waiting for more work", peer.Addr())
Expand Down Expand Up @@ -266,5 +274,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
//
// NOTE: Part of the Worker interface.
func (w *worker) NewJob() chan<- *queryJob {
// The worker has already quit so don't return the nextJob channel.
if atomic.LoadInt32(&w.quit) != 0 {
return nil
}

return w.nextJob
}
20 changes: 10 additions & 10 deletions query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func makeJob() *queryJob {
}

type testCtx struct {
nextJob chan<- *queryJob
worker Worker
jobResults chan *jobResult
peer *mockPeer
workerDone chan struct{}
Expand All @@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) {
subscriptions: make(chan chan wire.Message),
quit: make(chan struct{}),
}
results := make(chan *jobResult)
results := make(chan *jobResult, maxJobs)
quit := make(chan struct{})

wk := NewWorker(peer)
Expand All @@ -123,7 +123,7 @@ func startWorker() (*testCtx, error) {
peer.responses = sub

return &testCtx{
nextJob: wk.NewJob(),
worker: wk,
jobResults: results,
peer: peer,
workerDone: done,
Expand All @@ -144,7 +144,7 @@ func TestWorkerIgnoreMsgs(t *testing.T) {
task := makeJob()

select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestWorkerTimeout(t *testing.T) {

// Give the worker the new job.
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestWorkerTimeout(t *testing.T) {

// It will immediately attempt to fetch another task.
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand All @@ -272,7 +272,7 @@ func TestWorkerDisconnect(t *testing.T) {
// Give the worker a new job.
task := makeJob()
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestWorkerDisconnect(t *testing.T) {

// No more jobs should be accepted by the worker after it has exited.
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
t.Fatalf("exited worker did pick up job")
default:
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestWorkerProgress(t *testing.T) {
task.timeout = taskTimeout

select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestWorkerJobCanceled(t *testing.T) {
canceled := false
for i := 0; i < 2; i++ {
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down
16 changes: 10 additions & 6 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

// maxQueryTimeout is the maximum timeout given to a single query.
maxQueryTimeout = 32 * time.Second

// maxJobs is the maximum amount of jobs a single worker can have.
maxJobs = 32
)

var (
Expand Down Expand Up @@ -74,7 +77,6 @@ type PeerRanking interface {

// activeWorker wraps a Worker that is currently running, together with the job
// we have given to it.
// TODO(halseth): support more than one active job at a time.
type activeWorker struct {
w Worker
activeJobs map[uint64]*queryJob
Expand Down Expand Up @@ -138,8 +140,8 @@ func NewWorkManager(cfg *Config) WorkManager {

return &peerWorkManager{
cfg: cfg,
newBatches: make(chan *batch),
jobResults: make(chan *jobResult),
newBatches: make(chan *batch, maxJobs),
jobResults: make(chan *jobResult, maxJobs),
workQueue: work,
workers: make(map[string]*activeWorker),
currentBatches: make(map[uint64]*batchProgress),
Expand Down Expand Up @@ -202,8 +204,10 @@ func (w *peerWorkManager) handleJobResult(result *jobResult) {

// Delete the job from the worker's active job, such
// that the slot gets opened for more work.
r := w.workers[result.peer.Addr()]
delete(r.activeJobs, result.job.Index())
r, found := w.workers[result.peer.Addr()]
if found {
delete(r.activeJobs, result.job.Index())
}

// Get the index of this query's batch, and delete it
// from the map of current queries, since we don't have
Expand Down Expand Up @@ -438,7 +442,7 @@ Loop:
for p, r := range w.workers {
// Only one active job at a time is currently
// supported.
if len(r.activeJobs) >= 1 {
if len(r.activeJobs) >= maxJobs {
continue
}

Expand Down
12 changes: 6 additions & 6 deletions query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
NewWorker: func(peer Peer) Worker {
m := &mockWorker{
peer: peer,
nextJob: make(chan *queryJob),
results: make(chan *jobResult),
nextJob: make(chan *queryJob, maxJobs),
results: make(chan *jobResult, maxJobs),
}
workerChan <- m
return m
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
for i := 0; i < numQueries; i++ {
q := &Request{}
queries[i] = q
scheduledJobs[i] = make(chan sched)
scheduledJobs[i] = make(chan sched, maxJobs)
}

// For each worker, spin up a goroutine that will forward the job it
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) {
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
// jobs among workers according to the peer ranking.
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
const numQueries = 4
const numQueries = 4 * maxJobs
const numWorkers = 8

workMgr, workers := startWorkManager(t, numWorkers)
Expand All @@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
var jobs []*queryJob
for i := 0; i < numQueries; i++ {
select {
case job := <-workers[i].nextJob:
case job := <-workers[i/maxJobs].nextJob:
if job.index != uint64(i) {
t.Fatalf("unexpected job")
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
// Go backwards, and succeed the queries.
for i := numQueries - 1; i >= 0; i-- {
select {
case workers[i].results <- &jobResult{
case workers[i/maxJobs].results <- &jobResult{
job: jobs[i],
err: nil,
}:
Expand Down

0 comments on commit f56d320

Please sign in to comment.