diff --git a/actor/combine.go b/actor/combine.go index 27787a9..4a3a038 100644 --- a/actor/combine.go +++ b/actor/combine.go @@ -26,6 +26,7 @@ func (b *CombineBuilder) Build() Actor { a := &combinedActor{ actors: b.actors, onStopFunc: b.options.Combined.OnStopFunc, + onStartFunc: b.options.Combined.OnStartFunc, stopTogether: b.options.Combined.StopTogether, stopping: &atomic.Bool{}, } @@ -44,8 +45,10 @@ func (b *CombineBuilder) WithOptions(opt ...CombinedOption) *CombineBuilder { type combinedActor struct { actors []Actor onStopFunc func() + onStartFunc func(Context) stopTogether bool + ctx *context runningCount atomic.Int32 running bool runningLock sync.Mutex @@ -70,6 +73,11 @@ func (a *combinedActor) onActorStopped() { if a.stopTogether && a.stopping.CompareAndSwap(false, true) { // Run stop in goroutine because wrapped actor // should not wait for other actors to stop. + // + // Also if a.Stop() is called from same gorutine it would + // be recoursive call without exit condition. Therfore + // it is need to call a.Stop() from other goroutine, + // regardless of first invariant. go a.Stop() } } @@ -82,6 +90,12 @@ func (a *combinedActor) Stop() { return } + if ctx := a.ctx; ctx != nil { + ctx.end() + + a.ctx = nil + } + a.running = false a.runningLock.Unlock() @@ -98,11 +112,18 @@ func (a *combinedActor) Start() { return } + ctx := newContext() + a.ctx = ctx + a.stopping.Store(false) a.running = true a.runningLock.Unlock() + if fn := a.onStartFunc; fn != nil { + fn(ctx) + } + for _, actor := range a.actors { a.runningCount.Add(1) actor.Start() diff --git a/actor/combine_test.go b/actor/combine_test.go index 841daf8..080d868 100644 --- a/actor/combine_test.go +++ b/actor/combine_test.go @@ -75,16 +75,17 @@ func Test_Combine_OptStopTogether(t *testing.T) { } } -func Test_Combine_OptOnStop(t *testing.T) { +func Test_Combine_OptOnStopOptOnStart(t *testing.T) { t.Parallel() const actorsCount = 5 + onStatC, onStartOpt := createCombinedOnStartOption(t, 1) onStopC, onStopOpt := createCombinedOnStopOption(t, 1) actors := createActors(actorsCount) a := Combine(actors...). - WithOptions(onStopOpt). + WithOptions(onStopOpt, onStartOpt). Build() a.Start() @@ -94,6 +95,7 @@ func Test_Combine_OptOnStop(t *testing.T) { a.Stop() // should have no effect a.Stop() // should have no effect assert.Equal(t, `🌚`, <-onStopC) + assert.Equal(t, `🌞`, <-onStatC) } func Test_Combine_OptOnStop_AfterActorStops(t *testing.T) { @@ -143,14 +145,29 @@ func createActor(i int, opts ...Option) Actor { func createCombinedOnStopOption(t *testing.T, count int) (<-chan any, CombinedOption) { t.Helper() - onStopC := make(chan any, count) - onStopFunc := func() { + c := make(chan any, count) + fn := func() { select { - case onStopC <- `🌚`: + case c <- `🌚`: default: t.Fatal("onStopFunc should be called only once") } } - return onStopC, OptOnStopCombined(onStopFunc) + return c, OptOnStopCombined(fn) +} + +func createCombinedOnStartOption(t *testing.T, count int) (<-chan any, CombinedOption) { + t.Helper() + + c := make(chan any, count) + fn := func(_ Context) { + select { + case c <- `🌞`: + default: + t.Fatal("onStart should be called only once") + } + } + + return c, OptOnStartCombined(fn) } diff --git a/actor/options.go b/actor/options.go index d1e6a4d..a8a3e0b 100644 --- a/actor/options.go +++ b/actor/options.go @@ -70,13 +70,20 @@ func OptStopTogether() CombinedOption { } } -// OptOnStopCombined will is called after all combined actors are stopped. +// OptOnStopCombined is called after all combined actors are stopped. func OptOnStopCombined(f func()) CombinedOption { return func(o *options) { o.Combined.OnStopFunc = f } } +// OptOnStartCombined is called before all. +func OptOnStartCombined(f func(Context)) CombinedOption { + return func(o *options) { + o.Combined.OnStartFunc = f + } +} + type ( option func(o *options) @@ -99,6 +106,7 @@ type optionsActor struct { type optionsCombined struct { StopTogether bool OnStopFunc func() + OnStartFunc func(Context) } type optionsMailbox struct { diff --git a/actor/options_test.go b/actor/options_test.go index 1b65e4b..44903af 100644 --- a/actor/options_test.go +++ b/actor/options_test.go @@ -116,4 +116,20 @@ func testCombinedOptions(t *testing.T) { assert.Empty(t, opts.Actor) assert.Empty(t, opts.Mailbox) } + + { // Assert that OnStartCombined will be set + opts := NewOptions(OptOnStartCombined(func(Context) {})) + assert.NotNil(t, opts.Combined.OnStartFunc) + + assert.Empty(t, opts.Actor) + assert.Empty(t, opts.Mailbox) + } + + { // Assert that OnStopCombined will be set + opts := NewOptions(OptOnStopCombined(func() {})) + assert.NotNil(t, opts.Combined.OnStopFunc) + + assert.Empty(t, opts.Actor) + assert.Empty(t, opts.Mailbox) + } }