From 1cc062a736dce3639358d7c87222d83d3bcaffc5 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Sat, 18 May 2024 11:05:02 -0700 Subject: [PATCH 1/3] Add implementation of otlploggrpc configuration --- exporters/otlp/otlplog/otlploggrpc/config.go | 102 +++-- exporters/otlp/otlplog/otlploggrpc/go.mod | 2 +- .../otlploggrpc/internal/otlpconf/options.go | 189 ++++++++ .../internal/otlpconf/options_test.go | 402 ++++++++++++++++++ .../otlploggrpc/internal/otlpconf/setting.go | 394 +++++++++++++++++ 5 files changed, 1058 insertions(+), 31 deletions(-) create mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go create mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go create mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go diff --git a/exporters/otlp/otlplog/otlploggrpc/config.go b/exporters/otlp/otlplog/otlploggrpc/config.go index 25635aabdaa..c77e06da433 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config.go +++ b/exporters/otlp/otlplog/otlploggrpc/config.go @@ -4,28 +4,58 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" import ( + "fmt" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" ) +type wrappedOption struct { + otlpconf.Option +} + +func (w wrappedOption) applyOption(cfg config) config { + cfg.Config = w.Option.ApplyOption(cfg.Config) + return cfg +} + +type fnOpt func(config) config + +func (f fnOpt) applyOption(c config) config { return f(c) } + // Option applies an option to the Exporter. type Option interface { - applyHTTPOption(config) config + applyOption(config) config } type config struct { - // TODO: implement. + otlpconf.Config + + // gRPC configurations + gRPCCredentials otlpconf.Setting[credentials.TransportCredentials] + serviceConfig otlpconf.Setting[string] + reconnectionPeriod otlpconf.Setting[time.Duration] + dialOptions otlpconf.Setting[[]grpc.DialOption] + gRPCConn otlpconf.Setting[*grpc.ClientConn] } func newConfig(options []Option) config { var c config for _, opt := range options { - c = opt.applyHTTPOption(c) + c = opt.applyOption(c) } + + c.Config = otlpconf.LoadConfig(c.Config) + + if !c.gRPCCredentials.Set && c.Config.TLSCfg.Set { + c.gRPCCredentials = otlpconf.NewSetting(credentials.NewTLS(c.Config.TLSCfg.Value)) + } + return c } @@ -51,8 +81,7 @@ type RetryConfig retry.Config // // This option has no effect if WithGRPCConn is used. func WithInsecure() Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithInsecure()} } // WithEndpoint sets the target endpoint the Exporter will connect to. @@ -70,8 +99,7 @@ func WithInsecure() Option { // // This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithEndpoint(endpoint)} } // WithEndpointURL sets the target endpoint URL the Exporter will connect to. @@ -91,8 +119,7 @@ func WithEndpoint(endpoint string) Option { // // This option has no effect if WithGRPCConn is used. func WithEndpointURL(u string) Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithEndpointURL(u)} } // WithReconnectionPeriod set the minimum amount of time between connection @@ -100,8 +127,10 @@ func WithEndpointURL(u string) Option { // // This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.reconnectionPeriod = otlpconf.NewSetting(rp) + return c + }) } // WithCompressor sets the compressor the gRPC client uses. @@ -118,8 +147,7 @@ func WithReconnectionPeriod(rp time.Duration) Option { // // This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithCompression(compressorToCompression(compressor))} } // WithHeaders will send the provided headers with each gRPC requests. @@ -134,8 +162,7 @@ func WithCompressor(compressor string) Option { // By default, if an environment variable is not set, and this option is not // passed, no user headers will be set. func WithHeaders(headers map[string]string) Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithHeaders(headers)} } // WithTLSCredentials sets the gRPC connection to use creds. @@ -150,17 +177,21 @@ func WithHeaders(headers map[string]string) Option { // passed, no TLS credentials will be used. // // This option has no effect if WithGRPCConn is used. -func WithTLSCredentials(_ credentials.TransportCredentials) Option { - // TODO: implement. - return nil +func WithTLSCredentials(credential credentials.TransportCredentials) Option { + return fnOpt(func(c config) config { + c.gRPCCredentials = otlpconf.NewSetting(credential) + return c + }) } // WithServiceConfig defines the default gRPC service config used. // // This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { - // TODO: implement. - return nil + return fnOpt(func(c config) config { + c.serviceConfig = otlpconf.NewSetting(serviceConfig) + return c + }) } // WithDialOption sets explicit grpc.DialOptions to use when establishing a @@ -171,9 +202,11 @@ func WithServiceConfig(serviceConfig string) Option { // grpc.DialOptions are ignored. // // This option has no effect if WithGRPCConn is used. -func WithDialOption(_ ...grpc.DialOption) Option { - // TODO: implement. - return nil +func WithDialOption(opts ...grpc.DialOption) Option { + return fnOpt(func(c config) config { + c.dialOptions = otlpconf.NewSetting(opts) + return c + }) } // WithGRPCConn sets conn as the gRPC ClientConn used for all communication. @@ -184,9 +217,11 @@ func WithDialOption(_ ...grpc.DialOption) Option { // // It is the callers responsibility to close the passed conn. The Exporter // Shutdown method will not close this connection. -func WithGRPCConn(_ *grpc.ClientConn) Option { - // TODO: implement. - return nil +func WithGRPCConn(conn *grpc.ClientConn) Option { + return fnOpt(func(c config) config { + c.gRPCConn = otlpconf.NewSetting(conn) + return c + }) } // WithTimeout sets the max amount of time an Exporter will attempt an export. @@ -204,8 +239,7 @@ func WithGRPCConn(_ *grpc.ClientConn) Option { // By default, if an environment variable is not set, and this option is not // passed, a timeout of 10 seconds will be used. func WithTimeout(duration time.Duration) Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithTimeout(duration)} } // WithRetry sets the retry policy for transient retryable errors that are @@ -222,6 +256,14 @@ func WithTimeout(duration time.Duration) Option { // 5 seconds after receiving a retryable error and increase exponentially // after each error for no more than a total time of 1 minute. func WithRetry(settings RetryConfig) Option { - // TODO: implement. - return nil + return wrappedOption{Option: otlpconf.WithRetry(retry.Config(settings))} +} + +func compressorToCompression(compressor string) otlpconf.Compression { + if compressor == "gzip" { + return otlpconf.GzipCompression + } + + otel.Handle(fmt.Errorf("invalid compression type: '%s', using no compression as default", compressor)) + return otlpconf.NoCompression } diff --git a/exporters/otlp/otlplog/otlploggrpc/go.mod b/exporters/otlp/otlplog/otlploggrpc/go.mod index 832392f6b64..5ed240fde0b 100644 --- a/exporters/otlp/otlplog/otlploggrpc/go.mod +++ b/exporters/otlp/otlplog/otlploggrpc/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.26.0 go.opentelemetry.io/otel/sdk/log v0.2.0-alpha google.golang.org/grpc v1.64.0 ) @@ -14,7 +15,6 @@ require ( github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/otel v1.26.0 // indirect go.opentelemetry.io/otel/log v0.2.0-alpha // indirect go.opentelemetry.io/otel/metric v1.26.0 // indirect go.opentelemetry.io/otel/sdk v1.26.0 // indirect diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go new file mode 100644 index 00000000000..7392152251d --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go @@ -0,0 +1,189 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlpconf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" + +import ( + "crypto/tls" + "net/url" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" + "go.opentelemetry.io/otel/internal/global" +) + +// WithInsecure disables client transport security for the Exporter's HTTP +// connection. +// +// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT +// environment variable is set, and this option is not passed, that variable +// value will be used to determine client security. If the endpoint has a +// scheme of "http" or "unix" client security will be disabled. If both are +// set, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, client security will be used. +func WithInsecure() Option { + return fnOpt(func(c Config) Config { + c.Insecure = NewSetting(true) + return c + }) +} + +// WithEndpoint sets the target endpoint the Exporter will connect to. This +// endpoint is specified as a host and optional port, no path or scheme should +// be included (see WithInsecure and WithURLPath). +// +// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT +// environment variable is set, and this option is not passed, that variable +// value will be used. If both are set, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT +// will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, "localhost:4318" will be used. +func WithEndpoint(endpoint string) Option { + return fnOpt(func(c Config) Config { + c.Endpoint = NewSetting(endpoint) + return c + }) +} + +// WithEndpointURL sets the target endpoint URL the Exporter will connect to. +// +// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT +// environment variable is set, and this option is not passed, that variable +// value will be used. If both are set, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT +// will take precedence. +// +// If both this option and WithEndpoint are used, the last used option will +// take precedence. +// +// If an invalid URL is provided, the default value will be kept. +// +// By default, if an environment variable is not set, and this option is not +// passed, "localhost:4318" will be used. +func WithEndpointURL(rawURL string) Option { + u, err := url.Parse(rawURL) + if err != nil { + global.Error(err, "otlpmetric: parse endpoint url", "url", rawURL) + return fnOpt(func(c Config) Config { return c }) + } + return fnOpt(func(c Config) Config { + c.Endpoint = NewSetting(u.Host) + c.Path = NewSetting(u.Path) + if u.Scheme != "https" { + c.Insecure = NewSetting(true) + } else { + c.Insecure = NewSetting(false) + } + return c + }) +} + +// WithCompression sets the compression strategy the Exporter will use to +// compress the HTTP body. +// +// If the OTEL_EXPORTER_OTLP_COMPRESSION or +// OTEL_EXPORTER_OTLP_LOGS_COMPRESSION environment variable is set, and +// this option is not passed, that variable value will be used. That value can +// be either "none" or "gzip". If both are set, +// OTEL_EXPORTER_OTLP_LOGS_COMPRESSION will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, no compression strategy will be used. +func WithCompression(compression Compression) Option { + return fnOpt(func(c Config) Config { + c.Compression = NewSetting(compression) + return c + }) +} + +// WithURLPath sets the URL path the Exporter will send requests to. +// +// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT +// environment variable is set, and this option is not passed, the path +// contained in that variable value will be used. If both are set, +// OTEL_EXPORTER_OTLP_LOGS_ENDPOINT will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, "/v1/logs" will be used. +func WithURLPath(urlPath string) Option { + return fnOpt(func(c Config) Config { + c.Path = NewSetting(urlPath) + return c + }) +} + +// WithTLSClientConfig sets the TLS Configuration the Exporter will use for +// HTTP requests. +// +// If the OTEL_EXPORTER_OTLP_CERTIFICATE or +// OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE environment variable is set, and +// this option is not passed, that variable value will be used. The value will +// be parsed the filepath of the TLS certificate chain to use. If both are +// set, OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, the system default Configuration is used. +func WithTLSClientConfig(tlsCfg *tls.Config) Option { + return fnOpt(func(c Config) Config { + c.TLSCfg = NewSetting(tlsCfg.Clone()) + return c + }) +} + +// WithHeaders will send the provided headers with each HTTP requests. +// +// If the OTEL_EXPORTER_OTLP_HEADERS or OTEL_EXPORTER_OTLP_LOGS_HEADERS +// environment variable is set, and this option is not passed, that variable +// value will be used. The value will be parsed as a list of key value pairs. +// These pairs are expected to be in the W3C Correlation-Context format +// without additional semi-colon delimited metadata (i.e. "k1=v1,k2=v2"). If +// both are set, OTEL_EXPORTER_OTLP_LOGS_HEADERS will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, no user headers will be set. +func WithHeaders(headers map[string]string) Option { + return fnOpt(func(c Config) Config { + c.Headers = NewSetting(headers) + return c + }) +} + +// WithTimeout sets the max amount of time an Exporter will attempt an export. +// +// This takes precedence over any retry settings defined by WithRetry. Once +// this time limit has been reached the export is abandoned and the log data is +// dropped. +// +// If the OTEL_EXPORTER_OTLP_TIMEOUT or OTEL_EXPORTER_OTLP_LOGS_TIMEOUT +// environment variable is set, and this option is not passed, that variable +// value will be used. The value will be parsed as an integer representing the +// timeout in milliseconds. If both are set, +// OTEL_EXPORTER_OTLP_LOGS_TIMEOUT will take precedence. +// +// By default, if an environment variable is not set, and this option is not +// passed, a timeout of 10 seconds will be used. +func WithTimeout(duration time.Duration) Option { + return fnOpt(func(c Config) Config { + c.Timeout = NewSetting(duration) + return c + }) +} + +// WithRetry sets the retry policy for transient retryable errors that are +// returned by the target endpoint. +// +// If the target endpoint responds with not only a retryable error, but +// explicitly returns a backoff time in the response, that time will take +// precedence over these settings. +// +// If unset, the default retry policy will be used. It will retry the export +// 5 seconds after receiving a retryable error and increase exponentially +// after each error for no more than a total time of 1 minute. +func WithRetry(rc retry.Config) Option { + return fnOpt(func(c Config) Config { + c.RetryCfg = NewSetting(rc) + return c + }) +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go new file mode 100644 index 00000000000..57edc93afb6 --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go @@ -0,0 +1,402 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlpconf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" +) + +const ( + weakCertificate = ` +-----BEGIN CERTIFICATE----- +MIIBhzCCASygAwIBAgIRANHpHgAWeTnLZpTSxCKs0ggwCgYIKoZIzj0EAwIwEjEQ +MA4GA1UEChMHb3RlbC1nbzAeFw0yMTA0MDExMzU5MDNaFw0yMTA0MDExNDU5MDNa +MBIxEDAOBgNVBAoTB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS9 +nWSkmPCxShxnp43F+PrOtbGV7sNfkbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0Z +sJCLHGogQsYnWJBXUZOVo2MwYTAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI +KwYBBQUHAwEwDAYDVR0TAQH/BAIwADAsBgNVHREEJTAjgglsb2NhbGhvc3SHEAAA +AAAAAAAAAAAAAAAAAAGHBH8AAAEwCgYIKoZIzj0EAwIDSQAwRgIhANwZVVKvfvQ/ +1HXsTvgH+xTQswOwSSKYJ1cVHQhqK7ZbAiEAus8NxpTRnp5DiTMuyVmhVNPB+bVH +Lhnm4N/QDk5rek0= +-----END CERTIFICATE----- +` + weakPrivateKey = ` +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgN8HEXiXhvByrJ1zK +SFT6Y2l2KqDWwWzKf+t4CyWrNKehRANCAAS9nWSkmPCxShxnp43F+PrOtbGV7sNf +kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV +-----END PRIVATE KEY----- +` +) + +// This is only for testing, as the package that is using this utility has its own newConfig function. +func newConfig(options []Option) Config { + var c Config + for _, opt := range options { + c = opt.ApplyOption(c) + } + + c = LoadConfig(c) + + return c +} + +func newTLSConf(cert, key []byte) (*tls.Config, error) { + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(cert); !ok { + return nil, errors.New("failed to append certificate to the cert pool") + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + crts := []tls.Certificate{crt} + return &tls.Config{RootCAs: cp, Certificates: crts}, nil +} + +func TestNewConfig(t *testing.T) { + orig := readFile + readFile = func() func(name string) ([]byte, error) { + index := map[string][]byte{ + "cert_path": []byte(weakCertificate), + "key_path": []byte(weakPrivateKey), + "invalid_cert": []byte("invalid certificate file."), + "invalid_key": []byte("invalid key file."), + } + return func(name string) ([]byte, error) { + b, ok := index[name] + if !ok { + err := fmt.Errorf("file does not exist: %s", name) + return nil, err + } + return b, nil + } + }() + t.Cleanup(func() { readFile = orig }) + + tlsCfg, err := newTLSConf([]byte(weakCertificate), []byte(weakPrivateKey)) + require.NoError(t, err, "testing TLS config") + + headers := map[string]string{"a": "A"} + rc := retry.Config{} + + testcases := []struct { + name string + options []Option + envars map[string]string + want Config + errs []string + }{ + { + name: "Defaults", + want: Config{ + Endpoint: NewSetting(defaultEndpoint), + Path: NewSetting(defaultPath), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "Options", + options: []Option{ + WithEndpoint("test"), + WithURLPath("/path"), + WithInsecure(), + WithTLSClientConfig(tlsCfg), + WithCompression(GzipCompression), + WithHeaders(headers), + WithTimeout(time.Second), + WithRetry(rc), + }, + want: Config{ + Endpoint: NewSetting("test"), + Path: NewSetting("/path"), + Insecure: NewSetting(true), + TLSCfg: NewSetting(tlsCfg), + Headers: NewSetting(headers), + Compression: NewSetting(GzipCompression), + Timeout: NewSetting(time.Second), + RetryCfg: NewSetting(rc), + }, + }, + { + name: "WithEndpointURL", + options: []Option{ + WithEndpointURL("http://test:8080/path"), + }, + want: Config{ + Endpoint: NewSetting("test:8080"), + Path: NewSetting("/path"), + Insecure: NewSetting(true), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "EndpointPrecidence", + options: []Option{ + WithEndpointURL("https://test:8080/path"), + WithEndpoint("not-test:9090"), + WithURLPath("/alt"), + WithInsecure(), + }, + want: Config{ + Endpoint: NewSetting("not-test:9090"), + Path: NewSetting("/alt"), + Insecure: NewSetting(true), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "EndpointURLPrecidence", + options: []Option{ + WithEndpoint("not-test:9090"), + WithURLPath("/alt"), + WithInsecure(), + WithEndpointURL("https://test:8080/path"), + }, + want: Config{ + Endpoint: NewSetting("test:8080"), + Path: NewSetting("/path"), + Insecure: NewSetting(false), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "LogEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: Config{ + Endpoint: NewSetting("env.endpoint:8080"), + Path: NewSetting("/prefix"), + Insecure: NewSetting(false), + TLSCfg: NewSetting(tlsCfg), + Headers: NewSetting(headers), + Compression: NewSetting(GzipCompression), + Timeout: NewSetting(15 * time.Second), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "LogEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", + }, + want: Config{ + Endpoint: NewSetting("env.endpoint"), + Path: NewSetting("/"), + Insecure: NewSetting(true), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path", + }, + want: Config{ + Endpoint: NewSetting("env.endpoint:8080"), + Path: NewSetting("/prefix/v1/logs"), + Insecure: NewSetting(true), + TLSCfg: NewSetting(tlsCfg), + Headers: NewSetting(headers), + Compression: NewSetting(NoCompression), + Timeout: NewSetting(15 * time.Second), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", + }, + want: Config{ + Endpoint: NewSetting("env.endpoint"), + Path: NewSetting(defaultPath), + Insecure: NewSetting(true), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "EnvironmentVariablesPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/path", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: Config{ + Endpoint: NewSetting("env.endpoint:8080"), + Path: NewSetting("/path"), + Insecure: NewSetting(false), + TLSCfg: NewSetting(tlsCfg), + Headers: NewSetting(headers), + Compression: NewSetting(GzipCompression), + Timeout: NewSetting(15 * time.Second), + RetryCfg: NewSetting(defaultRetryCfg), + }, + }, + { + name: "OptionsPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + options: []Option{ + WithEndpoint("test"), + WithURLPath("/path"), + WithInsecure(), + WithTLSClientConfig(tlsCfg), + WithCompression(GzipCompression), + WithHeaders(headers), + WithTimeout(time.Second), + WithRetry(rc), + }, + want: Config{ + Endpoint: NewSetting("test"), + Path: NewSetting("/path"), + Insecure: NewSetting(true), + TLSCfg: NewSetting(tlsCfg), + Headers: NewSetting(headers), + Compression: NewSetting(GzipCompression), + Timeout: NewSetting(time.Second), + RetryCfg: NewSetting(rc), + }, + }, + { + name: "InvalidEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "%invalid", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a,%ZZ=valid,key=%ZZ", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "xz", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "100 seconds", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", + }, + want: Config{ + Endpoint: NewSetting(defaultEndpoint), + Path: NewSetting(defaultPath), + Timeout: NewSetting(defaultTimeout), + RetryCfg: NewSetting(defaultRetryCfg), + }, + errs: []string{ + `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, + `failed to load TLS:`, + `certificate not added`, + `tls: failed to find any PEM data in certificate input`, + `invalid OTEL_EXPORTER_OTLP_LOGS_HEADERS value a,%ZZ=valid,key=%ZZ:`, + `invalid header: a`, + `invalid header key: %ZZ`, + `invalid header value: %ZZ`, + `invalid OTEL_EXPORTER_OTLP_LOGS_COMPRESSION value xz: unknown compression: xz`, + `invalid OTEL_EXPORTER_OTLP_LOGS_TIMEOUT value 100 seconds: strconv.Atoi: parsing "100 seconds": invalid syntax`, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + for key, value := range tc.envars { + t.Setenv(key, value) + } + + var err error + t.Cleanup(func(orig otel.ErrorHandler) func() { + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(e error) { + err = errors.Join(err, e) + })) + return func() { otel.SetErrorHandler(orig) } + }(otel.GetErrorHandler())) + c := newConfig(tc.options) + + // Do not compare pointer values. + assertTLSConfig(t, tc.want.TLSCfg, c.TLSCfg) + var emptyTLS Setting[*tls.Config] + c.TLSCfg, tc.want.TLSCfg = emptyTLS, emptyTLS + + assert.Equal(t, tc.want, c) + + for _, errMsg := range tc.errs { + assert.ErrorContains(t, err, errMsg) + } + }) + } +} + +func assertTLSConfig(t *testing.T, want, got Setting[*tls.Config]) { + t.Helper() + + assert.Equal(t, want.Set, got.Set, "setting Set") + if !want.Set { + return + } + + if want.Value == nil { + assert.Nil(t, got.Value, "*tls.Config") + return + } + require.NotNil(t, got.Value, "*tls.Config") + + if want.Value.RootCAs == nil { + assert.Nil(t, got.Value.RootCAs, "*tls.Config.RootCAs") + } else { + if assert.NotNil(t, got.Value.RootCAs, "RootCAs") { + assert.True(t, want.Value.RootCAs.Equal(got.Value.RootCAs), "RootCAs equal") + } + } + assert.Equal(t, want.Value.Certificates, got.Value.Certificates, "Certificates") +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go new file mode 100644 index 00000000000..c0d8870c9ac --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go @@ -0,0 +1,394 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlpconf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net/url" + "os" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" +) + +// Default values. +var ( + defaultEndpoint = "localhost:4318" + defaultPath = "/v1/logs" + defaultTimeout = 10 * time.Second + defaultRetryCfg = retry.DefaultConfig +) + +// Environment variable keys. +var ( + envEndpoint = []string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + "OTEL_EXPORTER_OTLP_ENDPOINT", + } + envInsecure = envEndpoint + + // Split because these are parsed differently. + envPathSignal = []string{"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"} + envPathOTLP = []string{"OTEL_EXPORTER_OTLP_ENDPOINT"} + + envHeaders = []string{ + "OTEL_EXPORTER_OTLP_LOGS_HEADERS", + "OTEL_EXPORTER_OTLP_HEADERS", + } + + envCompression = []string{ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_COMPRESSION", + } + + envTimeout = []string{ + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT", + "OTEL_EXPORTER_OTLP_TIMEOUT", + } + + envTLSCert = []string{ + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CERTIFICATE", + } + envTLSClient = []struct { + Certificate string + Key string + }{ + { + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY", + }, + { + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CLIENT_KEY", + }, + } +) + +// Setting is a configuration setting value. +type Setting[T any] struct { + Value T + Set bool +} + +// NewSetting returns a new setting with the value set. +func NewSetting[T any](value T) Setting[T] { + return Setting[T]{Value: value, Set: true} +} + +// Resolver returns an updated setting after applying an resolution operation. +type Resolver[T any] func(Setting[T]) Setting[T] + +// Resolve returns a resolved version of s. +// +// It will apply all the passed fn in the order provided, chaining together the +// return setting to the next input. The setting s is used as the initial +// argument to the first fn. +// +// Each fn needs to validate if it should apply given the Set state of the +// setting. This will not perform any checks on the set state when chaining +// function. +func (s Setting[T]) Resolve(fn ...Resolver[T]) Setting[T] { + for _, f := range fn { + s = f(s) + } + return s +} + +// Compression describes the compression used for exported payloads. +type Compression int + +const ( + // NoCompression represents that no compression should be used. + NoCompression Compression = iota + // GzipCompression represents that gzip compression should be used. + GzipCompression +) + +// Option applies an option to the Exporter. +type Option interface { + ApplyOption(Config) Config +} + +type fnOpt func(Config) Config + +func (f fnOpt) ApplyOption(c Config) Config { return f(c) } + +type Config struct { + Endpoint Setting[string] + Path Setting[string] + Insecure Setting[bool] + TLSCfg Setting[*tls.Config] + Headers Setting[map[string]string] + Compression Setting[Compression] + Timeout Setting[time.Duration] + RetryCfg Setting[retry.Config] +} + +func LoadConfig(c Config) Config { + c.Endpoint = c.Endpoint.Resolve( + GetEnv[string](envEndpoint, convEndpoint), + fallback[string](defaultEndpoint), + ) + c.Path = c.Path.Resolve( + GetEnv[string](envPathSignal, convPathExact), + GetEnv[string](envPathOTLP, convPath), + fallback[string](defaultPath), + ) + c.Insecure = c.Insecure.Resolve( + GetEnv[bool](envInsecure, convInsecure), + ) + c.TLSCfg = c.TLSCfg.Resolve( + loadEnvTLS[*tls.Config](), + ) + c.Headers = c.Headers.Resolve( + GetEnv[map[string]string](envHeaders, convHeaders), + ) + c.Compression = c.Compression.Resolve( + GetEnv[Compression](envCompression, convCompression), + ) + c.Timeout = c.Timeout.Resolve( + GetEnv[time.Duration](envTimeout, convDuration), + fallback[time.Duration](defaultTimeout), + ) + c.RetryCfg = c.RetryCfg.Resolve( + fallback[retry.Config](defaultRetryCfg), + ) + + return c +} + +// GetEnv returns a Resolver that will apply an environment variable value +// associated with the first set key to a setting value. The conv function is +// used to convert between the environment variable value and the setting type. +// +// If the input setting to the Resolver is set, the environment variable will +// not be applied. +// +// Any error returned from conv is sent to the OTel ErrorHandler and the +// setting will not be updated. +func GetEnv[T any](keys []string, conv func(string) (T, error)) Resolver[T] { + return func(s Setting[T]) Setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + for _, key := range keys { + if vStr := os.Getenv(key); vStr != "" { + v, err := conv(vStr) + if err == nil { + s.Value = v + s.Set = true + break + } + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) + } + } + return s + } +} + +// convEndpoint converts s from a URL string to an endpoint if s is a valid +// URL. Otherwise, "" and an error are returned. +func convEndpoint(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Host, nil +} + +// convPathExact converts s from a URL string to the exact path if s is a valid +// URL. Otherwise, "" and an error are returned. +// +// If the path contained in s is empty, "/" is returned. +func convPathExact(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + if u.Path == "" { + return "/", nil + } + return u.Path, nil +} + +// convPath converts s from a URL string to an OTLP endpoint path if s is a +// valid URL. Otherwise, "" and an error are returned. +func convPath(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Path + "/v1/logs", nil +} + +// convInsecure parses s as a URL string and returns if the connection should +// use client transport security or not. If s is an invalid URL, false and an +// error are returned. +func convInsecure(s string) (bool, error) { + u, err := url.Parse(s) + if err != nil { + return false, err + } + return u.Scheme != "https", nil +} + +// convHeaders converts the OTel environment variable header value s into a +// mapping of header key to value. If s is invalid a partial result and error +// are returned. +func convHeaders(s string) (map[string]string, error) { + out := make(map[string]string) + var err error + for _, header := range strings.Split(s, ",") { + rawKey, rawVal, found := strings.Cut(header, "=") + if !found { + err = errors.Join(err, fmt.Errorf("invalid header: %s", header)) + continue + } + + escKey, e := url.PathUnescape(rawKey) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey)) + continue + } + key := strings.TrimSpace(escKey) + + escVal, e := url.PathUnescape(rawVal) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal)) + continue + } + val := strings.TrimSpace(escVal) + + out[key] = val + } + return out, err +} + +// convCompression returns the parsed compression encoded in s. NoCompression +// and an errors are returned if s is unknown. +func convCompression(s string) (Compression, error) { + switch s { + case "gzip": + return GzipCompression, nil + case "none", "": + return NoCompression, nil + } + return NoCompression, fmt.Errorf("unknown compression: %s", s) +} + +// convDuration converts s into a duration of milliseconds. If s does not +// contain an integer, 0 and an error are returned. +func convDuration(s string) (time.Duration, error) { + d, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + // OTel durations are defined in milliseconds. + return time.Duration(d) * time.Millisecond, nil +} + +// fallback returns a resolve that will set a setting value to val if it is not +// already set. +// +// This is usually passed at the end of a resolver chain to ensure a default is +// applied if the setting has not already been set. +func fallback[T any](val T) Resolver[T] { + return func(s Setting[T]) Setting[T] { + if !s.Set { + s.Value = val + s.Set = true + } + return s + } +} + +// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by +// the OTLP TLS environment variables. This will load both the rootCAs and +// certificates used for mTLS. +// +// If the filepath defined is invalid or does not contain valid TLS files, an +// error is passed to the OTel ErrorHandler and no TLS configuration is +// provided. +func loadEnvTLS[T *tls.Config]() Resolver[T] { + return func(s Setting[T]) Setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + var rootCAs *x509.CertPool + var err error + for _, key := range envTLSCert { + if v := os.Getenv(key); v != "" { + rootCAs, err = loadCertPool(v) + break + } + } + + var certs []tls.Certificate + for _, pair := range envTLSClient { + cert := os.Getenv(pair.Certificate) + key := os.Getenv(pair.Key) + if cert != "" && key != "" { + var e error + certs, e = loadCertificates(cert, key) + err = errors.Join(err, e) + break + } + } + + if err != nil { + err = fmt.Errorf("failed to load TLS: %w", err) + otel.Handle(err) + } else if rootCAs != nil || certs != nil { + s.Set = true + s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs} + } + return s + } +} + +// readFile is used for testing. +var readFile = os.ReadFile + +// loadCertPool loads and returns the *x509.CertPool found at path if it exists +// and is valid. Otherwise, nil and an error is returned. +func loadCertPool(path string) (*x509.CertPool, error) { + b, err := readFile(path) + if err != nil { + return nil, err + } + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(b); !ok { + return nil, errors.New("certificate not added") + } + return cp, nil +} + +// loadCertificates loads and returns the tls.Certificate found at path if it +// exists and is valid. Otherwise, nil and an error is returned. +func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) { + cert, err := readFile(certPath) + if err != nil { + return nil, err + } + key, err := readFile(keyPath) + if err != nil { + return nil, err + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + return []tls.Certificate{crt}, nil +} From b2ece10a347a06ffef231627e5ee60c0c9d30e03 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Thu, 23 May 2024 00:05:24 -0700 Subject: [PATCH 2/3] Only reuse setting struct --- exporters/otlp/otlplog/otlploggrpc/config.go | 350 ++++++++++++++-- .../options_test.go => config_test.go} | 224 +++++----- .../otlploggrpc/internal/conf/setting.go | 87 ++++ .../otlploggrpc/internal/otlpconf/options.go | 189 --------- .../otlploggrpc/internal/otlpconf/setting.go | 394 ------------------ 5 files changed, 504 insertions(+), 740 deletions(-) rename exporters/otlp/otlplog/otlploggrpc/{internal/otlpconf/options_test.go => config_test.go} (66%) create mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go delete mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go delete mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go diff --git a/exporters/otlp/otlplog/otlploggrpc/config.go b/exporters/otlp/otlplog/otlploggrpc/config.go index c77e06da433..debc901d9a8 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config.go +++ b/exporters/otlp/otlplog/otlploggrpc/config.go @@ -4,25 +4,73 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" import ( + "crypto/tls" + "crypto/x509" + "errors" "fmt" + "net/url" + "os" + "strconv" + "strings" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/conf" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" + "go.opentelemetry.io/otel/internal/global" ) -type wrappedOption struct { - otlpconf.Option -} +// Default values. +var ( + defaultEndpoint = "localhost:4317" + defaultTimeout = 10 * time.Second + defaultRetryCfg = retry.DefaultConfig +) -func (w wrappedOption) applyOption(cfg config) config { - cfg.Config = w.Option.ApplyOption(cfg.Config) - return cfg -} +// Environment variable keys. +var ( + envEndpoint = []string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + "OTEL_EXPORTER_OTLP_ENDPOINT", + } + envInsecure = envEndpoint + + envHeaders = []string{ + "OTEL_EXPORTER_OTLP_LOGS_HEADERS", + "OTEL_EXPORTER_OTLP_HEADERS", + } + + envCompression = []string{ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_COMPRESSION", + } + + envTimeout = []string{ + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT", + "OTEL_EXPORTER_OTLP_TIMEOUT", + } + + envTLSCert = []string{ + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CERTIFICATE", + } + envTLSClient = []struct { + Certificate string + Key string + }{ + { + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY", + }, + { + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CLIENT_KEY", + }, + } +) type fnOpt func(config) config @@ -34,14 +82,20 @@ type Option interface { } type config struct { - otlpconf.Config + endpoint conf.Setting[string] + insecure conf.Setting[bool] + tlsCfg conf.Setting[*tls.Config] + headers conf.Setting[map[string]string] + compression conf.Setting[Compression] + timeout conf.Setting[time.Duration] + retryCfg conf.Setting[retry.Config] // gRPC configurations - gRPCCredentials otlpconf.Setting[credentials.TransportCredentials] - serviceConfig otlpconf.Setting[string] - reconnectionPeriod otlpconf.Setting[time.Duration] - dialOptions otlpconf.Setting[[]grpc.DialOption] - gRPCConn otlpconf.Setting[*grpc.ClientConn] + gRPCCredentials conf.Setting[credentials.TransportCredentials] + serviceConfig conf.Setting[string] + reconnectionPeriod conf.Setting[time.Duration] + dialOptions conf.Setting[[]grpc.DialOption] + gRPCConn conf.Setting[*grpc.ClientConn] } func newConfig(options []Option) config { @@ -50,11 +104,30 @@ func newConfig(options []Option) config { c = opt.applyOption(c) } - c.Config = otlpconf.LoadConfig(c.Config) - - if !c.gRPCCredentials.Set && c.Config.TLSCfg.Set { - c.gRPCCredentials = otlpconf.NewSetting(credentials.NewTLS(c.Config.TLSCfg.Value)) - } + // Apply environment value and default value + c.endpoint = c.endpoint.Resolve( + conf.GetEnv[string](envEndpoint, convEndpoint), + conf.Fallback[string](defaultEndpoint), + ) + c.insecure = c.insecure.Resolve( + conf.GetEnv[bool](envInsecure, convInsecure), + ) + c.tlsCfg = c.tlsCfg.Resolve( + loadEnvTLS[*tls.Config](), + ) + c.headers = c.headers.Resolve( + conf.GetEnv[map[string]string](envHeaders, convHeaders), + ) + c.compression = c.compression.Resolve( + conf.GetEnv[Compression](envCompression, convCompression), + ) + c.timeout = c.timeout.Resolve( + conf.GetEnv[time.Duration](envTimeout, convDuration), + conf.Fallback[time.Duration](defaultTimeout), + ) + c.retryCfg = c.retryCfg.Resolve( + conf.Fallback[retry.Config](defaultRetryCfg), + ) return c } @@ -81,7 +154,10 @@ type RetryConfig retry.Config // // This option has no effect if WithGRPCConn is used. func WithInsecure() Option { - return wrappedOption{Option: otlpconf.WithInsecure()} + return fnOpt(func(c config) config { + c.insecure = conf.NewSetting(true) + return c + }) } // WithEndpoint sets the target endpoint the Exporter will connect to. @@ -99,7 +175,10 @@ func WithInsecure() Option { // // This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { - return wrappedOption{Option: otlpconf.WithEndpoint(endpoint)} + return fnOpt(func(c config) config { + c.endpoint = conf.NewSetting(endpoint) + return c + }) } // WithEndpointURL sets the target endpoint URL the Exporter will connect to. @@ -118,8 +197,21 @@ func WithEndpoint(endpoint string) Option { // passed, "localhost:4317" will be used. // // This option has no effect if WithGRPCConn is used. -func WithEndpointURL(u string) Option { - return wrappedOption{Option: otlpconf.WithEndpointURL(u)} +func WithEndpointURL(rawURL string) Option { + u, err := url.Parse(rawURL) + if err != nil { + global.Error(err, "otlplog: parse endpoint url", "url", rawURL) + return fnOpt(func(c config) config { return c }) + } + return fnOpt(func(c config) config { + c.endpoint = conf.NewSetting(u.Host) + if u.Scheme != "https" { + c.insecure = conf.NewSetting(true) + } else { + c.insecure = conf.NewSetting(false) + } + return c + }) } // WithReconnectionPeriod set the minimum amount of time between connection @@ -128,11 +220,21 @@ func WithEndpointURL(u string) Option { // This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { return fnOpt(func(c config) config { - c.reconnectionPeriod = otlpconf.NewSetting(rp) + c.reconnectionPeriod = conf.NewSetting(rp) return c }) } +// Compression describes the compression used for exported payloads. +type Compression int + +const ( + // NoCompression represents that no compression should be used. + NoCompression Compression = iota + // GzipCompression represents that gzip compression should be used. + GzipCompression +) + // WithCompressor sets the compressor the gRPC client uses. // Supported compressor values: "gzip". // @@ -143,11 +245,14 @@ func WithReconnectionPeriod(rp time.Duration) Option { // OTEL_EXPORTER_OTLP_LOGS_COMPRESSION will take precedence. // // By default, if an environment variable is not set, and this option is not -// passed, no compressor will be used. +// passed, no compression strategy will be used. // // This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { - return wrappedOption{Option: otlpconf.WithCompression(compressorToCompression(compressor))} + return fnOpt(func(c config) config { + c.compression = conf.NewSetting(compressorToCompression(compressor)) + return c + }) } // WithHeaders will send the provided headers with each gRPC requests. @@ -162,7 +267,10 @@ func WithCompressor(compressor string) Option { // By default, if an environment variable is not set, and this option is not // passed, no user headers will be set. func WithHeaders(headers map[string]string) Option { - return wrappedOption{Option: otlpconf.WithHeaders(headers)} + return fnOpt(func(c config) config { + c.headers = conf.NewSetting(headers) + return c + }) } // WithTLSCredentials sets the gRPC connection to use creds. @@ -179,7 +287,7 @@ func WithHeaders(headers map[string]string) Option { // This option has no effect if WithGRPCConn is used. func WithTLSCredentials(credential credentials.TransportCredentials) Option { return fnOpt(func(c config) config { - c.gRPCCredentials = otlpconf.NewSetting(credential) + c.gRPCCredentials = conf.NewSetting(credential) return c }) } @@ -189,7 +297,7 @@ func WithTLSCredentials(credential credentials.TransportCredentials) Option { // This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { return fnOpt(func(c config) config { - c.serviceConfig = otlpconf.NewSetting(serviceConfig) + c.serviceConfig = conf.NewSetting(serviceConfig) return c }) } @@ -204,7 +312,7 @@ func WithServiceConfig(serviceConfig string) Option { // This option has no effect if WithGRPCConn is used. func WithDialOption(opts ...grpc.DialOption) Option { return fnOpt(func(c config) config { - c.dialOptions = otlpconf.NewSetting(opts) + c.dialOptions = conf.NewSetting(opts) return c }) } @@ -219,7 +327,7 @@ func WithDialOption(opts ...grpc.DialOption) Option { // Shutdown method will not close this connection. func WithGRPCConn(conn *grpc.ClientConn) Option { return fnOpt(func(c config) config { - c.gRPCConn = otlpconf.NewSetting(conn) + c.gRPCConn = conf.NewSetting(conn) return c }) } @@ -239,7 +347,10 @@ func WithGRPCConn(conn *grpc.ClientConn) Option { // By default, if an environment variable is not set, and this option is not // passed, a timeout of 10 seconds will be used. func WithTimeout(duration time.Duration) Option { - return wrappedOption{Option: otlpconf.WithTimeout(duration)} + return fnOpt(func(c config) config { + c.timeout = conf.NewSetting(duration) + return c + }) } // WithRetry sets the retry policy for transient retryable errors that are @@ -255,15 +366,176 @@ func WithTimeout(duration time.Duration) Option { // If unset, the default retry policy will be used. It will retry the export // 5 seconds after receiving a retryable error and increase exponentially // after each error for no more than a total time of 1 minute. -func WithRetry(settings RetryConfig) Option { - return wrappedOption{Option: otlpconf.WithRetry(retry.Config(settings))} +func WithRetry(rc RetryConfig) Option { + return fnOpt(func(c config) config { + c.retryCfg = conf.NewSetting(retry.Config(rc)) + return c + }) +} + +// convCompression returns the parsed compression encoded in s. NoCompression +// and an errors are returned if s is unknown. +func convCompression(s string) (Compression, error) { + switch s { + case "gzip": + return GzipCompression, nil + case "none", "": + return NoCompression, nil + } + return NoCompression, fmt.Errorf("unknown compression: %s", s) +} + +// convEndpoint converts s from a URL string to an endpoint if s is a valid +// URL. Otherwise, "" and an error are returned. +func convEndpoint(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Host, nil +} + +// convInsecure parses s as a URL string and returns if the connection should +// use client transport security or not. If s is an invalid URL, false and an +// error are returned. +func convInsecure(s string) (bool, error) { + u, err := url.Parse(s) + if err != nil { + return false, err + } + return u.Scheme != "https", nil +} + +// convHeaders converts the OTel environment variable header value s into a +// mapping of header key to value. If s is invalid a partial result and error +// are returned. +func convHeaders(s string) (map[string]string, error) { + out := make(map[string]string) + var err error + for _, header := range strings.Split(s, ",") { + rawKey, rawVal, found := strings.Cut(header, "=") + if !found { + err = errors.Join(err, fmt.Errorf("invalid header: %s", header)) + continue + } + + escKey, e := url.PathUnescape(rawKey) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey)) + continue + } + key := strings.TrimSpace(escKey) + + escVal, e := url.PathUnescape(rawVal) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal)) + continue + } + val := strings.TrimSpace(escVal) + + out[key] = val + } + return out, err +} + +// convDuration converts s into a duration of milliseconds. If s does not +// contain an integer, 0 and an error are returned. +func convDuration(s string) (time.Duration, error) { + d, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + // OTel durations are defined in milliseconds. + return time.Duration(d) * time.Millisecond, nil } -func compressorToCompression(compressor string) otlpconf.Compression { - if compressor == "gzip" { - return otlpconf.GzipCompression +// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by +// the OTLP TLS environment variables. This will load both the rootCAs and +// certificates used for mTLS. +// +// If the filepath defined is invalid or does not contain valid TLS files, an +// error is passed to the OTel ErrorHandler and no TLS configuration is +// provided. +func loadEnvTLS[T *tls.Config]() conf.Resolver[T] { + return func(s conf.Setting[T]) conf.Setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + var rootCAs *x509.CertPool + var err error + for _, key := range envTLSCert { + if v := os.Getenv(key); v != "" { + rootCAs, err = loadCertPool(v) + break + } + } + + var certs []tls.Certificate + for _, pair := range envTLSClient { + cert := os.Getenv(pair.Certificate) + key := os.Getenv(pair.Key) + if cert != "" && key != "" { + var e error + certs, e = loadCertificates(cert, key) + err = errors.Join(err, e) + break + } + } + + if err != nil { + err = fmt.Errorf("failed to load TLS: %w", err) + otel.Handle(err) + } else if rootCAs != nil || certs != nil { + s.Set = true + s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs} + } + return s } +} + +// readFile is used for testing. +var readFile = os.ReadFile - otel.Handle(fmt.Errorf("invalid compression type: '%s', using no compression as default", compressor)) - return otlpconf.NoCompression +// loadCertPool loads and returns the *x509.CertPool found at path if it exists +// and is valid. Otherwise, nil and an error is returned. +func loadCertPool(path string) (*x509.CertPool, error) { + b, err := readFile(path) + if err != nil { + return nil, err + } + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(b); !ok { + return nil, errors.New("certificate not added") + } + return cp, nil +} + +// loadCertificates loads and returns the tls.Certificate found at path if it +// exists and is valid. Otherwise, nil and an error is returned. +func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) { + cert, err := readFile(certPath) + if err != nil { + return nil, err + } + key, err := readFile(keyPath) + if err != nil { + return nil, err + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + return []tls.Certificate{crt}, nil +} + +func compressorToCompression(compressor string) Compression { + c, err := convCompression(compressor) + if err != nil { + otel.Handle(fmt.Errorf("%s, using no compression as default", err)) + return NoCompression + } + + return c } diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go b/exporters/otlp/otlplog/otlploggrpc/config_test.go similarity index 66% rename from exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go rename to exporters/otlp/otlplog/otlploggrpc/config_test.go index 57edc93afb6..612b9d9b213 100644 --- a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/config_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package otlpconf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" +package otlploggrpc import ( "crypto/tls" @@ -13,8 +13,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/conf" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" ) @@ -41,18 +44,6 @@ kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV ` ) -// This is only for testing, as the package that is using this utility has its own newConfig function. -func newConfig(options []Option) Config { - var c Config - for _, opt := range options { - c = opt.ApplyOption(c) - } - - c = LoadConfig(c) - - return c -} - func newTLSConf(cert, key []byte) (*tls.Config, error) { cp := x509.NewCertPool() if ok := cp.AppendCertsFromPEM(cert); !ok { @@ -92,43 +83,51 @@ func TestNewConfig(t *testing.T) { headers := map[string]string{"a": "A"} rc := retry.Config{} + dialOptions := []grpc.DialOption{grpc.WithUserAgent("test-agent")} + testcases := []struct { name string options []Option envars map[string]string - want Config + want config errs []string }{ { name: "Defaults", - want: Config{ - Endpoint: NewSetting(defaultEndpoint), - Path: NewSetting(defaultPath), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting(defaultEndpoint), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { name: "Options", options: []Option{ - WithEndpoint("test"), - WithURLPath("/path"), WithInsecure(), - WithTLSClientConfig(tlsCfg), - WithCompression(GzipCompression), + WithEndpoint("test"), + WithEndpointURL("http://test:8080/path"), + WithReconnectionPeriod(time.Second), + WithCompressor("gzip"), WithHeaders(headers), - WithTimeout(time.Second), - WithRetry(rc), + WithTLSCredentials(credentials.NewTLS(tlsCfg)), + WithServiceConfig("{}"), + WithDialOption(dialOptions...), + WithGRPCConn(&grpc.ClientConn{}), + WithTimeout(2 * time.Second), + WithRetry(RetryConfig(rc)), }, - want: Config{ - Endpoint: NewSetting("test"), - Path: NewSetting("/path"), - Insecure: NewSetting(true), - TLSCfg: NewSetting(tlsCfg), - Headers: NewSetting(headers), - Compression: NewSetting(GzipCompression), - Timeout: NewSetting(time.Second), - RetryCfg: NewSetting(rc), + want: config{ + endpoint: conf.NewSetting("test:8080"), + insecure: conf.NewSetting(true), + headers: conf.NewSetting(headers), + compression: conf.NewSetting(GzipCompression), + timeout: conf.NewSetting(2 * time.Second), + retryCfg: conf.NewSetting(rc), + gRPCCredentials: conf.NewSetting(credentials.NewTLS(tlsCfg)), + serviceConfig: conf.NewSetting("{}"), + reconnectionPeriod: conf.NewSetting(time.Second), + gRPCConn: conf.NewSetting(&grpc.ClientConn{}), + dialOptions: conf.NewSetting(dialOptions), }, }, { @@ -136,44 +135,39 @@ func TestNewConfig(t *testing.T) { options: []Option{ WithEndpointURL("http://test:8080/path"), }, - want: Config{ - Endpoint: NewSetting("test:8080"), - Path: NewSetting("/path"), - Insecure: NewSetting(true), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("test:8080"), + insecure: conf.NewSetting(true), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { - name: "EndpointPrecidence", + name: "EndpointPrecedence", options: []Option{ WithEndpointURL("https://test:8080/path"), WithEndpoint("not-test:9090"), - WithURLPath("/alt"), WithInsecure(), }, - want: Config{ - Endpoint: NewSetting("not-test:9090"), - Path: NewSetting("/alt"), - Insecure: NewSetting(true), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("not-test:9090"), + insecure: conf.NewSetting(true), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { - name: "EndpointURLPrecidence", + name: "EndpointURLPrecedence", options: []Option{ WithEndpoint("not-test:9090"), - WithURLPath("/alt"), WithInsecure(), WithEndpointURL("https://test:8080/path"), }, - want: Config{ - Endpoint: NewSetting("test:8080"), - Path: NewSetting("/path"), - Insecure: NewSetting(false), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("test:8080"), + insecure: conf.NewSetting(false), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { @@ -187,15 +181,14 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", }, - want: Config{ - Endpoint: NewSetting("env.endpoint:8080"), - Path: NewSetting("/prefix"), - Insecure: NewSetting(false), - TLSCfg: NewSetting(tlsCfg), - Headers: NewSetting(headers), - Compression: NewSetting(GzipCompression), - Timeout: NewSetting(15 * time.Second), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("env.endpoint:8080"), + insecure: conf.NewSetting(false), + tlsCfg: conf.NewSetting(tlsCfg), + headers: conf.NewSetting(headers), + compression: conf.NewSetting(GzipCompression), + timeout: conf.NewSetting(15 * time.Second), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { @@ -203,12 +196,11 @@ func TestNewConfig(t *testing.T) { envars: map[string]string{ "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", }, - want: Config{ - Endpoint: NewSetting("env.endpoint"), - Path: NewSetting("/"), - Insecure: NewSetting(true), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("env.endpoint"), + insecure: conf.NewSetting(true), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { @@ -222,15 +214,14 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "cert_path", "OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path", }, - want: Config{ - Endpoint: NewSetting("env.endpoint:8080"), - Path: NewSetting("/prefix/v1/logs"), - Insecure: NewSetting(true), - TLSCfg: NewSetting(tlsCfg), - Headers: NewSetting(headers), - Compression: NewSetting(NoCompression), - Timeout: NewSetting(15 * time.Second), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("env.endpoint:8080"), + insecure: conf.NewSetting(true), + tlsCfg: conf.NewSetting(tlsCfg), + headers: conf.NewSetting(headers), + compression: conf.NewSetting(NoCompression), + timeout: conf.NewSetting(15 * time.Second), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { @@ -238,12 +229,11 @@ func TestNewConfig(t *testing.T) { envars: map[string]string{ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", }, - want: Config{ - Endpoint: NewSetting("env.endpoint"), - Path: NewSetting(defaultPath), - Insecure: NewSetting(true), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("env.endpoint"), + insecure: conf.NewSetting(true), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { @@ -265,15 +255,14 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", }, - want: Config{ - Endpoint: NewSetting("env.endpoint:8080"), - Path: NewSetting("/path"), - Insecure: NewSetting(false), - TLSCfg: NewSetting(tlsCfg), - Headers: NewSetting(headers), - Compression: NewSetting(GzipCompression), - Timeout: NewSetting(15 * time.Second), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting("env.endpoint:8080"), + insecure: conf.NewSetting(false), + tlsCfg: conf.NewSetting(tlsCfg), + headers: conf.NewSetting(headers), + compression: conf.NewSetting(GzipCompression), + timeout: conf.NewSetting(15 * time.Second), + retryCfg: conf.NewSetting(defaultRetryCfg), }, }, { @@ -296,24 +285,24 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", }, options: []Option{ - WithEndpoint("test"), - WithURLPath("/path"), + WithEndpoint("foo"), + WithEndpointURL("https://test/path"), WithInsecure(), - WithTLSClientConfig(tlsCfg), - WithCompression(GzipCompression), + WithTLSCredentials(credentials.NewTLS(tlsCfg)), + WithCompressor("gzip"), WithHeaders(headers), WithTimeout(time.Second), - WithRetry(rc), + WithRetry(RetryConfig(rc)), }, - want: Config{ - Endpoint: NewSetting("test"), - Path: NewSetting("/path"), - Insecure: NewSetting(true), - TLSCfg: NewSetting(tlsCfg), - Headers: NewSetting(headers), - Compression: NewSetting(GzipCompression), - Timeout: NewSetting(time.Second), - RetryCfg: NewSetting(rc), + want: config{ + endpoint: conf.NewSetting("test"), + insecure: conf.NewSetting(true), + tlsCfg: conf.NewSetting(tlsCfg), + headers: conf.NewSetting(headers), + compression: conf.NewSetting(GzipCompression), + timeout: conf.NewSetting(time.Second), + retryCfg: conf.NewSetting(rc), + gRPCCredentials: conf.NewSetting(credentials.NewTLS(tlsCfg)), }, }, { @@ -327,11 +316,10 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "invalid_cert", "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", }, - want: Config{ - Endpoint: NewSetting(defaultEndpoint), - Path: NewSetting(defaultPath), - Timeout: NewSetting(defaultTimeout), - RetryCfg: NewSetting(defaultRetryCfg), + want: config{ + endpoint: conf.NewSetting(defaultEndpoint), + timeout: conf.NewSetting(defaultTimeout), + retryCfg: conf.NewSetting(defaultRetryCfg), }, errs: []string{ `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, @@ -364,9 +352,9 @@ func TestNewConfig(t *testing.T) { c := newConfig(tc.options) // Do not compare pointer values. - assertTLSConfig(t, tc.want.TLSCfg, c.TLSCfg) - var emptyTLS Setting[*tls.Config] - c.TLSCfg, tc.want.TLSCfg = emptyTLS, emptyTLS + assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) + var emptyTLS conf.Setting[*tls.Config] + c.tlsCfg, tc.want.tlsCfg = emptyTLS, emptyTLS assert.Equal(t, tc.want, c) @@ -377,7 +365,7 @@ func TestNewConfig(t *testing.T) { } } -func assertTLSConfig(t *testing.T, want, got Setting[*tls.Config]) { +func assertTLSConfig(t *testing.T, want, got conf.Setting[*tls.Config]) { t.Helper() assert.Equal(t, want.Set, got.Set, "setting Set") diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go b/exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go new file mode 100644 index 00000000000..4c14054cdee --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package conf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/conf" + +import ( + "fmt" + "os" + + "go.opentelemetry.io/otel" +) + +// Setting is a configuration setting value. +type Setting[T any] struct { + Value T + Set bool +} + +// NewSetting returns a new setting with the value set. +func NewSetting[T any](value T) Setting[T] { + return Setting[T]{Value: value, Set: true} +} + +// Resolver returns an updated setting after applying an resolution operation. +type Resolver[T any] func(Setting[T]) Setting[T] + +// Resolve returns a resolved version of s. +// +// It will apply all the passed fn in the order provided, chaining together the +// return setting to the next input. The setting s is used as the initial +// argument to the first fn. +// +// Each fn needs to validate if it should apply given the Set state of the +// setting. This will not perform any checks on the set state when chaining +// function. +func (s Setting[T]) Resolve(fn ...Resolver[T]) Setting[T] { + for _, f := range fn { + s = f(s) + } + return s +} + +// GetEnv returns a Resolver that will apply an environment variable value +// associated with the first set key to a setting value. The conv function is +// used to convert between the environment variable value and the setting type. +// +// If the input setting to the Resolver is set, the environment variable will +// not be applied. +// +// Any error returned from conv is sent to the OTel ErrorHandler and the +// setting will not be updated. +func GetEnv[T any](keys []string, conv func(string) (T, error)) Resolver[T] { + return func(s Setting[T]) Setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + for _, key := range keys { + if vStr := os.Getenv(key); vStr != "" { + v, err := conv(vStr) + if err == nil { + s.Value = v + s.Set = true + break + } + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) + } + } + return s + } +} + +// Fallback returns a resolve that will set a setting value to val if it is not +// already set. +// +// This is usually passed at the end of a resolver chain to ensure a default is +// applied if the setting has not already been set. +func Fallback[T any](val T) Resolver[T] { + return func(s Setting[T]) Setting[T] { + if !s.Set { + s.Value = val + s.Set = true + } + return s + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go deleted file mode 100644 index 7392152251d..00000000000 --- a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/options.go +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package otlpconf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" - -import ( - "crypto/tls" - "net/url" - "time" - - "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" - "go.opentelemetry.io/otel/internal/global" -) - -// WithInsecure disables client transport security for the Exporter's HTTP -// connection. -// -// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT -// environment variable is set, and this option is not passed, that variable -// value will be used to determine client security. If the endpoint has a -// scheme of "http" or "unix" client security will be disabled. If both are -// set, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, client security will be used. -func WithInsecure() Option { - return fnOpt(func(c Config) Config { - c.Insecure = NewSetting(true) - return c - }) -} - -// WithEndpoint sets the target endpoint the Exporter will connect to. This -// endpoint is specified as a host and optional port, no path or scheme should -// be included (see WithInsecure and WithURLPath). -// -// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT -// environment variable is set, and this option is not passed, that variable -// value will be used. If both are set, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT -// will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, "localhost:4318" will be used. -func WithEndpoint(endpoint string) Option { - return fnOpt(func(c Config) Config { - c.Endpoint = NewSetting(endpoint) - return c - }) -} - -// WithEndpointURL sets the target endpoint URL the Exporter will connect to. -// -// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT -// environment variable is set, and this option is not passed, that variable -// value will be used. If both are set, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT -// will take precedence. -// -// If both this option and WithEndpoint are used, the last used option will -// take precedence. -// -// If an invalid URL is provided, the default value will be kept. -// -// By default, if an environment variable is not set, and this option is not -// passed, "localhost:4318" will be used. -func WithEndpointURL(rawURL string) Option { - u, err := url.Parse(rawURL) - if err != nil { - global.Error(err, "otlpmetric: parse endpoint url", "url", rawURL) - return fnOpt(func(c Config) Config { return c }) - } - return fnOpt(func(c Config) Config { - c.Endpoint = NewSetting(u.Host) - c.Path = NewSetting(u.Path) - if u.Scheme != "https" { - c.Insecure = NewSetting(true) - } else { - c.Insecure = NewSetting(false) - } - return c - }) -} - -// WithCompression sets the compression strategy the Exporter will use to -// compress the HTTP body. -// -// If the OTEL_EXPORTER_OTLP_COMPRESSION or -// OTEL_EXPORTER_OTLP_LOGS_COMPRESSION environment variable is set, and -// this option is not passed, that variable value will be used. That value can -// be either "none" or "gzip". If both are set, -// OTEL_EXPORTER_OTLP_LOGS_COMPRESSION will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, no compression strategy will be used. -func WithCompression(compression Compression) Option { - return fnOpt(func(c Config) Config { - c.Compression = NewSetting(compression) - return c - }) -} - -// WithURLPath sets the URL path the Exporter will send requests to. -// -// If the OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_LOGS_ENDPOINT -// environment variable is set, and this option is not passed, the path -// contained in that variable value will be used. If both are set, -// OTEL_EXPORTER_OTLP_LOGS_ENDPOINT will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, "/v1/logs" will be used. -func WithURLPath(urlPath string) Option { - return fnOpt(func(c Config) Config { - c.Path = NewSetting(urlPath) - return c - }) -} - -// WithTLSClientConfig sets the TLS Configuration the Exporter will use for -// HTTP requests. -// -// If the OTEL_EXPORTER_OTLP_CERTIFICATE or -// OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE environment variable is set, and -// this option is not passed, that variable value will be used. The value will -// be parsed the filepath of the TLS certificate chain to use. If both are -// set, OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, the system default Configuration is used. -func WithTLSClientConfig(tlsCfg *tls.Config) Option { - return fnOpt(func(c Config) Config { - c.TLSCfg = NewSetting(tlsCfg.Clone()) - return c - }) -} - -// WithHeaders will send the provided headers with each HTTP requests. -// -// If the OTEL_EXPORTER_OTLP_HEADERS or OTEL_EXPORTER_OTLP_LOGS_HEADERS -// environment variable is set, and this option is not passed, that variable -// value will be used. The value will be parsed as a list of key value pairs. -// These pairs are expected to be in the W3C Correlation-Context format -// without additional semi-colon delimited metadata (i.e. "k1=v1,k2=v2"). If -// both are set, OTEL_EXPORTER_OTLP_LOGS_HEADERS will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, no user headers will be set. -func WithHeaders(headers map[string]string) Option { - return fnOpt(func(c Config) Config { - c.Headers = NewSetting(headers) - return c - }) -} - -// WithTimeout sets the max amount of time an Exporter will attempt an export. -// -// This takes precedence over any retry settings defined by WithRetry. Once -// this time limit has been reached the export is abandoned and the log data is -// dropped. -// -// If the OTEL_EXPORTER_OTLP_TIMEOUT or OTEL_EXPORTER_OTLP_LOGS_TIMEOUT -// environment variable is set, and this option is not passed, that variable -// value will be used. The value will be parsed as an integer representing the -// timeout in milliseconds. If both are set, -// OTEL_EXPORTER_OTLP_LOGS_TIMEOUT will take precedence. -// -// By default, if an environment variable is not set, and this option is not -// passed, a timeout of 10 seconds will be used. -func WithTimeout(duration time.Duration) Option { - return fnOpt(func(c Config) Config { - c.Timeout = NewSetting(duration) - return c - }) -} - -// WithRetry sets the retry policy for transient retryable errors that are -// returned by the target endpoint. -// -// If the target endpoint responds with not only a retryable error, but -// explicitly returns a backoff time in the response, that time will take -// precedence over these settings. -// -// If unset, the default retry policy will be used. It will retry the export -// 5 seconds after receiving a retryable error and increase exponentially -// after each error for no more than a total time of 1 minute. -func WithRetry(rc retry.Config) Option { - return fnOpt(func(c Config) Config { - c.RetryCfg = NewSetting(rc) - return c - }) -} diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go b/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go deleted file mode 100644 index c0d8870c9ac..00000000000 --- a/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf/setting.go +++ /dev/null @@ -1,394 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package otlpconf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/otlpconf" - -import ( - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "net/url" - "os" - "strconv" - "strings" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" -) - -// Default values. -var ( - defaultEndpoint = "localhost:4318" - defaultPath = "/v1/logs" - defaultTimeout = 10 * time.Second - defaultRetryCfg = retry.DefaultConfig -) - -// Environment variable keys. -var ( - envEndpoint = []string{ - "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", - "OTEL_EXPORTER_OTLP_ENDPOINT", - } - envInsecure = envEndpoint - - // Split because these are parsed differently. - envPathSignal = []string{"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"} - envPathOTLP = []string{"OTEL_EXPORTER_OTLP_ENDPOINT"} - - envHeaders = []string{ - "OTEL_EXPORTER_OTLP_LOGS_HEADERS", - "OTEL_EXPORTER_OTLP_HEADERS", - } - - envCompression = []string{ - "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", - "OTEL_EXPORTER_OTLP_COMPRESSION", - } - - envTimeout = []string{ - "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT", - "OTEL_EXPORTER_OTLP_TIMEOUT", - } - - envTLSCert = []string{ - "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE", - "OTEL_EXPORTER_OTLP_CERTIFICATE", - } - envTLSClient = []struct { - Certificate string - Key string - }{ - { - "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE", - "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY", - }, - { - "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE", - "OTEL_EXPORTER_OTLP_CLIENT_KEY", - }, - } -) - -// Setting is a configuration setting value. -type Setting[T any] struct { - Value T - Set bool -} - -// NewSetting returns a new setting with the value set. -func NewSetting[T any](value T) Setting[T] { - return Setting[T]{Value: value, Set: true} -} - -// Resolver returns an updated setting after applying an resolution operation. -type Resolver[T any] func(Setting[T]) Setting[T] - -// Resolve returns a resolved version of s. -// -// It will apply all the passed fn in the order provided, chaining together the -// return setting to the next input. The setting s is used as the initial -// argument to the first fn. -// -// Each fn needs to validate if it should apply given the Set state of the -// setting. This will not perform any checks on the set state when chaining -// function. -func (s Setting[T]) Resolve(fn ...Resolver[T]) Setting[T] { - for _, f := range fn { - s = f(s) - } - return s -} - -// Compression describes the compression used for exported payloads. -type Compression int - -const ( - // NoCompression represents that no compression should be used. - NoCompression Compression = iota - // GzipCompression represents that gzip compression should be used. - GzipCompression -) - -// Option applies an option to the Exporter. -type Option interface { - ApplyOption(Config) Config -} - -type fnOpt func(Config) Config - -func (f fnOpt) ApplyOption(c Config) Config { return f(c) } - -type Config struct { - Endpoint Setting[string] - Path Setting[string] - Insecure Setting[bool] - TLSCfg Setting[*tls.Config] - Headers Setting[map[string]string] - Compression Setting[Compression] - Timeout Setting[time.Duration] - RetryCfg Setting[retry.Config] -} - -func LoadConfig(c Config) Config { - c.Endpoint = c.Endpoint.Resolve( - GetEnv[string](envEndpoint, convEndpoint), - fallback[string](defaultEndpoint), - ) - c.Path = c.Path.Resolve( - GetEnv[string](envPathSignal, convPathExact), - GetEnv[string](envPathOTLP, convPath), - fallback[string](defaultPath), - ) - c.Insecure = c.Insecure.Resolve( - GetEnv[bool](envInsecure, convInsecure), - ) - c.TLSCfg = c.TLSCfg.Resolve( - loadEnvTLS[*tls.Config](), - ) - c.Headers = c.Headers.Resolve( - GetEnv[map[string]string](envHeaders, convHeaders), - ) - c.Compression = c.Compression.Resolve( - GetEnv[Compression](envCompression, convCompression), - ) - c.Timeout = c.Timeout.Resolve( - GetEnv[time.Duration](envTimeout, convDuration), - fallback[time.Duration](defaultTimeout), - ) - c.RetryCfg = c.RetryCfg.Resolve( - fallback[retry.Config](defaultRetryCfg), - ) - - return c -} - -// GetEnv returns a Resolver that will apply an environment variable value -// associated with the first set key to a setting value. The conv function is -// used to convert between the environment variable value and the setting type. -// -// If the input setting to the Resolver is set, the environment variable will -// not be applied. -// -// Any error returned from conv is sent to the OTel ErrorHandler and the -// setting will not be updated. -func GetEnv[T any](keys []string, conv func(string) (T, error)) Resolver[T] { - return func(s Setting[T]) Setting[T] { - if s.Set { - // Passed, valid, options have precedence. - return s - } - - for _, key := range keys { - if vStr := os.Getenv(key); vStr != "" { - v, err := conv(vStr) - if err == nil { - s.Value = v - s.Set = true - break - } - otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) - } - } - return s - } -} - -// convEndpoint converts s from a URL string to an endpoint if s is a valid -// URL. Otherwise, "" and an error are returned. -func convEndpoint(s string) (string, error) { - u, err := url.Parse(s) - if err != nil { - return "", err - } - return u.Host, nil -} - -// convPathExact converts s from a URL string to the exact path if s is a valid -// URL. Otherwise, "" and an error are returned. -// -// If the path contained in s is empty, "/" is returned. -func convPathExact(s string) (string, error) { - u, err := url.Parse(s) - if err != nil { - return "", err - } - if u.Path == "" { - return "/", nil - } - return u.Path, nil -} - -// convPath converts s from a URL string to an OTLP endpoint path if s is a -// valid URL. Otherwise, "" and an error are returned. -func convPath(s string) (string, error) { - u, err := url.Parse(s) - if err != nil { - return "", err - } - return u.Path + "/v1/logs", nil -} - -// convInsecure parses s as a URL string and returns if the connection should -// use client transport security or not. If s is an invalid URL, false and an -// error are returned. -func convInsecure(s string) (bool, error) { - u, err := url.Parse(s) - if err != nil { - return false, err - } - return u.Scheme != "https", nil -} - -// convHeaders converts the OTel environment variable header value s into a -// mapping of header key to value. If s is invalid a partial result and error -// are returned. -func convHeaders(s string) (map[string]string, error) { - out := make(map[string]string) - var err error - for _, header := range strings.Split(s, ",") { - rawKey, rawVal, found := strings.Cut(header, "=") - if !found { - err = errors.Join(err, fmt.Errorf("invalid header: %s", header)) - continue - } - - escKey, e := url.PathUnescape(rawKey) - if e != nil { - err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey)) - continue - } - key := strings.TrimSpace(escKey) - - escVal, e := url.PathUnescape(rawVal) - if e != nil { - err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal)) - continue - } - val := strings.TrimSpace(escVal) - - out[key] = val - } - return out, err -} - -// convCompression returns the parsed compression encoded in s. NoCompression -// and an errors are returned if s is unknown. -func convCompression(s string) (Compression, error) { - switch s { - case "gzip": - return GzipCompression, nil - case "none", "": - return NoCompression, nil - } - return NoCompression, fmt.Errorf("unknown compression: %s", s) -} - -// convDuration converts s into a duration of milliseconds. If s does not -// contain an integer, 0 and an error are returned. -func convDuration(s string) (time.Duration, error) { - d, err := strconv.Atoi(s) - if err != nil { - return 0, err - } - // OTel durations are defined in milliseconds. - return time.Duration(d) * time.Millisecond, nil -} - -// fallback returns a resolve that will set a setting value to val if it is not -// already set. -// -// This is usually passed at the end of a resolver chain to ensure a default is -// applied if the setting has not already been set. -func fallback[T any](val T) Resolver[T] { - return func(s Setting[T]) Setting[T] { - if !s.Set { - s.Value = val - s.Set = true - } - return s - } -} - -// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by -// the OTLP TLS environment variables. This will load both the rootCAs and -// certificates used for mTLS. -// -// If the filepath defined is invalid or does not contain valid TLS files, an -// error is passed to the OTel ErrorHandler and no TLS configuration is -// provided. -func loadEnvTLS[T *tls.Config]() Resolver[T] { - return func(s Setting[T]) Setting[T] { - if s.Set { - // Passed, valid, options have precedence. - return s - } - - var rootCAs *x509.CertPool - var err error - for _, key := range envTLSCert { - if v := os.Getenv(key); v != "" { - rootCAs, err = loadCertPool(v) - break - } - } - - var certs []tls.Certificate - for _, pair := range envTLSClient { - cert := os.Getenv(pair.Certificate) - key := os.Getenv(pair.Key) - if cert != "" && key != "" { - var e error - certs, e = loadCertificates(cert, key) - err = errors.Join(err, e) - break - } - } - - if err != nil { - err = fmt.Errorf("failed to load TLS: %w", err) - otel.Handle(err) - } else if rootCAs != nil || certs != nil { - s.Set = true - s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs} - } - return s - } -} - -// readFile is used for testing. -var readFile = os.ReadFile - -// loadCertPool loads and returns the *x509.CertPool found at path if it exists -// and is valid. Otherwise, nil and an error is returned. -func loadCertPool(path string) (*x509.CertPool, error) { - b, err := readFile(path) - if err != nil { - return nil, err - } - cp := x509.NewCertPool() - if ok := cp.AppendCertsFromPEM(b); !ok { - return nil, errors.New("certificate not added") - } - return cp, nil -} - -// loadCertificates loads and returns the tls.Certificate found at path if it -// exists and is valid. Otherwise, nil and an error is returned. -func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) { - cert, err := readFile(certPath) - if err != nil { - return nil, err - } - key, err := readFile(keyPath) - if err != nil { - return nil, err - } - crt, err := tls.X509KeyPair(cert, key) - if err != nil { - return nil, err - } - return []tls.Certificate{crt}, nil -} From bf175b24716f5d923a200d1575d8704663bfd65f Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Wed, 5 Jun 2024 20:51:12 -0700 Subject: [PATCH 3/3] Move setting.go into config.go --- exporters/otlp/otlplog/otlploggrpc/config.go | 149 +++++++++++++----- .../otlp/otlplog/otlploggrpc/config_test.go | 137 ++++++++-------- .../otlploggrpc/internal/conf/setting.go | 87 ---------- 3 files changed, 180 insertions(+), 193 deletions(-) delete mode 100644 exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go diff --git a/exporters/otlp/otlplog/otlploggrpc/config.go b/exporters/otlp/otlplog/otlploggrpc/config.go index debc901d9a8..37220acabfe 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config.go +++ b/exporters/otlp/otlplog/otlploggrpc/config.go @@ -18,7 +18,6 @@ import ( "google.golang.org/grpc/credentials" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/conf" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" "go.opentelemetry.io/otel/internal/global" ) @@ -82,20 +81,20 @@ type Option interface { } type config struct { - endpoint conf.Setting[string] - insecure conf.Setting[bool] - tlsCfg conf.Setting[*tls.Config] - headers conf.Setting[map[string]string] - compression conf.Setting[Compression] - timeout conf.Setting[time.Duration] - retryCfg conf.Setting[retry.Config] + endpoint setting[string] + insecure setting[bool] + tlsCfg setting[*tls.Config] + headers setting[map[string]string] + compression setting[Compression] + timeout setting[time.Duration] + retryCfg setting[retry.Config] // gRPC configurations - gRPCCredentials conf.Setting[credentials.TransportCredentials] - serviceConfig conf.Setting[string] - reconnectionPeriod conf.Setting[time.Duration] - dialOptions conf.Setting[[]grpc.DialOption] - gRPCConn conf.Setting[*grpc.ClientConn] + gRPCCredentials setting[credentials.TransportCredentials] + serviceConfig setting[string] + reconnectionPeriod setting[time.Duration] + dialOptions setting[[]grpc.DialOption] + gRPCConn setting[*grpc.ClientConn] } func newConfig(options []Option) config { @@ -106,27 +105,27 @@ func newConfig(options []Option) config { // Apply environment value and default value c.endpoint = c.endpoint.Resolve( - conf.GetEnv[string](envEndpoint, convEndpoint), - conf.Fallback[string](defaultEndpoint), + getEnv[string](envEndpoint, convEndpoint), + fallback[string](defaultEndpoint), ) c.insecure = c.insecure.Resolve( - conf.GetEnv[bool](envInsecure, convInsecure), + getEnv[bool](envInsecure, convInsecure), ) c.tlsCfg = c.tlsCfg.Resolve( loadEnvTLS[*tls.Config](), ) c.headers = c.headers.Resolve( - conf.GetEnv[map[string]string](envHeaders, convHeaders), + getEnv[map[string]string](envHeaders, convHeaders), ) c.compression = c.compression.Resolve( - conf.GetEnv[Compression](envCompression, convCompression), + getEnv[Compression](envCompression, convCompression), ) c.timeout = c.timeout.Resolve( - conf.GetEnv[time.Duration](envTimeout, convDuration), - conf.Fallback[time.Duration](defaultTimeout), + getEnv[time.Duration](envTimeout, convDuration), + fallback[time.Duration](defaultTimeout), ) c.retryCfg = c.retryCfg.Resolve( - conf.Fallback[retry.Config](defaultRetryCfg), + fallback[retry.Config](defaultRetryCfg), ) return c @@ -155,7 +154,7 @@ type RetryConfig retry.Config // This option has no effect if WithGRPCConn is used. func WithInsecure() Option { return fnOpt(func(c config) config { - c.insecure = conf.NewSetting(true) + c.insecure = newSetting(true) return c }) } @@ -176,7 +175,7 @@ func WithInsecure() Option { // This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { return fnOpt(func(c config) config { - c.endpoint = conf.NewSetting(endpoint) + c.endpoint = newSetting(endpoint) return c }) } @@ -204,11 +203,11 @@ func WithEndpointURL(rawURL string) Option { return fnOpt(func(c config) config { return c }) } return fnOpt(func(c config) config { - c.endpoint = conf.NewSetting(u.Host) + c.endpoint = newSetting(u.Host) if u.Scheme != "https" { - c.insecure = conf.NewSetting(true) + c.insecure = newSetting(true) } else { - c.insecure = conf.NewSetting(false) + c.insecure = newSetting(false) } return c }) @@ -220,7 +219,7 @@ func WithEndpointURL(rawURL string) Option { // This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { return fnOpt(func(c config) config { - c.reconnectionPeriod = conf.NewSetting(rp) + c.reconnectionPeriod = newSetting(rp) return c }) } @@ -250,7 +249,7 @@ const ( // This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { return fnOpt(func(c config) config { - c.compression = conf.NewSetting(compressorToCompression(compressor)) + c.compression = newSetting(compressorToCompression(compressor)) return c }) } @@ -268,7 +267,7 @@ func WithCompressor(compressor string) Option { // passed, no user headers will be set. func WithHeaders(headers map[string]string) Option { return fnOpt(func(c config) config { - c.headers = conf.NewSetting(headers) + c.headers = newSetting(headers) return c }) } @@ -287,7 +286,7 @@ func WithHeaders(headers map[string]string) Option { // This option has no effect if WithGRPCConn is used. func WithTLSCredentials(credential credentials.TransportCredentials) Option { return fnOpt(func(c config) config { - c.gRPCCredentials = conf.NewSetting(credential) + c.gRPCCredentials = newSetting(credential) return c }) } @@ -297,7 +296,7 @@ func WithTLSCredentials(credential credentials.TransportCredentials) Option { // This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { return fnOpt(func(c config) config { - c.serviceConfig = conf.NewSetting(serviceConfig) + c.serviceConfig = newSetting(serviceConfig) return c }) } @@ -312,7 +311,7 @@ func WithServiceConfig(serviceConfig string) Option { // This option has no effect if WithGRPCConn is used. func WithDialOption(opts ...grpc.DialOption) Option { return fnOpt(func(c config) config { - c.dialOptions = conf.NewSetting(opts) + c.dialOptions = newSetting(opts) return c }) } @@ -327,7 +326,7 @@ func WithDialOption(opts ...grpc.DialOption) Option { // Shutdown method will not close this connection. func WithGRPCConn(conn *grpc.ClientConn) Option { return fnOpt(func(c config) config { - c.gRPCConn = conf.NewSetting(conn) + c.gRPCConn = newSetting(conn) return c }) } @@ -348,7 +347,7 @@ func WithGRPCConn(conn *grpc.ClientConn) Option { // passed, a timeout of 10 seconds will be used. func WithTimeout(duration time.Duration) Option { return fnOpt(func(c config) config { - c.timeout = conf.NewSetting(duration) + c.timeout = newSetting(duration) return c }) } @@ -368,7 +367,7 @@ func WithTimeout(duration time.Duration) Option { // after each error for no more than a total time of 1 minute. func WithRetry(rc RetryConfig) Option { return fnOpt(func(c config) config { - c.retryCfg = conf.NewSetting(retry.Config(rc)) + c.retryCfg = newSetting(retry.Config(rc)) return c }) } @@ -456,8 +455,8 @@ func convDuration(s string) (time.Duration, error) { // If the filepath defined is invalid or does not contain valid TLS files, an // error is passed to the OTel ErrorHandler and no TLS configuration is // provided. -func loadEnvTLS[T *tls.Config]() conf.Resolver[T] { - return func(s conf.Setting[T]) conf.Setting[T] { +func loadEnvTLS[T *tls.Config]() resolver[T] { + return func(s setting[T]) setting[T] { if s.Set { // Passed, valid, options have precedence. return s @@ -539,3 +538,79 @@ func compressorToCompression(compressor string) Compression { return c } + +// setting is a configuration setting value. +type setting[T any] struct { + Value T + Set bool +} + +// newSetting returns a new setting with the value set. +func newSetting[T any](value T) setting[T] { + return setting[T]{Value: value, Set: true} +} + +// resolver returns an updated setting after applying an resolution operation. +type resolver[T any] func(setting[T]) setting[T] + +// Resolve returns a resolved version of s. +// +// It will apply all the passed fn in the order provided, chaining together the +// return setting to the next input. The setting s is used as the initial +// argument to the first fn. +// +// Each fn needs to validate if it should apply given the Set state of the +// setting. This will not perform any checks on the set state when chaining +// function. +func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] { + for _, f := range fn { + s = f(s) + } + return s +} + +// getEnv returns a resolver that will apply an environment variable value +// associated with the first set key to a setting value. The conv function is +// used to convert between the environment variable value and the setting type. +// +// If the input setting to the resolver is set, the environment variable will +// not be applied. +// +// Any error returned from conv is sent to the OTel ErrorHandler and the +// setting will not be updated. +func getEnv[T any](keys []string, conv func(string) (T, error)) resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + for _, key := range keys { + if vStr := os.Getenv(key); vStr != "" { + v, err := conv(vStr) + if err == nil { + s.Value = v + s.Set = true + break + } + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) + } + } + return s + } +} + +// fallback returns a resolve that will set a setting value to val if it is not +// already set. +// +// This is usually passed at the end of a resolver chain to ensure a default is +// applied if the setting has not already been set. +func fallback[T any](val T) resolver[T] { + return func(s setting[T]) setting[T] { + if !s.Set { + s.Value = val + s.Set = true + } + return s + } +} diff --git a/exporters/otlp/otlplog/otlploggrpc/config_test.go b/exporters/otlp/otlplog/otlploggrpc/config_test.go index 612b9d9b213..02817476f5c 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/config_test.go @@ -17,7 +17,6 @@ import ( "google.golang.org/grpc/credentials" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/conf" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" ) @@ -95,9 +94,9 @@ func TestNewConfig(t *testing.T) { { name: "Defaults", want: config{ - endpoint: conf.NewSetting(defaultEndpoint), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting(defaultEndpoint), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -117,17 +116,17 @@ func TestNewConfig(t *testing.T) { WithRetry(RetryConfig(rc)), }, want: config{ - endpoint: conf.NewSetting("test:8080"), - insecure: conf.NewSetting(true), - headers: conf.NewSetting(headers), - compression: conf.NewSetting(GzipCompression), - timeout: conf.NewSetting(2 * time.Second), - retryCfg: conf.NewSetting(rc), - gRPCCredentials: conf.NewSetting(credentials.NewTLS(tlsCfg)), - serviceConfig: conf.NewSetting("{}"), - reconnectionPeriod: conf.NewSetting(time.Second), - gRPCConn: conf.NewSetting(&grpc.ClientConn{}), - dialOptions: conf.NewSetting(dialOptions), + endpoint: newSetting("test:8080"), + insecure: newSetting(true), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(2 * time.Second), + retryCfg: newSetting(rc), + gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)), + serviceConfig: newSetting("{}"), + reconnectionPeriod: newSetting(time.Second), + gRPCConn: newSetting(&grpc.ClientConn{}), + dialOptions: newSetting(dialOptions), }, }, { @@ -136,10 +135,10 @@ func TestNewConfig(t *testing.T) { WithEndpointURL("http://test:8080/path"), }, want: config{ - endpoint: conf.NewSetting("test:8080"), - insecure: conf.NewSetting(true), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("test:8080"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -150,10 +149,10 @@ func TestNewConfig(t *testing.T) { WithInsecure(), }, want: config{ - endpoint: conf.NewSetting("not-test:9090"), - insecure: conf.NewSetting(true), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("not-test:9090"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -164,10 +163,10 @@ func TestNewConfig(t *testing.T) { WithEndpointURL("https://test:8080/path"), }, want: config{ - endpoint: conf.NewSetting("test:8080"), - insecure: conf.NewSetting(false), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("test:8080"), + insecure: newSetting(false), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -182,13 +181,13 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", }, want: config{ - endpoint: conf.NewSetting("env.endpoint:8080"), - insecure: conf.NewSetting(false), - tlsCfg: conf.NewSetting(tlsCfg), - headers: conf.NewSetting(headers), - compression: conf.NewSetting(GzipCompression), - timeout: conf.NewSetting(15 * time.Second), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint:8080"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -197,10 +196,10 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", }, want: config{ - endpoint: conf.NewSetting("env.endpoint"), - insecure: conf.NewSetting(true), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -215,13 +214,13 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path", }, want: config{ - endpoint: conf.NewSetting("env.endpoint:8080"), - insecure: conf.NewSetting(true), - tlsCfg: conf.NewSetting(tlsCfg), - headers: conf.NewSetting(headers), - compression: conf.NewSetting(NoCompression), - timeout: conf.NewSetting(15 * time.Second), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint:8080"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(NoCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -230,10 +229,10 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", }, want: config{ - endpoint: conf.NewSetting("env.endpoint"), - insecure: conf.NewSetting(true), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -256,13 +255,13 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", }, want: config{ - endpoint: conf.NewSetting("env.endpoint:8080"), - insecure: conf.NewSetting(false), - tlsCfg: conf.NewSetting(tlsCfg), - headers: conf.NewSetting(headers), - compression: conf.NewSetting(GzipCompression), - timeout: conf.NewSetting(15 * time.Second), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint:8080"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), }, }, { @@ -295,14 +294,14 @@ func TestNewConfig(t *testing.T) { WithRetry(RetryConfig(rc)), }, want: config{ - endpoint: conf.NewSetting("test"), - insecure: conf.NewSetting(true), - tlsCfg: conf.NewSetting(tlsCfg), - headers: conf.NewSetting(headers), - compression: conf.NewSetting(GzipCompression), - timeout: conf.NewSetting(time.Second), - retryCfg: conf.NewSetting(rc), - gRPCCredentials: conf.NewSetting(credentials.NewTLS(tlsCfg)), + endpoint: newSetting("test"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(time.Second), + retryCfg: newSetting(rc), + gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)), }, }, { @@ -317,9 +316,9 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", }, want: config{ - endpoint: conf.NewSetting(defaultEndpoint), - timeout: conf.NewSetting(defaultTimeout), - retryCfg: conf.NewSetting(defaultRetryCfg), + endpoint: newSetting(defaultEndpoint), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), }, errs: []string{ `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, @@ -353,7 +352,7 @@ func TestNewConfig(t *testing.T) { // Do not compare pointer values. assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) - var emptyTLS conf.Setting[*tls.Config] + var emptyTLS setting[*tls.Config] c.tlsCfg, tc.want.tlsCfg = emptyTLS, emptyTLS assert.Equal(t, tc.want, c) @@ -365,7 +364,7 @@ func TestNewConfig(t *testing.T) { } } -func assertTLSConfig(t *testing.T, want, got conf.Setting[*tls.Config]) { +func assertTLSConfig(t *testing.T, want, got setting[*tls.Config]) { t.Helper() assert.Equal(t, want.Set, got.Set, "setting Set") diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go b/exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go deleted file mode 100644 index 4c14054cdee..00000000000 --- a/exporters/otlp/otlplog/otlploggrpc/internal/conf/setting.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package conf // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/conf" - -import ( - "fmt" - "os" - - "go.opentelemetry.io/otel" -) - -// Setting is a configuration setting value. -type Setting[T any] struct { - Value T - Set bool -} - -// NewSetting returns a new setting with the value set. -func NewSetting[T any](value T) Setting[T] { - return Setting[T]{Value: value, Set: true} -} - -// Resolver returns an updated setting after applying an resolution operation. -type Resolver[T any] func(Setting[T]) Setting[T] - -// Resolve returns a resolved version of s. -// -// It will apply all the passed fn in the order provided, chaining together the -// return setting to the next input. The setting s is used as the initial -// argument to the first fn. -// -// Each fn needs to validate if it should apply given the Set state of the -// setting. This will not perform any checks on the set state when chaining -// function. -func (s Setting[T]) Resolve(fn ...Resolver[T]) Setting[T] { - for _, f := range fn { - s = f(s) - } - return s -} - -// GetEnv returns a Resolver that will apply an environment variable value -// associated with the first set key to a setting value. The conv function is -// used to convert between the environment variable value and the setting type. -// -// If the input setting to the Resolver is set, the environment variable will -// not be applied. -// -// Any error returned from conv is sent to the OTel ErrorHandler and the -// setting will not be updated. -func GetEnv[T any](keys []string, conv func(string) (T, error)) Resolver[T] { - return func(s Setting[T]) Setting[T] { - if s.Set { - // Passed, valid, options have precedence. - return s - } - - for _, key := range keys { - if vStr := os.Getenv(key); vStr != "" { - v, err := conv(vStr) - if err == nil { - s.Value = v - s.Set = true - break - } - otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) - } - } - return s - } -} - -// Fallback returns a resolve that will set a setting value to val if it is not -// already set. -// -// This is usually passed at the end of a resolver chain to ensure a default is -// applied if the setting has not already been set. -func Fallback[T any](val T) Resolver[T] { - return func(s Setting[T]) Setting[T] { - if !s.Set { - s.Value = val - s.Set = true - } - return s - } -}