From e8748663d34dd0c2af1878eee45395c35c9b58c5 Mon Sep 17 00:00:00 2001 From: Tal Vintrob Date: Wed, 7 Feb 2024 12:56:49 +0200 Subject: [PATCH] [exporter/otlphttp] added support for configurable telemetry encoding (#9276) **Description:** This PR adds support for encoding configuration in the `otlphttp` exporter. **Link to tracking Issue:** #6945 **Testing:** Updated existing tests, and added relevant tests **Documentation:** Updated the `otlphttp` docs to include the new configuration option. --------- Co-authored-by: Yang Song --- .chloggen/otlphttp-json-encoding.yaml | 25 ++++ exporter/otlphttpexporter/README.md | 10 ++ exporter/otlphttpexporter/config.go | 34 +++++ exporter/otlphttpexporter/config_test.go | 55 ++++++++ exporter/otlphttpexporter/factory.go | 1 + exporter/otlphttpexporter/factory_test.go | 15 ++ exporter/otlphttpexporter/otlp.go | 113 ++++++++++++---- exporter/otlphttpexporter/otlp_test.go | 128 +++++++++++++++++- .../testdata/bad_invalid_encoding.yaml | 1 + 9 files changed, 355 insertions(+), 27 deletions(-) create mode 100755 .chloggen/otlphttp-json-encoding.yaml create mode 100644 exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml diff --git a/.chloggen/otlphttp-json-encoding.yaml b/.chloggen/otlphttp-json-encoding.yaml new file mode 100755 index 00000000000..da20ab9df03 --- /dev/null +++ b/.chloggen/otlphttp-json-encoding.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for json content encoding when exporting telemetry + +# One or more tracking issues or pull requests related to the change +issues: [6945] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 75d6824af6c..fb15c0dab7a 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -37,6 +37,7 @@ The following settings can be optionally configured: - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. +- `encoding` (default = proto): The encoding to use for the messages (valid options: `proto`, `json`) Example: @@ -55,5 +56,14 @@ exporters: compression: none ``` +By default `proto` encoding is used, to change the content encoding of the message configure it as follows: + +```yaml +exporters: + otlphttp: + ... + encoding: json +``` + The full list of settings exposed for this exporter are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index c0918e0924e..ef59fc324a0 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -4,7 +4,9 @@ package otlphttpexporter // import "go.opentelemetry.io/collector/exporter/otlphttpexporter" import ( + "encoding" "errors" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" @@ -12,6 +14,35 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" ) +// EncodingType defines the type for content encoding +type EncodingType string + +const ( + EncodingProto EncodingType = "proto" + EncodingJSON EncodingType = "json" +) + +var _ encoding.TextUnmarshaler = (*EncodingType)(nil) + +// UnmarshalText unmarshalls text to an EncodingType. +func (e *EncodingType) UnmarshalText(text []byte) error { + if e == nil { + return errors.New("cannot unmarshal to a nil *EncodingType") + } + + str := string(text) + switch str { + case string(EncodingProto): + *e = EncodingProto + case string(EncodingJSON): + *e = EncodingJSON + default: + return fmt.Errorf("invalid encoding type: %s", str) + } + + return nil +} + // Config defines configuration for OTLP/HTTP exporter. type Config struct { confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. @@ -26,6 +57,9 @@ type Config struct { // The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used. LogsEndpoint string `mapstructure:"logs_endpoint"` + + // The encoding to export telemetry (default: "proto") + Encoding EncodingType `mapstructure:"encoding"` } var _ component.Config = (*Config)(nil) diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 76579f05d21..8981ffb9d0b 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -51,6 +51,7 @@ func TestUnmarshalConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, + Encoding: EncodingProto, ClientConfig: confighttp.ClientConfig{ Headers: map[string]configopaque.String{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", @@ -73,3 +74,57 @@ func TestUnmarshalConfig(t *testing.T) { }, }, cfg) } + +func TestUnmarshalConfigInvalidEncoding(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_invalid_encoding.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.Error(t, component.UnmarshalConfig(cm, cfg)) +} + +func TestUnmarshalEncoding(t *testing.T) { + tests := []struct { + name string + encodingBytes []byte + expected EncodingType + shouldError bool + }{ + { + name: "UnmarshalEncodingProto", + encodingBytes: []byte("proto"), + expected: EncodingProto, + shouldError: false, + }, + { + name: "UnmarshalEncodingJson", + encodingBytes: []byte("json"), + expected: EncodingJSON, + shouldError: false, + }, + { + name: "UnmarshalEmptyEncoding", + encodingBytes: []byte(""), + shouldError: true, + }, + { + name: "UnmarshalInvalidEncoding", + encodingBytes: []byte("invalid"), + shouldError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var encoding EncodingType + err := encoding.UnmarshalText(tt.encodingBytes) + + if tt.shouldError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, encoding) + } + }) + } +} diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 39cf084c612..9ebcc01fba1 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -36,6 +36,7 @@ func createDefaultConfig() component.Config { return &Config{ RetryConfig: configretry.NewDefaultBackOffConfig(), QueueConfig: exporterhelper.NewDefaultQueueSettings(), + Encoding: EncodingProto, ClientConfig: confighttp.ClientConfig{ Endpoint: "", Timeout: 30 * time.Second, diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 04d2027f655..5df472fb91e 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -35,6 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, ocfg.RetryConfig.InitialInterval, 5*time.Second, "default retry InitialInterval") assert.Equal(t, ocfg.RetryConfig.MaxInterval, 30*time.Second, "default retry MaxInterval") assert.Equal(t, ocfg.QueueConfig.Enabled, true, "default sending queue is enabled") + assert.Equal(t, ocfg.Encoding, EncodingProto) assert.Equal(t, ocfg.Compression, configcompression.TypeGzip) } @@ -154,6 +155,20 @@ func TestCreateTracesExporter(t *testing.T) { }, }, }, + { + name: "ProtoEncoding", + config: &Config{ + Encoding: EncodingProto, + ClientConfig: confighttp.ClientConfig{Endpoint: endpoint}, + }, + }, + { + name: "JSONEncoding", + config: &Config{ + Encoding: EncodingJSON, + ClientConfig: confighttp.ClientConfig{Endpoint: endpoint}, + }, + }, } for _, tt := range tests { diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 0dc1fe2e7f0..6ebc50e2cc0 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -48,6 +48,7 @@ const ( headerRetryAfter = "Retry-After" maxHTTPResponseReadBytes = 64 * 1024 + jsonContentType = "application/json" protobufContentType = "application/x-protobuf" ) @@ -87,7 +88,18 @@ func (e *baseExporter) start(_ context.Context, host component.Host) error { func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { tr := ptraceotlp.NewExportRequestFromTraces(td) - request, err := tr.MarshalProto() + + var err error + var request []byte + switch e.config.Encoding { + case EncodingJSON: + request, err = tr.MarshalJSON() + case EncodingProto: + request, err = tr.MarshalProto() + default: + err = fmt.Errorf("invalid encoding: %s", e.config.Encoding) + } + if err != nil { return consumererror.NewPermanent(err) } @@ -97,7 +109,18 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { tr := pmetricotlp.NewExportRequestFromMetrics(md) - request, err := tr.MarshalProto() + + var err error + var request []byte + switch e.config.Encoding { + case EncodingJSON: + request, err = tr.MarshalJSON() + case EncodingProto: + request, err = tr.MarshalProto() + default: + err = fmt.Errorf("invalid encoding: %s", e.config.Encoding) + } + if err != nil { return consumererror.NewPermanent(err) } @@ -106,7 +129,18 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) - request, err := tr.MarshalProto() + + var err error + var request []byte + switch e.config.Encoding { + case EncodingJSON: + request, err = tr.MarshalJSON() + case EncodingProto: + request, err = tr.MarshalProto() + default: + err = fmt.Errorf("invalid encoding: %s", e.config.Encoding) + } + if err != nil { return consumererror.NewPermanent(err) } @@ -120,7 +154,16 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p if err != nil { return consumererror.NewPermanent(err) } - req.Header.Set("Content-Type", protobufContentType) + + switch e.config.Encoding { + case EncodingJSON: + req.Header.Set("Content-Type", jsonContentType) + case EncodingProto: + req.Header.Set("Content-Type", protobufContentType) + default: + return fmt.Errorf("invalid encoding: %s", e.config.Encoding) + } + req.Header.Set("User-Agent", e.userAgent) resp, err := e.client.Do(req) @@ -231,7 +274,6 @@ func readResponseStatus(resp *http.Response) *status.Status { // "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a // Protobuf-encoded Status message that describes the problem." respBytes, err := readResponseBody(resp) - if err != nil { return nil } @@ -249,7 +291,6 @@ func readResponseStatus(resp *http.Response) *status.Status { func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { bodyBytes, err := readResponseBody(resp) - if err != nil { return err } @@ -260,14 +301,22 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par type partialSuccessHandler func(bytes []byte, contentType string) error func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { - if contentType != protobufContentType { - return nil - } exportResponse := ptraceotlp.NewExportResponse() - err := exportResponse.UnmarshalProto(protoBytes) - if err != nil { - return fmt.Errorf("error parsing protobuf response: %w", err) + switch contentType { + case protobufContentType: + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + case jsonContentType: + err := exportResponse.UnmarshalJSON(protoBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } + default: + return nil } + partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { e.logger.Warn("Partial success response", @@ -279,14 +328,22 @@ func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentTyp } func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { - if contentType != protobufContentType { - return nil - } exportResponse := pmetricotlp.NewExportResponse() - err := exportResponse.UnmarshalProto(protoBytes) - if err != nil { - return fmt.Errorf("error parsing protobuf response: %w", err) + switch contentType { + case protobufContentType: + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + case jsonContentType: + err := exportResponse.UnmarshalJSON(protoBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } + default: + return nil } + partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { e.logger.Warn("Partial success response", @@ -298,14 +355,22 @@ func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentTy } func (e *baseExporter) logsPartialSuccessHandler(protoBytes []byte, contentType string) error { - if contentType != protobufContentType { - return nil - } exportResponse := plogotlp.NewExportResponse() - err := exportResponse.UnmarshalProto(protoBytes) - if err != nil { - return fmt.Errorf("error parsing protobuf response: %w", err) + switch contentType { + case protobufContentType: + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + case jsonContentType: + err := exportResponse.UnmarshalJSON(protoBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } + default: + return nil } + partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { e.logger.Warn("Partial success response", diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 2f1216d1ca7..c30450f78a6 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -178,6 +178,7 @@ func TestErrorResponses(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), // Create without QueueSettings and RetryConfig so that ConsumeTraces // returns the errors that we want to check immediately. @@ -252,6 +253,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), ClientConfig: confighttp.ClientConfig{ Headers: test.headers, @@ -285,6 +287,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), ClientConfig: confighttp.ClientConfig{ Headers: test.headers, @@ -318,6 +321,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), ClientConfig: confighttp.ClientConfig{ Headers: test.headers, @@ -382,9 +386,6 @@ func TestPartialSuccessUnsupportedContentType(t *testing.T) { unsupportedContentTypeCases := []struct { contentType string }{ - { - contentType: "application/json", - }, { contentType: "text/plain", }, @@ -433,6 +434,7 @@ func TestPartialSuccess_logs(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), ClientConfig: confighttp.ClientConfig{}, } @@ -603,6 +605,7 @@ func TestPartialSuccess_traces(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), ClientConfig: confighttp.ClientConfig{}, } @@ -642,6 +645,7 @@ func TestPartialSuccess_metrics(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), ClientConfig: confighttp.ClientConfig{}, } @@ -666,6 +670,124 @@ func TestPartialSuccess_metrics(t *testing.T) { require.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success") } +func TestEncoding(t *testing.T) { + set := exportertest.NewNopCreateSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" + + tests := []struct { + name string + encoding EncodingType + expectedEncoding EncodingType + }{ + { + name: "proto_encoding", + encoding: EncodingProto, + expectedEncoding: "application/x-protobuf", + }, + { + name: "json_encoding", + encoding: EncodingJSON, + expectedEncoding: "application/json", + }, + } + + t.Run("traces", func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding) + writer.WriteHeader(200) + }) + defer srv.Close() + + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + Encoding: test.encoding, + } + exp, err := createTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + require.NoError(t, err) + }) + } + }) + + t.Run("metrics", func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding) + writer.WriteHeader(200) + }) + defer srv.Close() + + cfg := &Config{ + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), + Encoding: test.encoding, + } + exp, err := createMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + require.NoError(t, err) + }) + } + }) + + t.Run("logs", func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding) + writer.WriteHeader(200) + }) + defer srv.Close() + + cfg := &Config{ + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), + Encoding: test.encoding, + } + exp, err := createLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + require.NoError(t, err) + + srv.Close() + }) + } + }) +} + func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler) diff --git a/exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml b/exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml new file mode 100644 index 00000000000..593aee269a5 --- /dev/null +++ b/exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml @@ -0,0 +1 @@ +encoding: invalid