From c1675c5f8c796b02b7034293e17a441a581d7b22 Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Sun, 7 Jan 2024 21:40:42 +0200 Subject: [PATCH 01/12] feat(otlphttp-json): added encoding parameter to otlphttp exporter --- exporter/otlphttpexporter/README.md | 10 ++++++++++ exporter/otlphttpexporter/config.go | 10 ++++++++++ exporter/otlphttpexporter/config_test.go | 1 + exporter/otlphttpexporter/factory.go | 1 + exporter/otlphttpexporter/factory_test.go | 15 +++++++++++++++ 5 files changed, 37 insertions(+) diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 75d6824af6c..b344a65e2c4 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -37,6 +37,7 @@ The following settings can be optionally configured: - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. +- `encoding` (default = otlp_proto): The encoding to use for the messages (valid options: `otlp_proto`, `otlp_json`) Example: @@ -55,5 +56,14 @@ exporters: compression: none ``` +By default `otlp_json` encoding is used, to change the content encoding of the message configure it as follows: + +```yaml +exporters: + otlphttp: + ... + encoding: otlp_json +``` + The full list of settings exposed for this exporter are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 3fb9d2bdec4..49c8e1eef66 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -12,6 +12,13 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" ) +type EncodingType string + +const ( + EncodingProto EncodingType = "otlp_proto" + EncodingJSON EncodingType = "otlp_json" +) + // Config defines configuration for OTLP/HTTP exporter. type Config struct { confighttp.HTTPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. @@ -26,6 +33,9 @@ type Config struct { // The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used. LogsEndpoint string `mapstructure:"logs_endpoint"` + + // The encoding to export telemetry (default: "otlp_proto") + Encoding EncodingType `mapstructure:"encoding"` } var _ component.Config = (*Config)(nil) diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 0089e13b719..640b6d78dfc 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -51,6 +51,7 @@ func TestUnmarshalConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, + Encoding: "otlp_proto", HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: map[string]configopaque.String{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 3ed71d626d0..4ceec2ca7bf 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -36,6 +36,7 @@ func createDefaultConfig() component.Config { return &Config{ RetryConfig: configretry.NewDefaultBackOffConfig(), QueueConfig: exporterhelper.NewDefaultQueueSettings(), + Encoding: "otlp_proto", HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "", Timeout: 30 * time.Second, diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 69749831c5e..3a7a9a0abc8 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -36,6 +36,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, ocfg.RetryConfig.MaxInterval, 30*time.Second, "default retry MaxInterval") assert.Equal(t, ocfg.QueueConfig.Enabled, true, "default sending queue is enabled") assert.Equal(t, ocfg.Compression, configcompression.Gzip) + assert.Equal(t, ocfg.Encoding, EncodingProto) } func TestCreateMetricsExporter(t *testing.T) { @@ -154,6 +155,20 @@ func TestCreateTracesExporter(t *testing.T) { }, }, }, + { + name: "ProtoEncoding", + config: &Config{ + Encoding: EncodingProto, + HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: endpoint}, + }, + }, + { + name: "JSONEncoding", + config: &Config{ + Encoding: EncodingJSON, + HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: endpoint}, + }, + }, } for _, tt := range tests { From 3b703c72c0e6af42692d975a87da3513da990a6a Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Mon, 8 Jan 2024 12:11:45 +0200 Subject: [PATCH 02/12] feat(otlphttp-json): added encoding logic to the exporter, updated tests --- exporter/otlphttpexporter/otlp.go | 103 +++++++++++++++----- exporter/otlphttpexporter/otlp_test.go | 125 ++++++++++++++++++++++++- 2 files changed, 204 insertions(+), 24 deletions(-) diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index b118b72f2a3..7fbea0365e2 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -48,6 +48,7 @@ const ( headerRetryAfter = "Retry-After" maxHTTPResponseReadBytes = 64 * 1024 + jsonContentType = "application/json" protobufContentType = "application/x-protobuf" ) @@ -87,7 +88,16 @@ func (e *baseExporter) start(_ context.Context, host component.Host) error { func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { tr := ptraceotlp.NewExportRequestFromTraces(td) - request, err := tr.MarshalProto() + + var err error + var request []byte + switch e.config.Encoding { + case EncodingJSON: + request, err = tr.MarshalJSON() + default: + request, err = tr.MarshalProto() + } + if err != nil { return consumererror.NewPermanent(err) } @@ -97,7 +107,16 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { tr := pmetricotlp.NewExportRequestFromMetrics(md) - request, err := tr.MarshalProto() + + var err error + var request []byte + switch e.config.Encoding { + case EncodingJSON: + request, err = tr.MarshalJSON() + default: + request, err = tr.MarshalProto() + } + if err != nil { return consumererror.NewPermanent(err) } @@ -106,7 +125,16 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) - request, err := tr.MarshalProto() + + var err error + var request []byte + switch e.config.Encoding { + case EncodingJSON: + request, err = tr.MarshalJSON() + default: + request, err = tr.MarshalProto() + } + if err != nil { return consumererror.NewPermanent(err) } @@ -120,7 +148,15 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p if err != nil { return consumererror.NewPermanent(err) } - req.Header.Set("Content-Type", protobufContentType) + + switch e.config.Encoding { + case EncodingJSON: + req.Header.Set("Content-Type", jsonContentType) + default: + req.Header.Set("Content-Type", protobufContentType) + + } + req.Header.Set("User-Agent", e.userAgent) resp, err := e.client.Do(req) @@ -231,7 +267,6 @@ func readResponseStatus(resp *http.Response) *status.Status { // "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a // Protobuf-encoded Status message that describes the problem." respBytes, err := readResponseBody(resp) - if err != nil { return nil } @@ -249,7 +284,6 @@ func readResponseStatus(resp *http.Response) *status.Status { func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { bodyBytes, err := readResponseBody(resp) - if err != nil { return err } @@ -259,15 +293,24 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par type partialSuccessHandler func(bytes []byte, contentType string) error -func tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { - if contentType != protobufContentType { +func tracesPartialSuccessHandler(responseBytes []byte, contentType string) error { + if contentType != protobufContentType && contentType != jsonContentType { return nil } exportResponse := ptraceotlp.NewExportResponse() - err := exportResponse.UnmarshalProto(protoBytes) - if err != nil { - return fmt.Errorf("error parsing protobuf response: %w", err) + + if contentType == protobufContentType { + err := exportResponse.UnmarshalProto(responseBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + } else if contentType == jsonContentType { + err := exportResponse.UnmarshalJSON(responseBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } } + partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans())) @@ -275,15 +318,24 @@ func tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { return nil } -func metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { - if contentType != protobufContentType { +func metricsPartialSuccessHandler(responseBytes []byte, contentType string) error { + if contentType != protobufContentType && contentType != jsonContentType { return nil } exportResponse := pmetricotlp.NewExportResponse() - err := exportResponse.UnmarshalProto(protoBytes) - if err != nil { - return fmt.Errorf("error parsing protobuf response: %w", err) + + if contentType == protobufContentType { + err := exportResponse.UnmarshalProto(responseBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + } else if contentType == jsonContentType { + err := exportResponse.UnmarshalJSON(responseBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } } + partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints())) @@ -291,15 +343,24 @@ func metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { return nil } -func logsPartialSuccessHandler(protoBytes []byte, contentType string) error { - if contentType != protobufContentType { +func logsPartialSuccessHandler(responseBytes []byte, contentType string) error { + if contentType != protobufContentType && contentType != jsonContentType { return nil } exportResponse := plogotlp.NewExportResponse() - err := exportResponse.UnmarshalProto(protoBytes) - if err != nil { - return fmt.Errorf("error parsing protobuf response: %w", err) + + if contentType == protobufContentType { + err := exportResponse.UnmarshalProto(responseBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + } else if contentType == jsonContentType { + err := exportResponse.UnmarshalJSON(responseBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } } + partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords())) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 578e6af3ace..ba3d30cdbc1 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -372,9 +372,6 @@ func TestPartialSuccessUnsupportedContentType(t *testing.T) { unsupportedContentTypeCases := []struct { contentType string }{ - { - contentType: "application/json", - }, { contentType: "text/plain", }, @@ -600,6 +597,128 @@ func TestPartialSuccess_metrics(t *testing.T) { require.Error(t, err) } +func TestEncoding(t *testing.T) { + set := exportertest.NewNopCreateSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" + + tests := []struct { + name string + encoding EncodingType + expectedEncoding EncodingType + }{ + { + name: "default_encoding", + expectedEncoding: "application/x-protobuf", + }, + { + name: "explicit_proto_encoding", + encoding: EncodingProto, + expectedEncoding: "application/x-protobuf", + }, + { + name: "json_encoding", + encoding: EncodingJSON, + expectedEncoding: "application/json", + }, + } + + t.Run("traces", func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding) + writer.WriteHeader(200) + }) + defer srv.Close() + + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + Encoding: test.encoding, + } + exp, err := createTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + require.NoError(t, err) + }) + } + }) + + t.Run("metrics", func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding) + writer.WriteHeader(200) + }) + defer srv.Close() + + cfg := &Config{ + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), + Encoding: test.encoding, + } + exp, err := createMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + require.NoError(t, err) + }) + } + }) + + t.Run("logs", func(t *testing.T) { + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + assert.Contains(t, request.Header.Get("content-type"), test.expectedEncoding) + writer.WriteHeader(200) + }) + defer srv.Close() + + cfg := &Config{ + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), + Encoding: test.encoding, + } + exp, err := createLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + require.NoError(t, err) + + srv.Close() + }) + } + }) +} + func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler) From 895feebe890b7dc17cde3cb7e2e004a8a67f94e8 Mon Sep 17 00:00:00 2001 From: Tal Vintrob Date: Fri, 12 Jan 2024 15:06:16 +0200 Subject: [PATCH 03/12] docs: fixed otlphttp example --- exporter/otlphttpexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index b344a65e2c4..6dcbac47836 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -56,7 +56,7 @@ exporters: compression: none ``` -By default `otlp_json` encoding is used, to change the content encoding of the message configure it as follows: +By default `otlp_proto` encoding is used, to change the content encoding of the message configure it as follows: ```yaml exporters: From 1193812ccfe1a89c28b5d0b97dde050678bfa0d8 Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Sat, 13 Jan 2024 17:13:37 +0200 Subject: [PATCH 04/12] added changelog entry, fixed PR comments --- .chloggen/otlphttp-json-encoding.yaml | 25 ++++++++++++++++++++++++ exporter/otlphttpexporter/README.md | 6 +++--- exporter/otlphttpexporter/config.go | 7 ++++--- exporter/otlphttpexporter/config_test.go | 2 +- exporter/otlphttpexporter/factory.go | 2 +- exporter/otlphttpexporter/otlp.go | 9 ++++++++- 6 files changed, 42 insertions(+), 9 deletions(-) create mode 100755 .chloggen/otlphttp-json-encoding.yaml diff --git a/.chloggen/otlphttp-json-encoding.yaml b/.chloggen/otlphttp-json-encoding.yaml new file mode 100755 index 00000000000..da20ab9df03 --- /dev/null +++ b/.chloggen/otlphttp-json-encoding.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for json content encoding when exporting telemetry + +# One or more tracking issues or pull requests related to the change +issues: [6945] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 6dcbac47836..fb15c0dab7a 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -37,7 +37,7 @@ The following settings can be optionally configured: - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. -- `encoding` (default = otlp_proto): The encoding to use for the messages (valid options: `otlp_proto`, `otlp_json`) +- `encoding` (default = proto): The encoding to use for the messages (valid options: `proto`, `json`) Example: @@ -56,13 +56,13 @@ exporters: compression: none ``` -By default `otlp_proto` encoding is used, to change the content encoding of the message configure it as follows: +By default `proto` encoding is used, to change the content encoding of the message configure it as follows: ```yaml exporters: otlphttp: ... - encoding: otlp_json + encoding: json ``` The full list of settings exposed for this exporter are documented [here](./config.go) diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 49c8e1eef66..f48ea9b98e3 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -12,11 +12,12 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" ) +// EncodingType defines the type for content encoding type EncodingType string const ( - EncodingProto EncodingType = "otlp_proto" - EncodingJSON EncodingType = "otlp_json" + EncodingProto EncodingType = "proto" + EncodingJSON EncodingType = "json" ) // Config defines configuration for OTLP/HTTP exporter. @@ -34,7 +35,7 @@ type Config struct { // The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used. LogsEndpoint string `mapstructure:"logs_endpoint"` - // The encoding to export telemetry (default: "otlp_proto") + // The encoding to export telemetry (default: "proto") Encoding EncodingType `mapstructure:"encoding"` } diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 640b6d78dfc..db0f6122bac 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -51,7 +51,7 @@ func TestUnmarshalConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Encoding: "otlp_proto", + Encoding: EncodingProto, HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: map[string]configopaque.String{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 4ceec2ca7bf..cad037267e3 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -36,7 +36,7 @@ func createDefaultConfig() component.Config { return &Config{ RetryConfig: configretry.NewDefaultBackOffConfig(), QueueConfig: exporterhelper.NewDefaultQueueSettings(), - Encoding: "otlp_proto", + Encoding: EncodingProto, HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "", Timeout: 30 * time.Second, diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 7fbea0365e2..75db7c55836 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -94,6 +94,8 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { switch e.config.Encoding { case EncodingJSON: request, err = tr.MarshalJSON() + case EncodingProto: + request, err = tr.MarshalProto() default: request, err = tr.MarshalProto() } @@ -113,6 +115,8 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro switch e.config.Encoding { case EncodingJSON: request, err = tr.MarshalJSON() + case EncodingProto: + request, err = tr.MarshalProto() default: request, err = tr.MarshalProto() } @@ -131,6 +135,8 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { switch e.config.Encoding { case EncodingJSON: request, err = tr.MarshalJSON() + case EncodingProto: + request, err = tr.MarshalProto() default: request, err = tr.MarshalProto() } @@ -152,9 +158,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p switch e.config.Encoding { case EncodingJSON: req.Header.Set("Content-Type", jsonContentType) + case EncodingProto: + req.Header.Set("Content-Type", protobufContentType) default: req.Header.Set("Content-Type", protobufContentType) - } req.Header.Set("User-Agent", e.userAgent) From 6b3c402fc7746cdb65a27218be0ce2b27c2dfc37 Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Thu, 18 Jan 2024 21:31:35 +0200 Subject: [PATCH 05/12] fixed f5cloudexporter test --- exporter/otlphttpexporter/config_test.go | 2 +- exporter/otlphttpexporter/factory.go | 1 - exporter/otlphttpexporter/factory_test.go | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index db0f6122bac..31da4343262 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -51,7 +51,7 @@ func TestUnmarshalConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Encoding: EncodingProto, + Encoding: EncodingType(""), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: map[string]configopaque.String{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index cad037267e3..3ed71d626d0 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -36,7 +36,6 @@ func createDefaultConfig() component.Config { return &Config{ RetryConfig: configretry.NewDefaultBackOffConfig(), QueueConfig: exporterhelper.NewDefaultQueueSettings(), - Encoding: EncodingProto, HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "", Timeout: 30 * time.Second, diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 3a7a9a0abc8..f31c966e33b 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -36,7 +36,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, ocfg.RetryConfig.MaxInterval, 30*time.Second, "default retry MaxInterval") assert.Equal(t, ocfg.QueueConfig.Enabled, true, "default sending queue is enabled") assert.Equal(t, ocfg.Compression, configcompression.Gzip) - assert.Equal(t, ocfg.Encoding, EncodingProto) + assert.Equal(t, ocfg.Encoding, EncodingType("")) } func TestCreateMetricsExporter(t *testing.T) { From bd7b3ea3ef6df1d211697c70147c0399b412d321 Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Wed, 24 Jan 2024 08:38:15 +0200 Subject: [PATCH 06/12] cr comments --- exporter/otlphttpexporter/otlp.go | 33 ++++++++++++++----------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 75db7c55836..3d108a173fe 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -301,21 +301,20 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par type partialSuccessHandler func(bytes []byte, contentType string) error func tracesPartialSuccessHandler(responseBytes []byte, contentType string) error { - if contentType != protobufContentType && contentType != jsonContentType { - return nil - } exportResponse := ptraceotlp.NewExportResponse() - - if contentType == protobufContentType { + switch contentType { + case protobufContentType: err := exportResponse.UnmarshalProto(responseBytes) if err != nil { return fmt.Errorf("error parsing protobuf response: %w", err) } - } else if contentType == jsonContentType { + case jsonContentType: err := exportResponse.UnmarshalJSON(responseBytes) if err != nil { return fmt.Errorf("error parsing json response: %w", err) } + default: + return nil } partialSuccess := exportResponse.PartialSuccess() @@ -326,21 +325,20 @@ func tracesPartialSuccessHandler(responseBytes []byte, contentType string) error } func metricsPartialSuccessHandler(responseBytes []byte, contentType string) error { - if contentType != protobufContentType && contentType != jsonContentType { - return nil - } exportResponse := pmetricotlp.NewExportResponse() - - if contentType == protobufContentType { + switch contentType { + case protobufContentType: err := exportResponse.UnmarshalProto(responseBytes) if err != nil { return fmt.Errorf("error parsing protobuf response: %w", err) } - } else if contentType == jsonContentType { + case jsonContentType: err := exportResponse.UnmarshalJSON(responseBytes) if err != nil { return fmt.Errorf("error parsing json response: %w", err) } + default: + return nil } partialSuccess := exportResponse.PartialSuccess() @@ -351,21 +349,20 @@ func metricsPartialSuccessHandler(responseBytes []byte, contentType string) erro } func logsPartialSuccessHandler(responseBytes []byte, contentType string) error { - if contentType != protobufContentType && contentType != jsonContentType { - return nil - } exportResponse := plogotlp.NewExportResponse() - - if contentType == protobufContentType { + switch contentType { + case protobufContentType: err := exportResponse.UnmarshalProto(responseBytes) if err != nil { return fmt.Errorf("error parsing protobuf response: %w", err) } - } else if contentType == jsonContentType { + case jsonContentType: err := exportResponse.UnmarshalJSON(responseBytes) if err != nil { return fmt.Errorf("error parsing json response: %w", err) } + default: + return nil } partialSuccess := exportResponse.PartialSuccess() From 76b3384987120bc839e62c4fae5201e81691a605 Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Wed, 24 Jan 2024 08:54:00 +0200 Subject: [PATCH 07/12] implemented text marshal/unmarshal for EncodingType --- exporter/otlphttpexporter/config.go | 33 +++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index f48ea9b98e3..18cfbda8cea 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -4,7 +4,9 @@ package otlphttpexporter // import "go.opentelemetry.io/collector/exporter/otlphttpexporter" import ( + "encoding" "errors" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" @@ -20,6 +22,37 @@ const ( EncodingJSON EncodingType = "json" ) +var ( + _ encoding.TextMarshaler = (*EncodingType)(nil) + _ encoding.TextUnmarshaler = (*EncodingType)(nil) +) + +// MarshalText marshals EncodingType to text. +func (e EncodingType) MarshalText() (text []byte, err error) { + return []byte(e), nil +} + +// UnmarshalText unmarshalls text to a Level. +func (e *EncodingType) UnmarshalText(text []byte) error { + if e == nil { + return errors.New("cannot unmarshal to a nil *EncodingType") + } + + str := string(text) + switch str { + case string(EncodingProto): + *e = EncodingProto + case string(EncodingJSON): + *e = EncodingJSON + + // TODO: remove this case when https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30703 is fixed + case "": + *e = EncodingProto + } + + return fmt.Errorf("invalid encoding type: %s", str) +} + // Config defines configuration for OTLP/HTTP exporter. type Config struct { confighttp.HTTPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. From 04057652040f758d245351557b0783129e07df18 Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Fri, 26 Jan 2024 12:49:20 +0200 Subject: [PATCH 08/12] updated config to include default --- exporter/otlphttpexporter/config.go | 18 ++++-------------- exporter/otlphttpexporter/config_test.go | 10 +++++++++- exporter/otlphttpexporter/factory.go | 1 + exporter/otlphttpexporter/factory_test.go | 2 +- exporter/otlphttpexporter/otlp.go | 8 ++++---- exporter/otlphttpexporter/otlp_test.go | 13 ++++++++----- .../testdata/bad_invalid_encoding.yaml | 1 + 7 files changed, 28 insertions(+), 25 deletions(-) create mode 100644 exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 18cfbda8cea..5b0ec8936b9 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -22,15 +22,7 @@ const ( EncodingJSON EncodingType = "json" ) -var ( - _ encoding.TextMarshaler = (*EncodingType)(nil) - _ encoding.TextUnmarshaler = (*EncodingType)(nil) -) - -// MarshalText marshals EncodingType to text. -func (e EncodingType) MarshalText() (text []byte, err error) { - return []byte(e), nil -} +var _ encoding.TextUnmarshaler = (*EncodingType)(nil) // UnmarshalText unmarshalls text to a Level. func (e *EncodingType) UnmarshalText(text []byte) error { @@ -44,13 +36,11 @@ func (e *EncodingType) UnmarshalText(text []byte) error { *e = EncodingProto case string(EncodingJSON): *e = EncodingJSON - - // TODO: remove this case when https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30703 is fixed - case "": - *e = EncodingProto + default: + return fmt.Errorf("invalid encoding type: %s", str) } - return fmt.Errorf("invalid encoding type: %s", str) + return nil } // Config defines configuration for OTLP/HTTP exporter. diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 31da4343262..24adc12d1ac 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -51,7 +51,7 @@ func TestUnmarshalConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Encoding: EncodingType(""), + Encoding: EncodingProto, HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: map[string]configopaque.String{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", @@ -74,3 +74,11 @@ func TestUnmarshalConfig(t *testing.T) { }, }, cfg) } + +func TestUnmarshalConfigInvalidEncoding(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_invalid_encoding.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.Error(t, component.UnmarshalConfig(cm, cfg)) +} diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 3ed71d626d0..cad037267e3 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -36,6 +36,7 @@ func createDefaultConfig() component.Config { return &Config{ RetryConfig: configretry.NewDefaultBackOffConfig(), QueueConfig: exporterhelper.NewDefaultQueueSettings(), + Encoding: EncodingProto, HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "", Timeout: 30 * time.Second, diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index f31c966e33b..3a7a9a0abc8 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -36,7 +36,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, ocfg.RetryConfig.MaxInterval, 30*time.Second, "default retry MaxInterval") assert.Equal(t, ocfg.QueueConfig.Enabled, true, "default sending queue is enabled") assert.Equal(t, ocfg.Compression, configcompression.Gzip) - assert.Equal(t, ocfg.Encoding, EncodingType("")) + assert.Equal(t, ocfg.Encoding, EncodingProto) } func TestCreateMetricsExporter(t *testing.T) { diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 3d108a173fe..4de02661b1d 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -97,7 +97,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { case EncodingProto: request, err = tr.MarshalProto() default: - request, err = tr.MarshalProto() + err = fmt.Errorf("invalid encoding: %s", e.config.Encoding) } if err != nil { @@ -118,7 +118,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro case EncodingProto: request, err = tr.MarshalProto() default: - request, err = tr.MarshalProto() + err = fmt.Errorf("invalid encoding: %s", e.config.Encoding) } if err != nil { @@ -138,7 +138,7 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { case EncodingProto: request, err = tr.MarshalProto() default: - request, err = tr.MarshalProto() + err = fmt.Errorf("invalid encoding: %s", e.config.Encoding) } if err != nil { @@ -161,7 +161,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p case EncodingProto: req.Header.Set("Content-Type", protobufContentType) default: - req.Header.Set("Content-Type", protobufContentType) + return fmt.Errorf("invalid encoding: %s", e.config.Encoding) } req.Header.Set("User-Agent", e.userAgent) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index ba3d30cdbc1..d642b790e7a 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -176,6 +176,7 @@ func TestErrorResponses(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), // Create without QueueSettings and RetryConfig so that ConsumeTraces // returns the errors that we want to check immediately. @@ -250,6 +251,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, @@ -283,6 +285,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, @@ -316,6 +319,7 @@ func TestUserAgent(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, @@ -420,6 +424,7 @@ func TestPartialSuccess_logs(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{}, } @@ -544,6 +549,7 @@ func TestPartialSuccess_traces(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{}, } @@ -578,6 +584,7 @@ func TestPartialSuccess_metrics(t *testing.T) { defer srv.Close() cfg := &Config{ + Encoding: EncodingProto, MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{}, } @@ -608,11 +615,7 @@ func TestEncoding(t *testing.T) { expectedEncoding EncodingType }{ { - name: "default_encoding", - expectedEncoding: "application/x-protobuf", - }, - { - name: "explicit_proto_encoding", + name: "proto_encoding", encoding: EncodingProto, expectedEncoding: "application/x-protobuf", }, diff --git a/exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml b/exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml new file mode 100644 index 00000000000..593aee269a5 --- /dev/null +++ b/exporter/otlphttpexporter/testdata/bad_invalid_encoding.yaml @@ -0,0 +1 @@ +encoding: invalid From 238df10aaa0b338ca7373878f3256cccab7fb561 Mon Sep 17 00:00:00 2001 From: Tal Vintrob Date: Sat, 27 Jan 2024 16:12:54 +0200 Subject: [PATCH 09/12] Update config.go Co-authored-by: Yang Song --- exporter/otlphttpexporter/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/otlphttpexporter/config.go b/exporter/otlphttpexporter/config.go index 5b0ec8936b9..65e73199686 100644 --- a/exporter/otlphttpexporter/config.go +++ b/exporter/otlphttpexporter/config.go @@ -24,7 +24,7 @@ const ( var _ encoding.TextUnmarshaler = (*EncodingType)(nil) -// UnmarshalText unmarshalls text to a Level. +// UnmarshalText unmarshalls text to an EncodingType. func (e *EncodingType) UnmarshalText(text []byte) error { if e == nil { return errors.New("cannot unmarshal to a nil *EncodingType") From af4241a03e2f068042df4f6ce483ec1295c0a50e Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Sat, 27 Jan 2024 19:05:50 +0200 Subject: [PATCH 10/12] added tests around UnmarshalText --- exporter/otlphttpexporter/config_test.go | 46 ++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 24adc12d1ac..535410555f2 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -82,3 +82,49 @@ func TestUnmarshalConfigInvalidEncoding(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Error(t, component.UnmarshalConfig(cm, cfg)) } + +func TestUnmarshalEncoding(t *testing.T) { + tests := []struct { + name string + encodingBytes []byte + expected EncodingType + shouldError bool + }{ + { + name: "UnmarshalEncodingProto", + encodingBytes: []byte("proto"), + expected: EncodingProto, + shouldError: false, + }, + { + name: "UnmarshalEncodingJson", + encodingBytes: []byte("json"), + expected: EncodingJSON, + shouldError: false, + }, + { + name: "UnmarshalEmptyEncoding", + encodingBytes: []byte(""), + shouldError: true, + }, + { + name: "UnmarshalInvalidEncoding", + encodingBytes: []byte("invalid"), + shouldError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var encoding EncodingType + err := encoding.UnmarshalText(tt.encodingBytes) + + if tt.shouldError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, encoding) + } + }) + } +} From 6addd0584d4f4f95972ec3a6ca76cde57ad2b55b Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Tue, 30 Jan 2024 15:45:35 +0200 Subject: [PATCH 11/12] fixed lint --- exporter/otlphttpexporter/factory_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 1c00ae30184..99ffe6252cf 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -158,15 +158,15 @@ func TestCreateTracesExporter(t *testing.T) { { name: "ProtoEncoding", config: &Config{ - Encoding: EncodingProto, - HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: endpoint}, + Encoding: EncodingProto, + HTTPClientConfig: confighttp.HTTPClientConfig{Endpoint: endpoint}, }, }, { name: "JSONEncoding", config: &Config{ - Encoding: EncodingJSON, - HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: endpoint}, + Encoding: EncodingJSON, + HTTPClientConfig: confighttp.HTTPClientConfig{Endpoint: endpoint}, }, }, } From e5510d5598bdbfeb73de8619e3fd968559567e4e Mon Sep 17 00:00:00 2001 From: tvaintrob Date: Mon, 5 Feb 2024 15:25:03 +0200 Subject: [PATCH 12/12] fixed lint --- exporter/otlphttpexporter/factory_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 263060f1e11..5df472fb91e 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -158,15 +158,15 @@ func TestCreateTracesExporter(t *testing.T) { { name: "ProtoEncoding", config: &Config{ - Encoding: EncodingProto, - HTTPClientConfig: confighttp.HTTPClientConfig{Endpoint: endpoint}, + Encoding: EncodingProto, + ClientConfig: confighttp.ClientConfig{Endpoint: endpoint}, }, }, { name: "JSONEncoding", config: &Config{ - Encoding: EncodingJSON, - HTTPClientConfig: confighttp.HTTPClientConfig{Endpoint: endpoint}, + Encoding: EncodingJSON, + ClientConfig: confighttp.ClientConfig{Endpoint: endpoint}, }, }, }