Skip to content

Commit

Permalink
use ReceiverCreateSettings to configure TracerProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Boten committed Sep 8, 2021
1 parent e8bd3be commit ec7c062
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 28 deletions.
4 changes: 4 additions & 0 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package component

import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
Expand All @@ -26,4 +27,7 @@ type TelemetrySettings struct {

// TracerProvider that the factory can pass to other instrumented third-party libraries.
TracerProvider trace.TracerProvider

// MeterProvider that the factory can pass to other instrumented third-party libraries.
MeterProvider metric.MeterProvider
}
8 changes: 5 additions & 3 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (gss *GRPCServerSettings) ToListener() (net.Listener, error) {
}

// ToServerOption maps configgrpc.GRPCServerSettings to a slice of server options for gRPC.
func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]component.Extension) ([]grpc.ServerOption, error) {
func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]component.Extension, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
var opts []grpc.ServerOption

if gss.TLSSetting != nil {
Expand Down Expand Up @@ -346,11 +346,13 @@ func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]compone
// Enable OpenTelemetry observability plugin.
// TODO: Pass construct settings to have access to Tracer.
uInterceptors = append(uInterceptors, otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
otelgrpc.WithTracerProvider(settings.TracerProvider),
// otelgrpc.WithMeterProvider(settings.MeterProvider),
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))
sInterceptors = append(sInterceptors, otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
otelgrpc.WithTracerProvider(settings.TracerProvider),
// otelgrpc.WithMeterProvider(settings.MeterProvider),
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))

Expand Down
15 changes: 8 additions & 7 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/confignet"
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestAllGrpcClientSettings(t *testing.T) {

func TestDefaultGrpcServerSettings(t *testing.T) {
gss := &GRPCServerSettings{}
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

assert.NoError(t, err)
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

assert.NoError(t, err)
Expand All @@ -126,7 +127,7 @@ func TestGrpcServerAuthSettings(t *testing.T) {
gss := &GRPCServerSettings{}

// sanity check
_, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
_, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

// test
Expand All @@ -136,7 +137,7 @@ func TestGrpcServerAuthSettings(t *testing.T) {
ext := map[config.ComponentID]component.Extension{
config.NewID("mock"): &configauth.MockAuthenticator{},
}
opts, err := gss.ToServerOption(ext)
opts, err := gss.ToServerOption(ext, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

// verify
Expand Down Expand Up @@ -302,7 +303,7 @@ func TestGRPCServerSettingsError(t *testing.T) {
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
opts, err := test.settings.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := test.settings.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

assert.Regexp(t, test.err, err)
Expand Down Expand Up @@ -457,7 +458,7 @@ func TestHttpReception(t *testing.T) {
}
ln, err := gss.ToListener()
assert.NoError(t, err)
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
Expand Down Expand Up @@ -502,7 +503,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
}
ln, err := gss.ToListener()
assert.NoError(t, err)
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
Expand Down
5 changes: 3 additions & 2 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func WithErrorHandler(e middleware.ErrorHandler) ToServerOption {
}

// ToServer creates an http.Server from settings object.
func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOption) *http.Server {
func (hss *HTTPServerSettings) ToServer(handler http.Handler, settings component.TelemetrySettings, opts ...ToServerOption) *http.Server {
serverOpts := &toServerOptions{}
for _, o := range opts {
o(serverOpts)
Expand All @@ -218,7 +218,8 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp
handler = otelhttp.NewHandler(
handler,
"",
otelhttp.WithTracerProvider(otel.GetTracerProvider()),
otelhttp.WithTracerProvider(settings.TracerProvider),
// otelhttp.WithMeterProvider(settings.MeterProvider),
otelhttp.WithPropagators(otel.GetTextMapPropagator()),
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
return r.URL.Path
Expand Down
9 changes: 5 additions & 4 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configtls"
Expand Down Expand Up @@ -390,7 +391,7 @@ func TestHttpReception(t *testing.T) {
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, errWrite := fmt.Fprint(w, "test")
assert.NoError(t, errWrite)
}))
}), componenttest.NewNopTelemetrySettings())

go func() {
_ = s.Serve(ln)
Expand Down Expand Up @@ -469,7 +470,7 @@ func TestHttpCors(t *testing.T) {
assert.NoError(t, err)
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
}), componenttest.NewNopTelemetrySettings())
go func() {
_ = s.Serve(ln)
}()
Expand Down Expand Up @@ -505,7 +506,7 @@ func TestHttpCorsInvalidSettings(t *testing.T) {
}

// This effectively does not enable CORS but should also not cause an error
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), componenttest.NewNopTelemetrySettings())
require.NotNil(t, s)
require.NoError(t, s.Close())
}
Expand Down Expand Up @@ -547,7 +548,7 @@ func ExampleHTTPServerSettings() {
settings := HTTPServerSettings{
Endpoint: ":443",
}
s := settings.ToServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}))
s := settings.ToServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}), componenttest.NewNopTelemetrySettings())
l, err := settings.ToListener()
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func createTracesReceiver(
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), set.Logger)
return newOtlpReceiver(cfg.(*Config), set)
})

if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil {
Expand All @@ -90,7 +90,7 @@ func createMetricsReceiver(
consumer consumer.Metrics,
) (component.MetricsReceiver, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), set.Logger)
return newOtlpReceiver(cfg.(*Config), set)
})

if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil {
Expand All @@ -107,7 +107,7 @@ func createLogReceiver(
consumer consumer.Logs,
) (component.LogsReceiver, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), set.Logger)
return newOtlpReceiver(cfg.(*Config), set)
})

if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/gorilla/mux"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -53,16 +52,16 @@ type otlpReceiver struct {
logReceiver *logs.Receiver
shutdownWG sync.WaitGroup

logger *zap.Logger
settings component.ReceiverCreateSettings
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver {
func newOtlpReceiver(cfg *Config, settings component.ReceiverCreateSettings) *otlpReceiver {
r := &otlpReceiver{
cfg: cfg,
logger: logger,
cfg: cfg,
settings: settings,
}
if cfg.HTTP != nil {
r.httpMux = mux.NewRouter()
Expand All @@ -72,7 +71,7 @@ func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver {
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)
r.settings.Logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)

gln, err := cfg.ToListener()
if err != nil {
Expand All @@ -90,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
}

func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error {
r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
r.settings.Logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
var hln net.Listener
hln, err := r.cfg.HTTP.ToListener()
if err != nil {
Expand All @@ -111,7 +110,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
var err error
if r.cfg.GRPC != nil {
var opts []grpc.ServerOption
opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions())
opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions(), r.settings.TelemetrySettings)
if err != nil {
return err
}
Expand All @@ -137,14 +136,15 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.httpMux,
r.settings.TelemetrySettings,
confighttp.WithErrorHandler(errorHandler),
)
err = r.startHTTPServer(r.cfg.HTTP, host)
if err != nil {
return err
}
if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint {
r.logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint)
r.settings.Logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint)

// Copy the config.
cfgLegacyHTTP := r.cfg.HTTP
Expand Down

0 comments on commit ec7c062

Please sign in to comment.