Skip to content

Commit

Permalink
refactor: enhance telemetry engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 7, 2024
1 parent 74f3644 commit ee47a22
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 78 deletions.
4 changes: 2 additions & 2 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
var err error
if x.metricEnabled.Load() {
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithMeterProvider(x.telemetry.MeterProvider),
otelconnect.WithMeterProvider(x.telemetry.MeterProvider()),
)
if err != nil {
x.logger.Panic(fmt.Errorf("failed to initialize observability feature: %w", err))
Expand Down Expand Up @@ -1202,7 +1202,7 @@ func (x *actorSystem) janitor() {

// registerMetrics register the PID metrics with OTel instrumentation.
func (x *actorSystem) registerMetrics() error {
meter := x.telemetry.Meter
meter := x.telemetry.Meter()
metrics, err := metric.NewActorSystemMetric(meter)
if err != nil {
return err
Expand Down
86 changes: 22 additions & 64 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
}

if p.metricEnabled.Load() {
metrics, err := metric.NewActorMetric(p.telemetry.Meter)
metrics, err := metric.NewActorMetric(p.telemetry.Meter())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -629,11 +629,7 @@ func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message

// RemoteLookup look for an actor address on a remote node.
func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error) {
remoteClient, err := pid.remotingClient(host, port)
if err != nil {
return nil, err
}

remoteClient := pid.remotingClient(host, port)
request := connect.NewRequest(&internalpb.RemoteLookupRequest{
Host: host,
Port: int32(port),
Expand All @@ -659,10 +655,7 @@ func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message pro
return err
}

remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort()))
if err != nil {
return err
}
remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort()))

sender := &goaktpb.Address{
Host: pid.Address().Host(),
Expand Down Expand Up @@ -710,10 +703,7 @@ func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message prot
return nil, err
}

remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort()))
if err != nil {
return nil, err
}
remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort()))

senderAddress := pid.Address()
sender := &goaktpb.Address{
Expand Down Expand Up @@ -798,10 +788,7 @@ func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messag
})
}

remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort()))
if err != nil {
return err
}
remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort()))

stream := remoteService.RemoteTell(ctx)
for _, request := range requests {
Expand Down Expand Up @@ -851,11 +838,7 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message
})
}

remoteService, err := pid.remotingClient(to.GetHost(), int(to.GetPort()))
if err != nil {
return nil, err
}

remoteService := pid.remotingClient(to.GetHost(), int(to.GetPort()))
stream := remoteService.RemoteAsk(ctx)
errc := make(chan error, 1)

Expand Down Expand Up @@ -897,42 +880,31 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message

// RemoteStop stops an actor on a remote node
func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error {
remoteService, err := pid.remotingClient(host, port)
if err != nil {
return err
}

remoteService := pid.remotingClient(host, port)
request := connect.NewRequest(&internalpb.RemoteStopRequest{
Host: host,
Port: int32(port),
Name: name,
})

if _, err = remoteService.RemoteStop(ctx, request); err != nil {
if _, err := remoteService.RemoteStop(ctx, request); err != nil {
code := connect.CodeOf(err)
if code == connect.CodeNotFound {
return nil
}
return err
}

return nil
}

// RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error {
remoteService, err := pid.remotingClient(host, port)
if err != nil {
return err
}

remoteService := pid.remotingClient(host, port)
request := connect.NewRequest(&internalpb.RemoteSpawnRequest{
Host: host,
Port: int32(port),
ActorName: name,
ActorType: actorType,
})

if _, err := remoteService.RemoteSpawn(ctx, request); err != nil {
code := connect.CodeOf(err)
if code == connect.CodeFailedPrecondition {
Expand All @@ -945,31 +917,24 @@ func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, ac
}
return err
}

return nil
}

