Skip to content

Commit

Permalink
perf: actor are encoded as byte array into the cluster. refactor Acto…
Browse files Browse the repository at this point in the history
…rOf logic (#307)
  • Loading branch information
Tochemey authored Apr 27, 2024
1 parent 4a57506 commit 197b2a3
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 222 deletions.
25 changes: 17 additions & 8 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"fmt"
"net"
"net/http"
"reflect"
"regexp"
"sync"
"time"
Expand Down Expand Up @@ -441,12 +442,15 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
}

x.actors.Set(pid)
x.registry.Register(actor)

if x.clusterEnabled.Load() {
actorType := reflect.TypeOf(actor).Elem().Name()
x.clusterChan <- &internalpb.WireActor{
ActorName: name,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: actorType,
}
}

Expand Down Expand Up @@ -509,12 +513,15 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc
}

x.actors.Set(pid)
x.registry.Register(actor)

if x.clusterEnabled.Load() {
actorType := reflect.TypeOf(actor).Elem().Name()
x.clusterChan <- &internalpb.WireActor{
ActorName: actorID,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: actorType,
}
}

Expand Down Expand Up @@ -626,6 +633,15 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak
return nil, nil, ErrActorSystemNotStarted
}

// first check whether the actor exist locally
items := x.actors.List()
for _, actorRef := range items {
if actorRef.ActorPath().Name() == actorName {
return actorRef.ActorPath().RemoteAddress(), actorRef, nil
}
}

