Skip to content

Commit

Permalink
fix: fix the actor metric recording (#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Aug 10, 2024
1 parent 6828ba2 commit 56af9de
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 129 deletions.
68 changes: 34 additions & 34 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ type actorSystem struct {
traceEnabled atomic.Bool
tracer trace.Tracer

// specifies whether metric is enabled
// specifies whether metrics is enabled
metricEnabled atomic.Bool

registry types.Registry
Expand Down Expand Up @@ -352,33 +352,33 @@ func (x *actorSystem) Register(ctx context.Context, actor Actor) error {
// This will send the given message to the actor after the given interval specified.
// The message will be sent once
func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error {
spanCtx, span := x.tracer.Start(ctx, "ScheduleOnce")
ctx, span := x.tracer.Start(ctx, "ScheduleOnce")
defer span.End()
return x.scheduler.ScheduleOnce(spanCtx, message, pid, interval)
return x.scheduler.ScheduleOnce(ctx, message, pid, interval)
}

// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
// This requires remoting to be enabled on the actor system.
// This will send the given message to the actor after the given interval specified
// The message will be sent once
func (x *actorSystem) RemoteScheduleOnce(ctx context.Context, message proto.Message, address *goaktpb.Address, interval time.Duration) error {
spanCtx, span := x.tracer.Start(ctx, "RemoteScheduleOnce")
ctx, span := x.tracer.Start(ctx, "RemoteScheduleOnce")
defer span.End()
return x.scheduler.RemoteScheduleOnce(spanCtx, message, address, interval)
return x.scheduler.RemoteScheduleOnce(ctx, message, address, interval)
}

// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error {
spanCtx, span := x.tracer.Start(ctx, "ScheduleWithCron")
ctx, span := x.tracer.Start(ctx, "ScheduleWithCron")
defer span.End()
return x.scheduler.ScheduleWithCron(spanCtx, message, pid, cronExpression)
return x.scheduler.ScheduleWithCron(ctx, message, pid, cronExpression)
}

// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
func (x *actorSystem) RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *goaktpb.Address, cronExpression string) error {
spanCtx, span := x.tracer.Start(ctx, "RemoteScheduleWithCron")
ctx, span := x.tracer.Start(ctx, "RemoteScheduleWithCron")
defer span.End()
return x.scheduler.RemoteScheduleWithCron(spanCtx, message, address, cronExpression)
return x.scheduler.RemoteScheduleWithCron(ctx, message, address, cronExpression)
}

// Subscribe help receive dead letters whenever there are available
Expand Down Expand Up @@ -422,7 +422,7 @@ func (x *actorSystem) NumActors() uint64 {

// Spawn creates or returns the instance of a given actor in the system
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, error) {
spanCtx, span := x.tracer.Start(ctx, "Spawn")
ctx, span := x.tracer.Start(ctx, "Spawn")
defer span.End()

if !x.started.Load() {
Expand All @@ -445,7 +445,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
}
}

pid, err := x.configPID(spanCtx, name, actor)
pid, err := x.configPID(ctx, name, actor)
if err != nil {
span.SetStatus(codes.Error, "Spawn")
span.RecordError(err)
Expand All @@ -460,7 +460,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) {
spanCtx, span := x.tracer.Start(ctx, "SpawnFromFunc")
ctx, span := x.tracer.Start(ctx, "SpawnFromFunc")
defer span.End()

if !x.started.Load() {
Expand All @@ -470,7 +470,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
}

actor := newFuncActor(name, receiveFunc, opts...)
pid, err := x.configPID(spanCtx, name, actor)
pid, err := x.configPID(ctx, name, actor)
if err != nil {
span.SetStatus(codes.Error, "Spawn")
span.RecordError(err)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind

// Kill stops a given actor in the system
func (x *actorSystem) Kill(ctx context.Context, name string) error {
spanCtx, span := x.tracer.Start(ctx, "Kill")
ctx, span := x.tracer.Start(ctx, "Kill")
defer span.End()

if !x.started.Load() {
Expand All @@ -517,7 +517,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {
if exist {
// stop the given actor. No need to record error in the span context
// because the shutdown method is taking care of that
return pid.Shutdown(spanCtx)
return pid.Shutdown(ctx)
}

err := ErrActorNotFound(actorPath.String())
Expand All @@ -528,7 +528,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {

// ReSpawn recreates a given actor in the system
func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) {
spanCtx, span := x.tracer.Start(ctx, "ReSpawn")
ctx, span := x.tracer.Start(ctx, "ReSpawn")
defer span.End()

if !x.started.Load() {
Expand All @@ -545,7 +545,7 @@ func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) {

pid, exist := x.actors.get(actorPath)
if exist {
if err := pid.Restart(spanCtx); err != nil {
if err := pid.Restart(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to restart actor=%s", actorPath.String())
}

Expand Down Expand Up @@ -597,7 +597,7 @@ func (x *actorSystem) PeerAddress() string {
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error) {
spanCtx, span := x.tracer.Start(ctx, "ActorOf")
ctx, span := x.tracer.Start(ctx, "ActorOf")
defer span.End()

x.locker.Lock()
Expand All @@ -619,7 +619,7 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak

// check in the cluster
if x.clusterEnabled.Load() {
actor, err := x.cluster.GetActor(spanCtx, actorName)
actor, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
x.logger.Infof("actor=%s not found", actorName)
Expand Down Expand Up @@ -676,7 +676,7 @@ func (x *actorSystem) LocalActor(actorName string) (PID, error) {
// When the cluster mode is not enabled an actor not found error will be returned
// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *goaktpb.Address, err error) {
spanCtx, span := x.tracer.Start(ctx, "RemoteActor")
ctx, span := x.tracer.Start(ctx, "RemoteActor")
defer span.End()

x.locker.Lock()
Expand All @@ -696,7 +696,7 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *
return nil, e
}

actor, err := x.cluster.GetActor(spanCtx, actorName)
actor, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
x.logger.Infof("actor=%s not found", actorName)
Expand All @@ -717,22 +717,22 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *

// Start starts the actor system
func (x *actorSystem) Start(ctx context.Context) error {
spanCtx, span := x.tracer.Start(ctx, "Start")
ctx, span := x.tracer.Start(ctx, "Start")
defer span.End()

x.started.Store(true)

if x.remotingEnabled.Load() {
x.enableRemoting(spanCtx)
x.enableRemoting(ctx)
}

if x.clusterEnabled.Load() {
if err := x.enableClustering(spanCtx); err != nil {
if err := x.enableClustering(ctx); err != nil {
return err
}
}

x.scheduler.Start(spanCtx)
x.scheduler.Start(ctx)

actorName := x.getSystemActorName(supervisorType)
pid, err := x.configPID(ctx, actorName, newSystemSupervisor(x.logger))
Expand All @@ -751,7 +751,7 @@ func (x *actorSystem) Start(ctx context.Context) error {

// Stop stops the actor system
func (x *actorSystem) Stop(ctx context.Context) error {
spanCtx, span := x.tracer.Start(ctx, "Stop")
ctx, span := x.tracer.Start(ctx, "Stop")
defer span.End()

x.logger.Infof("%s shutting down...", x.name)
Expand All @@ -768,17 +768,17 @@ func (x *actorSystem) Stop(ctx context.Context) error {
x.logger.Infof("%s is shutting down..:)", x.name)

x.started.Store(false)
x.scheduler.Stop(spanCtx)
x.scheduler.Stop(ctx)

if x.eventsStream != nil {
x.eventsStream.Shutdown()
}

ctx, cancel := context.WithTimeout(spanCtx, x.shutdownTimeout)
ctx, cancel := context.WithTimeout(ctx, x.shutdownTimeout)
defer cancel()

if x.remotingEnabled.Load() {
if err := x.remotingServer.Shutdown(spanCtx); err != nil {
if err := x.remotingServer.Shutdown(ctx); err != nil {
span.SetStatus(codes.Error, "Stop")
span.RecordError(err)
return err
Expand All @@ -789,7 +789,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}

if x.clusterEnabled.Load() {
if err := x.cluster.Stop(spanCtx); err != nil {
if err := x.cluster.Stop(ctx); err != nil {
span.SetStatus(codes.Error, "Stop")
span.RecordError(err)
return err
Expand Down Expand Up @@ -1124,16 +1124,16 @@ func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[inter
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
spanCtx, span := x.tracer.Start(ctx, "HandleRemoteAsk")
ctx, span := x.tracer.Start(ctx, "HandleRemoteAsk")
defer span.End()
return Ask(spanCtx, to, message, timeout)
return Ask(ctx, to, message, timeout)
}

// handleRemoteTell handles an asynchronous message to an actor
func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message proto.Message) error {
spanCtx, span := x.tracer.Start(ctx, "HandleRemoteTell")
ctx, span := x.tracer.Start(ctx, "HandleRemoteTell")
defer span.End()
return Tell(spanCtx, to, message)
return Tell(ctx, to, message)
}

// getSupervisor return the system supervisor
Expand Down
2 changes: 1 addition & 1 deletion actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ func TestActorSystem(t *testing.T) {
}
// sort the array
sort.Strings(expected)
// get the metric names
// get the metrics names
actual := make([]string, len(got.ScopeMetrics[0].Metrics))
for i, metric := range got.ScopeMetrics[0].Metrics {
actual[i] = metric.Name
Expand Down
Loading

0 comments on commit 56af9de

Please sign in to comment.