Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add router and fix #393

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The project adheres to [Semantic Versioning](https://semver.org) and [Convention
- [Actor System](#actor-system)
- [Behaviors](#behaviors)
- [Mailbox](#mailbox)
- [Routers](#routers)
- [Events Stream](#events-stream)
- [Supported events](#supported-events)
- [Messaging](#messaging)
Expand Down Expand Up @@ -123,8 +124,11 @@ The fundamental building blocks of Go-Akt are actors.
- supervise the failure behavior of (child) actors. The supervisory strategy to adopt is set during its creation.
In Go-Akt that each child actor is treated separately. There is no concept of one-for-one and one-for-all strategies.
The following directives are supported:
- [`Restart`](./actors/types.go): to restart the child actor
- [`Stop`](./actors/types.go): to stop the child actor which is the default one
- [`Restart`](./actors/supervisor.go): to restart the child actor. One can control how the restart is done using the following options:
- `maxNumRetries`: defines the maximum of restart attempts
- `timeout`: how to attempt restarting the faulty actor
- [`Stop`](./actors/supervisor.go): to stop the child actor which is the default one
- [`Resume`](./actors/supervisor.go): ignores the failure and process the next message, instead
- remotely lookup for an actor on another node via their process id [`PID`](./actors/pid.go) `RemoteLookup`.
This
allows it to send messages remotely via `RemoteAsk` or `RemoteTell` methods
Expand Down Expand Up @@ -177,6 +181,10 @@ To change the behavior, call the following methods on the [ReceiveContext interf

Once can implement a custom mailbox. See [Mailbox](./actors/mailbox.go). The default mailbox makes use of buffered channels.

### Routers

Go-Akt comes shipped with some routers. See [docs](./router/router.md)

### Events Stream

To receive some system events and act on them for some particular business cases, you just need to call the actor system `Subscribe`.
Expand Down
37 changes: 17 additions & 20 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ type ActorSystem interface {
handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to PID, message proto.Message) error
// setActor sets actor in the actor system actors registry
setActor(actor PID)
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -465,16 +467,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
return nil, err
}

x.actors.set(pid)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
ActorName: name,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: types.NameOf(actor),
}
}

x.setActor(pid)
return pid, nil
}

Expand Down Expand Up @@ -539,16 +532,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
return nil, err
}

x.actors.set(pid)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
ActorName: actorID,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: types.NameOf(actor),
}
}

x.setActor(pid)
return pid, nil
}

Expand Down Expand Up @@ -1170,6 +1154,19 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot
return Tell(spanCtx, to, message)
}

// setActor implements ActorSystem.
func (x *actorSystem) setActor(actor PID) {
x.actors.set(actor)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
ActorName: actor.Name(),
ActorAddress: actor.ActorPath().RemoteAddress(),
ActorPath: actor.ActorPath().String(),
ActorType: types.NameOf(actor),
}
}
}

// enableClustering enables clustering. When clustering is enabled remoting is also enabled to facilitate remote
// communication
func (x *actorSystem) enableClustering(ctx context.Context) error {
Expand Down
3 changes: 0 additions & 3 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (x *userActor) Receive(ctx ReceiveContext) {
// Authenticated behavior is executed when the actor receive the TestAuth message
func (x *userActor) Authenticated(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.TestReadiness:
ctx.Response(new(testspb.TestReady))
ctx.UnBecome()
Expand All @@ -206,7 +205,6 @@ func (x *userActor) Authenticated(ctx ReceiveContext) {

func (x *userActor) CreditAccount(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.CreditAccount:
ctx.Response(new(testspb.AccountCredited))
ctx.BecomeStacked(x.DebitAccount)
Expand All @@ -217,7 +215,6 @@ func (x *userActor) CreditAccount(ctx ReceiveContext) {

func (x *userActor) DebitAccount(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.DebitAccount:
ctx.Response(new(testspb.AccountDebited))
ctx.UnBecomeStacked()
Expand Down
6 changes: 4 additions & 2 deletions actors/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ func (c *receiveContext) Self() PID {
// Err is used instead of panicking within a message handler.
// One can also call panic which is not the recommended way
func (c *receiveContext) Err(err error) {
// this will be recovered
panic(err)
if err != nil {
// this will be recovered
panic(err)
}
}

// Response sets the message response
Expand Down
22 changes: 14 additions & 8 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type PID interface {
// push a message to the actor's receiveContextBuffer
doReceive(ctx ReceiveContext)
// watchers returns the list of watchMen
watchers() *slices.ConcurrentSlice[*watcher]
watchers() *slices.ThreadSafe[*watcher]
// setBehavior is a utility function that helps set the actor behavior
setBehavior(behavior Behavior)
// setBehaviorStacked adds a behavior to the actor's behaviors
Expand Down Expand Up @@ -227,7 +227,7 @@ type pid struct {
haltPassivationLnr chan types.Unit

// set of watchersList watching the given actor
watchersList *slices.ConcurrentSlice[*watcher]
watchersList *slices.ThreadSafe[*watcher]

// hold the list of the children
children *pidMap
Expand Down Expand Up @@ -257,7 +257,7 @@ type pid struct {
// http client
httpClient *stdhttp.Client

// specifies the current actor behavior
// specifies the actor behavior stack
behaviorStack *behaviorStack

// stash settings
Expand Down Expand Up @@ -292,7 +292,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
mailboxSize: DefaultMailboxSize,
children: newPIDMap(10),
supervisorDirective: DefaultSupervisoryStrategy,
watchersList: slices.NewConcurrentSlice[*watcher](),
watchersList: slices.NewThreadSafe[*watcher](),
telemetry: telemetry.New(),
actorPath: actorPath,
rwLocker: &sync.RWMutex{},
Expand Down Expand Up @@ -604,6 +604,11 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er
})
}

// set the actor in the given actor system registry
if x.ActorSystem() != nil {
x.ActorSystem().setActor(cid)
}

return cid, nil
}

Expand Down Expand Up @@ -1182,7 +1187,7 @@ func (x *pid) UnWatch(pid PID) {
}

// Watchers return the list of watchersList
func (x *pid) watchers() *slices.ConcurrentSlice[*watcher] {
func (x *pid) watchers() *slices.ThreadSafe[*watcher] {
return x.watchersList
}

Expand Down Expand Up @@ -1247,7 +1252,7 @@ func (x *pid) reset() {
x.lastProcessingDuration.Store(0)
x.initTimeout.Store(DefaultInitTimeout)
x.children = newPIDMap(10)
x.watchersList = slices.NewConcurrentSlice[*watcher]()
x.watchersList = slices.NewThreadSafe[*watcher]()
x.telemetry = telemetry.New()
x.mailbox.Reset()
x.resetBehavior()
Expand All @@ -1265,7 +1270,9 @@ func (x *pid) freeWatchers(ctx context.Context) {
if watchers.Len() > 0 {
for item := range watchers.Iter() {
watcher := item.Value
terminated := &goaktpb.Terminated{}
terminated := &goaktpb.Terminated{
ActorId: x.ID(),
}
if watcher.WatcherID.IsRunning() {
// TODO: handle error and push to some system dead-letters queue
_ = x.Tell(ctx, watcher.WatcherID, terminated)
Expand Down Expand Up @@ -1394,7 +1401,6 @@ func (x *pid) setBehavior(behavior Behavior) {
// resetBehavior is a utility function resets the actor behavior
func (x *pid) resetBehavior() {
x.rwLocker.Lock()
x.behaviorStack.Clear()
x.behaviorStack.Push(x.Receive)
x.rwLocker.Unlock()
}
Expand Down
Loading
Loading