Skip to content

Commit

Permalink
refactor: refactor supervisor strategy (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Jul 11, 2024
1 parent cc01002 commit e86f0e4
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 90 deletions.
10 changes: 5 additions & 5 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type actorSystem struct {
// The default value is 1s
actorInitTimeout time.Duration
// Specifies the supervisor strategy
supervisorStrategy StrategyDirective
supervisorDirective supervisorDirective
// Specifies the telemetry config
telemetry *telemetry.Telemetry
// Specifies whether remoting is enabled.
Expand Down Expand Up @@ -240,7 +240,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
expireActorAfter: DefaultPassivationTimeout,
askTimeout: DefaultAskTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorStrategy: DefaultSupervisoryStrategy,
supervisorDirective: DefaultSupervisoryStrategy,
telemetry: telemetry.New(),
locker: sync.Mutex{},
shutdownTimeout: DefaultShutdownTimeout,
Expand Down Expand Up @@ -437,7 +437,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorStrategy(x.supervisorStrategy),
withSupervisorDirective(x.supervisorDirective),
withMailboxSize(x.mailboxSize),
withMailbox(mailbox), // nil mailbox is taken care during initiliazation by the newPID
withStash(x.stashCapacity),
Expand Down Expand Up @@ -490,7 +490,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
}

actorID := uuid.NewString()
actor := newFnActor(actorID, receiveFunc, opts...)
actor := newFuncActor(actorID, receiveFunc, opts...)

actorPath := NewPath(actorID, NewAddress(x.name, "", -1))
if x.remotingEnabled.Load() {
Expand All @@ -511,7 +511,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorStrategy(x.supervisorStrategy),
withSupervisorDirective(x.supervisorDirective),
withMailboxSize(x.mailboxSize),
withMailbox(mailbox), // nil mailbox is taken care during initiliazation by the newPID
withStash(x.stashCapacity),
Expand Down
12 changes: 9 additions & 3 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,24 @@ func newSupervised() *supervised {
return &supervised{}
}

func (c *supervised) PreStart(context.Context) error {
func (x *supervised) PreStart(context.Context) error {
return nil
}

func (c *supervised) Receive(ctx ReceiveContext) {
func (x *supervised) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.TestSend:
case *testspb.TestReply:
ctx.Response(new(testspb.Reply))
case *testspb.TestPanic:
panic("panicked")
default:
panic(ErrUnhandled)
}
}

func (c *supervised) PostStop(context.Context) error {
func (x *supervised) PostStop(context.Context) error {
return nil
}

Expand Down Expand Up @@ -491,3 +493,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
// return the cluster startNode
return system, provider
}

type unhandledSupervisorDirective struct{}

func (x unhandledSupervisorDirective) isSupervisorDirective() {}
10 changes: 5 additions & 5 deletions actors/func_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ type funcActor struct {
postStop PostStopFunc
}

// newFnActor creates an instance of funcActor
func newFnActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor {
// newFuncActor creates an instance of funcActor
func newFuncActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor {
// create the actor instance
actor := &funcActor{
receiveFunc: receiveFunc,
Expand All @@ -98,7 +98,7 @@ func newFnActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor {
var _ Actor = (*funcActor)(nil)

// PreStart pre-starts the actor.
func (x funcActor) PreStart(ctx context.Context) error {
func (x *funcActor) PreStart(ctx context.Context) error {
// check whether the pre-start hook is set and call it
preStart := x.preStart
if preStart != nil {
Expand All @@ -108,7 +108,7 @@ func (x funcActor) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (x funcActor) Receive(ctx ReceiveContext) {
func (x *funcActor) Receive(ctx ReceiveContext) {
switch m := ctx.Message().(type) {
case *goaktpb.PostStart:
x.pid = ctx.Self()
Expand All @@ -121,7 +121,7 @@ func (x funcActor) Receive(ctx ReceiveContext) {
}

// PostStop is executed when the actor is shutting down.
func (x funcActor) PostStop(ctx context.Context) error {
func (x *funcActor) PostStop(ctx context.Context) error {
// check whether the pre-start hook is set and call it
postStop := x.postStop
if postStop != nil {
Expand Down
6 changes: 3 additions & 3 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func WithPassivationDisabled() Option {
})
}

// WithSupervisorStrategy sets the supervisor strategy
func WithSupervisorStrategy(strategy StrategyDirective) Option {
// WithSupervisorDirective sets the supervisor strategy directive
func WithSupervisorDirective(directive supervisorDirective) Option {
return OptionFunc(func(a *actorSystem) {
a.supervisorStrategy = strategy
a.supervisorDirective = directive
})
}

Expand Down
7 changes: 4 additions & 3 deletions actors/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
func TestOption(t *testing.T) {
tel := telemetry.New()
mailbox := newReceiveContextBuffer(10)
resumeDirective := NewResumeDirective()
var atomicTrue atomic.Bool
atomicTrue.Store(true)
clusterConfig := NewClusterConfig()
Expand Down Expand Up @@ -74,9 +75,9 @@ func TestOption(t *testing.T) {
expected: actorSystem{expireActorAfter: -1},
},
{
name: "WithSupervisorStrategy",
option: WithSupervisorStrategy(RestartDirective),
expected: actorSystem{supervisorStrategy: RestartDirective},
name: "WithSupervisorDirective",
option: WithSupervisorDirective(resumeDirective),
expected: actorSystem{supervisorDirective: resumeDirective},
},
{
name: "WithRemoting",
Expand Down
45 changes: 3 additions & 42 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type pid struct {
processingTimeLocker *sync.Mutex

// supervisor strategy
supervisorStrategy StrategyDirective
supervisorDirective supervisorDirective

// observability settings
telemetry *telemetry.Telemetry
Expand Down Expand Up @@ -291,7 +291,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
logger: log.DefaultLogger,
mailboxSize: DefaultMailboxSize,
children: newPIDMap(10),
supervisorStrategy: DefaultSupervisoryStrategy,
supervisorDirective: DefaultSupervisoryStrategy,
watchersList: slices.NewConcurrentSlice[*watcher](),
telemetry: telemetry.New(),
actorPath: actorPath,
Expand Down Expand Up @@ -558,7 +558,7 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er
withAskTimeout(x.askTimeout.Load()),
withCustomLogger(x.logger),
withActorSystem(x.system),
withSupervisorStrategy(x.supervisorStrategy),
withSupervisorDirective(x.supervisorDirective),
withMailboxSize(x.mailboxSize),
withStash(x.stashCapacity.Load()),
withMailbox(x.mailbox.Clone()),
Expand Down Expand Up @@ -1326,45 +1326,6 @@ func (x *pid) handleReceived(received ReceiveContext) {
}
}

// supervise watches for child actor's failure and act based upon the supervisory strategy
func (x *pid) supervise(cid PID, watcher *watcher) {
for {
select {
case <-watcher.Done:
x.logger.Debugf("stop watching cid=(%s)", cid.ActorPath().String())
return
case err := <-watcher.ErrChan:
x.logger.Errorf("child actor=(%s) is failing: Err=%v", cid.ActorPath().String(), err)
switch x.supervisorStrategy {
case StopDirective:
x.UnWatch(cid)
x.children.delete(cid.ActorPath())
if err := cid.Shutdown(context.Background()); err != nil {
// this can enter into some infinite loop if we panic
// since we are just shutting down the actor we can just log the error
// TODO: rethink properly about PostStop error handling
x.logger.Error(err)
}
case RestartDirective:
x.UnWatch(cid)
if err := cid.Restart(context.Background()); err != nil {
x.logger.Panic(err)
}
x.Watch(cid)
default:
x.UnWatch(cid)
x.children.delete(cid.ActorPath())
if err := cid.Shutdown(context.Background()); err != nil {
// this can enter into some infinite loop if we panic
// since we are just shutting down the actor we can just log the error
// TODO: rethink properly about PostStop error handling
x.logger.Error(err)
}
}
}
}
}

// passivationListener checks whether the actor is processing public or not.
// when the actor is idle, it automatically shuts down to free resources
func (x *pid) passivationListener() {
Expand Down
6 changes: 3 additions & 3 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func withActorSystem(sys ActorSystem) pidOption {
}
}

// withSupervisorStrategy sets the supervisor strategy to used when dealing
// withSupervisorDirective sets the supervisor strategy to used when dealing
// with child actors
func withSupervisorStrategy(strategy StrategyDirective) pidOption {
func withSupervisorDirective(directive supervisorDirective) pidOption {
return func(pid *pid) {
pid.supervisorStrategy = strategy
pid.supervisorDirective = directive
}
}

Expand Down
5 changes: 3 additions & 2 deletions actors/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

func TestPIDOptions(t *testing.T) {
mailbox := newReceiveContextBuffer(10)
resumeDirective := NewResumeDirective()
var (
atomicDuration atomic.Duration
atomicInt atomic.Int32
Expand Down Expand Up @@ -78,8 +79,8 @@ func TestPIDOptions(t *testing.T) {
},
{
name: "WithSupervisorStrategy",
option: withSupervisorStrategy(RestartDirective),
expected: &pid{supervisorStrategy: RestartDirective},
option: withSupervisorDirective(resumeDirective),
expected: &pid{supervisorDirective: resumeDirective},
},
{
name: "WithShutdownTimeout",
Expand Down
Loading

0 comments on commit e86f0e4

Please sign in to comment.