Skip to content

Commit

Permalink
feat: add router and fix (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Jul 15, 2024
1 parent 65635a6 commit 4c9846b
Show file tree
Hide file tree
Showing 22 changed files with 1,403 additions and 96 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The project adheres to [Semantic Versioning](https://semver.org) and [Convention
- [Actor System](#actor-system)
- [Behaviors](#behaviors)
- [Mailbox](#mailbox)
- [Routers](#routers)
- [Events Stream](#events-stream)
- [Supported events](#supported-events)
- [Messaging](#messaging)
Expand Down Expand Up @@ -123,8 +124,11 @@ The fundamental building blocks of Go-Akt are actors.
- supervise the failure behavior of (child) actors. The supervisory strategy to adopt is set during its creation.
In Go-Akt that each child actor is treated separately. There is no concept of one-for-one and one-for-all strategies.
The following directives are supported:
- [`Restart`](./actors/types.go): to restart the child actor
- [`Stop`](./actors/types.go): to stop the child actor which is the default one
- [`Restart`](./actors/supervisor.go): to restart the child actor. One can control how the restart is done using the following options:
- `maxNumRetries`: defines the maximum of restart attempts
- `timeout`: how to attempt restarting the faulty actor
- [`Stop`](./actors/supervisor.go): to stop the child actor which is the default one
- [`Resume`](./actors/supervisor.go): ignores the failure and process the next message, instead
- remotely lookup for an actor on another node via their process id [`PID`](./actors/pid.go) `RemoteLookup`.
This
allows it to send messages remotely via `RemoteAsk` or `RemoteTell` methods
Expand Down Expand Up @@ -177,6 +181,10 @@ To change the behavior, call the following methods on the [ReceiveContext interf

Once can implement a custom mailbox. See [Mailbox](./actors/mailbox.go). The default mailbox makes use of buffered channels.

### Routers

Go-Akt comes shipped with some routers. See [docs](./router/router.md)

### Events Stream

To receive some system events and act on them for some particular business cases, you just need to call the actor system `Subscribe`.
Expand Down
37 changes: 17 additions & 20 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ type ActorSystem interface {
handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
// handleRemoteTell handles an asynchronous message to an actor
handleRemoteTell(ctx context.Context, to PID, message proto.Message) error
// setActor sets actor in the actor system actors registry
setActor(actor PID)
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -465,16 +467,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
return nil, err
}

x.actors.set(pid)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
ActorName: name,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: types.NameOf(actor),
}
}

x.setActor(pid)
return pid, nil
}

Expand Down Expand Up @@ -539,16 +532,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
return nil, err
}

x.actors.set(pid)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
ActorName: actorID,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: types.NameOf(actor),
}
}

x.setActor(pid)
return pid, nil
}

Expand Down Expand Up @@ -1170,6 +1154,19 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot
return Tell(spanCtx, to, message)
}

// setActor implements ActorSystem.
func (x *actorSystem) setActor(actor PID) {
x.actors.set(actor)
if x.clusterEnabled.Load() {
x.actorsChan <- &internalpb.WireActor{
ActorName: actor.Name(),
ActorAddress: actor.ActorPath().RemoteAddress(),
ActorPath: actor.ActorPath().String(),
ActorType: types.NameOf(actor),
}
}
}

// enableClustering enables clustering. When clustering is enabled remoting is also enabled to facilitate remote
// communication
func (x *actorSystem) enableClustering(ctx context.Context) error {
Expand Down
3 changes: 0 additions & 3 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (x *userActor) Receive(ctx ReceiveContext) {
// Authenticated behavior is executed when the actor receive the TestAuth message
func (x *userActor) Authenticated(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.TestReadiness:
ctx.Response(new(testspb.TestReady))
ctx.UnBecome()
Expand All @@ -206,7 +205,6 @@ func (x *userActor) Authenticated(ctx ReceiveContext) {

func (x *userActor) CreditAccount(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.CreditAccount:
ctx.Response(new(testspb.AccountCredited))
ctx.BecomeStacked(x.DebitAccount)
Expand All @@ -217,7 +215,6 @@ func (x *userActor) CreditAccount(ctx ReceiveContext) {

func (x *userActor) DebitAccount(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.DebitAccount:
ctx.Response(new(testspb.AccountDebited))
ctx.UnBecomeStacked()
Expand Down
6 changes: 4 additions & 2 deletions actors/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ func (c *receiveContext) Self() PID {
// Err is used instead of panicking within a message handler.
// One can also call panic which is not the recommended way
func (c *receiveContext) Err(err error) {
// this will be recovered
panic(err)
if err != nil {
// this will be recovered
panic(err)
}
}

// Response sets the message response
Expand Down
22 changes: 14 additions & 8 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type PID interface {
// push a message to the actor's receiveContextBuffer
doReceive(ctx ReceiveContext)
// watchers returns the list of watchMen
watchers() *slices.ConcurrentSlice[*watcher]
watchers() *slices.ThreadSafe[*watcher]
// setBehavior is a utility function that helps set the actor behavior
setBehavior(behavior Behavior)
// setBehaviorStacked adds a behavior to the actor's behaviors
Expand Down Expand Up @@ -227,7 +227,7 @@ type pid struct {
haltPassivationLnr chan types.Unit

// set of watchersList watching the given actor
watchersList *slices.ConcurrentSlice[*watcher]
watchersList *slices.ThreadSafe[*watcher]

// hold the list of the children
children *pidMap
Expand Down Expand Up @@ -257,7 +257,7 @@ type pid struct {
// http client
httpClient *stdhttp.Client

// specifies the current actor behavior
// specifies the actor behavior stack
behaviorStack *behaviorStack

// stash settings
Expand Down Expand Up @@ -292,7 +292,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
mailboxSize: DefaultMailboxSize,
children: newPIDMap(10),
supervisorDirective: DefaultSupervisoryStrategy,
watchersList: slices.NewConcurrentSlice[*watcher](),
watchersList: slices.NewThreadSafe[*watcher](),
telemetry: telemetry.New(),
actorPath: actorPath,
rwLocker: &sync.RWMutex{},
Expand Down Expand Up @@ -604,6 +604,11 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er
})
}

// set the actor in the given actor system registry
if x.ActorSystem() != nil {
x.ActorSystem().setActor(cid)
}

return cid, nil
}

Expand Down Expand Up @@ -1182,7 +1187,7 @@ func (x *pid) UnWatch(pid PID) {
}

// Watchers return the list of watchersList
func (x *pid) watchers() *slices.ConcurrentSlice[*watcher] {
func (x *pid) watchers() *slices.ThreadSafe[*watcher] {
return x.watchersList
}

Expand Down Expand Up @@ -1247,7 +1252,7 @@ func (x *pid) reset() {
x.lastProcessingDuration.Store(0)
x.initTimeout.Store(DefaultInitTimeout)
x.children = newPIDMap(10)
x.watchersList = slices.NewConcurrentSlice[*watcher]()
x.watchersList = slices.NewThreadSafe[*watcher]()
x.telemetry = telemetry.New()
x.mailbox.Reset()
x.resetBehavior()
Expand All @@ -1265,7 +1270,9 @@ func (x *pid) freeWatchers(ctx context.Context) {
if watchers.Len() > 0 {
for item := range watchers.Iter() {
watcher := item.Value
terminated := &goaktpb.Terminated{}
terminated := &goaktpb.Terminated{
ActorId: x.ID(),
}
if watcher.WatcherID.IsRunning() {
// TODO: handle error and push to some system dead-letters queue
_ = x.Tell(ctx, watcher.WatcherID, terminated)
Expand Down Expand Up @@ -1394,7 +1401,6 @@ func (x *pid) setBehavior(behavior Behavior) {
// resetBehavior is a utility function resets the actor behavior
func (x *pid) resetBehavior() {
x.rwLocker.Lock()
x.behaviorStack.Clear()
x.behaviorStack.Push(x.Receive)
x.rwLocker.Unlock()
}
Expand Down
Loading

0 comments on commit 4c9846b

Please sign in to comment.