diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 2d04a8a4e2f..1819f515855 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -270,7 +270,7 @@ func (gss *GRPCServerSettings) ToListener() (net.Listener, error) { } // ToServerOption maps configgrpc.GRPCServerSettings to a slice of server options for gRPC. -func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]component.Extension) ([]grpc.ServerOption, error) { +func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]component.Extension, settings component.TelemetrySettings) ([]grpc.ServerOption, error) { var opts []grpc.ServerOption if gss.TLSSetting != nil { @@ -346,11 +346,11 @@ func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]compone // Enable OpenTelemetry observability plugin. // TODO: Pass construct settings to have access to Tracer. uInterceptors = append(uInterceptors, otelgrpc.UnaryServerInterceptor( - otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + otelgrpc.WithTracerProvider(settings.TracerProvider), otelgrpc.WithPropagators(otel.GetTextMapPropagator()), )) sInterceptors = append(sInterceptors, otelgrpc.StreamServerInterceptor( - otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + otelgrpc.WithTracerProvider(settings.TracerProvider), otelgrpc.WithPropagators(otel.GetTextMapPropagator()), )) diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index b3c6f8b6ef5..a1c0a676182 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/confignet" @@ -80,7 +81,7 @@ func TestAllGrpcClientSettings(t *testing.T) { func TestDefaultGrpcServerSettings(t *testing.T) { gss := &GRPCServerSettings{} - opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}) + opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings()) _ = grpc.NewServer(opts...) assert.NoError(t, err) @@ -115,7 +116,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) { }, }, } - opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}) + opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings()) _ = grpc.NewServer(opts...) assert.NoError(t, err) @@ -126,7 +127,7 @@ func TestGrpcServerAuthSettings(t *testing.T) { gss := &GRPCServerSettings{} // sanity check - _, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}) + _, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings()) require.NoError(t, err) // test @@ -136,7 +137,7 @@ func TestGrpcServerAuthSettings(t *testing.T) { ext := map[config.ComponentID]component.Extension{ config.NewID("mock"): &configauth.MockAuthenticator{}, } - opts, err := gss.ToServerOption(ext) + opts, err := gss.ToServerOption(ext, componenttest.NewNopTelemetrySettings()) _ = grpc.NewServer(opts...) // verify @@ -302,7 +303,7 @@ func TestGRPCServerSettingsError(t *testing.T) { } for _, test := range tests { t.Run(test.err, func(t *testing.T) { - opts, err := test.settings.ToServerOption(map[config.ComponentID]component.Extension{}) + opts, err := test.settings.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings()) _ = grpc.NewServer(opts...) assert.Regexp(t, test.err, err) @@ -457,7 +458,7 @@ func TestHttpReception(t *testing.T) { } ln, err := gss.ToListener() assert.NoError(t, err) - opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}) + opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) s := grpc.NewServer(opts...) otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{}) @@ -502,7 +503,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) { } ln, err := gss.ToListener() assert.NoError(t, err) - opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}) + opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) s := grpc.NewServer(opts...) otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{}) diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 039c210043e..e00f161a7bd 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -192,7 +192,7 @@ func WithErrorHandler(e middleware.ErrorHandler) ToServerOption { } // ToServer creates an http.Server from settings object. -func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOption) *http.Server { +func (hss *HTTPServerSettings) ToServer(handler http.Handler, settings component.TelemetrySettings, opts ...ToServerOption) *http.Server { serverOpts := &toServerOptions{} for _, o := range opts { o(serverOpts) @@ -218,7 +218,7 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp handler = otelhttp.NewHandler( handler, "", - otelhttp.WithTracerProvider(otel.GetTracerProvider()), + otelhttp.WithTracerProvider(settings.TracerProvider), otelhttp.WithPropagators(otel.GetTextMapPropagator()), otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string { return r.URL.Path diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index d7d11e21b0c..82c26900a50 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configtls" @@ -390,7 +391,7 @@ func TestHttpReception(t *testing.T) { s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, errWrite := fmt.Fprint(w, "test") assert.NoError(t, errWrite) - })) + }), componenttest.NewNopTelemetrySettings()) go func() { _ = s.Serve(ln) @@ -469,7 +470,7 @@ func TestHttpCors(t *testing.T) { assert.NoError(t, err) s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - })) + }), componenttest.NewNopTelemetrySettings()) go func() { _ = s.Serve(ln) }() @@ -505,7 +506,7 @@ func TestHttpCorsInvalidSettings(t *testing.T) { } // This effectively does not enable CORS but should also not cause an error - s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), componenttest.NewNopTelemetrySettings()) require.NotNil(t, s) require.NoError(t, s.Close()) } @@ -547,7 +548,7 @@ func ExampleHTTPServerSettings() { settings := HTTPServerSettings{ Endpoint: ":443", } - s := settings.ToServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + s := settings.ToServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}), componenttest.NewNopTelemetrySettings()) l, err := settings.ToListener() if err != nil { panic(err) diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 0618c73d968..af77a700080 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -73,7 +73,7 @@ func createTracesReceiver( nextConsumer consumer.Traces, ) (component.TracesReceiver, error) { r := receivers.GetOrAdd(cfg, func() component.Component { - return newOtlpReceiver(cfg.(*Config), set.Logger) + return newOtlpReceiver(cfg.(*Config), set) }) if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil { @@ -90,7 +90,7 @@ func createMetricsReceiver( consumer consumer.Metrics, ) (component.MetricsReceiver, error) { r := receivers.GetOrAdd(cfg, func() component.Component { - return newOtlpReceiver(cfg.(*Config), set.Logger) + return newOtlpReceiver(cfg.(*Config), set) }) if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil { @@ -107,7 +107,7 @@ func createLogReceiver( consumer consumer.Logs, ) (component.LogsReceiver, error) { r := receivers.GetOrAdd(cfg, func() component.Component { - return newOtlpReceiver(cfg.(*Config), set.Logger) + return newOtlpReceiver(cfg.(*Config), set) }) if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil { diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 2a954c18c31..ccbe9214501 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -21,7 +21,6 @@ import ( "sync" "github.com/gorilla/mux" - "go.uber.org/zap" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" @@ -53,16 +52,16 @@ type otlpReceiver struct { logReceiver *logs.Receiver shutdownWG sync.WaitGroup - logger *zap.Logger + settings component.ReceiverCreateSettings } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver { +func newOtlpReceiver(cfg *Config, settings component.ReceiverCreateSettings) *otlpReceiver { r := &otlpReceiver{ - cfg: cfg, - logger: logger, + cfg: cfg, + settings: settings, } if cfg.HTTP != nil { r.httpMux = mux.NewRouter() @@ -72,7 +71,7 @@ func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver { } func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { - r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint) + r.settings.Logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint) gln, err := cfg.ToListener() if err != nil { @@ -90,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host } func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error { - r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint) + r.settings.Logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint) var hln net.Listener hln, err := r.cfg.HTTP.ToListener() if err != nil { @@ -111,7 +110,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { var err error if r.cfg.GRPC != nil { var opts []grpc.ServerOption - opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions()) + opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions(), r.settings.TelemetrySettings) if err != nil { return err } @@ -137,6 +136,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { if r.cfg.HTTP != nil { r.serverHTTP = r.cfg.HTTP.ToServer( r.httpMux, + r.settings.TelemetrySettings, confighttp.WithErrorHandler(errorHandler), ) err = r.startHTTPServer(r.cfg.HTTP, host) @@ -144,7 +144,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error { return err } if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint { - r.logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint) + r.settings.Logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint) // Copy the config. cfgLegacyHTTP := r.cfg.HTTP