// check in the cluster
if x.cluster != nil || x.clusterEnabled.Load() {
wireActor, err := x.cluster.GetActor(spanCtx, actorName)
if err != nil {
Expand All @@ -652,13 +668,6 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak
return nil, nil, ErrMethodCallNotAllowed
}

items := x.actors.List()
for _, actorRef := range items {
if actorRef.ActorPath().Name() == actorName {
return actorRef.ActorPath().RemoteAddress(), actorRef, nil
}
}

x.logger.Infof("actor=%s not found", actorName)
e := ErrActorNotFound(actorName)
span.SetStatus(codes.Error, "ActorOf")
Expand Down Expand Up @@ -1004,7 +1013,7 @@ func (x *actorSystem) RemoteStop(ctx context.Context, request *connect.Request[i
return connect.NewResponse(new(internalpb.RemoteStopResponse)), nil
}

// RemoteSpawn starts an actor on a remote machine. The given actor must be registered on the remote machine using the actor system Register method
// RemoteSpawn handles the remoteSpawn call
func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[internalpb.RemoteSpawnRequest]) (*connect.Response[internalpb.RemoteSpawnResponse], error) {
logger := x.logger

Expand Down
33 changes: 3 additions & 30 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,9 @@ func TestActorSystem(t *testing.T) {
time.Sleep(time.Second)

// get the actor
addr, pid, err := newActorSystem.ActorOf(ctx, actorName)
addr, _, err := newActorSystem.ActorOf(ctx, actorName)
require.NoError(t, err)
require.NotNil(t, addr)
require.Nil(t, pid)

// use RemoteActor method and compare the results
remoteAddr, err := newActorSystem.RemoteActor(ctx, actorName)
Expand All @@ -246,10 +245,11 @@ func TestActorSystem(t *testing.T) {

// assert actor not found
actorName = "some-actor"
addr, pid, err = newActorSystem.ActorOf(ctx, actorName)
addr, pid, err := newActorSystem.ActorOf(ctx, actorName)
require.Error(t, err)
require.EqualError(t, err, ErrActorNotFound(actorName).Error())
require.Nil(t, addr)
require.Nil(t, pid)

remoteAddr, err = newActorSystem.RemoteActor(ctx, actorName)
require.Error(t, err)
Expand Down Expand Up @@ -289,40 +289,13 @@ func TestActorSystem(t *testing.T) {

// create an actor
actorName := uuid.NewString()
actor := newTestActor()
actorRef, err := newActorSystem.Spawn(ctx, actorName, actor)
assert.NoError(t, err)
assert.NotNil(t, actorRef)

path := NewPath(actorName, &Address{
host: host,
port: remotingPort,
system: newActorSystem.Name(),
protocol: protocol,
})
addr := path.RemoteAddress()

reply, err := RemoteAsk(ctx, addr, new(testpb.TestReply))
require.NoError(t, err)
require.NotNil(t, reply)

actual := new(testpb.Reply)
require.NoError(t, reply.UnmarshalTo(actual))

expected := &testpb.Reply{Content: "received message"}
assert.True(t, proto.Equal(expected, actual))

addr, pid, err := newActorSystem.ActorOf(ctx, actorName)
require.Error(t, err)
require.EqualError(t, err, ErrMethodCallNotAllowed.Error())
require.Nil(t, addr)
require.Nil(t, pid)

// stop the actor after some time
time.Sleep(time.Second)
err = newActorSystem.Kill(ctx, actorName)
require.NoError(t, err)

t.Cleanup(func() {
err = newActorSystem.Stop(ctx)
assert.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1782,14 +1782,15 @@ func TestRemoteSpawn(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, pid)

// create an actor implementation and register it
actorName := uuid.NewString()

// fetching the address of the that actor should return nil address
addr, err := pid.RemoteLookup(ctx, host, remotingPort, actorName)
require.NoError(t, err)
require.Nil(t, addr)

// for the sake of the test
require.NoError(t, sys.Deregister(ctx, &exchanger{}))

// spawn the remote actor
err = pid.RemoteSpawn(ctx, host, remotingPort, actorName, "exchanger")
require.Error(t, err)
Expand Down
113 changes: 46 additions & 67 deletions examples/actor-cluster/dnssd/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/protobuf/proto"

"github.com/tochemey/goakt/actors"
kactors "github.com/tochemey/goakt/examples/actor-cluster/dnssd/actors"
Expand Down Expand Up @@ -99,83 +100,49 @@ func (s *AccountService) CreateAccount(ctx context.Context, c *connect.Request[s

// CreditAccount helps credit a given account
func (s *AccountService) CreditAccount(ctx context.Context, c *connect.Request[samplepb.CreditAccountRequest]) (*connect.Response[samplepb.CreditAccountResponse], error) {
// grab the actual request
req := c.Msg
// grab the account id
accountID := req.GetCreditAccount().GetAccountId()

// check whether the actor system is running in a cluster
if !s.actorSystem.InCluster() {
s.logger.Info("cluster mode not is on....")
// locate the given actor
pid, err := s.actorSystem.LocalActor(accountID)
// handle the error
if err != nil {
// check whether it is not found error
if !errors.Is(err, actors.ErrActorNotFound(accountID)) {
return nil, connect.NewError(connect.CodeInternal, err)
}

// return not found
addr, pid, err := s.actorSystem.ActorOf(ctx, accountID)
if err != nil {
// check whether it is not found error
if !errors.Is(err, actors.ErrActorNotFound(accountID)) {
return nil, connect.NewError(connect.CodeNotFound, err)
}

// defensive programming. making sure pid is defined
if pid == nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
// return not found
return nil, connect.NewError(connect.CodeNotFound, err)
}

// send the create command to the pid
reply, err := actors.Ask(ctx, pid, &samplepb.CreditAccount{
AccountId: accountID,
Balance: req.GetCreditAccount().GetBalance(),
}, time.Second)
var message proto.Message
command := &samplepb.CreditAccount{
AccountId: accountID,
Balance: req.GetCreditAccount().GetBalance(),
}

// handle the error
if pid != nil {
s.logger.Info("actor is found locally...")
message, err = actors.Ask(ctx, pid, command, time.Second)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
}

// pattern match on the reply
switch x := reply.(type) {
case *samplepb.Account:
// return the appropriate response
return connect.NewResponse(&samplepb.CreditAccountResponse{Account: x}), nil
default:
// create the error message to send
err := fmt.Errorf("invalid reply=%s", reply.ProtoReflect().Descriptor().FullName())
if pid == nil {
s.logger.Info("actor is not found locally...")
reply, err := actors.RemoteAsk(ctx, addr, command)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
}

// here the actor system is running in a cluster
addr, err := s.actorSystem.RemoteActor(ctx, accountID)
// handle the error
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

// create the command to send to the remote actor
command := &samplepb.CreditAccount{
AccountId: accountID,
Balance: req.GetCreditAccount().GetBalance(),
}
// send a remote message to the actor
reply, err := actors.RemoteAsk(ctx, addr, command)
// handle the error
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
message, _ = reply.UnmarshalNew()
}

message, _ := reply.UnmarshalNew()
// pattern match on the reply
switch x := message.(type) {
case *samplepb.Account:
// return the appropriate response
return connect.NewResponse(&samplepb.CreditAccountResponse{Account: x}), nil
default:
// create the error message to send
err := fmt.Errorf("invalid reply=%s", reply.ProtoReflect().Descriptor().FullName())
err := fmt.Errorf("invalid reply=%s", message.ProtoReflect().Descriptor().FullName())
return nil, connect.NewError(connect.CodeInternal, err)
}
}
Expand All @@ -188,32 +155,41 @@ func (s *AccountService) GetAccount(ctx context.Context, c *connect.Request[samp
accountID := req.GetAccountId()

// locate the given actor
addr, _, err := s.actorSystem.ActorOf(ctx, accountID)
addr, pid, err := s.actorSystem.ActorOf(ctx, accountID)
// handle the error
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

// send GetAccount message to the actor
var message proto.Message
command := &samplepb.GetAccount{
AccountId: accountID,
}
// send a remote message to the actor
reply, err := actors.RemoteAsk(ctx, addr, command)
// handle the error
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)

if pid != nil {
s.logger.Info("actor is found locally...")
message, err = actors.Ask(ctx, pid, command, time.Second)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
}

if pid == nil {
s.logger.Info("actor is not found locally...")
reply, err := actors.RemoteAsk(ctx, addr, command)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

message, _ = reply.UnmarshalNew()
}

message, _ := reply.UnmarshalNew()
// pattern match on the reply
switch x := message.(type) {
case *samplepb.Account:
// return the appropriate response
return connect.NewResponse(&samplepb.GetAccountResponse{Account: x}), nil
default:
// create the error message to send
err := fmt.Errorf("invalid reply=%s", reply.ProtoReflect().Descriptor().FullName())
err := fmt.Errorf("invalid reply=%s", message.ProtoReflect().Descriptor().FullName())
return nil, connect.NewError(connect.CodeInternal, err)
}
}
Expand Down Expand Up @@ -261,6 +237,9 @@ func (s *AccountService) listenAndServe() {
s.server = server
// listen and service requests
if err := s.server.ListenAndServe(); err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
s.logger.Panic(errors.Wrap(err, "failed to start remoting service"))
}
}
Loading

0 comments on commit 197b2a3

Please sign in to comment.