Skip to content

Commit

Permalink
make dial queue parameters configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored and anacrolix committed Feb 1, 2019
1 parent 1b1fb7e commit ebcfcd4
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 56 deletions.
137 changes: 88 additions & 49 deletions dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"context"
"fmt"
"math"
"time"

Expand All @@ -10,35 +11,80 @@ import (
)

const (
// DialQueueMinParallelism is the minimum number of worker dial goroutines that will be alive at any time.
DialQueueMinParallelism = 6
// DialQueueMaxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
DialQueueMaxParallelism = 20
// DialQueueMaxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
DialQueueMaxIdle = 5 * time.Second
// DialQueueScalingMutePeriod is the amount of time to ignore further worker pool scaling events, after one is
// processed. Its role is to reduce jitter.
DialQueueScalingMutePeriod = 1 * time.Second
// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
// be alive at any time.
DefaultDialQueueMinParallelism = 6
// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
// be alive at any time.
DefaultDialQueueMaxParallelism = 20
// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
// a worker pool downscaling.
DefaultDialQueueMaxIdle = 5 * time.Second
// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
// scaling events, after one is processed. Its role is to reduce jitter.
DefaultDialQueueScalingMutePeriod = 1 * time.Second
// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
// or divided when upscaling and downscaling events occur, respectively.
DefaultDialQueueScalingFactor = 1.5
)

type dialQueue struct {
ctx context.Context
dialFn func(context.Context, peer.ID) error

nWorkers int
scalingFactor float64
scalingMutePeriod time.Duration
maxIdle time.Duration
*dqParams

in *queue.ChanQueue
out *queue.ChanQueue
nWorkers uint
out *queue.ChanQueue

waitingCh chan waitingCh
dieCh chan struct{}
growCh chan struct{}
shrinkCh chan struct{}
}

type dqParams struct {
ctx context.Context
target string
dialFn func(context.Context, peer.ID) error
in *queue.ChanQueue
config dqConfig
}

type dqConfig struct {
// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
minParallelism uint
// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
maxParallelism uint
// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
// and downscaling events occur, respectively.
scalingFactor float64
// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
// Its role is to reduce jitter.
mutePeriod time.Duration
// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
maxIdle time.Duration
}

// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
return dqConfig{
minParallelism: DefaultDialQueueMinParallelism,
maxParallelism: DefaultDialQueueMaxParallelism,
scalingFactor: DefaultDialQueueScalingFactor,
maxIdle: DefaultDialQueueMaxIdle,
mutePeriod: DefaultDialQueueScalingMutePeriod,
}
}

func (dqc *dqConfig) validate() error {
if dqc.minParallelism > dqc.maxParallelism {
return fmt.Errorf("minParallelism must be below maxParallelism; actual values: min=%d, max=%d",
dqc.minParallelism, dqc.maxParallelism)
}
if dqc.scalingFactor < 1 {
return fmt.Errorf("scalingFactor must be >= 1; actual value: %f", dqc.scalingFactor)
}
return nil
}

