Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use ReceiverCreateSettings to configure TracerProvider #3967

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (gss *GRPCServerSettings) ToListener() (net.Listener, error) {
}

// ToServerOption maps configgrpc.GRPCServerSettings to a slice of server options for gRPC.
func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]component.Extension) ([]grpc.ServerOption, error) {
func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]component.Extension, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just file an issue for me to investigate if better to pass the Host not the extensions only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var opts []grpc.ServerOption

if gss.TLSSetting != nil {
Expand Down Expand Up @@ -346,11 +346,11 @@ func (gss *GRPCServerSettings) ToServerOption(ext map[config.ComponentID]compone
// Enable OpenTelemetry observability plugin.
// TODO: Pass construct settings to have access to Tracer.
uInterceptors = append(uInterceptors, otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
otelgrpc.WithTracerProvider(settings.TracerProvider),
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))
sInterceptors = append(sInterceptors, otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
otelgrpc.WithTracerProvider(settings.TracerProvider),
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))

Expand Down
15 changes: 8 additions & 7 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/confignet"
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestAllGrpcClientSettings(t *testing.T) {

func TestDefaultGrpcServerSettings(t *testing.T) {
gss := &GRPCServerSettings{}
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

assert.NoError(t, err)
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

assert.NoError(t, err)
Expand All @@ -126,7 +127,7 @@ func TestGrpcServerAuthSettings(t *testing.T) {
gss := &GRPCServerSettings{}

// sanity check
_, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
_, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

// test
Expand All @@ -136,7 +137,7 @@ func TestGrpcServerAuthSettings(t *testing.T) {
ext := map[config.ComponentID]component.Extension{
config.NewID("mock"): &configauth.MockAuthenticator{},
}
opts, err := gss.ToServerOption(ext)
opts, err := gss.ToServerOption(ext, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

// verify
Expand Down Expand Up @@ -302,7 +303,7 @@ func TestGRPCServerSettingsError(t *testing.T) {
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
opts, err := test.settings.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := test.settings.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

assert.Regexp(t, test.err, err)
Expand Down Expand Up @@ -457,7 +458,7 @@ func TestHttpReception(t *testing.T) {
}
ln, err := gss.ToListener()
assert.NoError(t, err)
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
Expand Down Expand Up @@ -502,7 +503,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
}
ln, err := gss.ToListener()
assert.NoError(t, err)
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{}, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})
Expand Down
4 changes: 2 additions & 2 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func WithErrorHandler(e middleware.ErrorHandler) ToServerOption {
}

// ToServer creates an http.Server from settings object.
func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOption) *http.Server {
func (hss *HTTPServerSettings) ToServer(handler http.Handler, settings component.TelemetrySettings, opts ...ToServerOption) *http.Server {
serverOpts := &toServerOptions{}
for _, o := range opts {
o(serverOpts)
Expand All @@ -218,7 +218,7 @@ func (hss *HTTPServerSettings) ToServer(handler http.Handler, opts ...ToServerOp
handler = otelhttp.NewHandler(
handler,
"",
otelhttp.WithTracerProvider(otel.GetTracerProvider()),
otelhttp.WithTracerProvider(settings.TracerProvider),
otelhttp.WithPropagators(otel.GetTextMapPropagator()),
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
return r.URL.Path
Expand Down
9 changes: 5 additions & 4 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configtls"
Expand Down Expand Up @@ -390,7 +391,7 @@ func TestHttpReception(t *testing.T) {
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, errWrite := fmt.Fprint(w, "test")
assert.NoError(t, errWrite)
}))
}), componenttest.NewNopTelemetrySettings())

go func() {
_ = s.Serve(ln)
Expand Down Expand Up @@ -469,7 +470,7 @@ func TestHttpCors(t *testing.T) {
assert.NoError(t, err)
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
}), componenttest.NewNopTelemetrySettings())
go func() {
_ = s.Serve(ln)
}()
Expand Down Expand Up @@ -505,7 +506,7 @@ func TestHttpCorsInvalidSettings(t *testing.T) {
}

// This effectively does not enable CORS but should also not cause an error
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
s := hss.ToServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), componenttest.NewNopTelemetrySettings())
require.NotNil(t, s)
require.NoError(t, s.Close())
}
Expand Down Expand Up @@ -547,7 +548,7 @@ func ExampleHTTPServerSettings() {
settings := HTTPServerSettings{
Endpoint: ":443",
}
s := settings.ToServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}))
s := settings.ToServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}), componenttest.NewNopTelemetrySettings())
l, err := settings.ToListener()
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func createTracesReceiver(
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), set.Logger)
return newOtlpReceiver(cfg.(*Config), set)
})

if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(nextConsumer); err != nil {
Expand All @@ -90,7 +90,7 @@ func createMetricsReceiver(
consumer consumer.Metrics,
) (component.MetricsReceiver, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), set.Logger)
return newOtlpReceiver(cfg.(*Config), set)
})

if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(consumer); err != nil {
Expand All @@ -107,7 +107,7 @@ func createLogReceiver(
consumer consumer.Logs,
) (component.LogsReceiver, error) {
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), set.Logger)
return newOtlpReceiver(cfg.(*Config), set)
})

if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(consumer); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/gorilla/mux"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -53,16 +52,16 @@ type otlpReceiver struct {
logReceiver *logs.Receiver
shutdownWG sync.WaitGroup

logger *zap.Logger
settings component.ReceiverCreateSettings
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver {
func newOtlpReceiver(cfg *Config, settings component.ReceiverCreateSettings) *otlpReceiver {
r := &otlpReceiver{
cfg: cfg,
logger: logger,
cfg: cfg,
settings: settings,
}
if cfg.HTTP != nil {
r.httpMux = mux.NewRouter()
Expand All @@ -72,7 +71,7 @@ func newOtlpReceiver(cfg *Config, logger *zap.Logger) *otlpReceiver {
}

func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error {
r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)
r.settings.Logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint)

gln, err := cfg.ToListener()
if err != nil {
Expand All @@ -90,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
}

func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error {
r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
r.settings.Logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint)
var hln net.Listener
hln, err := r.cfg.HTTP.ToListener()
if err != nil {
Expand All @@ -111,7 +110,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
var err error
if r.cfg.GRPC != nil {
var opts []grpc.ServerOption
opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions())
opts, err = r.cfg.GRPC.ToServerOption(host.GetExtensions(), r.settings.TelemetrySettings)
if err != nil {
return err
}
Expand All @@ -137,14 +136,15 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
if r.cfg.HTTP != nil {
r.serverHTTP = r.cfg.HTTP.ToServer(
r.httpMux,
r.settings.TelemetrySettings,
confighttp.WithErrorHandler(errorHandler),
)
err = r.startHTTPServer(r.cfg.HTTP, host)
if err != nil {
return err
}
if r.cfg.HTTP.Endpoint == defaultHTTPEndpoint {
r.logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint)
r.settings.Logger.Info("Setting up a second HTTP listener on legacy endpoint " + legacyHTTPEndpoint)

// Copy the config.
cfgLegacyHTTP := r.cfg.HTTP
Expand Down