Skip to content

Commit

Permalink
refactor: remove OpenTelemetry dependency (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 24, 2024
1 parent 10dc734 commit d2f3e41
Show file tree
Hide file tree
Showing 20 changed files with 52 additions and 953 deletions.
19 changes: 6 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,12 @@ Go-Akt offers out of the box features that can help track, monitor and measure t

#### Metrics

One can enable/disable metrics on a Go-Akt actor system to collect the following metrics:

- Actor Metrics:
- Number of children
- Number of messages stashed
- Number of Restarts
- Last message received processing latency in milliseconds
- System Metrics:
- Total Number of Actors

Go-Akt uses under the hood [OpenTelemetry](https://opentelemetry.io/docs/instrumentation/go/) to instrument a system.
One just need to use the `WithMetric` option when instantiating a Go-Akt actor system and use the default [Telemetry](./telemetry/telemetry.go)
engine or set a custom one with `WithTelemetry` option of the actor system.
The following methods have been implemented to help push some metrics to any observability tool:
- Total Number of children at a given point in time [PID](./actors/pid.go)
- Number of messages stashed at a given point in time [PID](./actors/pid.go)
- Number of Restarts at a given point in time [PID](./actors/pid.go)
- Latest message received processing duration in milliseconds [PID](./actors/pid.go)
- Total Number of Actors at a given point in time [ActorSystem](./actors/actor_system.go)

#### Logging

Expand Down
61 changes: 3 additions & 58 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ import (
"time"

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/google/uuid"
otelmetric "go.opentelemetry.io/otel/metric"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand All @@ -54,11 +52,9 @@ import (
"github.com/tochemey/goakt/v2/internal/http"
"github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/internal/internalpb/internalpbconnect"
"github.com/tochemey/goakt/v2/internal/metric"
"github.com/tochemey/goakt/v2/internal/tcp"
"github.com/tochemey/goakt/v2/internal/types"
"github.com/tochemey/goakt/v2/log"
"github.com/tochemey/goakt/v2/telemetry"
)

// ActorSystem defines the contract of an actor system
Expand Down Expand Up @@ -175,8 +171,7 @@ type actorSystem struct {
actorInitTimeout time.Duration
// Specifies the supervisor strategy
supervisorDirective SupervisorDirective
// Specifies the telemetry config
telemetry *telemetry.Telemetry

// Specifies whether remoting is enabled.
// This allows to handle remote messaging
remotingEnabled atomic.Bool
Expand Down Expand Up @@ -210,9 +205,6 @@ type actorSystem struct {
// specifies the message scheduler
scheduler *scheduler

// specifies whether metrics is enabled
metricEnabled atomic.Bool

registry types.Registry
reflection *reflection

Expand Down Expand Up @@ -247,7 +239,6 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
askTimeout: DefaultAskTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorDirective: DefaultSupervisoryStrategy,
telemetry: telemetry.New(),
locker: sync.Mutex{},
shutdownTimeout: DefaultShutdownTimeout,
stashEnabled: false,
Expand All @@ -269,7 +260,6 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
system.started.Store(false)
system.remotingEnabled.Store(false)
system.clusterEnabled.Store(false)
system.metricEnabled.Store(false)

system.reflection = newReflection(system.registry)

Expand All @@ -286,13 +276,6 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}

system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster))

if system.metricEnabled.Load() {
if err := system.registerMetrics(); err != nil {
return nil, err
}
}

return system, nil
}

Expand Down Expand Up @@ -1111,22 +1094,6 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
func (x *actorSystem) enableRemoting(ctx context.Context) {
x.logger.Info("enabling remoting...")

var interceptor *otelconnect.Interceptor
var err error
if x.metricEnabled.Load() {
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithMeterProvider(x.telemetry.MeterProvider()),
)
if err != nil {
x.logger.Panic(fmt.Errorf("failed to initialize observability feature: %w", err))
}
}

var opts []connect.HandlerOption
if interceptor != nil {
opts = append(opts, connect.WithInterceptors(interceptor))
}

remotingHost, remotingPort, err := tcp.GetHostPort(fmt.Sprintf("%s:%d", x.host, x.port))
if err != nil {
x.logger.Panic(fmt.Errorf("failed to resolve remoting TCP address: %w", err))
Expand All @@ -1135,8 +1102,8 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
x.host = remotingHost
x.port = int32(remotingPort)

remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x, opts...)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x, opts...)
remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x)