type waitingCh struct {
ch chan<- peer.ID
ts time.Time
Expand All @@ -52,7 +98,7 @@ type waitingCh struct {
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers.
//
// The following events trigger scaling:
Expand All @@ -62,31 +108,23 @@ type waitingCh struct {
//
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to DialQueueMaxParallelism.
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error,
maxIdle, scalingMutePeriod time.Duration,
) *dialQueue {
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
sq := &dialQueue{
ctx: ctx,
dialFn: dialFn,
nWorkers: DialQueueMinParallelism,
scalingFactor: 1.5,
scalingMutePeriod: scalingMutePeriod,
maxIdle: maxIdle,

in: in,
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),

dqParams: params,
nWorkers: params.config.minParallelism,
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
growCh: make(chan struct{}, 1),
shrinkCh: make(chan struct{}, 1),
waitingCh: make(chan waitingCh),
dieCh: make(chan struct{}, DialQueueMaxParallelism),
dieCh: make(chan struct{}, params.config.maxParallelism),
}
for i := 0; i < DialQueueMinParallelism; i++ {

for i := 0; i < int(params.config.minParallelism); i++ {
go sq.worker()
}
go sq.control()
return sq
return sq, nil
}

func (dq *dialQueue) control() {
Expand Down Expand Up @@ -151,13 +189,13 @@ func (dq *dialQueue) control() {
dialled = nil
}
case <-dq.growCh:
if time.Since(lastScalingEvt) < dq.scalingMutePeriod {
if time.Since(lastScalingEvt) < dq.config.mutePeriod {
continue
}
dq.grow()
lastScalingEvt = time.Now()
case <-dq.shrinkCh:
if time.Since(lastScalingEvt) < dq.scalingMutePeriod {
if time.Since(lastScalingEvt) < dq.config.mutePeriod {
continue
}
dq.shrink()
Expand Down Expand Up @@ -201,19 +239,20 @@ func (dq *dialQueue) Consume() <-chan peer.ID {

func (dq *dialQueue) grow() {
// no mutex needed as this is only called from the (single-threaded) control loop.
defer func(prev int) {
defer func(prev uint) {
if prev == dq.nWorkers {
return
}
log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
}(dq.nWorkers)

if dq.nWorkers == DialQueueMaxParallelism {
if dq.nWorkers == dq.config.maxParallelism {
return
}
target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor))
if target > DialQueueMaxParallelism {
target = DialQueueMinParallelism
// choosing not to worry about uint wrapping beyond max value.
target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
if target > dq.config.maxParallelism {
target = dq.config.maxParallelism
}
for ; dq.nWorkers < target; dq.nWorkers++ {
go dq.worker()
Expand All @@ -222,19 +261,19 @@ func (dq *dialQueue) grow() {

func (dq *dialQueue) shrink() {
// no mutex needed as this is only called from the (single-threaded) control loop.
defer func(prev int) {
defer func(prev uint) {
if prev == dq.nWorkers {
return
}
log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
}(dq.nWorkers)

if dq.nWorkers == DialQueueMinParallelism {
if dq.nWorkers == dq.config.minParallelism {
return
}
target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor))
if target < DialQueueMinParallelism {
target = DialQueueMinParallelism
target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
if target < dq.config.minParallelism {
target = dq.config.minParallelism
}
// send as many die signals as workers we have to prune.
for ; dq.nWorkers > target; dq.nWorkers-- {
Expand Down Expand Up @@ -265,7 +304,7 @@ func (dq *dialQueue) worker() {
case <-idleTimer.C:
default:
}
idleTimer.Reset(dq.maxIdle)
idleTimer.Reset(dq.config.maxIdle)

select {
case <-dq.dieCh:
Expand Down
58 changes: 52 additions & 6 deletions dial_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

func TestDialQueueGrowsOnSlowDials(t *testing.T) {

in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})

Expand All @@ -29,15 +28,27 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
}

// remove the mute period to grow faster.
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)
config := dqDefaultConfig()
config.maxIdle = 10 * time.Minute
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}

for i := 0; i < 4; i++ {
_ = dq.Consume()
time.Sleep(100 * time.Millisecond)
}

for i := 0; i < 20; i++ {
if atomic.LoadInt32(&cnt) > int32(DialQueueMinParallelism) {
if atomic.LoadInt32(&cnt) > int32(DefaultDialQueueMinParallelism) {
return
}
time.Sleep(100 * time.Millisecond)
Expand All @@ -61,7 +72,19 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
return nil
}

dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)
config := dqDefaultConfig()
config.maxIdle = 10 * time.Minute
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}

// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable.
Expand Down Expand Up @@ -121,7 +144,19 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
in.EnqChan <- peer.ID(i)
}

dq := newDialQueue(context.Background(), "test", in, dialFn, time.Second, 0)
config := dqDefaultConfig()
config.maxIdle = 1 * time.Second
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}

// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ {
Expand Down Expand Up @@ -162,7 +197,18 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
in.EnqChan <- peer.ID(i)
}

dq := newDialQueue(context.Background(), "test", in, dialFn, DialQueueMaxIdle, 2*time.Second)
config := dqDefaultConfig()
config.mutePeriod = 2 * time.Second
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}

// pick up three consumers.
for i := 0; i < 3; i++ {
Expand Down
12 changes: 11 additions & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,17 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
peersToQuery: peersToQuery,
proc: proc,
}
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer, DialQueueMaxIdle, DialQueueScalingMutePeriod)
dq, err := newDialQueue(&dqParams{
ctx: ctx,
target: q.key,
in: peersToQuery,
dialFn: r.dialPeer,
config: dqDefaultConfig(),
})
if err != nil {
panic(err)
}
r.peersDialed = dq
return r
}

Expand Down

0 comments on commit ebcfcd4

Please sign in to comment.