diff --git a/actors/actor_system.go b/actors/actor_system.go index 8684e886..1997c5ee 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -163,7 +163,7 @@ type actorSystem struct { // The default value is 1s actorInitTimeout time.Duration // Specifies the supervisor strategy - supervisorStrategy StrategyDirective + supervisorDirective supervisorDirective // Specifies the telemetry config telemetry *telemetry.Telemetry // Specifies whether remoting is enabled. @@ -240,7 +240,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { expireActorAfter: DefaultPassivationTimeout, askTimeout: DefaultAskTimeout, actorInitMaxRetries: DefaultInitMaxRetries, - supervisorStrategy: DefaultSupervisoryStrategy, + supervisorDirective: DefaultSupervisoryStrategy, telemetry: telemetry.New(), locker: sync.Mutex{}, shutdownTimeout: DefaultShutdownTimeout, @@ -437,7 +437,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, withAskTimeout(x.askTimeout), withCustomLogger(x.logger), withActorSystem(x), - withSupervisorStrategy(x.supervisorStrategy), + withSupervisorDirective(x.supervisorDirective), withMailboxSize(x.mailboxSize), withMailbox(mailbox), // nil mailbox is taken care during initiliazation by the newPID withStash(x.stashCapacity), @@ -490,7 +490,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc } actorID := uuid.NewString() - actor := newFnActor(actorID, receiveFunc, opts...) + actor := newFuncActor(actorID, receiveFunc, opts...) actorPath := NewPath(actorID, NewAddress(x.name, "", -1)) if x.remotingEnabled.Load() { @@ -511,7 +511,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc withAskTimeout(x.askTimeout), withCustomLogger(x.logger), withActorSystem(x), - withSupervisorStrategy(x.supervisorStrategy), + withSupervisorDirective(x.supervisorDirective), withMailboxSize(x.mailboxSize), withMailbox(mailbox), // nil mailbox is taken care during initiliazation by the newPID withStash(x.stashCapacity), diff --git a/actors/actor_test.go b/actors/actor_test.go index 18fb6584..be669662 100644 --- a/actors/actor_test.go +++ b/actors/actor_test.go @@ -147,14 +147,16 @@ func newSupervised() *supervised { return &supervised{} } -func (c *supervised) PreStart(context.Context) error { +func (x *supervised) PreStart(context.Context) error { return nil } -func (c *supervised) Receive(ctx ReceiveContext) { +func (x *supervised) Receive(ctx ReceiveContext) { switch ctx.Message().(type) { case *goaktpb.PostStart: case *testspb.TestSend: + case *testspb.TestReply: + ctx.Response(new(testspb.Reply)) case *testspb.TestPanic: panic("panicked") default: @@ -162,7 +164,7 @@ func (c *supervised) Receive(ctx ReceiveContext) { } } -func (c *supervised) PostStop(context.Context) error { +func (x *supervised) PostStop(context.Context) error { return nil } @@ -491,3 +493,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, // return the cluster startNode return system, provider } + +type unhandledSupervisorDirective struct{} + +func (x unhandledSupervisorDirective) isSupervisorDirective() {} diff --git a/actors/func_actor.go b/actors/func_actor.go index 4cbe1803..71cbbf30 100644 --- a/actors/func_actor.go +++ b/actors/func_actor.go @@ -80,8 +80,8 @@ type funcActor struct { postStop PostStopFunc } -// newFnActor creates an instance of funcActor -func newFnActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor { +// newFuncActor creates an instance of funcActor +func newFuncActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor { // create the actor instance actor := &funcActor{ receiveFunc: receiveFunc, @@ -98,7 +98,7 @@ func newFnActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor { var _ Actor = (*funcActor)(nil) // PreStart pre-starts the actor. -func (x funcActor) PreStart(ctx context.Context) error { +func (x *funcActor) PreStart(ctx context.Context) error { // check whether the pre-start hook is set and call it preStart := x.preStart if preStart != nil { @@ -108,7 +108,7 @@ func (x funcActor) PreStart(ctx context.Context) error { } // Receive processes any message dropped into the actor mailbox. -func (x funcActor) Receive(ctx ReceiveContext) { +func (x *funcActor) Receive(ctx ReceiveContext) { switch m := ctx.Message().(type) { case *goaktpb.PostStart: x.pid = ctx.Self() @@ -121,7 +121,7 @@ func (x funcActor) Receive(ctx ReceiveContext) { } // PostStop is executed when the actor is shutting down. -func (x funcActor) PostStop(ctx context.Context) error { +func (x *funcActor) PostStop(ctx context.Context) error { // check whether the pre-start hook is set and call it postStop := x.postStop if postStop != nil { diff --git a/actors/option.go b/actors/option.go index 72ea2a49..bedaf8df 100644 --- a/actors/option.go +++ b/actors/option.go @@ -86,10 +86,10 @@ func WithPassivationDisabled() Option { }) } -// WithSupervisorStrategy sets the supervisor strategy -func WithSupervisorStrategy(strategy StrategyDirective) Option { +// WithSupervisorDirective sets the supervisor strategy directive +func WithSupervisorDirective(directive supervisorDirective) Option { return OptionFunc(func(a *actorSystem) { - a.supervisorStrategy = strategy + a.supervisorDirective = directive }) } diff --git a/actors/option_test.go b/actors/option_test.go index 2ccd6b89..42900d27 100644 --- a/actors/option_test.go +++ b/actors/option_test.go @@ -39,6 +39,7 @@ import ( func TestOption(t *testing.T) { tel := telemetry.New() mailbox := newReceiveContextBuffer(10) + resumeDirective := NewResumeDirective() var atomicTrue atomic.Bool atomicTrue.Store(true) clusterConfig := NewClusterConfig() @@ -74,9 +75,9 @@ func TestOption(t *testing.T) { expected: actorSystem{expireActorAfter: -1}, }, { - name: "WithSupervisorStrategy", - option: WithSupervisorStrategy(RestartDirective), - expected: actorSystem{supervisorStrategy: RestartDirective}, + name: "WithSupervisorDirective", + option: WithSupervisorDirective(resumeDirective), + expected: actorSystem{supervisorDirective: resumeDirective}, }, { name: "WithRemoting", diff --git a/actors/pid.go b/actors/pid.go index e32bba27..8c80fc3f 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -249,7 +249,7 @@ type pid struct { processingTimeLocker *sync.Mutex // supervisor strategy - supervisorStrategy StrategyDirective + supervisorDirective supervisorDirective // observability settings telemetry *telemetry.Telemetry @@ -291,7 +291,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption logger: log.DefaultLogger, mailboxSize: DefaultMailboxSize, children: newPIDMap(10), - supervisorStrategy: DefaultSupervisoryStrategy, + supervisorDirective: DefaultSupervisoryStrategy, watchersList: slices.NewConcurrentSlice[*watcher](), telemetry: telemetry.New(), actorPath: actorPath, @@ -558,7 +558,7 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er withAskTimeout(x.askTimeout.Load()), withCustomLogger(x.logger), withActorSystem(x.system), - withSupervisorStrategy(x.supervisorStrategy), + withSupervisorDirective(x.supervisorDirective), withMailboxSize(x.mailboxSize), withStash(x.stashCapacity.Load()), withMailbox(x.mailbox.Clone()), @@ -1326,45 +1326,6 @@ func (x *pid) handleReceived(received ReceiveContext) { } } -// supervise watches for child actor's failure and act based upon the supervisory strategy -func (x *pid) supervise(cid PID, watcher *watcher) { - for { - select { - case <-watcher.Done: - x.logger.Debugf("stop watching cid=(%s)", cid.ActorPath().String()) - return - case err := <-watcher.ErrChan: - x.logger.Errorf("child actor=(%s) is failing: Err=%v", cid.ActorPath().String(), err) - switch x.supervisorStrategy { - case StopDirective: - x.UnWatch(cid) - x.children.delete(cid.ActorPath()) - if err := cid.Shutdown(context.Background()); err != nil { - // this can enter into some infinite loop if we panic - // since we are just shutting down the actor we can just log the error - // TODO: rethink properly about PostStop error handling - x.logger.Error(err) - } - case RestartDirective: - x.UnWatch(cid) - if err := cid.Restart(context.Background()); err != nil { - x.logger.Panic(err) - } - x.Watch(cid) - default: - x.UnWatch(cid) - x.children.delete(cid.ActorPath()) - if err := cid.Shutdown(context.Background()); err != nil { - // this can enter into some infinite loop if we panic - // since we are just shutting down the actor we can just log the error - // TODO: rethink properly about PostStop error handling - x.logger.Error(err) - } - } - } - } -} - // passivationListener checks whether the actor is processing public or not. // when the actor is idle, it automatically shuts down to free resources func (x *pid) passivationListener() { diff --git a/actors/pid_option.go b/actors/pid_option.go index bbde8971..f95122ce 100644 --- a/actors/pid_option.go +++ b/actors/pid_option.go @@ -71,11 +71,11 @@ func withActorSystem(sys ActorSystem) pidOption { } } -// withSupervisorStrategy sets the supervisor strategy to used when dealing +// withSupervisorDirective sets the supervisor strategy to used when dealing // with child actors -func withSupervisorStrategy(strategy StrategyDirective) pidOption { +func withSupervisorDirective(directive supervisorDirective) pidOption { return func(pid *pid) { - pid.supervisorStrategy = strategy + pid.supervisorDirective = directive } } diff --git a/actors/pid_option_test.go b/actors/pid_option_test.go index c0946212..243a9f29 100644 --- a/actors/pid_option_test.go +++ b/actors/pid_option_test.go @@ -37,6 +37,7 @@ import ( func TestPIDOptions(t *testing.T) { mailbox := newReceiveContextBuffer(10) + resumeDirective := NewResumeDirective() var ( atomicDuration atomic.Duration atomicInt atomic.Int32 @@ -78,8 +79,8 @@ func TestPIDOptions(t *testing.T) { }, { name: "WithSupervisorStrategy", - option: withSupervisorStrategy(RestartDirective), - expected: &pid{supervisorStrategy: RestartDirective}, + option: withSupervisorDirective(resumeDirective), + expected: &pid{supervisorDirective: resumeDirective}, }, { name: "WithShutdownTimeout", diff --git a/actors/pid_test.go b/actors/pid_test.go index a992e6b9..24397076 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -380,7 +380,7 @@ func TestRestart(t *testing.T) { }) } func TestSupervisorStrategy(t *testing.T) { - t.Run("With happy path", func(t *testing.T) { + t.Run("With stop as supervisor directive", func(t *testing.T) { // create a test context ctx := context.TODO() // create the actor path @@ -391,6 +391,7 @@ func TestSupervisorStrategy(t *testing.T) { newSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), + withSupervisorDirective(NewStopDirective()), withAskTimeout(replyTimeout)) require.NoError(t, err) @@ -413,7 +414,7 @@ func TestSupervisorStrategy(t *testing.T) { err = parent.Shutdown(ctx) assert.NoError(t, err) }) - t.Run("With stop as default strategy", func(t *testing.T) { + t.Run("With the default supervisor directive", func(t *testing.T) { // create a test context ctx := context.TODO() // create the actor path @@ -466,7 +467,7 @@ func TestSupervisorStrategy(t *testing.T) { withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), withPassivationDisabled(), - withSupervisorStrategy(-1), // this is a rogue strategy which will default to a Stop + withSupervisorDirective(new(unhandledSupervisorDirective)), // only for test to handle default case withAskTimeout(replyTimeout)) require.NoError(t, err) @@ -494,7 +495,7 @@ func TestSupervisorStrategy(t *testing.T) { err = parent.Shutdown(ctx) assert.NoError(t, err) }) - t.Run("With stop as default strategy with child actor shutdown failure", func(t *testing.T) { + t.Run("With stop as supervisor directive with child actor shutdown failure", func(t *testing.T) { // create a test context ctx := context.TODO() // create the actor path @@ -506,7 +507,7 @@ func TestSupervisorStrategy(t *testing.T) { newSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withSupervisorStrategy(DefaultSupervisoryStrategy), + withSupervisorDirective(DefaultSupervisoryStrategy), withPassivationDisabled(), withAskTimeout(replyTimeout)) @@ -535,7 +536,7 @@ func TestSupervisorStrategy(t *testing.T) { err = parent.Shutdown(ctx) assert.NoError(t, err) }) - t.Run("With restart as default strategy", func(t *testing.T) { + t.Run("With restart as supervisor strategy", func(t *testing.T) { // create a test context ctx := context.TODO() @@ -549,7 +550,7 @@ func TestSupervisorStrategy(t *testing.T) { withInitMaxRetries(1), withCustomLogger(logger), withPassivationDisabled(), - withSupervisorStrategy(RestartDirective), + withSupervisorDirective(NewRestartDirective()), withAskTimeout(replyTimeout)) require.NoError(t, err) @@ -593,9 +594,6 @@ func TestSupervisorStrategy(t *testing.T) { require.NoError(t, err) assert.NotNil(t, parent) - // this is for the sake of the test - parent.supervisorStrategy = StrategyDirective(-1) - // create the child actor child, err := parent.SpawnChild(ctx, "SpawnChild", newSupervised()) assert.NoError(t, err) @@ -612,6 +610,97 @@ func TestSupervisorStrategy(t *testing.T) { assert.False(t, child.IsRunning()) assert.Len(t, parent.Children(), 0) + //stop the actor + err = parent.Shutdown(ctx) + assert.NoError(t, err) + }) + t.Run("With resume as supervisor strategy", func(t *testing.T) { + // create a test context + ctx := context.TODO() + + logger := log.New(log.DebugLevel, os.Stdout) + // create the actor path + actorPath := NewPath("Parent", NewAddress("sys", "host", 1)) + // create the parent actor + parent, err := newPID(ctx, + actorPath, + newSupervisor(), + withInitMaxRetries(1), + withCustomLogger(logger), + withPassivationDisabled(), + withSupervisorDirective(NewResumeDirective()), + withAskTimeout(replyTimeout)) + + require.NoError(t, err) + assert.NotNil(t, parent) + + // create the child actor + child, err := parent.SpawnChild(ctx, "SpawnChild", newSupervised()) + assert.NoError(t, err) + assert.NotNil(t, child) + + assert.Len(t, parent.Children(), 1) + // send a test panic message to the actor + assert.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + + // wait for the child to properly shutdown + time.Sleep(time.Second) + + // assert the actor state + assert.True(t, child.IsRunning()) + require.Len(t, parent.Children(), 1) + + reply, err := Ask(ctx, child, new(testpb.TestReply), time.Minute) + require.NoError(t, err) + require.NotNil(t, reply) + expected := new(testpb.Reply) + assert.True(t, proto.Equal(expected, reply)) + + //stop the actor + err = parent.Shutdown(ctx) + assert.NoError(t, err) + }) + t.Run("With restart with limit as supervisor strategy", func(t *testing.T) { + // create a test context + ctx := context.TODO() + + logger := log.New(log.DebugLevel, os.Stdout) + // create the actor path + actorPath := NewPath("Parent", NewAddress("sys", "host", 1)) + + // create the directive + restart := NewRestartDirective() + restart.WithLimit(2, time.Minute) + + // create the parent actor + parent, err := newPID(ctx, + actorPath, + newSupervisor(), + withInitMaxRetries(1), + withCustomLogger(logger), + withPassivationDisabled(), + withSupervisorDirective(restart), + withAskTimeout(replyTimeout)) + + require.NoError(t, err) + assert.NotNil(t, parent) + + // create the child actor + child, err := parent.SpawnChild(ctx, "SpawnChild", newSupervised()) + assert.NoError(t, err) + assert.NotNil(t, child) + + assert.Len(t, parent.Children(), 1) + // send a test panic message to the actor + assert.NoError(t, Tell(ctx, child, new(testpb.TestPanic))) + + // wait for the child to properly shutdown + time.Sleep(time.Second) + + // assert the actor state + assert.True(t, child.IsRunning()) + require.Len(t, parent.Children(), 1) + //stop the actor err = parent.Shutdown(ctx) assert.NoError(t, err) diff --git a/actors/supervisor.go b/actors/supervisor.go new file mode 100644 index 00000000..b801fc71 --- /dev/null +++ b/actors/supervisor.go @@ -0,0 +1,153 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package actors + +import ( + "context" + "time" + + "github.com/flowchartsman/retry" +) + +// supervisorDirective defines the supervisor directive +type supervisorDirective interface { + isSupervisorDirective() +} + +// StopDirective defines the supervisor stop directive +type StopDirective struct{} + +// NewStopDirective creates an instance of StopDirective +func NewStopDirective() *StopDirective { + return new(StopDirective) +} + +func (*StopDirective) isSupervisorDirective() {} + +// ResumeDirective defines the supervisor resume directive +// This ignores the failure and process the next message, instead +type ResumeDirective struct{} + +// NewResumeDirective creates an instance of ResumeDirective +func NewResumeDirective() *ResumeDirective { + return new(ResumeDirective) +} + +func (*ResumeDirective) isSupervisorDirective() {} + +// RestartDirective defines supervisor restart directive +type RestartDirective struct { + // Specifies the maximum number of retries + // When reaching this number the faulty actor is stopped + maxNumRetries uint32 + // Specifies the time range to restart the faulty actor + timeout time.Duration +} + +// MaxNumRetries returns the max num retries +func (x *RestartDirective) MaxNumRetries() uint32 { + return x.maxNumRetries +} + +// Timeout returns the timeout +func (x *RestartDirective) Timeout() time.Duration { + return x.timeout +} + +// NewRestartDirective creates an instance of RestartDirective +func NewRestartDirective() *RestartDirective { + return &RestartDirective{ + maxNumRetries: 0, + timeout: -1, + } +} + +// WithLimit sets the restart limit +func (x *RestartDirective) WithLimit(maxNumRetries uint32, timeout time.Duration) { + x.maxNumRetries = maxNumRetries + x.timeout = timeout +} + +func (*RestartDirective) isSupervisorDirective() {} + +// supervise watches for child actor's failure and act based upon the supervisory strategy +func (x *pid) supervise(cid PID, watcher *watcher) { + for { + select { + case <-watcher.Done: + x.logger.Debugf("stop watching cid=(%s)", cid.ActorPath().String()) + return + case err := <-watcher.ErrChan: + x.logger.Errorf("child actor=(%s) is failing: Err=%v", cid.ActorPath().String(), err) + switch directive := x.supervisorDirective.(type) { + case *StopDirective: + x.handleStopDirective(cid) + case *RestartDirective: + x.handleRestartDirective(cid, directive.MaxNumRetries(), directive.Timeout()) + case *ResumeDirective: + // pass + default: + x.handleStopDirective(cid) + } + } + } +} + +// handleStopDirective handles the supervisor stop directive +func (x *pid) handleStopDirective(cid PID) { + x.UnWatch(cid) + x.children.delete(cid.ActorPath()) + if err := cid.Shutdown(context.Background()); err != nil { + // this can enter into some infinite loop if we panic + // since we are just shutting down the actor we can just log the error + // TODO: rethink properly about PostStop error handling + x.logger.Error(err) + } +} + +// handleRestartDirective handles the supervisor restart directive +func (x *pid) handleRestartDirective(cid PID, maxRetries uint32, timeout time.Duration) { + x.UnWatch(cid) + ctx := context.Background() + var err error + if maxRetries == 0 || timeout <= 0 { + err = cid.Restart(ctx) + } else { + // TODO: handle the initial delay + retrier := retry.NewRetrier(int(maxRetries), 100*time.Millisecond, timeout) + err = retrier.RunContext(ctx, cid.Restart) + } + + if err != nil { + x.logger.Error(err) + // remove the actor and stop it + x.children.delete(cid.ActorPath()) + if err := cid.Shutdown(ctx); err != nil { + x.logger.Error(err) + } + return + } + x.Watch(cid) +} diff --git a/actors/types.go b/actors/types.go index b7870177..4d2c4605 100644 --- a/actors/types.go +++ b/actors/types.go @@ -30,16 +30,9 @@ import ( "github.com/tochemey/goakt/v2/goaktpb" ) -// StrategyDirective represents the supervisor strategy directive -type StrategyDirective int - const ( // protocol defines the Go-Akt addressing protocol protocol = "goakt" - // RestartDirective defines the restart strategy when handling actors failure - RestartDirective StrategyDirective = iota - // StopDirective defines the stop strategy when handling actors failure - StopDirective // DefaultPassivationTimeout defines the default passivation timeout DefaultPassivationTimeout = 2 * time.Minute @@ -47,8 +40,6 @@ const ( DefaultAskTimeout = 20 * time.Second // DefaultInitMaxRetries defines the default value for retrying actor initialization DefaultInitMaxRetries = 5 - // DefaultSupervisoryStrategy defines the default supervisory strategy - DefaultSupervisoryStrategy = StopDirective // DefaultShutdownTimeout defines the default shutdown timeout DefaultShutdownTimeout = 2 * time.Second // DefaultInitTimeout defines the default init timeout @@ -66,8 +57,11 @@ const ( eventsTopic = "topic.events" ) -// NoSender means that there is no sender -var NoSender PID - -// RemoteNoSender means that there is no sender -var RemoteNoSender = new(goaktpb.Address) +var ( + // NoSender means that there is no sender + NoSender PID + // DefaultSupervisoryStrategy defines the default supervisory strategy + DefaultSupervisoryStrategy = NewStopDirective() + // RemoteNoSender means that there is no sender + RemoteNoSender = new(goaktpb.Address) +) diff --git a/bench/bench_test.go b/bench/bench_test.go index 9d61631c..264c3907 100644 --- a/bench/bench_test.go +++ b/bench/bench_test.go @@ -67,6 +67,7 @@ func BenchmarkActor(b *testing.B) { actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), actors.WithMailboxSize(uint64(b.N)), + actors.WithSupervisorDirective(actors.NewStopDirective()), actors.WithReplyTimeout(receivingTimeout)) // start the actor system