mux := stdhttp.NewServeMux()
mux.Handle(remotingServicePath, remotingServiceHandler)
Expand All @@ -1160,7 +1127,6 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {

// reset the actor system
func (x *actorSystem) reset() {
x.telemetry = nil
x.actors.reset()
x.name = ""
x.cluster = nil
Expand Down Expand Up @@ -1200,22 +1166,6 @@ func (x *actorSystem) janitor() {
x.logger.Info("janitor has stopped...")
}

// registerMetrics register the PID metrics with OTel instrumentation.
func (x *actorSystem) registerMetrics() error {
meter := x.telemetry.Meter()
metrics, err := metric.NewActorSystemMetric(meter)
if err != nil {
return err
}

_, err = meter.RegisterCallback(func(_ context.Context, observer otelmetric.Observer) error {
observer.ObserveInt64(metrics.ActorsCount(), int64(x.NumActors()))
return nil
}, metrics.ActorsCount())

return err
}

// replicationLoop publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) replicationLoop() {
for actor := range x.actorsChan {
Expand Down Expand Up @@ -1374,7 +1324,6 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (
withSupervisorDirective(x.supervisorDirective),
withEventsStream(x.eventsStream),
withInitTimeout(x.actorInitTimeout),
withTelemetry(x.telemetry),
}

// enable stash
Expand All @@ -1389,10 +1338,6 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (
pidOpts = append(pidOpts, withPassivationAfter(x.expireActorAfter))
}

if x.metricEnabled.Load() {
pidOpts = append(pidOpts, withMetric())
}

pid, err := newPID(ctx,
addr,
actor,
Expand Down
85 changes: 8 additions & 77 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package actors
import (
"context"
"net"
"sort"
"strconv"
"sync"
"testing"
Expand All @@ -38,9 +37,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/travisjeffery/go-dynaport"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/goakt/v2/address"
Expand All @@ -50,7 +46,6 @@ import (
"github.com/tochemey/goakt/v2/log"
clustermocks "github.com/tochemey/goakt/v2/mocks/cluster"
testkit "github.com/tochemey/goakt/v2/mocks/discovery"
"github.com/tochemey/goakt/v2/telemetry"
"github.com/tochemey/goakt/v2/test/data/testpb"
)

Expand Down Expand Up @@ -1002,69 +997,6 @@ func TestActorSystem(t *testing.T) {
provider.AssertExpectations(t)
})
})
t.Run("With Metric enabled", func(t *testing.T) {
r := sdkmetric.NewManualReader()
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r))
// create an instance of telemetry
tel := telemetry.New(telemetry.WithMeterProvider(mp))

ctx := context.TODO()
sys, _ := NewActorSystem("testSys",
WithMetric(),
WithTelemetry(tel),
WithStash(),
WithLogger(log.DiscardLogger))

// start the actor system
err := sys.Start(ctx)
assert.NoError(t, err)

actor := newTestActor()
actorRef, err := sys.Spawn(ctx, "Test", actor)
assert.NoError(t, err)
assert.NotNil(t, actorRef)

// create a message to send to the test actor
message := new(testpb.TestSend)
// send the message to the actor
err = Tell(ctx, actorRef, message)
// perform some assertions
require.NoError(t, err)

// Should collect 4 metrics, 3 for the actor and 1 for the actor system
got := &metricdata.ResourceMetrics{}
err = r.Collect(ctx, got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
require.Len(t, got.ScopeMetrics[0].Metrics, 6)

expected := []string{
"actor_child_count",
"actor_stash_count",
"actor_restart_count",
"actors_count",
"actor_processed_count",
"actor_received_duration",
}
// sort the array
sort.Strings(expected)
// get the metrics names
actual := make([]string, len(got.ScopeMetrics[0].Metrics))
for i, metric := range got.ScopeMetrics[0].Metrics {
actual[i] = metric.Name
}
sort.Strings(actual)

assert.ElementsMatch(t, expected, actual)

// stop the actor after some time
lib.Pause(time.Second)

t.Cleanup(func() {
err = sys.Stop(ctx)
assert.NoError(t, err)
})
})
t.Run("With cluster events subscription", func(t *testing.T) {
// create a context
ctx := context.TODO()
Expand Down Expand Up @@ -1397,16 +1329,15 @@ func TestActorSystem(t *testing.T) {
provider.EXPECT().ID().Return("id")

system := &actorSystem{
name: "testSystem",
logger: logger,
cluster: mockedCluster,
clusterEnabled: *atomic.NewBool(true),
telemetry: telemetry.New(),
locker: sync.Mutex{},
scheduler: newScheduler(logger, time.Second, withSchedulerCluster(mockedCluster)),
clusterConfig: NewClusterConfig(),
registry: types.NewRegistry(),
name: "testSystem",
logger: logger,
cluster: mockedCluster,
locker: sync.Mutex{},
scheduler: newScheduler(logger, time.Second, withSchedulerCluster(mockedCluster)),
clusterConfig: NewClusterConfig(),
registry: types.NewRegistry(),
}
system.clusterEnabled.Store(true)

err := system.Start(ctx)
require.Error(t, err)
Expand Down
3 changes: 0 additions & 3 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Durat
// await patiently to receive the response from the actor
select {
case response = <-receiveContext.response:
to.recordLatestReceiveDurationMetric(ctx)
return
case <-time.After(timeout):
to.recordLatestReceiveDurationMetric(ctx)
err = ErrRequestTimeout
to.toDeadletterQueue(receiveContext, err)
return
Expand All @@ -79,7 +77,6 @@ func Tell(ctx context.Context, to *PID, message proto.Message) error {
}

to.doReceive(receiveContext)
to.recordLatestReceiveDurationMetric(ctx)
return nil
}

Expand Down
15 changes: 0 additions & 15 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/tochemey/goakt/v2/discovery"
"github.com/tochemey/goakt/v2/hash"
"github.com/tochemey/goakt/v2/log"
"github.com/tochemey/goakt/v2/telemetry"
)

// Option is the interface that applies a configuration option.
Expand Down Expand Up @@ -93,13 +92,6 @@ func WithSupervisorDirective(directive SupervisorDirective) Option {
})
}

// WithTelemetry sets the custom telemetry
func WithTelemetry(telemetry *telemetry.Telemetry) Option {
return OptionFunc(func(a *actorSystem) {
a.telemetry = telemetry
})
}

// WithRemoting enables remoting on the actor system
func WithRemoting(host string, port int32) Option {
return OptionFunc(func(a *actorSystem) {
Expand Down Expand Up @@ -166,13 +158,6 @@ func WithActorInitTimeout(timeout time.Duration) Option {
})
}

// WithMetric enables metrics
func WithMetric() Option {
return OptionFunc(func(system *actorSystem) {
system.metricEnabled.Store(true)
})
}

// WithPeerStateLoopInterval sets the peer state loop interval
func WithPeerStateLoopInterval(interval time.Duration) Option {
return OptionFunc(func(system *actorSystem) {
Expand Down
12 changes: 0 additions & 12 deletions actors/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ import (

"github.com/tochemey/goakt/v2/hash"
"github.com/tochemey/goakt/v2/log"
"github.com/tochemey/goakt/v2/telemetry"
)

func TestOption(t *testing.T) {
tel := telemetry.New()
resumeDirective := NewResumeDirective()
var atomicTrue atomic.Bool
atomicTrue.Store(true)
Expand Down Expand Up @@ -88,11 +86,6 @@ func TestOption(t *testing.T) {
option: WithShutdownTimeout(2 * time.Second),
expected: actorSystem{shutdownTimeout: 2. * time.Second},
},
{
name: "WithTelemetry",
option: WithTelemetry(tel),
expected: actorSystem{telemetry: tel},
},
{
name: "WithStash",
option: WithStash(),
Expand All @@ -108,11 +101,6 @@ func TestOption(t *testing.T) {
option: WithActorInitTimeout(2 * time.Second),
expected: actorSystem{actorInitTimeout: 2. * time.Second},
},
{
name: "WithMetric",
option: WithMetric(),
expected: actorSystem{metricEnabled: atomicTrue},
},
{
name: "WithCluster",
option: WithCluster(clusterConfig),
Expand Down
Loading

0 comments on commit d2f3e41

Please sign in to comment.