Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: multiple jobs support #303

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"errors"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -58,6 +59,10 @@ type jobResult struct {
type worker struct {
peer Peer

// quit indicates that the worker has already quit and is not accepting
// any more jobs.
quit int32

// nextJob is a channel of queries to be distributed, where the worker
// will poll new work from.
nextJob chan *queryJob
Expand All @@ -70,7 +75,7 @@ var _ Worker = (*worker)(nil)
func NewWorker(peer Peer) Worker {
return &worker{
peer: peer,
nextJob: make(chan *queryJob),
nextJob: make(chan *queryJob, maxJobs),
}
}

Expand All @@ -88,7 +93,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {

// Subscribe to messages from the peer.
msgChan, cancel := peer.SubscribeRecvMsg()
defer cancel()
defer func() {
atomic.AddInt32(&w.quit, 1)
cancel()
}()

for {
log.Tracef("Worker %v waiting for more work", peer.Addr())
Expand Down Expand Up @@ -266,5 +274,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
//
// NOTE: Part of the Worker interface.
func (w *worker) NewJob() chan<- *queryJob {
// The worker has already quit so don't return the nextJob channel.
if atomic.LoadInt32(&w.quit) != 0 {
return nil
}

return w.nextJob
}
20 changes: 10 additions & 10 deletions query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func makeJob() *queryJob {
}

type testCtx struct {
nextJob chan<- *queryJob
worker Worker
jobResults chan *jobResult
peer *mockPeer
workerDone chan struct{}
Expand All @@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) {
subscriptions: make(chan chan wire.Message),
quit: make(chan struct{}),
}
results := make(chan *jobResult)
results := make(chan *jobResult, maxJobs)
quit := make(chan struct{})

wk := NewWorker(peer)
Expand All @@ -123,7 +123,7 @@ func startWorker() (*testCtx, error) {
peer.responses = sub

return &testCtx{
nextJob: wk.NewJob(),
worker: wk,
jobResults: results,
peer: peer,
workerDone: done,
Expand All @@ -144,7 +144,7 @@ func TestWorkerIgnoreMsgs(t *testing.T) {
task := makeJob()

select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestWorkerTimeout(t *testing.T) {

// Give the worker the new job.
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestWorkerTimeout(t *testing.T) {

// It will immediately attempt to fetch another task.
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand All @@ -272,7 +272,7 @@ func TestWorkerDisconnect(t *testing.T) {
// Give the worker a new job.
task := makeJob()
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestWorkerDisconnect(t *testing.T) {

// No more jobs should be accepted by the worker after it has exited.
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
t.Fatalf("exited worker did pick up job")
default:
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestWorkerProgress(t *testing.T) {
task.timeout = taskTimeout

select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestWorkerJobCanceled(t *testing.T) {
canceled := false
for i := 0; i < 2; i++ {
select {
case ctx.nextJob <- task:
case ctx.worker.NewJob() <- task:
case <-time.After(1 * time.Second):
t.Fatalf("did not pick up job")
}
Expand Down
26 changes: 14 additions & 12 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

// maxQueryTimeout is the maximum timeout given to a single query.
maxQueryTimeout = 32 * time.Second

// maxJobs is the maximum amount of jobs a single worker can have.
maxJobs = 32
)

var (
Expand Down Expand Up @@ -74,11 +77,10 @@ type PeerRanking interface {

// activeWorker wraps a Worker that is currently running, together with the job
// we have given to it.
// TODO(halseth): support more than one active job at a time.
type activeWorker struct {
w Worker
activeJob *queryJob
onExit chan struct{}
w Worker
activeJobs map[uint64]*queryJob
onExit chan struct{}
}

// Config holds the configuration options for a new WorkManager.
Expand Down Expand Up @@ -126,8 +128,8 @@ var _ WorkManager = (*peerWorkManager)(nil)
func NewWorkManager(cfg *Config) WorkManager {
return &peerWorkManager{
cfg: cfg,
newBatches: make(chan *batch),
jobResults: make(chan *jobResult),
newBatches: make(chan *batch, maxJobs),
jobResults: make(chan *jobResult, maxJobs),
quit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -220,7 +222,7 @@ Loop:
for p, r := range workers {
// Only one active job at a time is currently
// supported.
if r.activeJob != nil {
if len(r.activeJobs) >= maxJobs {
continue
}

Expand All @@ -242,7 +244,7 @@ Loop:
log.Tracef("Sent job %v to worker %v",
next.Index(), p)
heap.Pop(work)
r.activeJob = next
r.activeJobs[next.Index()] = next

// Go back to start of loop, to check
// if there are more jobs to
Expand Down Expand Up @@ -278,9 +280,9 @@ Loop:
// remove it from our set of active workers.
onExit := make(chan struct{})
workers[peer.Addr()] = &activeWorker{
w: r,
activeJob: nil,
onExit: onExit,
w: r,
activeJobs: make(map[uint64]*queryJob),
onExit: onExit,
}

w.cfg.Ranking.AddPeer(peer.Addr())
Expand All @@ -302,7 +304,7 @@ Loop:
// Delete the job from the worker's active job, such
// that the slot gets opened for more work.
r := workers[result.peer.Addr()]
r.activeJob = nil
delete(r.activeJobs, result.job.Index())

// Get the index of this query's batch, and delete it
// from the map of current queries, since we don't have
Expand Down
12 changes: 6 additions & 6 deletions query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
NewWorker: func(peer Peer) Worker {
m := &mockWorker{
peer: peer,
nextJob: make(chan *queryJob),
results: make(chan *jobResult),
nextJob: make(chan *queryJob, maxJobs),
results: make(chan *jobResult, maxJobs),
}
workerChan <- m
return m
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
for i := 0; i < numQueries; i++ {
q := &Request{}
queries[i] = q
scheduledJobs[i] = make(chan sched)
scheduledJobs[i] = make(chan sched, maxJobs)
}

// For each worker, spin up a goroutine that will forward the job it
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) {
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
// jobs among workers according to the peer ranking.
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
const numQueries = 4
const numQueries = 4 * maxJobs
const numWorkers = 8

workMgr, workers := startWorkManager(t, numWorkers)
Expand All @@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
var jobs []*queryJob
for i := 0; i < numQueries; i++ {
select {
case job := <-workers[i].nextJob:
case job := <-workers[i/maxJobs].nextJob:
if job.index != uint64(i) {
t.Fatalf("unexpected job")
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
// Go backwards, and succeed the queries.
for i := numQueries - 1; i >= 0; i-- {
select {
case workers[i].results <- &jobResult{
case workers[i/maxJobs].results <- &jobResult{
job: jobs[i],
err: nil,
}:
Expand Down