diff --git a/actors/actor_system.go b/actors/actor_system.go index db46ca72..1acac8cb 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -245,7 +245,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { } system := &actorSystem{ - actors: newPIDMap(1_000), // TODO need to check with memory footprint here since we change the map engine + actors: newPIDMap(1_000), actorsChan: make(chan *internalpb.WireActor, 10), name: name, logger: log.DefaultLogger, @@ -1214,7 +1214,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error { x.locker.Unlock() go x.clusterEventsLoop() - go x.clusterReplicationLoop() + go x.replicationLoop() go x.peersStateLoop() go x.redistributionLoop() @@ -1359,8 +1359,8 @@ func (x *actorSystem) registerMetrics() error { return err } -// clusterReplicationLoop publishes newly created actor into the cluster when cluster is enabled -func (x *actorSystem) clusterReplicationLoop() { +// replicationLoop publishes newly created actor into the cluster when cluster is enabled +func (x *actorSystem) replicationLoop() { for actor := range x.actorsChan { if x.InCluster() { ctx := context.Background() diff --git a/actors/context.go b/actors/context.go index 1e022a6b..90cc1764 100644 --- a/actors/context.go +++ b/actors/context.go @@ -143,15 +143,15 @@ type ReceiveContext interface { } type receiveContext struct { - ctx context.Context - message proto.Message - sender PID - remoteSender *goaktpb.Address - response chan proto.Message - recipient PID - mu sync.Mutex - isAsyncMessage bool - sendTime atomic.Time + ctx context.Context + message proto.Message + sender PID + remoteSender *goaktpb.Address + response chan proto.Message + recipient PID + mu sync.Mutex + async bool + sendTime atomic.Time } // force compilation error @@ -167,7 +167,7 @@ func newReceiveContext(ctx context.Context, from, to PID, message proto.Message, context.sender = from context.recipient = to context.message = message - context.isAsyncMessage = async + context.async = async context.mu = sync.Mutex{} context.response = make(chan proto.Message, 1) context.sendTime.Store(time.Now()) @@ -204,7 +204,7 @@ func (c *receiveContext) Response(resp proto.Message) { c.mu.Lock() defer c.mu.Unlock() // only set a response when the message is sync message - if !c.isAsyncMessage { + if !c.async { defer close(c.response) c.response <- resp } @@ -497,12 +497,12 @@ func (c *receiveContext) Forward(to PID) { if to.IsRunning() { ctx := context.WithoutCancel(c.ctx) receiveContext := &receiveContext{ - ctx: ctx, - message: message, - sender: sender, - recipient: to, - mu: sync.Mutex{}, - isAsyncMessage: false, + ctx: ctx, + message: message, + sender: sender, + recipient: to, + mu: sync.Mutex{}, + async: false, } to.doReceive(receiveContext) } diff --git a/actors/context_test.go b/actors/context_test.go index 406feea0..9e1adeb8 100644 --- a/actors/context_test.go +++ b/actors/context_test.go @@ -120,12 +120,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -161,12 +161,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -205,12 +205,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -246,12 +246,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -315,12 +315,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // get the address of the exchanger actor one @@ -378,12 +378,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } op := func() { @@ -447,12 +447,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // get the address of the exchanger actor one @@ -509,12 +509,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // send the message to the exchanger actor one using remote messaging @@ -574,12 +574,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } require.Nil(t, context.RemoteLookup(host, remotingPort, actorName2)) @@ -630,12 +630,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } assert.Panics(t, func() { @@ -667,12 +667,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } assert.NotPanics(t, func() { @@ -695,12 +695,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -733,12 +733,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // stop the actor @@ -765,12 +765,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -806,12 +806,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -847,12 +847,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -888,12 +888,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -931,12 +931,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } time.Sleep(time.Second) @@ -965,12 +965,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } name := "monitored" @@ -1005,12 +1005,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -1049,12 +1049,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: parent, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: parent, + mu: sync.Mutex{}, + async: true, } // create the child actor @@ -1093,12 +1093,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } assert.Panics(t, func() { @@ -1179,12 +1179,12 @@ func TestReceiveContext(t *testing.T) { send := new(testpb.TestSend) // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: send, - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: send, + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // calling unhandled will push the current message to deadletters @@ -1251,12 +1251,12 @@ func TestReceiveContext(t *testing.T) { send := new(testpb.TestSend) // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: send, - sender: pid2, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: send, + sender: pid2, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // calling unhandled will push the current message to deadletters @@ -1318,12 +1318,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(goaktpb.PostStart), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(goaktpb.PostStart), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // calling unhandled will push the current message to deadletters @@ -1369,12 +1369,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -1410,12 +1410,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -1451,12 +1451,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -1495,12 +1495,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -1539,12 +1539,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // create actor2 @@ -1598,11 +1598,11 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - sender: NoSender, - recipient: testerRef, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + sender: NoSender, + recipient: testerRef, + mu: sync.Mutex{}, + async: true, } // get the address of the exchanger actor one @@ -1655,11 +1655,11 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - sender: NoSender, - recipient: testerRef, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + sender: NoSender, + recipient: testerRef, + mu: sync.Mutex{}, + async: true, } // get the address of the exchanger actor one @@ -1717,12 +1717,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } // send the message to the exchanger actor one using remote messaging @@ -1782,12 +1782,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } op := func() { @@ -1847,12 +1847,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } assert.NotPanics(t, func() { @@ -1905,12 +1905,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context context := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } assert.Panics(t, func() { @@ -1955,12 +1955,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context messageContext := &receiveContext{ - ctx: ctx, - message: new(testpb.TaskComplete), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TaskComplete), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } task := make(chan proto.Message) @@ -2020,12 +2020,12 @@ func TestReceiveContext(t *testing.T) { // create an instance of receive context messageContext := &receiveContext{ - ctx: ctx, - message: new(testpb.TaskComplete), - sender: NoSender, - recipient: pid1, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TaskComplete), + sender: NoSender, + recipient: pid1, + mu: sync.Mutex{}, + async: true, } assert.Panics(t, func() { diff --git a/actors/pid.go b/actors/pid.go index 45648ee4..97cd1a68 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -225,7 +225,7 @@ type pid struct { shutdownTimeout atomic.Duration // specifies the actor mailbox - mailbox *queue.Mpsc[ReceiveContext] + mailbox *queue.MpscQueue[ReceiveContext] // receives a shutdown signal. Once the signal is received // the actor is shut down gracefully. @@ -319,7 +319,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption fieldsLocker: &sync.RWMutex{}, stopLocker: &sync.Mutex{}, httpClient: http.NewClient(), - mailbox: queue.NewMpsc[ReceiveContext](), + mailbox: queue.NewMpscQueue[ReceiveContext](), stashBuffer: nil, stashLocker: &sync.Mutex{}, eventsStream: nil, @@ -364,7 +364,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption go p.receive() if p.passivateAfter.Load() > 0 { - go p.passivationListener() + go p.passivationLoop() } if p.metricEnabled.Load() { @@ -556,7 +556,7 @@ func (x *pid) Restart(ctx context.Context) error { } go x.receive() if x.passivateAfter.Load() > 0 { - go x.passivationListener() + go x.passivationLoop() } span.SetStatus(codes.Ok, "Restart") @@ -816,7 +816,7 @@ func (x *pid) BatchAsk(ctx context.Context, to PID, messages ...proto.Message) ( // RemoteLookup look for an actor address on a remote node. func (x *pid) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error) { - remoteClient, err := x.getRemoteServiceClient(host, port) + remoteClient, err := x.remotingClient(host, port) if err != nil { return nil, err } @@ -846,7 +846,7 @@ func (x *pid) RemoteTell(ctx context.Context, to *goaktpb.Address, message proto return err } - remoteService, err := x.getRemoteServiceClient(to.GetHost(), int(to.GetPort())) + remoteService, err := x.remotingClient(to.GetHost(), int(to.GetPort())) if err != nil { return err } @@ -897,7 +897,7 @@ func (x *pid) RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto. return nil, err } - remoteService, err := x.getRemoteServiceClient(to.GetHost(), int(to.GetPort())) + remoteService, err := x.remotingClient(to.GetHost(), int(to.GetPort())) if err != nil { return nil, err } @@ -990,7 +990,7 @@ func (x *pid) RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages }) } - remoteService, err := x.getRemoteServiceClient(to.GetHost(), int(to.GetPort())) + remoteService, err := x.remotingClient(to.GetHost(), int(to.GetPort())) if err != nil { return err } @@ -1046,7 +1046,7 @@ func (x *pid) RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages }) } - remoteService, err := x.getRemoteServiceClient(to.GetHost(), int(to.GetPort())) + remoteService, err := x.remotingClient(to.GetHost(), int(to.GetPort())) if err != nil { return nil, err } @@ -1092,7 +1092,7 @@ func (x *pid) RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages // RemoteStop stops an actor on a remote node func (x *pid) RemoteStop(ctx context.Context, host string, port int, name string) error { - remoteService, err := x.getRemoteServiceClient(host, port) + remoteService, err := x.remotingClient(host, port) if err != nil { return err } @@ -1116,7 +1116,7 @@ func (x *pid) RemoteStop(ctx context.Context, host string, port int, name string // RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem func (x *pid) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error { - remoteService, err := x.getRemoteServiceClient(host, port) + remoteService, err := x.remotingClient(host, port) if err != nil { return err } @@ -1146,7 +1146,7 @@ func (x *pid) RemoteSpawn(ctx context.Context, host string, port int, name, acto // RemoteReSpawn restarts an actor on a remote node. func (x *pid) RemoteReSpawn(ctx context.Context, host string, port int, name string) error { - remoteService, err := x.getRemoteServiceClient(host, port) + remoteService, err := x.remotingClient(host, port) if err != nil { return err } @@ -1253,6 +1253,9 @@ func (x *pid) doReceive(ctx ReceiveContext) { x.logger.Warn(err) x.handleError(ctx, err) } + x.fieldsLocker.Lock() + x.lastProcessingTime.Store(time.Now()) + x.fieldsLocker.Unlock() } // init initializes the given actor and init processing messages @@ -1416,9 +1419,9 @@ func (x *pid) handleReceived(received ReceiveContext) { } } -// passivationListener checks whether the actor is processing public or not. +// passivationLoop checks whether the actor is processing public or not. // when the actor is idle, it automatically shuts down to free resources -func (x *pid) passivationListener() { +func (x *pid) passivationLoop() { x.logger.Info("start the passivation listener...") x.logger.Infof("passivation timeout is (%s)", x.passivateAfter.Load().String()) ticker := time.NewTicker(x.passivateAfter.Load()) @@ -1643,8 +1646,8 @@ func (x *pid) grpcClientOptions() ([]connect.ClientOption, error) { return clientOptions, err } -// getRemoteServiceClient returns an instance of the Remote Service client -func (x *pid) getRemoteServiceClient(host string, port int) (internalpbconnect.RemotingServiceClient, error) { +// remotingClient returns an instance of the Remote Service client +func (x *pid) remotingClient(host string, port int) (internalpbconnect.RemotingServiceClient, error) { clientConnectionOptions, err := x.grpcClientOptions() if err != nil { return nil, err diff --git a/actors/pid_test.go b/actors/pid_test.go index 6450c699..add96452 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -76,12 +76,12 @@ func TestReceive(t *testing.T) { count := 10 for i := 0; i < count; i++ { receiveContext := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid, + mu: sync.Mutex{}, + async: true, } pid.doReceive(receiveContext) @@ -1536,12 +1536,12 @@ func TestRegisterMetrics(t *testing.T) { count := 10 for i := 0; i < count; i++ { receiveContext := &receiveContext{ - ctx: ctx, - message: new(testpb.TestSend), - sender: NoSender, - recipient: pid, - mu: sync.Mutex{}, - isAsyncMessage: true, + ctx: ctx, + message: new(testpb.TestSend), + sender: NoSender, + recipient: pid, + mu: sync.Mutex{}, + async: true, } pid.doReceive(receiveContext) diff --git a/internal/queue/mpsc.go b/internal/queue/mpsc.go index 9ac2f26b..dad5ec86 100644 --- a/internal/queue/mpsc.go +++ b/internal/queue/mpsc.go @@ -36,19 +36,19 @@ type node[T any] struct { next *node[T] } -// Mpsc is a Multi-Producer-Single-Consumer Queue +// MpscQueue is a Multi-Producer-Single-Consumer Queue // reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html -type Mpsc[T any] struct { +type MpscQueue[T any] struct { head *node[T] tail *node[T] length int64 lock sync.Mutex } -// NewMpsc create an instance of Mpsc -func NewMpsc[T any]() *Mpsc[T] { +// NewMpscQueue create an instance of MpscQueue +func NewMpscQueue[T any]() *MpscQueue[T] { item := new(node[T]) - return &Mpsc[T]{ + return &MpscQueue[T]{ head: item, tail: item, length: 0, @@ -57,7 +57,7 @@ func NewMpsc[T any]() *Mpsc[T] { } // Push place the given value in the queue head (FIFO). Returns always true -func (q *Mpsc[T]) Push(value T) bool { +func (q *MpscQueue[T]) Push(value T) bool { tnode := &node[T]{ value: value, } @@ -67,8 +67,9 @@ func (q *Mpsc[T]) Push(value T) bool { return true } -// Pop takes the QueueItem from the queue tail. Returns false if the queue is empty. Can be used in a single consumer (goroutine) only. -func (q *Mpsc[T]) Pop() (T, bool) { +// Pop takes the QueueItem from the queue tail. +// Returns false if the queue is empty. Can be used in a single consumer (goroutine) only. +func (q *MpscQueue[T]) Pop() (T, bool) { var tnil T next := (*node[T])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail.next)))) if next == nil { @@ -85,13 +86,13 @@ func (q *Mpsc[T]) Pop() (T, bool) { } // Len returns queue length -func (q *Mpsc[T]) Len() int64 { +func (q *MpscQueue[T]) Len() int64 { return atomic.LoadInt64(&q.length) } // IsEmpty returns true when the queue is empty // must be called from a single, consumer goroutine -func (q *Mpsc[T]) IsEmpty() bool { +func (q *MpscQueue[T]) IsEmpty() bool { q.lock.Lock() tail := q.tail q.lock.Unlock() diff --git a/internal/queue/mpsc_test.go b/internal/queue/mpsc_test.go index 4fd1e6cb..c799ba68 100644 --- a/internal/queue/mpsc_test.go +++ b/internal/queue/mpsc_test.go @@ -34,7 +34,7 @@ import ( // TODO: add go routine-based tests func TestMpscQueue(t *testing.T) { t.Run("With Push/Pop", func(t *testing.T) { - q := NewMpsc[int]() + q := NewMpscQueue[int]() require.True(t, q.IsEmpty()) for j := 0; j < 100; j++ { if q.Len() != 0 {