From 2cc976bbac9e6595756d17f40780ab1d69783d76 Mon Sep 17 00:00:00 2001 From: Kaveh Shahbazian Date: Tue, 1 Nov 2022 22:57:58 +0100 Subject: [PATCH] push package actor to its own repo --- actor/actor.go | 114 --------- actor/actor_test.go | 237 ------------------ ...backs_spy_test.go => callbacks_spy_test.go | 12 +- go.mod | 11 +- go.sum | 16 +- test/callbacks_spy.go | 88 ------- test/support/support.go | 1 - worker-pool.go | 7 +- worker-pool_test.go | 13 +- 9 files changed, 37 insertions(+), 462 deletions(-) delete mode 100644 actor/actor.go delete mode 100644 actor/actor_test.go rename actor/callbacks_spy_test.go => callbacks_spy_test.go (91%) delete mode 100644 test/callbacks_spy.go delete mode 100644 test/support/support.go diff --git a/actor/actor.go b/actor/actor.go deleted file mode 100644 index 117caa6..0000000 --- a/actor/actor.go +++ /dev/null @@ -1,114 +0,0 @@ -// see LICENSE file - -package actor - -import ( - "context" - "time" -) - -func Start[T any](ctx context.Context, mailbox Mailbox[T], callbacks Callbacks[T], options ...Option) { - opts := applyOptions(options...) - start(ctx, mailbox, callbacks, opts) -} - -func start[T any](ctx context.Context, mailbox Mailbox[T], callbacks Callbacks[T], opts actorOptions) { - go func() { - if started != nil { - started(mailbox) - } - if stopped != nil { - defer stopped(mailbox) - } - - var ( - absoluteTimeout = opts.absoluteTimeout - idleTimeout = opts.idleTimeout - ) - - var absoluteTimeoutSignal, idleTimeoutSignal <-chan time.Time - if absoluteTimeout > 0 { - absoluteTimeoutSignal = time.After(absoluteTimeout) - } - - var requestCount RequestCount - for { - if requestCount > 0 && opts.respawnAfter > 0 && opts.respawnAfter <= requestCount { - start(ctx, mailbox, callbacks, opts) - return - } - - if idleTimeout > 0 { - idleTimeoutSignal = time.After(idleTimeout) - } - - select { - case <-absoluteTimeoutSignal: - callbacks.Stopped() - return - case <-idleTimeoutSignal: - if opts.respawnAfter > 0 { - start(ctx, mailbox, callbacks, opts) - return - } - callbacks.Stopped() - return - case <-ctx.Done(): - callbacks.Stopped() - return - case v, ok := <-mailbox: - if !ok { - callbacks.Stopped() - return - } - callbacks.Received(v) - requestCount++ - } - } - }() -} - -type ( - Callbacks[T any] interface { - Received(T) - Stopped() - } - - Mailbox[T any] <-chan T - - Option func(actorOptions) actorOptions - - actorOptions struct { - absoluteTimeout time.Duration - idleTimeout time.Duration - respawnAfter RequestCount - } - - RequestCount int - MailboxSize int -) - -func WithAbsoluteTimeout(timeout time.Duration) Option { - return func(opts actorOptions) actorOptions { opts.absoluteTimeout = timeout; return opts } -} - -func WithIdleTimeout(timeout time.Duration) Option { - return func(opts actorOptions) actorOptions { opts.idleTimeout = timeout; return opts } -} - -func WithRespawnAfter(respawnAfter RequestCount) Option { - return func(opts actorOptions) actorOptions { opts.respawnAfter = respawnAfter; return opts } -} - -func applyOptions(opts ...Option) actorOptions { - var options actorOptions - for _, fn := range opts { - options = fn(options) - } - return options -} - -var ( - started func(pool interface{}) - stopped func(pool interface{}) -) diff --git a/actor/actor_test.go b/actor/actor_test.go deleted file mode 100644 index e951faf..0000000 --- a/actor/actor_test.go +++ /dev/null @@ -1,237 +0,0 @@ -//go:generate moq -out callbacks_spy_test.go . Callbacks:CallbacksSpy -// see LICENSE file - -// install moq: -// $ go install github.com/matryer/moq@latest - -package actor - -import ( - "context" - "fmt" - "os" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestMain(m *testing.M) { - started = func(box interface{}) { - accessMailboxState.Lock() - defer accessMailboxState.Unlock() - mailboxStateWorkerStartCount[strKey(box)]++ - } - stopped = func(box interface{}) { - accessMailboxState.Lock() - defer accessMailboxState.Unlock() - mailboxStateWorkerStopCount[strKey(box)]++ - } - - exitVal := m.Run() - - os.Exit(exitVal) -} - -func Test_should_call_received_concurrently(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.ReceivedFunc = func(T) {} - - Start[T](context.TODO(), mailbox, callbacks) - - mailbox <- 1 - mailbox <- 2 - mailbox <- 3 - - assert.Eventually(t, func() bool { return len(callbacks.ReceivedCalls()) == 3 }, - time.Millisecond*300, time.Millisecond*20) - - assert.EqualValues(t, 1, callbacks.ReceivedCalls()[0].IfaceVal) - assert.EqualValues(t, 2, callbacks.ReceivedCalls()[1].IfaceVal) - assert.EqualValues(t, 3, callbacks.ReceivedCalls()[2].IfaceVal) -} - -func Test_should_call_stopped_when_context_is_canceled(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - ctx, cancel := context.WithCancel(context.Background()) - callbacks.StoppedFunc = func() {} - - Start[T](ctx, mailbox, callbacks) - cancel() - - assert.Eventually(t, func() bool { return len(callbacks.StoppedCalls()) == 1 }, - time.Millisecond*300, time.Millisecond*20) -} - -func Test_should_not_call_received_when_context_is_canceled(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - ctx, cancel := context.WithCancel(context.Background()) - callbacks.StoppedFunc = func() {} - - Start[T](ctx, mailbox, callbacks) - cancel() - - assert.Eventually(t, func() bool { return len(callbacks.StoppedCalls()) == 1 }, - time.Millisecond*300, time.Millisecond*20) - - sendingStarted := make(chan struct{}) - go func() { - close(sendingStarted) - mailbox <- 1 - }() - <-sendingStarted - - assert.Never(t, func() bool { return len(callbacks.ReceivedCalls()) > 0 }, - time.Millisecond*300, time.Millisecond*20) -} - -func Test_should_stop_after_absolute_timeout(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.StoppedFunc = func() {} - - Start[T](context.Background(), mailbox, callbacks, WithAbsoluteTimeout(time.Millisecond*50)) - - assert.Eventually(t, func() bool { return len(callbacks.StoppedCalls()) == 1 }, - time.Millisecond*300, time.Millisecond*20) -} - -func Test_should_stop_when_mailbox_is_closed(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.StoppedFunc = func() {} - - Start[T](context.Background(), mailbox, callbacks) - close(mailbox) - - assert.Eventually(t, func() bool { return len(callbacks.StoppedCalls()) == 1 }, - time.Millisecond*300, time.Millisecond*20) - assert.Equal(t, 0, len(callbacks.ReceivedCalls())) -} - -func Test_should_stop_after_idle_timeout_elapsed(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.StoppedFunc = func() {} - - Start[T](context.Background(), mailbox, callbacks, WithIdleTimeout(time.Millisecond*100)) - - assert.Never(t, func() bool { return len(callbacks.StoppedCalls()) > 0 }, - time.Millisecond*100, time.Millisecond*20) - - assert.Eventually(t, func() bool { return len(callbacks.StoppedCalls()) == 1 }, - time.Millisecond*300, time.Millisecond*20) -} - -func Test_should_respawn_after_receiving_n_messages(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.StoppedFunc = func() {} - callbacks.ReceivedFunc = func(T) {} - - Start[T](context.Background(), mailbox, callbacks, WithRespawnAfter(10)) - - go func() { - for i := 0; i < 20; i++ { - mailbox <- i - } - }() - - assert.Eventually(t, func() bool { return assert.EqualValues(t, 3, getNumberOfStarts(mailbox)) }, - time.Millisecond*300, time.Millisecond*20) - assert.Eventually(t, func() bool { return assert.EqualValues(t, 2, getNumberOfStops(mailbox)) }, - time.Millisecond*300, time.Millisecond*20) -} - -func Test_should_not_respawn_if_not_provided(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.StoppedFunc = func() {} - callbacks.ReceivedFunc = func(T) {} - - Start[T](context.Background(), mailbox, callbacks) - - go func() { - for i := 0; i < 20; i++ { - mailbox <- i - } - }() - - assert.Eventually(t, func() bool { return assert.EqualValues(t, 1, getNumberOfStarts(mailbox)) }, - time.Millisecond*300, time.Millisecond*20) - assert.Eventually(t, func() bool { return assert.EqualValues(t, 0, getNumberOfStops(mailbox)) }, - time.Millisecond*300, time.Millisecond*20) -} - -func Test_should_respawn_after_idle_timeout_elapsed_if_respawn_count_is_provided(t *testing.T) { - type T = interface{} - var ( - mailbox = make(chan T) - callbacks = &CallbacksSpy[T]{} - ) - callbacks.StoppedFunc = func() {} - - Start[T](context.Background(), mailbox, callbacks, - WithIdleTimeout(time.Millisecond*100), - WithRespawnAfter(100)) - - assert.Eventually(t, func() bool { return getNumberOfStarts(mailbox) == 2 }, - time.Millisecond*300, time.Millisecond*20) - assert.Eventually(t, func() bool { return getNumberOfStops(mailbox) == 1 }, - time.Millisecond*300, time.Millisecond*20) - - assert.Never(t, func() bool { return len(callbacks.StoppedCalls()) > 0 }, - time.Millisecond*100, time.Millisecond*20) -} - -func getNumberOfStarts(box interface{}) int { - accessMailboxState.Lock() - defer accessMailboxState.Unlock() - - return mailboxStateWorkerStartCount[strKey(box)] -} - -func getNumberOfStops(box interface{}) int { - accessMailboxState.Lock() - defer accessMailboxState.Unlock() - - return mailboxStateWorkerStopCount[strKey(box)] -} - -func strKey(key interface{}) string { - return fmt.Sprintf("%v", key) -} - -var ( - mailboxStateWorkerStopCount = make(map[string]int) - mailboxStateWorkerStartCount = make(map[string]int) - accessMailboxState = &sync.Mutex{} -) diff --git a/actor/callbacks_spy_test.go b/callbacks_spy_test.go similarity index 91% rename from actor/callbacks_spy_test.go rename to callbacks_spy_test.go index 42291f7..688c730 100644 --- a/actor/callbacks_spy_test.go +++ b/callbacks_spy_test.go @@ -1,10 +1,12 @@ -package actor +package spool import ( "sync" + + "github.com/dc0d/actor" ) -var _ Callbacks[int] = &CallbacksSpy[int]{} +var _ actor.Callbacks[int] = &CallbacksSpy[int]{} type CallbacksSpy[T any] struct { // ReceivedFunc mocks the Received method. @@ -46,7 +48,8 @@ func (mock *CallbacksSpy[T]) Received(ifaceVal T) { // ReceivedCalls gets all the calls that were made to Received. // Check the length with: -// len(mockedCallbacks.ReceivedCalls()) +// +// len(mockedCallbacks.ReceivedCalls()) func (mock *CallbacksSpy[T]) ReceivedCalls() []struct { IfaceVal T } { @@ -74,7 +77,8 @@ func (mock *CallbacksSpy[T]) Stopped() { // StoppedCalls gets all the calls that were made to Stopped. // Check the length with: -// len(mockedCallbacks.StoppedCalls()) +// +// len(mockedCallbacks.StoppedCalls()) func (mock *CallbacksSpy[T]) StoppedCalls() []struct { } { var calls []struct { diff --git a/go.mod b/go.mod index e6a35ea..3c5d22a 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,14 @@ module github.com/dc0d/spool -go 1.18 +go 1.19 -require github.com/stretchr/testify v1.7.0 +require ( + github.com/dc0d/actor v0.1.1 + github.com/stretchr/testify v1.8.1 +) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index acb88a4..ead6932 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,19 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dc0d/actor v0.1.1 h1:VlUTkvUoPwJow34ZHKXwzGryJ9YHHPo3ZJJy3PJ4aKk= +github.com/dc0d/actor v0.1.1/go.mod h1:7YE3Tv4ANQSDk86acTfch26c+8sauPaR9CYjgUg+BhE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/callbacks_spy.go b/test/callbacks_spy.go deleted file mode 100644 index ed2d019..0000000 --- a/test/callbacks_spy.go +++ /dev/null @@ -1,88 +0,0 @@ -package test - -import ( - "sync" - - "github.com/dc0d/spool/actor" -) - -var _ actor.Callbacks[int] = &CallbacksSpy[int]{} - -type CallbacksSpy[T any] struct { - // ReceivedFunc mocks the Received method. - ReceivedFunc func(ifaceVal T) - - // StoppedFunc mocks the Stopped method. - StoppedFunc func() - - // calls tracks calls to the methods. - calls struct { - // Received holds details about calls to the Received method. - Received []struct { - // IfaceVal is the ifaceVal argument value. - IfaceVal T - } - // Stopped holds details about calls to the Stopped method. - Stopped []struct { - } - } - lockReceived sync.RWMutex - lockStopped sync.RWMutex -} - -// Received calls ReceivedFunc. -func (mock *CallbacksSpy[T]) Received(ifaceVal T) { - if mock.ReceivedFunc == nil { - panic("CallbacksSpy.ReceivedFunc: method is nil but Callbacks.Received was just called") - } - callInfo := struct { - IfaceVal T - }{ - IfaceVal: ifaceVal, - } - mock.lockReceived.Lock() - mock.calls.Received = append(mock.calls.Received, callInfo) - mock.lockReceived.Unlock() - mock.ReceivedFunc(ifaceVal) -} - -// ReceivedCalls gets all the calls that were made to Received. -// Check the length with: -// len(mockedCallbacks.ReceivedCalls()) -func (mock *CallbacksSpy[T]) ReceivedCalls() []struct { - IfaceVal T -} { - var calls []struct { - IfaceVal T - } - mock.lockReceived.RLock() - calls = mock.calls.Received - mock.lockReceived.RUnlock() - return calls -} - -// Stopped calls StoppedFunc. -func (mock *CallbacksSpy[T]) Stopped() { - if mock.StoppedFunc == nil { - panic("CallbacksSpy.StoppedFunc: method is nil but Callbacks.Stopped was just called") - } - callInfo := struct { - }{} - mock.lockStopped.Lock() - mock.calls.Stopped = append(mock.calls.Stopped, callInfo) - mock.lockStopped.Unlock() - mock.StoppedFunc() -} - -// StoppedCalls gets all the calls that were made to Stopped. -// Check the length with: -// len(mockedCallbacks.StoppedCalls()) -func (mock *CallbacksSpy[T]) StoppedCalls() []struct { -} { - var calls []struct { - } - mock.lockStopped.RLock() - calls = mock.calls.Stopped - mock.lockStopped.RUnlock() - return calls -} diff --git a/test/support/support.go b/test/support/support.go deleted file mode 100644 index 17d5ef2..0000000 --- a/test/support/support.go +++ /dev/null @@ -1 +0,0 @@ -package support diff --git a/worker-pool.go b/worker-pool.go index a7e0085..fc32afc 100644 --- a/worker-pool.go +++ b/worker-pool.go @@ -6,13 +6,13 @@ import ( "context" "log" - "github.com/dc0d/spool/actor" + "github.com/dc0d/actor" ) type WorkerPool chan func() // New creates a new WorkerPool without any initial workers. To spawn workers, Grow must be called. -func New(mailboxSize actor.MailboxSize) WorkerPool { +func New(mailboxSize MailboxSize) WorkerPool { if mailboxSize < 0 { mailboxSize = 0 } @@ -76,5 +76,6 @@ func (obj defaultExecutor) Received(fn T) { func (obj defaultExecutor) Stopped() {} type ( - T = func() + T = func() + MailboxSize int ) diff --git a/worker-pool_test.go b/worker-pool_test.go index a372193..f130b4d 100644 --- a/worker-pool_test.go +++ b/worker-pool_test.go @@ -10,8 +10,7 @@ import ( "testing" "time" - "github.com/dc0d/spool/actor" - "github.com/dc0d/spool/test" + "github.com/dc0d/actor" "github.com/stretchr/testify/assert" ) @@ -46,7 +45,7 @@ func Test_WorkerPool_grow_should_spawn_workers_equal_to_growth(t *testing.T) { options []actor.Option ) pool := New(-1) - exec := &test.CallbacksSpy[T]{ + exec := &CallbacksSpy[T]{ StoppedFunc: func() {}, } executorFactory := func() actor.Callbacks[T] { return exec } @@ -153,7 +152,7 @@ func Test_WorkerPool_Grow_should_stop_extra_workers_with_absolute_timeout(t *tes absoluteTimeout := time.Millisecond * 10 pool := New(9) defer pool.Stop() - exec := &test.CallbacksSpy[T]{ + exec := &CallbacksSpy[T]{ StoppedFunc: func() {}, } executorFactory := func() actor.Callbacks[T] { return exec } @@ -171,7 +170,7 @@ func Test_WorkerPool_Grow_should_stop_extra_workers_with_idle_timeout_when_there idleTimeout := time.Millisecond * 50 pool := New(100) defer pool.Stop() - exec := &test.CallbacksSpy[T]{ + exec := &CallbacksSpy[T]{ StoppedFunc: func() {}, ReceivedFunc: func(fn func()) { fn() }, } @@ -209,7 +208,7 @@ func Test_WorkerPool_Grow_should_stop_extra_workers_when_context_is_canceled(t * increased := 10 pool := New(10) defer pool.Stop() - exec := &test.CallbacksSpy[T]{ + exec := &CallbacksSpy[T]{ StoppedFunc: func() {}, ReceivedFunc: func(fn func()) { fn() }, } @@ -239,7 +238,7 @@ func Test_WorkerPool_Stop_should_close_the_pool(t *testing.T) { func Test_WorkerPool_Stop_should_stop_the_workers(t *testing.T) { pool := New(9) increased := 10 - exec := &test.CallbacksSpy[T]{ + exec := &CallbacksSpy[T]{ StoppedFunc: func() {}, ReceivedFunc: func(fn func()) { fn() }, }