Skip to content

Commit

Permalink
set number of initial workers
Browse files Browse the repository at this point in the history
  • Loading branch information
dc0d committed Oct 7, 2021
1 parent 295e9ad commit 5abf573
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 63 deletions.
20 changes: 5 additions & 15 deletions worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,21 @@ import (

type WorkerPool chan func()

// Init creates a new workerpool which has one default worker (the minimum number of workers is one).
// New creates a new workerpool. If initialPoolSize is zero, no initial workers will be started.
// To have more workers, the Grow method should be used.
// For initial worker absolute timeout and stop signal are ignored.
// Also, idle timeout is ignored, if no respawnAfter is provided.
func Init(mailboxSize MailboxSize, opts ...GrowthOption) WorkerPool {
func New(mailboxSize MailboxSize, initialPoolSize int, opts ...GrowthOption) WorkerPool {
if mailboxSize < 0 {
mailboxSize = 0
}

var pool WorkerPool = make(chan func(), mailboxSize)
pool.start(initialWorkerOptions(opts...))
if initialPoolSize > 0 {
pool.Grow(initialPoolSize, opts...)
}

return pool
}

func initialWorkerOptions(opts ...GrowthOption) growthOptions {
options := applyOptions(opts...)
options.absoluteTimeout = 0
options.stopSignal = nil
if options.respawnAfter == 0 && options.idleTimeout > 0 {
options.idleTimeout = 0
}
return options
}

func (pool WorkerPool) Stop() {
close(pool)
}
Expand Down
98 changes: 50 additions & 48 deletions worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,48 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

func Test_WorkerPool_Blocking_should_set_default_mailbox_size_to_zero(t *testing.T) {
pool := Init(-1)
defer pool.Stop()
func Test_WorkerPool_New(t *testing.T) {
t.Run(`should set default mailbox size to zero`, func(t *testing.T) {
pool := New(-1, 0)
defer pool.Stop()

assert.True(t, len(pool) == 0)
})

t.Run(`should start n initial workers`, func(t *testing.T) {
n := 100 // initial workers
pool := New(-1, n)
defer pool.Stop()

var count int64
stop := make(chan struct{})
defer close(stop)
for i := 0; i < n; i++ {
pool.SemiBlocking(func() {
atomic.AddInt64(&count, 1)
<-stop
})
}

assert.True(t, len(pool) == 0)
assert.Eventually(t, func() bool {
return atomic.LoadInt64(&count) == int64(n)
}, time.Millisecond*300, time.Millisecond*20)

assert.Never(t, func() bool {
select {
case pool <- func() { panic("should not run") }:
return true
default:
}
return false
}, time.Millisecond*300, time.Millisecond*20)
})
}

func Test_WorkerPool_Blocking_should_serialize_the_jobs(t *testing.T) {
const n = 1000

pool := Init(10)
pool := New(10, 1)
defer pool.Stop()

var (
Expand Down Expand Up @@ -60,7 +91,7 @@ func Test_WorkerPool_Blocking_should_serialize_the_jobs(t *testing.T) {

func Test_WorkerPool_Nonblocking_should_just_put_job_in_the_mailbox(t *testing.T) {
const n = 1000
pool := Init(n)
pool := New(n, 1)
defer pool.Stop()

var counter int64 = 0
Expand All @@ -79,7 +110,7 @@ func Test_WorkerPool_Nonblocking_should_just_put_job_in_the_mailbox(t *testing.T
}

func Test_WorkerPool_should_not_stop_because_of_panic(t *testing.T) {
pool := Init(1)
pool := New(1, 1)
defer pool.Stop()

pool.Blocking(func() {
Expand All @@ -98,7 +129,7 @@ func Test_WorkerPool_should_not_stop_because_of_panic(t *testing.T) {

func Test_WorkerPool_Grow_should_spin_up_at_least_one_new_worker(t *testing.T) {
increased := 1
pool := Init(9)
pool := New(9, 1)
defer pool.Stop()

negativeOrZero := 0
Expand All @@ -113,7 +144,7 @@ func Test_WorkerPool_Grow_should_spin_up_at_least_one_new_worker(t *testing.T) {

func Test_WorkerPool_Grow_should_spin_up_multiple_new_workers(t *testing.T) {
increased := 10
pool := Init(9)
pool := New(9, 1)
defer pool.Stop()

pool.Grow(increased)
Expand All @@ -128,7 +159,7 @@ func Test_WorkerPool_Grow_should_spin_up_multiple_new_workers(t *testing.T) {
func Test_WorkerPool_Grow_should_stop_extra_workers_with_absolute_timeout(t *testing.T) {
increased := 10
absoluteTimeout := time.Millisecond * 10
pool := Init(9)
pool := New(9, 1)
defer pool.Stop()

pool.Grow(increased, WithAbsoluteTimeout(absoluteTimeout))
Expand All @@ -144,7 +175,7 @@ func Test_WorkerPool_Grow_should_stop_extra_workers_with_idle_timeout_when_there
const n = 1000
increased := 10
idleTimeout := time.Millisecond * 50
pool := Init(100)
pool := New(100, 1)
defer pool.Stop()

start := make(chan struct{}, n)
Expand Down Expand Up @@ -181,7 +212,7 @@ func Test_WorkerPool_Grow_should_stop_extra_workers_with_idle_timeout_when_there
func Test_WorkerPool_Grow_should_stop_extra_workers_with_explicit_stop_signal(t *testing.T) {
increased := 10
stopSignal := make(chan struct{})
pool := Init(9)
pool := New(9, 1)
defer pool.Stop()

pool.Grow(increased, WithStopSignal(stopSignal))
Expand All @@ -201,7 +232,7 @@ func Test_WorkerPool_Grow_should_stop_extra_workers_with_explicit_stop_signal(t
}

func Test_WorkerPool_Grow_should_respawn_after_a_certain_number_of_requests(t *testing.T) {
pool := Init(9, WithRespawnAfter(10))
pool := New(9, 1, WithRespawnAfter(10))
defer pool.Stop()

expectedNumberOfStarts := 1 // one initial start
Expand All @@ -220,7 +251,7 @@ func Test_WorkerPool_Grow_should_respawn_after_a_certain_number_of_requests(t *t
}

func Test_WorkerPool_Grow_should_respawn_after_a_certain_timespan_if_reapawnAfter_is_provided(t *testing.T) {
pool := Init(9, WithRespawnAfter(1000), WithIdleTimeout(time.Millisecond*50))
pool := New(9, 1, WithRespawnAfter(1000), WithIdleTimeout(time.Millisecond*50))
defer pool.Stop()

time.Sleep(time.Millisecond * 190)
Expand All @@ -231,7 +262,7 @@ func Test_WorkerPool_Grow_should_respawn_after_a_certain_timespan_if_reapawnAfte
//

func Test_WorkerPool_Stop_should_close_the_pool(t *testing.T) {
pool := Init(9)
pool := New(9, 1)
pool.Stop()

assert.Panics(t, func() {
Expand All @@ -240,7 +271,7 @@ func Test_WorkerPool_Stop_should_close_the_pool(t *testing.T) {
}

func Test_WorkerPool_Stop_should_stop_the_workers(t *testing.T) {
pool := Init(9)
pool := New(9, 1)

increased := 10
pool.Grow(increased)
Expand All @@ -266,35 +297,6 @@ func Test_WorkerPool_Stop_should_stop_the_workers(t *testing.T) {

//

func Test_initialWorkerOptions(t *testing.T) {
t.Run(`ignores absolute timeout`, func(t *testing.T) {
options := initialWorkerOptions(WithAbsoluteTimeout(time.Minute))

assert.Equal(t, time.Duration(0), options.absoluteTimeout)
})

t.Run(`ignores stop signal`, func(t *testing.T) {
options := initialWorkerOptions(WithStopSignal(make(chan struct{})))

var zeroStopSignal <-chan struct{}
assert.Equal(t, zeroStopSignal, options.stopSignal)
})

t.Run(`ignores idle timeout if no respawn count is provided`, func(t *testing.T) {
options := initialWorkerOptions(WithIdleTimeout(time.Minute))

assert.Equal(t, time.Duration(0), options.idleTimeout)
})

t.Run(`does not ignore idle timeout if respawn count is provided`, func(t *testing.T) {
options := initialWorkerOptions(WithIdleTimeout(time.Minute), WithRespawnAfter(1000))

assert.Equal(t, time.Minute, options.idleTimeout)
})
}

//

func getNumberOfWorkers(pool WorkerPool) int {
accessWorkerPoolState.RLock()
defer accessWorkerPoolState.RUnlock()
Expand Down Expand Up @@ -331,7 +333,7 @@ var (
)

func ExampleWorkerPool_Blocking() {
pool := Init(1)
pool := New(1, 1)
defer pool.Stop()

var state int64
Expand All @@ -346,7 +348,7 @@ func ExampleWorkerPool_Blocking() {
}

func ExampleWorkerPool_SemiBlocking() {
pool := Init(1)
pool := New(1, 1)
defer pool.Stop()

var state int64
Expand All @@ -367,7 +369,7 @@ func ExampleWorkerPool_SemiBlocking() {

func ExampleWorkerPool_Grow() {
const n = 19
pool := Init(10)
pool := New(10, 1)
defer pool.Stop()

pool.Grow(3) // spin up three new workers
Expand Down

0 comments on commit 5abf573

Please sign in to comment.