Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor supervisor strategy #387

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type actorSystem struct {
// The default value is 1s
actorInitTimeout time.Duration
// Specifies the supervisor strategy
supervisorStrategy StrategyDirective
supervisorDirective supervisorDirective
// Specifies the telemetry config
telemetry *telemetry.Telemetry
// Specifies whether remoting is enabled.
Expand Down Expand Up @@ -240,7 +240,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
expireActorAfter: DefaultPassivationTimeout,
askTimeout: DefaultAskTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorStrategy: DefaultSupervisoryStrategy,
supervisorDirective: DefaultSupervisoryStrategy,
telemetry: telemetry.New(),
locker: sync.Mutex{},
shutdownTimeout: DefaultShutdownTimeout,
Expand Down Expand Up @@ -437,7 +437,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorStrategy(x.supervisorStrategy),
withSupervisorDirective(x.supervisorDirective),
withMailboxSize(x.mailboxSize),
withMailbox(mailbox), // nil mailbox is taken care during initiliazation by the newPID
withStash(x.stashCapacity),
Expand Down Expand Up @@ -490,7 +490,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
}

actorID := uuid.NewString()
actor := newFnActor(actorID, receiveFunc, opts...)
actor := newFuncActor(actorID, receiveFunc, opts...)

actorPath := NewPath(actorID, NewAddress(x.name, "", -1))
if x.remotingEnabled.Load() {
Expand All @@ -511,7 +511,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorStrategy(x.supervisorStrategy),
withSupervisorDirective(x.supervisorDirective),
withMailboxSize(x.mailboxSize),
withMailbox(mailbox), // nil mailbox is taken care during initiliazation by the newPID
withStash(x.stashCapacity),
Expand Down
12 changes: 9 additions & 3 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,24 @@ func newSupervised() *supervised {
return &supervised{}
}

func (c *supervised) PreStart(context.Context) error {
func (x *supervised) PreStart(context.Context) error {
return nil
}

func (c *supervised) Receive(ctx ReceiveContext) {
func (x *supervised) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.TestSend:
case *testspb.TestReply:
ctx.Response(new(testspb.Reply))
case *testspb.TestPanic:
panic("panicked")
default:
panic(ErrUnhandled)
}
}

func (c *supervised) PostStop(context.Context) error {
func (x *supervised) PostStop(context.Context) error {
return nil
}

Expand Down Expand Up @@ -491,3 +493,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
// return the cluster startNode
return system, provider
}

type unhandledSupervisorDirective struct{}

func (x unhandledSupervisorDirective) isSupervisorDirective() {}
10 changes: 5 additions & 5 deletions actors/func_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ type funcActor struct {
postStop PostStopFunc
}

// newFnActor creates an instance of funcActor
func newFnActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor {
// newFuncActor creates an instance of funcActor
func newFuncActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor {
// create the actor instance
actor := &funcActor{
receiveFunc: receiveFunc,
Expand All @@ -98,7 +98,7 @@ func newFnActor(id string, receiveFunc ReceiveFunc, opts ...FuncOption) Actor {
var _ Actor = (*funcActor)(nil)

// PreStart pre-starts the actor.
func (x funcActor) PreStart(ctx context.Context) error {
func (x *funcActor) PreStart(ctx context.Context) error {
// check whether the pre-start hook is set and call it
preStart := x.preStart
if preStart != nil {
Expand All @@ -108,7 +108,7 @@ func (x funcActor) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (x funcActor) Receive(ctx ReceiveContext) {
func (x *funcActor) Receive(ctx ReceiveContext) {
switch m := ctx.Message().(type) {
case *goaktpb.PostStart:
x.pid = ctx.Self()
Expand All @@ -121,7 +121,7 @@ func (x funcActor) Receive(ctx ReceiveContext) {
}

// PostStop is executed when the actor is shutting down.
func (x funcActor) PostStop(ctx context.Context) error {
func (x *funcActor) PostStop(ctx context.Context) error {
// check whether the pre-start hook is set and call it
postStop := x.postStop
if postStop != nil {
Expand Down
6 changes: 3 additions & 3 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func WithPassivationDisabled() Option {
})
}

// WithSupervisorStrategy sets the supervisor strategy
func WithSupervisorStrategy(strategy StrategyDirective) Option {
// WithSupervisorDirective sets the supervisor strategy directive
func WithSupervisorDirective(directive supervisorDirective) Option {
return OptionFunc(func(a *actorSystem) {
a.supervisorStrategy = strategy
a.supervisorDirective = directive
})
}

Expand Down
7 changes: 4 additions & 3 deletions actors/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
func TestOption(t *testing.T) {
tel := telemetry.New()
mailbox := newReceiveContextBuffer(10)
resumeDirective := NewResumeDirective()
var atomicTrue atomic.Bool
atomicTrue.Store(true)
clusterConfig := NewClusterConfig()
Expand Down Expand Up @@ -74,9 +75,9 @@ func TestOption(t *testing.T) {
expected: actorSystem{expireActorAfter: -1},
},
{
name: "WithSupervisorStrategy",
option: WithSupervisorStrategy(RestartDirective),
expected: actorSystem{supervisorStrategy: RestartDirective},
name: "WithSupervisorDirective",
option: WithSupervisorDirective(resumeDirective),
expected: actorSystem{supervisorDirective: resumeDirective},
},
{
name: "WithRemoting",
Expand Down
45 changes: 3 additions & 42 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type pid struct {
processingTimeLocker *sync.Mutex

// supervisor strategy
supervisorStrategy StrategyDirective
supervisorDirective supervisorDirective

// observability settings
telemetry *telemetry.Telemetry
Expand Down Expand Up @@ -291,7 +291,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
logger: log.DefaultLogger,
mailboxSize: DefaultMailboxSize,
children: newPIDMap(10),
supervisorStrategy: DefaultSupervisoryStrategy,
supervisorDirective: DefaultSupervisoryStrategy,
watchersList: slices.NewConcurrentSlice[*watcher](),
telemetry: telemetry.New(),
actorPath: actorPath,
Expand Down Expand Up @@ -558,7 +558,7 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er
withAskTimeout(x.askTimeout.Load()),
withCustomLogger(x.logger),
withActorSystem(x.system),
withSupervisorStrategy(x.supervisorStrategy),
withSupervisorDirective(x.supervisorDirective),
withMailboxSize(x.mailboxSize),
withStash(x.stashCapacity.Load()),
withMailbox(x.mailbox.Clone()),
Expand Down Expand Up @@ -1326,45 +1326,6 @@ func (x *pid) handleReceived(received ReceiveContext) {
}
}

// supervise watches for child actor's failure and act based upon the supervisory strategy
func (x *pid) supervise(cid PID, watcher *watcher) {
for {
select {
case <-watcher.Done:
x.logger.Debugf("stop watching cid=(%s)", cid.ActorPath().String())
return
case err := <-watcher.ErrChan:
x.logger.Errorf("child actor=(%s) is failing: Err=%v", cid.ActorPath().String(), err)
switch x.supervisorStrategy {
case StopDirective:
x.UnWatch(cid)
x.children.delete(cid.ActorPath())
if err := cid.Shutdown(context.Background()); err != nil {
// this can enter into some infinite loop if we panic
// since we are just shutting down the actor we can just log the error
// TODO: rethink properly about PostStop error handling
x.logger.Error(err)
}
case RestartDirective:
x.UnWatch(cid)
if err := cid.Restart(context.Background()); err != nil {
x.logger.Panic(err)
}
x.Watch(cid)
default:
x.UnWatch(cid)
x.children.delete(cid.ActorPath())
if err := cid.Shutdown(context.Background()); err != nil {
// this can enter into some infinite loop if we panic
// since we are just shutting down the actor we can just log the error
// TODO: rethink properly about PostStop error handling
x.logger.Error(err)
}
}
}
}
}

// passivationListener checks whether the actor is processing public or not.
// when the actor is idle, it automatically shuts down to free resources
func (x *pid) passivationListener() {
Expand Down
6 changes: 3 additions & 3 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func withActorSystem(sys ActorSystem) pidOption {
}
}

// withSupervisorStrategy sets the supervisor strategy to used when dealing
// withSupervisorDirective sets the supervisor strategy to used when dealing
// with child actors
func withSupervisorStrategy(strategy StrategyDirective) pidOption {
func withSupervisorDirective(directive supervisorDirective) pidOption {
return func(pid *pid) {
pid.supervisorStrategy = strategy
pid.supervisorDirective = directive
}
}

Expand Down
5 changes: 3 additions & 2 deletions actors/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

func TestPIDOptions(t *testing.T) {
mailbox := newReceiveContextBuffer(10)
resumeDirective := NewResumeDirective()
var (
atomicDuration atomic.Duration
atomicInt atomic.Int32
Expand Down Expand Up @@ -78,8 +79,8 @@ func TestPIDOptions(t *testing.T) {
},
{
name: "WithSupervisorStrategy",
option: withSupervisorStrategy(RestartDirective),
expected: &pid{supervisorStrategy: RestartDirective},
option: withSupervisorDirective(resumeDirective),
expected: &pid{supervisorDirective: resumeDirective},
},
{
name: "WithShutdownTimeout",
Expand Down
Loading
Loading