// RemoteReSpawn restarts an actor on a remote node.
func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error {
remoteService, err := pid.remotingClient(host, port)
if err != nil {
return err
}

remoteService := pid.remotingClient(host, port)
request := connect.NewRequest(&internalpb.RemoteReSpawnRequest{
Host: host,
Port: int32(port),
Name: name,
})

if _, err = remoteService.RemoteReSpawn(ctx, request); err != nil {
if _, err := remoteService.RemoteReSpawn(ctx, request); err != nil {
code := connect.CodeOf(err)
if code == connect.CodeNotFound {
return nil
}
return err
}

return nil
}

Expand Down Expand Up @@ -1410,7 +1375,7 @@ func (pid *PID) toDeadletterQueue(receiveCtx *ReceiveContext, err error) {

// registerMetrics register the PID metrics with OTel instrumentation.
func (pid *PID) registerMetrics() error {
meter := pid.telemetry.Meter
meter := pid.telemetry.Meter()
metrics := pid.metrics
_, err := meter.RegisterCallback(func(_ context.Context, observer otelmetric.Observer) error {
observer.ObserveInt64(metrics.ChildrenCount(), pid.childrenCount.Load())
Expand All @@ -1427,37 +1392,30 @@ func (pid *PID) registerMetrics() error {
}

// clientOptions returns the gRPC client connections options
func (pid *PID) clientOptions() ([]connect.ClientOption, error) {
func (pid *PID) clientOptions() []connect.ClientOption {
var interceptor *otelconnect.Interceptor
var err error
if pid.metricEnabled.Load() {
interceptor, err = otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(pid.telemetry.TracerProvider),
otelconnect.WithMeterProvider(pid.telemetry.MeterProvider))
if err != nil {
return nil, fmt.Errorf("failed to initialize observability feature: %w", err)
}
// no need to handle the error because a NoOp trace and meter provider will be
// returned by the telemetry engine when none is provided
interceptor, _ = otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(pid.telemetry.TraceProvider()),
otelconnect.WithMeterProvider(pid.telemetry.MeterProvider()))
}

var clientOptions []connect.ClientOption
if interceptor != nil {
clientOptions = append(clientOptions, connect.WithInterceptors(interceptor))
}
return clientOptions, err
return clientOptions
}

// remotingClient returns an instance of the Remote Service client
func (pid *PID) remotingClient(host string, port int) (internalpbconnect.RemotingServiceClient, error) {
clientOptions, err := pid.clientOptions()
if err != nil {
return nil, err
}

func (pid *PID) remotingClient(host string, port int) internalpbconnect.RemotingServiceClient {
return internalpbconnect.NewRemotingServiceClient(
pid.httpClient,
http.URL(host, port),
clientOptions...,
), nil
pid.clientOptions()...,
)
}

// handleCompletion processes a long-running task and pipe the result to
Expand Down
4 changes: 2 additions & 2 deletions telemetry/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (f OptionFunc) Apply(c *Telemetry) {
// If none is specified, the global provider is used.
func WithTracerProvider(provider trace.TracerProvider) Option {
return OptionFunc(func(cfg *Telemetry) {
cfg.TracerProvider = provider
cfg.tracerProvider = provider
})
}

// WithMeterProvider specifies a tracer provider to use for creating a tracer.
// If none is specified, the global provider is used.
func WithMeterProvider(provider metric.MeterProvider) Option {
return OptionFunc(func(cfg *Telemetry) {
cfg.MeterProvider = provider
cfg.meterProvider = provider
})
}
4 changes: 2 additions & 2 deletions telemetry/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func TestOptions(t *testing.T) {
{
name: "WithTracerProvider",
option: WithTracerProvider(tracerProvider),
expectedConfig: Telemetry{TracerProvider: tracerProvider},
expectedConfig: Telemetry{tracerProvider: tracerProvider},
},
{
name: "WithMeterProvider",
option: WithMeterProvider(meterProvider),
expectedConfig: Telemetry{MeterProvider: meterProvider},
expectedConfig: Telemetry{meterProvider: meterProvider},
},
}
for _, tc := range testCases {
Expand Down
36 changes: 28 additions & 8 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ const (

// Telemetry encapsulates some settings for an actor
type Telemetry struct {
TracerProvider trace.TracerProvider
Tracer trace.Tracer
tracerProvider trace.TracerProvider
tracer trace.Tracer

MeterProvider metric.MeterProvider
Meter metric.Meter
meterProvider metric.MeterProvider
meter metric.Meter
}

// New creates an instance of Telemetry
func New(options ...Option) *Telemetry {
// create a config instance
telemetry := &Telemetry{
TracerProvider: otel.GetTracerProvider(),
MeterProvider: otel.GetMeterProvider(),
tracerProvider: otel.GetTracerProvider(),
meterProvider: otel.GetMeterProvider(),
}

// apply the various options
Expand All @@ -57,16 +57,36 @@ func New(options ...Option) *Telemetry {
}

// set the tracer
telemetry.Tracer = telemetry.TracerProvider.Tracer(
telemetry.tracer = telemetry.tracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
)

// set the meter
telemetry.Meter = telemetry.MeterProvider.Meter(
telemetry.meter = telemetry.meterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
)

return telemetry
}

// TraceProvider returns the trace provider
func (t *Telemetry) TraceProvider() trace.TracerProvider {
return t.tracerProvider
}

// MeterProvider returns the meter provider
func (t *Telemetry) MeterProvider() metric.MeterProvider {
return t.meterProvider
}

// Meter returns the meter
func (t *Telemetry) Meter() metric.Meter {
return t.meter
}

// Tracer returns the tracer
func (t *Telemetry) Tracer() trace.Tracer {
return t.tracer
}
53 changes: 53 additions & 0 deletions telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package telemetry

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

func TestTelemetry(t *testing.T) {
tel := New()
assert.NotNil(t, tel)
globalTracer := otel.GetTracerProvider()
globalMeterProvider := otel.GetMeterProvider()

actual := tel.TraceProvider()
assert.NotNil(t, actual)
assert.Equal(t, globalTracer, actual)
assert.Equal(t, globalTracer.Tracer(instrumentationName,
trace.WithInstrumentationVersion(Version())), tel.Tracer())

actualmp := tel.MeterProvider()
assert.NotNil(t, actualmp)
assert.Equal(t, globalMeterProvider, actualmp)
assert.Equal(t, globalMeterProvider.Meter(instrumentationName,
metric.WithInstrumentationVersion(Version())), tel.Meter())
}

0 comments on commit ee47a22

Please sign in to comment.