From ee47a22ab8ff2533e9191d9372d3118a0a24d442 Mon Sep 17 00:00:00 2001 From: Tochemey Date: Sat, 7 Sep 2024 16:24:29 +0100 Subject: [PATCH] refactor: enhance telemetry engine --- actors/actor_system.go | 4 +- actors/pid.go | 86 ++++++++++--------------------------- telemetry/option.go | 4 +- telemetry/option_test.go | 4 +- telemetry/telemetry.go | 36 ++++++++++++---- telemetry/telemetry_test.go | 53 +++++++++++++++++++++++ 6 files changed, 109 insertions(+), 78 deletions(-) create mode 100644 telemetry/telemetry_test.go diff --git a/actors/actor_system.go b/actors/actor_system.go index 6f99ea05..1c31e0ea 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -1115,7 +1115,7 @@ func (x *actorSystem) enableRemoting(ctx context.Context) { var err error if x.metricEnabled.Load() { interceptor, err = otelconnect.NewInterceptor( - otelconnect.WithMeterProvider(x.telemetry.MeterProvider), + otelconnect.WithMeterProvider(x.telemetry.MeterProvider()), ) if err != nil { x.logger.Panic(fmt.Errorf("failed to initialize observability feature: %w", err)) @@ -1202,7 +1202,7 @@ func (x *actorSystem) janitor() { // registerMetrics register the PID metrics with OTel instrumentation. func (x *actorSystem) registerMetrics() error { - meter := x.telemetry.Meter + meter := x.telemetry.Meter() metrics, err := metric.NewActorSystemMetric(meter) if err != nil { return err diff --git a/actors/pid.go b/actors/pid.go index 3c4e85ab..a0cd167d 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -236,7 +236,7 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... } if p.metricEnabled.Load() { - metrics, err := metric.NewActorMetric(p.telemetry.Meter) + metrics, err := metric.NewActorMetric(p.telemetry.Meter()) if err != nil { return nil, err } @@ -629,11 +629,7 @@ func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message // RemoteLookup look for an actor address on a remote node. func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error) { - remoteClient, err := pid.remotingClient(host, port) - if err != nil { - return nil, err - } - + remoteClient := pid.remotingClient(host, port) request := connect.NewRequest(&internalpb.RemoteLookupRequest{ Host: host, Port: int32(port), @@ -659,10 +655,7 @@ func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message pro return err } - remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort())) - if err != nil { - return err - } + remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort())) sender := &goaktpb.Address{ Host: pid.Address().Host(), @@ -710,10 +703,7 @@ func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message prot return nil, err } - remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort())) - if err != nil { - return nil, err - } + remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort())) senderAddress := pid.Address() sender := &goaktpb.Address{ @@ -798,10 +788,7 @@ func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messag }) } - remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort())) - if err != nil { - return err - } + remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort())) stream := remoteService.RemoteTell(ctx) for _, request := range requests { @@ -851,11 +838,7 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message }) } - remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort())) - if err != nil { - return nil, err - } - + remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort())) stream := remoteService.RemoteAsk(ctx) errc := make(chan error, 1) @@ -897,42 +880,31 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message // RemoteStop stops an actor on a remote node func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error { - remoteService, err := pid.remotingClient(host, port) - if err != nil { - return err - } - + remoteService := pid.remotingClient(host, port) request := connect.NewRequest(&internalpb.RemoteStopRequest{ Host: host, Port: int32(port), Name: name, }) - - if _, err = remoteService.RemoteStop(ctx, request); err != nil { + if _, err := remoteService.RemoteStop(ctx, request); err != nil { code := connect.CodeOf(err) if code == connect.CodeNotFound { return nil } return err } - return nil } // RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error { - remoteService, err := pid.remotingClient(host, port) - if err != nil { - return err - } - + remoteService := pid.remotingClient(host, port) request := connect.NewRequest(&internalpb.RemoteSpawnRequest{ Host: host, Port: int32(port), ActorName: name, ActorType: actorType, }) - if _, err := remoteService.RemoteSpawn(ctx, request); err != nil { code := connect.CodeOf(err) if code == connect.CodeFailedPrecondition { @@ -945,31 +917,24 @@ func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, ac } return err } - return nil } // RemoteReSpawn restarts an actor on a remote node. func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error { - remoteService, err := pid.remotingClient(host, port) - if err != nil { - return err - } - + remoteService := pid.remotingClient(host, port) request := connect.NewRequest(&internalpb.RemoteReSpawnRequest{ Host: host, Port: int32(port), Name: name, }) - - if _, err = remoteService.RemoteReSpawn(ctx, request); err != nil { + if _, err := remoteService.RemoteReSpawn(ctx, request); err != nil { code := connect.CodeOf(err) if code == connect.CodeNotFound { return nil } return err } - return nil } @@ -1410,7 +1375,7 @@ func (pid *PID) toDeadletterQueue(receiveCtx *ReceiveContext, err error) { // registerMetrics register the PID metrics with OTel instrumentation. func (pid *PID) registerMetrics() error { - meter := pid.telemetry.Meter + meter := pid.telemetry.Meter() metrics := pid.metrics _, err := meter.RegisterCallback(func(_ context.Context, observer otelmetric.Observer) error { observer.ObserveInt64(metrics.ChildrenCount(), pid.childrenCount.Load()) @@ -1427,37 +1392,30 @@ func (pid *PID) registerMetrics() error { } // clientOptions returns the gRPC client connections options -func (pid *PID) clientOptions() ([]connect.ClientOption, error) { +func (pid *PID) clientOptions() []connect.ClientOption { var interceptor *otelconnect.Interceptor - var err error if pid.metricEnabled.Load() { - interceptor, err = otelconnect.NewInterceptor( - otelconnect.WithTracerProvider(pid.telemetry.TracerProvider), - otelconnect.WithMeterProvider(pid.telemetry.MeterProvider)) - if err != nil { - return nil, fmt.Errorf("failed to initialize observability feature: %w", err) - } + // no need to handle the error because a NoOp trace and meter provider will be + // returned by the telemetry engine when none is provided + interceptor, _ = otelconnect.NewInterceptor( + otelconnect.WithTracerProvider(pid.telemetry.TraceProvider()), + otelconnect.WithMeterProvider(pid.telemetry.MeterProvider())) } var clientOptions []connect.ClientOption if interceptor != nil { clientOptions = append(clientOptions, connect.WithInterceptors(interceptor)) } - return clientOptions, err + return clientOptions } // remotingClient returns an instance of the Remote Service client -func (pid *PID) remotingClient(host string, port int) (internalpbconnect.RemotingServiceClient, error) { - clientOptions, err := pid.clientOptions() - if err != nil { - return nil, err - } - +func (pid *PID) remotingClient(host string, port int) internalpbconnect.RemotingServiceClient { return internalpbconnect.NewRemotingServiceClient( pid.httpClient, http.URL(host, port), - clientOptions..., - ), nil + pid.clientOptions()..., + ) } // handleCompletion processes a long-running task and pipe the result to diff --git a/telemetry/option.go b/telemetry/option.go index 37fbe4c7..2f2b2358 100644 --- a/telemetry/option.go +++ b/telemetry/option.go @@ -48,7 +48,7 @@ func (f OptionFunc) Apply(c *Telemetry) { // If none is specified, the global provider is used. func WithTracerProvider(provider trace.TracerProvider) Option { return OptionFunc(func(cfg *Telemetry) { - cfg.TracerProvider = provider + cfg.tracerProvider = provider }) } @@ -56,6 +56,6 @@ func WithTracerProvider(provider trace.TracerProvider) Option { // If none is specified, the global provider is used. func WithMeterProvider(provider metric.MeterProvider) Option { return OptionFunc(func(cfg *Telemetry) { - cfg.MeterProvider = provider + cfg.meterProvider = provider }) } diff --git a/telemetry/option_test.go b/telemetry/option_test.go index 645021fa..5ee0031c 100644 --- a/telemetry/option_test.go +++ b/telemetry/option_test.go @@ -44,12 +44,12 @@ func TestOptions(t *testing.T) { { name: "WithTracerProvider", option: WithTracerProvider(tracerProvider), - expectedConfig: Telemetry{TracerProvider: tracerProvider}, + expectedConfig: Telemetry{tracerProvider: tracerProvider}, }, { name: "WithMeterProvider", option: WithMeterProvider(meterProvider), - expectedConfig: Telemetry{MeterProvider: meterProvider}, + expectedConfig: Telemetry{meterProvider: meterProvider}, }, } for _, tc := range testCases { diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 73129cc6..727839a3 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -36,19 +36,19 @@ const ( // Telemetry encapsulates some settings for an actor type Telemetry struct { - TracerProvider trace.TracerProvider - Tracer trace.Tracer + tracerProvider trace.TracerProvider + tracer trace.Tracer - MeterProvider metric.MeterProvider - Meter metric.Meter + meterProvider metric.MeterProvider + meter metric.Meter } // New creates an instance of Telemetry func New(options ...Option) *Telemetry { // create a config instance telemetry := &Telemetry{ - TracerProvider: otel.GetTracerProvider(), - MeterProvider: otel.GetMeterProvider(), + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), } // apply the various options @@ -57,16 +57,36 @@ func New(options ...Option) *Telemetry { } // set the tracer - telemetry.Tracer = telemetry.TracerProvider.Tracer( + telemetry.tracer = telemetry.tracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), ) // set the meter - telemetry.Meter = telemetry.MeterProvider.Meter( + telemetry.meter = telemetry.meterProvider.Meter( instrumentationName, metric.WithInstrumentationVersion(Version()), ) return telemetry } + +// TraceProvider returns the trace provider +func (t *Telemetry) TraceProvider() trace.TracerProvider { + return t.tracerProvider +} + +// MeterProvider returns the meter provider +func (t *Telemetry) MeterProvider() metric.MeterProvider { + return t.meterProvider +} + +// Meter returns the meter +func (t *Telemetry) Meter() metric.Meter { + return t.meter +} + +// Tracer returns the tracer +func (t *Telemetry) Tracer() trace.Tracer { + return t.tracer +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go new file mode 100644 index 00000000..cb439e6f --- /dev/null +++ b/telemetry/telemetry_test.go @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +func TestTelemetry(t *testing.T) { + tel := New() + assert.NotNil(t, tel) + globalTracer := otel.GetTracerProvider() + globalMeterProvider := otel.GetMeterProvider() + + actual := tel.TraceProvider() + assert.NotNil(t, actual) + assert.Equal(t, globalTracer, actual) + assert.Equal(t, globalTracer.Tracer(instrumentationName, + trace.WithInstrumentationVersion(Version())), tel.Tracer()) + + actualmp := tel.MeterProvider() + assert.NotNil(t, actualmp) + assert.Equal(t, globalMeterProvider, actualmp) + assert.Equal(t, globalMeterProvider.Meter(instrumentationName, + metric.WithInstrumentationVersion(Version())), tel.Meter()) +}