diff --git a/exporter/newrelicexporter/README.md b/exporter/newrelicexporter/README.md index 78c6414cf00c..adfef82a612d 100644 --- a/exporter/newrelicexporter/README.md +++ b/exporter/newrelicexporter/README.md @@ -9,35 +9,45 @@ This exporter supports sending trace and metric data to [New Relic](https://newr ## Configuration -The following configuration options are supported: +The following common configuration options are supported: * One or both of the following are required: * `apikey`: Your New Relic [Insights Insert API Key](https://docs.newrelic.com/docs/insights/insights-data-sources/custom-data/send-custom-events-event-api#register). * `api_key_header`: Request header to read New Relic [Insights Insert API Key](https://docs.newrelic.com/docs/insights/insights-data-sources/custom-data/send-custom-events-event-api#register) from. * `timeout` (Optional): Amount of time spent attempting a request before abandoning and dropping data. Default is 15 seconds. -* `common_attributes` (Optional): Attributes to apply to all metrics sent. -* `metrics_host_override` (Optional): Overrides the endpoint to send metrics. - The endpoint defaults to New Relic's US data centers. For other use cases - refer to - [OpenTelemetry: Advanced configuration](https://docs.newrelic.com/docs/integrations/open-source-telemetry-integrations/opentelemetry/opentelemetry-advanced-configuration#h2-change-endpoints). -* `spans_host_override` (Optional): Overrides the endpoint to send spans. - The endpoint defaults to New Relic's US data centers. For other use cases - refer to - [OpenTelemetry: Advanced configuration](https://docs.newrelic.com/docs/integrations/open-source-telemetry-integrations/opentelemetry/opentelemetry-advanced-configuration#h2-change-endpoints). - -Example: +* `host_override` (Optional): Overrides the host to which data is sent. The URL will be generated in the form: + https://\$host/\$path. Only set the the host portion of the URL. The path component **CANNOT** be overridden. +**Basic example:** ```yaml exporters: newrelic: apikey: super-secret-api-key timeout: 30s - common_attributes: - server: prod-server-01 - ready_to_rock: true - volume: 11 ``` +Configuration option can be overridden by telemetry signal (i.e., traces, +metrics, and logs). This is especially important if you need to use the +`host_override` option because the exporter defaults to sending data to New +Relic's US data centers. For other use cases refer to +[OpenTelemetry: Advanced configuration](https://docs.newrelic.com/docs/integrations/open-source-telemetry-integrations/opentelemetry/opentelemetry-advanced-configuration#h2-change-endpoints). + +**Example of overriding options by telemetry signal:** +```yaml +exporters: + newrelic: + apikey: super-secret-api-key + timeout: 30s + + # host_override is set to send data to New Relic's EU data centers. + traces: + host_override: trace-api.eu.newrelic.com + timeout: 20s + metrics: + host_override: metric-api.eu.newrelic.com + logs: + host_override: log-api.eu.newrelic.com +``` ## Find and use your data @@ -45,6 +55,7 @@ Once the exporter is sending data you can start to explore your data in New Reli - Metric data: see [Metric API docs](https://docs.newrelic.com/docs/data-ingest-apis/get-data-new-relic/metric-api/introduction-metric-api#find-data). - Trace/span data: see [Trace API docs](https://docs.newrelic.com/docs/understand-dependencies/distributed-tracing/trace-api/introduction-trace-api#view-data). +- Log data: see [Log docs](https://docs.newrelic.com/docs/logs/log-management/ui-data/explore-your-data-log-analytics) For general querying information, see: diff --git a/exporter/newrelicexporter/config.go b/exporter/newrelicexporter/config.go index 6d8c1bff234e..49cd1feb27ed 100644 --- a/exporter/newrelicexporter/config.go +++ b/exporter/newrelicexporter/config.go @@ -17,58 +17,76 @@ package newrelicexporter import ( "time" - "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" "go.opentelemetry.io/collector/config" ) -// Config defines configuration options for the New Relic exporter. -type Config struct { - *config.ExporterSettings `mapstructure:"-"` - +// EndpointConfig defines configuration for a single endpoint in the New Relic exporter. +type EndpointConfig struct { // APIKey is the required authentication credentials for New Relic APIs. This field specifies the default key. APIKey string `mapstructure:"apikey"` // APIKeyHeader may be specified to instruct the exporter to extract the API key from the request context. APIKeyHeader string `mapstructure:"api_key_header"` + // HostOverride overrides the endpoint. + HostOverride string `mapstructure:"host_override"` + // Timeout is the total amount of time spent attempting a request, // including retries, before abandoning and dropping data. Default is 15 // seconds. Timeout time.Duration `mapstructure:"timeout"` - // CommonAttributes are the attributes to be applied to all telemetry - // sent to New Relic. - CommonAttributes map[string]interface{} `mapstructure:"common_attributes"` + // Insecure disables TLS on the endpoint. + insecure bool +} + +// Config defines configuration options for the New Relic exporter. +type Config struct { + *config.ExporterSettings `mapstructure:"-"` + + // CommonConfig stores the base configuration for each endpoint. + CommonConfig EndpointConfig `mapstructure:",squash"` - // MetricsHostOverride overrides the metrics endpoint. - MetricsHostOverride string `mapstructure:"metrics_host_override"` + // TracesConfig stores the configuration for the traces endpoint. + TracesConfig EndpointConfig `mapstructure:"traces"` - // SpansHostOverride overrides the spans endpoint. - SpansHostOverride string `mapstructure:"spans_host_override"` + // MetricsConfig stores the configuration for the metrics endpoint. + MetricsConfig EndpointConfig `mapstructure:"metrics"` - // MetricsInsecure disables TLS on the metrics endpoint. - metricsInsecure bool + // LogsConfig stores the configuration for the logs endpoint. + LogsConfig EndpointConfig `mapstructure:"logs"` +} + +// GetTracesConfig merges the common configuration section with the traces specific section. +func (c Config) GetTracesConfig() EndpointConfig { + return mergeConfig(c.CommonConfig, c.TracesConfig) +} - // SpansInsecure disables TLS on the spans endpoint. - spansInsecure bool +// GetMetricsConfig merges the common configuration section with the metrics specific section. +func (c Config) GetMetricsConfig() EndpointConfig { + return mergeConfig(c.CommonConfig, c.MetricsConfig) } -// HarvestOption sets all relevant Config values when instantiating a New -// Relic Harvester. -func (c Config) HarvestOption(cfg *telemetry.Config) { - cfg.APIKey = c.APIKey - cfg.HarvestPeriod = 0 // use collector harvest period. - cfg.HarvestTimeout = c.Timeout - cfg.CommonAttributes = c.CommonAttributes - cfg.Product = product - cfg.ProductVersion = version - var prefix string - if c.MetricsHostOverride != "" { - if c.metricsInsecure { - prefix = "http://" - } else { - prefix = "https://" - } - cfg.MetricsURLOverride = prefix + c.MetricsHostOverride +// GetLogsConfig merges the common configuration section with the logs specific section. +func (c Config) GetLogsConfig() EndpointConfig { + return mergeConfig(c.CommonConfig, c.LogsConfig) +} + +func mergeConfig(baseConfig EndpointConfig, config EndpointConfig) EndpointConfig { + if config.APIKey == "" { + config.APIKey = baseConfig.APIKey + } + + if config.APIKeyHeader == "" { + config.APIKeyHeader = baseConfig.APIKeyHeader + } + + if config.HostOverride == "" { + config.HostOverride = baseConfig.HostOverride + } + + if config.Timeout == 0 { + config.Timeout = baseConfig.Timeout } + return config } diff --git a/exporter/newrelicexporter/config_test.go b/exporter/newrelicexporter/config_test.go index 37daa0f8ccfe..52d90ce276f4 100644 --- a/exporter/newrelicexporter/config_test.go +++ b/exporter/newrelicexporter/config_test.go @@ -19,7 +19,6 @@ import ( "testing" "time" - "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -46,43 +45,142 @@ func TestLoadConfig(t *testing.T) { defaultConfig := factory.CreateDefaultConfig().(*Config) assert.Equal(t, r0, defaultConfig) - defaultNrConfig := new(telemetry.Config) - defaultConfig.HarvestOption(defaultNrConfig) - assert.Empty(t, defaultNrConfig.MetricsURLOverride) - assert.Empty(t, defaultNrConfig.SpansURLOverride) - r1 := cfg.Exporters["newrelic/alt"].(*Config) assert.Equal(t, r1, &Config{ ExporterSettings: &config.ExporterSettings{ - TypeVal: config.Type(typeStr), + TypeVal: "newrelic", NameVal: "newrelic/alt", }, - APIKey: "a1b2c3d4", - Timeout: time.Second * 30, - CommonAttributes: map[string]interface{}{ - "server": "test-server", - "prod": true, - "weight": 3, - }, - MetricsHostOverride: "alt.metrics.newrelic.com", - SpansHostOverride: "alt.spans.newrelic.com", - metricsInsecure: false, - spansInsecure: false, + CommonConfig: EndpointConfig{ + APIKey: "a1b2c3d4", + Timeout: time.Second * 30, + }, + MetricsConfig: EndpointConfig{ + HostOverride: "alt.metrics.newrelic.com", + insecure: false, + }, + TracesConfig: EndpointConfig{ + HostOverride: "alt.spans.newrelic.com", + insecure: false, + }, + LogsConfig: EndpointConfig{ + HostOverride: "alt.logs.newrelic.com", + insecure: false, + }, }) +} - nrConfig := new(telemetry.Config) - r1.HarvestOption(nrConfig) +func TestEndpointSpecificConfigTakesPrecedence(t *testing.T) { + config := Config{ + CommonConfig: EndpointConfig{ + APIKey: "commonapikey", + APIKeyHeader: "commonapikeyheader", + HostOverride: "commonhost", + Timeout: time.Second * 10, + }, + TracesConfig: EndpointConfig{ + APIKey: "tracesapikey", + APIKeyHeader: "tracesapikeyheader", + HostOverride: "traceshost", + Timeout: time.Second * 20, + }, + MetricsConfig: EndpointConfig{ + APIKey: "metricsapikey", + APIKeyHeader: "metricsapikeyheader", + HostOverride: "metricshost", + Timeout: time.Second * 30, + }, + LogsConfig: EndpointConfig{ + APIKey: "logsapikey", + APIKeyHeader: "logsapikeyheader", + HostOverride: "logshost", + Timeout: time.Second * 40, + }, + } - assert.Equal(t, nrConfig, &telemetry.Config{ - APIKey: "a1b2c3d4", - HarvestTimeout: time.Second * 30, - CommonAttributes: map[string]interface{}{ - "server": "test-server", - "prod": true, - "weight": 3, + assert.Equal(t, config.TracesConfig, config.GetTracesConfig()) + assert.Equal(t, config.MetricsConfig, config.GetMetricsConfig()) + assert.Equal(t, config.LogsConfig, config.GetLogsConfig()) +} + +func TestEndpointSpecificConfigUsedWhenDefined(t *testing.T) { + config := Config{ + CommonConfig: EndpointConfig{ + APIKey: "commonapikey", + APIKeyHeader: "commonapikeyheader", + HostOverride: "commonhost", + Timeout: time.Second * 10, }, - MetricsURLOverride: "https://alt.metrics.newrelic.com", - Product: product, - ProductVersion: version, - }) + TracesConfig: EndpointConfig{ + APIKey: "tracesapikey", + HostOverride: "traceshost", + Timeout: time.Second * 20, + }, + MetricsConfig: EndpointConfig{ + APIKeyHeader: "metricsapikeyheader", + HostOverride: "metricshost", + Timeout: time.Second * 30, + }, + LogsConfig: EndpointConfig{ + APIKey: "logsapikey", + APIKeyHeader: "logsapikeyheader", + HostOverride: "logshost", + }, + } + + expectedTraceConfig := EndpointConfig{ + APIKey: "tracesapikey", + APIKeyHeader: "commonapikeyheader", + HostOverride: "traceshost", + Timeout: time.Second * 20, + } + expectedMetricConfig := EndpointConfig{ + APIKey: "commonapikey", + APIKeyHeader: "metricsapikeyheader", + HostOverride: "metricshost", + Timeout: time.Second * 30, + } + expectedLogConfig := EndpointConfig{ + APIKey: "logsapikey", + APIKeyHeader: "logsapikeyheader", + HostOverride: "logshost", + Timeout: time.Second * 10, + } + + assert.Equal(t, expectedTraceConfig, config.GetTracesConfig()) + assert.Equal(t, expectedMetricConfig, config.GetMetricsConfig()) + assert.Equal(t, expectedLogConfig, config.GetLogsConfig()) +} + +func TestCommonConfigValuesUsed(t *testing.T) { + config := Config{ + CommonConfig: EndpointConfig{ + APIKey: "commonapikey", + APIKeyHeader: "commonapikeyheader", + HostOverride: "commonhost", + Timeout: time.Second * 10, + }, + TracesConfig: EndpointConfig{ + APIKey: "", + APIKeyHeader: "", + HostOverride: "", + Timeout: 0, + }, + MetricsConfig: EndpointConfig{ + APIKey: "", + APIKeyHeader: "", + HostOverride: "", + Timeout: 0, + }, + LogsConfig: EndpointConfig{ + APIKey: "", + APIKeyHeader: "", + HostOverride: "", + Timeout: 0, + }, + } + + assert.Equal(t, config.CommonConfig, config.GetTracesConfig()) + assert.Equal(t, config.CommonConfig, config.GetMetricsConfig()) + assert.Equal(t, config.CommonConfig, config.GetLogsConfig()) } diff --git a/exporter/newrelicexporter/factory.go b/exporter/newrelicexporter/factory.go index 917b18606f96..93fc430e6406 100644 --- a/exporter/newrelicexporter/factory.go +++ b/exporter/newrelicexporter/factory.go @@ -16,8 +16,11 @@ package newrelicexporter import ( "context" + "fmt" "time" + "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" + "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -28,17 +31,24 @@ const typeStr = "newrelic" // NewFactory creates a factory for New Relic exporter. func NewFactory() component.ExporterFactory { + view.Register(MetricViews()...) + return exporterhelper.NewFactory( typeStr, createDefaultConfig, exporterhelper.WithTraces(createTracesExporter), - exporterhelper.WithMetrics(createMetricsExporter)) + exporterhelper.WithMetrics(createMetricsExporter), + exporterhelper.WithLogs(createLogsExporter), + ) } func createDefaultConfig() config.Exporter { return &Config{ ExporterSettings: config.NewExporterSettings(typeStr), - Timeout: time.Second * 15, + + CommonConfig: EndpointConfig{ + Timeout: time.Second * 15, + }, } } @@ -48,7 +58,12 @@ func createTracesExporter( params component.ExporterCreateParams, cfg config.Exporter, ) (component.TracesExporter, error) { - exp, err := newTracesExporter(params.Logger, cfg) + nrConfig, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("invalid config: %#v", cfg) + } + traceConfig := nrConfig.GetTracesConfig() + exp, err := newExporter(params.Logger, ¶ms.ApplicationStartInfo, traceConfig, telemetry.NewSpanRequestFactory) if err != nil { return nil, err } @@ -56,9 +71,10 @@ func createTracesExporter( // The logger is only used in a disabled queuedRetrySender, which noisily logs at // the error level when it is disabled and errors occur. return exporterhelper.NewTracesExporter(cfg, zap.NewNop(), exp.pushTraceData, - exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: cfg.(*Config).Timeout}), + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: traceConfig.Timeout}), exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}), - exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false})) + exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}), + ) } // CreateMetricsExporter creates a New Relic metrics exporter for this configuration. @@ -67,10 +83,43 @@ func createMetricsExporter( params component.ExporterCreateParams, cfg config.Exporter, ) (component.MetricsExporter, error) { - exp, err := newMetricsExporter(params.Logger, cfg) + nrConfig, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("invalid config: %#v", cfg) + } + + metricsConfig := nrConfig.GetMetricsConfig() + exp, err := newExporter(params.Logger, ¶ms.ApplicationStartInfo, metricsConfig, telemetry.NewMetricRequestFactory) if err != nil { return nil, err } - return exporterhelper.NewMetricsExporter(cfg, params.Logger, exp.pushMetricData, exporterhelper.WithShutdown(exp.Shutdown)) + return exporterhelper.NewMetricsExporter(cfg, zap.NewNop(), exp.pushMetricData, + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: metricsConfig.Timeout}), + exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}), + exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}), + ) +} + +// CreateLogsExporter creates a New Relic logs exporter for this configuration. +func createLogsExporter( + _ context.Context, + params component.ExporterCreateParams, + cfg config.Exporter, +) (component.LogsExporter, error) { + nrConfig, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("invalid config: %#v", cfg) + } + + logsConfig := nrConfig.GetLogsConfig() + exp, err := newExporter(params.Logger, ¶ms.ApplicationStartInfo, logsConfig, telemetry.NewLogRequestFactory) + if err != nil { + return nil, err + } + return exporterhelper.NewLogsExporter(cfg, zap.NewNop(), exp.pushLogData, + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: logsConfig.Timeout}), + exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}), + exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}), + ) } diff --git a/exporter/newrelicexporter/factory_test.go b/exporter/newrelicexporter/factory_test.go index cac2ce24d244..b689dd322315 100644 --- a/exporter/newrelicexporter/factory_test.go +++ b/exporter/newrelicexporter/factory_test.go @@ -33,13 +33,13 @@ func TestCreateDefaultConfig(t *testing.T) { nrCfg, ok := cfg.(*Config) require.True(t, ok, "invalid Config: %#v", cfg) - assert.Equal(t, nrCfg.Timeout, time.Second*15) + assert.Equal(t, nrCfg.CommonConfig.Timeout, time.Second*15) } func TestCreateExporterWithAPIKey(t *testing.T) { cfg := createDefaultConfig() nrConfig := cfg.(*Config) - nrConfig.APIKey = "a1b2c3d4" + nrConfig.CommonConfig.APIKey = "a1b2c3d4" params := component.ExporterCreateParams{Logger: zap.NewNop()} te, err := createTracesExporter(context.Background(), params, nrConfig) @@ -49,12 +49,16 @@ func TestCreateExporterWithAPIKey(t *testing.T) { me, err := createMetricsExporter(context.Background(), params, nrConfig) assert.Nil(t, err) assert.NotNil(t, me, "failed to create metrics exporter") + + le, err := createLogsExporter(context.Background(), params, nrConfig) + assert.Nil(t, err) + assert.NotNil(t, le, "failed to create logs exporter") } func TestCreateExporterWithAPIKeyHeader(t *testing.T) { cfg := createDefaultConfig() nrConfig := cfg.(*Config) - nrConfig.APIKeyHeader = "x-nr-key" + nrConfig.CommonConfig.APIKeyHeader = "api-key" params := component.ExporterCreateParams{Logger: zap.NewNop()} te, err := createTracesExporter(context.Background(), params, nrConfig) @@ -62,15 +66,19 @@ func TestCreateExporterWithAPIKeyHeader(t *testing.T) { assert.NotNil(t, te, "failed to create trace exporter") me, err := createMetricsExporter(context.Background(), params, nrConfig) - assert.NotNil(t, err) - assert.Nil(t, me, "created metrics exporter") + assert.Nil(t, err) + assert.NotNil(t, me, "failed to create metrics exporter") + + le, err := createLogsExporter(context.Background(), params, nrConfig) + assert.Nil(t, err) + assert.NotNil(t, le, "failed to create logs exporter") } func TestCreateExporterWithAPIKeyAndAPIKeyHeader(t *testing.T) { cfg := createDefaultConfig() nrConfig := cfg.(*Config) - nrConfig.APIKey = "a1b2c3d4" - nrConfig.APIKeyHeader = "x-nr-key" + nrConfig.CommonConfig.APIKey = "a1b2c3d4" + nrConfig.CommonConfig.APIKeyHeader = "api-key" params := component.ExporterCreateParams{Logger: zap.NewNop()} te, err := createTracesExporter(context.Background(), params, nrConfig) @@ -80,6 +88,10 @@ func TestCreateExporterWithAPIKeyAndAPIKeyHeader(t *testing.T) { me, err := createMetricsExporter(context.Background(), params, nrConfig) assert.Nil(t, err) assert.NotNil(t, me, "failed to create metrics exporter") + + le, err := createLogsExporter(context.Background(), params, nrConfig) + assert.Nil(t, err) + assert.NotNil(t, le, "failed to create logs exporter") } func TestCreateExporterErrorWithoutAPIKeyOrAPIKeyHeader(t *testing.T) { @@ -94,6 +106,10 @@ func TestCreateExporterErrorWithoutAPIKeyOrAPIKeyHeader(t *testing.T) { me, err := createMetricsExporter(context.Background(), params, nrConfig) assert.NotNil(t, err) assert.Nil(t, me) + + le, err := createLogsExporter(context.Background(), params, nrConfig) + assert.NotNil(t, err) + assert.Nil(t, le) } func TestCreateTracesExporterError(t *testing.T) { params := component.ExporterCreateParams{Logger: zap.NewNop()} @@ -106,3 +122,9 @@ func TestCreateMetricsExporterError(t *testing.T) { _, err := createMetricsExporter(context.Background(), params, nil) assert.Error(t, err) } + +func TestCreateLogsExporterError(t *testing.T) { + params := component.ExporterCreateParams{Logger: zap.NewNop()} + _, err := createLogsExporter(context.Background(), params, nil) + assert.Error(t, err) +} diff --git a/exporter/newrelicexporter/go.mod b/exporter/newrelicexporter/go.mod index 97fdfb3c088c..77b27368b592 100644 --- a/exporter/newrelicexporter/go.mod +++ b/exporter/newrelicexporter/go.mod @@ -10,7 +10,8 @@ require ( github.com/hashicorp/go-immutable-radix v1.2.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/mattn/go-colorable v0.1.7 // indirect - github.com/newrelic/newrelic-telemetry-sdk-go v0.6.0 + github.com/newrelic/newrelic-telemetry-sdk-go v0.7.0 + go.opencensus.io v0.23.0 github.com/onsi/ginkgo v1.14.1 // indirect github.com/onsi/gomega v1.10.2 // indirect github.com/pelletier/go-toml v1.8.0 // indirect diff --git a/exporter/newrelicexporter/go.sum b/exporter/newrelicexporter/go.sum index b1ceb49a223f..f520771532c1 100644 --- a/exporter/newrelicexporter/go.sum +++ b/exporter/newrelicexporter/go.sum @@ -762,8 +762,8 @@ github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= -github.com/newrelic/newrelic-telemetry-sdk-go v0.6.0 h1:XAzKWY7T2speu5ZwW3pwgNBKaOeERj+pMXq7xmnnlCQ= -github.com/newrelic/newrelic-telemetry-sdk-go v0.6.0/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E= +github.com/newrelic/newrelic-telemetry-sdk-go v0.7.0 h1:tS/wanndQjrBFTQFeNb3jN1iJeUPSPLi74Y58ZtYAkQ= +github.com/newrelic/newrelic-telemetry-sdk-go v0.7.0/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/exporter/newrelicexporter/metrics.go b/exporter/newrelicexporter/metrics.go new file mode 100644 index 000000000000..30a5bbe687a8 --- /dev/null +++ b/exporter/newrelicexporter/metrics.go @@ -0,0 +1,246 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package newrelicexporter + +import ( + "context" + "strconv" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/pdata" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +var ( + tagGrpcStatusCode, _ = tag.NewKey("grpc_response_code") + tagHTTPStatusCode, _ = tag.NewKey("http_status_code") + tagRequestUserAgent, _ = tag.NewKey("user_agent") + tagAPIKey, _ = tag.NewKey("api_key") + tagDataType, _ = tag.NewKey("data_type") + tagMetricType, _ = tag.NewKey("metric_type") + tagMetricTemporality, _ = tag.NewKey("metric_temporality") + tagHasSpanEvents, _ = tag.NewKey("has_span_events") + tagHasSpanLinks, _ = tag.NewKey("has_span_links") + tagAttributeLocation, _ = tag.NewKey("attribute_location") + tagAttributeValueType, _ = tag.NewKey("attribute_type") + tagKeys = []tag.Key{tagGrpcStatusCode, tagHTTPStatusCode, tagRequestUserAgent, tagAPIKey, tagDataType} + metricMetadataTagKeys = []tag.Key{tagGrpcStatusCode, tagHTTPStatusCode, tagRequestUserAgent, tagAPIKey, tagDataType, tagMetricType, tagMetricTemporality} + spanMetadataTagKeys = []tag.Key{tagGrpcStatusCode, tagHTTPStatusCode, tagRequestUserAgent, tagAPIKey, tagDataType, tagHasSpanEvents, tagHasSpanLinks} + attributeMetadataTagKeys = []tag.Key{tagGrpcStatusCode, tagHTTPStatusCode, tagRequestUserAgent, tagAPIKey, tagDataType, tagAttributeLocation, tagAttributeValueType} + + statRequestCount = stats.Int64("newrelicexporter_request_count", "Number of requests processed", stats.UnitDimensionless) + statOutputDatapointCount = stats.Int64("newrelicexporter_output_datapoint_count", "Number of data points sent to the HTTP API", stats.UnitDimensionless) + statExporterTime = stats.Float64("newrelicexporter_exporter_time", "Wall clock time (seconds) spent in the exporter", stats.UnitSeconds) + statExternalTime = stats.Float64("newrelicexporter_external_time", "Wall clock time (seconds) spent sending data to the HTTP API", stats.UnitSeconds) + statMetricMetadata = stats.Int64("newrelicexporter_metric_metadata_count", "Number of metrics processed", stats.UnitDimensionless) + statSpanMetadata = stats.Int64("newrelicexporter_span_metadata_count", "Number of spans processed", stats.UnitDimensionless) + statAttributeMetadata = stats.Int64("newrelicexporter_attribute_metadata_count", "Number of attributes processed", stats.UnitDimensionless) +) + +// MetricViews return metric views for Kafka receiver. +func MetricViews() []*view.View { + return []*view.View{ + buildView(tagKeys, statRequestCount, view.Sum()), + buildView(tagKeys, statOutputDatapointCount, view.Sum()), + buildView(tagKeys, statExporterTime, view.Sum()), + buildView(tagKeys, statExternalTime, view.Sum()), + buildView(metricMetadataTagKeys, statMetricMetadata, view.Sum()), + buildView(spanMetadataTagKeys, statSpanMetadata, view.Sum()), + buildView(attributeMetadataTagKeys, statAttributeMetadata, view.Sum()), + } +} + +func buildView(tagKeys []tag.Key, m stats.Measure, a *view.Aggregation) *view.View { + return &view.View{ + Name: m.Name(), + Measure: m, + Description: m.Description(), + TagKeys: tagKeys, + Aggregation: a, + } +} + +type metricStatsKey struct { + MetricType pdata.MetricDataType + MetricTemporality pdata.AggregationTemporality +} + +type spanStatsKey struct { + hasEvents bool + hasLinks bool +} + +type attributeLocation int + +const ( + attributeLocationResource attributeLocation = iota + attributeLocationSpan + attributeLocationSpanEvent + attributeLocationLog +) + +func (al attributeLocation) String() string { + switch al { + case attributeLocationResource: + return "resource" + case attributeLocationSpan: + return "span" + case attributeLocationSpanEvent: + return "span_event" + case attributeLocationLog: + return "log" + } + return "" +} + +type attributeStatsKey struct { + location attributeLocation + attributeType pdata.AttributeValueType +} + +type exportMetadata struct { + // Metric tags + grpcResponseCode codes.Code // The gRPC response code + httpStatusCode int // The HTTP response status code form the HTTP API + apiKey string // The API key from the request + userAgent string // The User-Agent from the request + dataType string // The type of data being recorded + + // Metric values + dataInputCount int // Number of resource spans in the request + dataOutputCount int // Number of spans sent to the trace API + exporterTime time.Duration // Total time spent in the newrelic exporter + externalDuration time.Duration // Time spent sending to the trace API + metricMetadataCount map[metricStatsKey]int // Number of metrics by type and temporality + spanMetadataCount map[spanStatsKey]int // Number of spans by whether or not they have events or links + attributeMetadataCount map[attributeStatsKey]int // Number of attributes by location and type +} + +func newTraceMetadata(ctx context.Context) exportMetadata { + return initMetadata(ctx, "trace") +} + +func newLogMetadata(ctx context.Context) exportMetadata { + return initMetadata(ctx, "log") +} + +func newMetricMetadata(ctx context.Context) exportMetadata { + return initMetadata(ctx, "metric") +} + +func initMetadata(ctx context.Context, dataType string) exportMetadata { + userAgent := "not_present" + if md, ctxOk := metadata.FromIncomingContext(ctx); ctxOk { + if values, headerOk := md["user-agent"]; headerOk { + userAgent = values[0] + } + } + + return exportMetadata{ + userAgent: userAgent, + apiKey: "not_present", + dataType: dataType, + metricMetadataCount: make(map[metricStatsKey]int, 8*3 /* 8 metric types by 3 temporarilities */), + spanMetadataCount: make(map[spanStatsKey]int, 2*2 /* combinations of the 2 bool key values */), + attributeMetadataCount: make(map[attributeStatsKey]int, 3*7 /* spans can have 7 value types in 4 different locations */), + } +} + +func (d exportMetadata) recordMetrics(ctx context.Context) error { + tags := []tag.Mutator{ + tag.Insert(tagGrpcStatusCode, d.grpcResponseCode.String()), + tag.Insert(tagHTTPStatusCode, strconv.Itoa(d.httpStatusCode)), + tag.Insert(tagRequestUserAgent, d.userAgent), + tag.Insert(tagAPIKey, d.apiKey), + tag.Insert(tagDataType, d.dataType), + } + + var errors []error + e := stats.RecordWithTags(ctx, tags, + statRequestCount.M(1), + statOutputDatapointCount.M(int64(d.dataOutputCount)), + statExporterTime.M(d.exporterTime.Seconds()), + statExternalTime.M(d.externalDuration.Seconds()), + ) + + if e != nil { + errors = append(errors, e) + } + + if len(d.metricMetadataCount) > 0 { + metricMetadataTagMutators := make([]tag.Mutator, len(tags)+2) + copy(metricMetadataTagMutators, tags) + for k, v := range d.metricMetadataCount { + metricTypeTag := tag.Insert(tagMetricType, k.MetricType.String()) + metricMetadataTagMutators[len(metricMetadataTagMutators)-2] = metricTypeTag + + temporalityTag := tag.Insert(tagMetricTemporality, k.MetricTemporality.String()) + metricMetadataTagMutators[len(metricMetadataTagMutators)-1] = temporalityTag + + e := stats.RecordWithTags(ctx, metricMetadataTagMutators, statMetricMetadata.M(int64(v))) + if e != nil { + errors = append(errors, e) + } + } + } + + if len(d.spanMetadataCount) > 0 { + spanMetadataTagMutators := make([]tag.Mutator, len(tags)+2) + copy(spanMetadataTagMutators, tags) + for k, v := range d.spanMetadataCount { + hasSpanEventsTag := tag.Insert(tagHasSpanEvents, strconv.FormatBool(k.hasEvents)) + spanMetadataTagMutators[len(spanMetadataTagMutators)-2] = hasSpanEventsTag + + hasSpanLinksTag := tag.Insert(tagHasSpanLinks, strconv.FormatBool(k.hasLinks)) + spanMetadataTagMutators[len(spanMetadataTagMutators)-1] = hasSpanLinksTag + + e := stats.RecordWithTags(ctx, spanMetadataTagMutators, statSpanMetadata.M(int64(v))) + if e != nil { + errors = append(errors, e) + } + } + } + + if len(d.attributeMetadataCount) > 0 { + attributeMetadataMutators := make([]tag.Mutator, len(tags)+2) + copy(attributeMetadataMutators, tags) + for k, v := range d.attributeMetadataCount { + locationTag := tag.Insert(tagAttributeLocation, k.location.String()) + attributeMetadataMutators[len(attributeMetadataMutators)-2] = locationTag + + typeTag := tag.Insert(tagAttributeValueType, k.attributeType.String()) + attributeMetadataMutators[len(attributeMetadataMutators)-1] = typeTag + + e := stats.RecordWithTags(ctx, attributeMetadataMutators, statAttributeMetadata.M(int64(v))) + if e != nil { + errors = append(errors, e) + } + } + } + + return consumererror.Combine(errors) +} + +func sanitizeAPIKeyForLogging(apiKey string) string { + if len(apiKey) <= 8 { + return apiKey + } + return apiKey[:8] +} diff --git a/exporter/newrelicexporter/metrics_test.go b/exporter/newrelicexporter/metrics_test.go new file mode 100644 index 000000000000..65f631977ff1 --- /dev/null +++ b/exporter/newrelicexporter/metrics_test.go @@ -0,0 +1,424 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package newrelicexporter + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opentelemetry.io/collector/consumer/pdata" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +func TestMetricViews(t *testing.T) { + metricViews := MetricViews() + + assert.True(t, len(metricViews) > 0) + for _, curView := range metricViews { + assert.True(t, strings.HasPrefix(curView.Name, "newrelicexporter_")) + assert.NotNil(t, curView.Aggregation) + assert.NotNil(t, curView.Description) + if curView.Name == "newrelicexporter_metric_metadata_count" { + assert.Equal(t, metricMetadataTagKeys, curView.TagKeys) + } else if curView.Name == "newrelicexporter_span_metadata_count" { + assert.Equal(t, spanMetadataTagKeys, curView.TagKeys) + } else if curView.Name == "newrelicexporter_attribute_metadata_count" { + assert.Equal(t, attributeMetadataTagKeys, curView.TagKeys) + } else { + assert.Equal(t, tagKeys, curView.TagKeys) + } + assert.NotNil(t, curView.Aggregation) + } +} + +func TestRecordMetrics(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + details := []exportMetadata{ + // A request that completes normally + { + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "data", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + }, + // A request that receives 403 status code from the HTTP API + { + grpcResponseCode: codes.Unauthenticated, + httpStatusCode: 403, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "data", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + }, + // A request experiences a url.Error while sending to the HTTP API + { + grpcResponseCode: codes.DataLoss, + httpStatusCode: 0, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "data", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + }, + } + + for _, traceDetails := range details { + if err := traceDetails.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + } + + measurements := []stats.Measure{ + statRequestCount, + statOutputDatapointCount, + statExporterTime, + statExternalTime, + } + + for _, measurement := range measurements { + rows, err := view.RetrieveData(measurement.Name()) + if err != nil { + t.Fail() + } + // Check that each measurement has a number of rows corresponding to the tag set produced by the interactions + assert.Equal(t, len(details), len(rows)) + for _, row := range rows { + // Confirm each row has data and has the required tag keys + assert.True(t, row.Data != nil) + assert.Equal(t, len(tagKeys), len(row.Tags)) + for _, rowTag := range row.Tags { + assert.Contains(t, tagKeys, rowTag.Key) + } + } + } +} + +func TestRecordMetricMetadata(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + detail := exportMetadata{ + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "metric", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + metricMetadataCount: map[metricStatsKey]int{ + {MetricType: pdata.MetricDataTypeSummary}: 1, + {MetricType: pdata.MetricDataTypeHistogram}: 1, + {MetricType: pdata.MetricDataTypeDoubleSum, MetricTemporality: pdata.AggregationTemporalityDelta}: 2, + {MetricType: pdata.MetricDataTypeDoubleSum, MetricTemporality: pdata.AggregationTemporalityCumulative}: 3, + }, + } + + if err := detail.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + + rows, err := view.RetrieveData(statMetricMetadata.Name()) + if err != nil { + t.Fail() + } + // Check that the measurement has the right number of results recorded + assert.Equal(t, len(detail.metricMetadataCount), len(rows)) + for _, row := range rows { + // Confirm each row has data and has the required tag keys + assert.True(t, row.Data != nil) + assert.Equal(t, len(metricMetadataTagKeys), len(row.Tags)) + for _, rowTag := range row.Tags { + assert.Contains(t, metricMetadataTagKeys, rowTag.Key) + } + } +} + +func TestDoesNotRecordMetricMetadata(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + detail := exportMetadata{ + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "metric", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + } + + if err := detail.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + + rows, err := view.RetrieveData(statMetricMetadata.Name()) + if err != nil { + t.Fail() + } + // No results should have been recorded + assert.Equal(t, 0, len(rows)) +} + +func TestRecordSpanMetadata(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + detail := exportMetadata{ + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "metric", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + spanMetadataCount: map[spanStatsKey]int{ + {hasEvents: false, hasLinks: false}: 1, + {hasEvents: true, hasLinks: false}: 1, + {hasEvents: false, hasLinks: true}: 2, + {hasEvents: true, hasLinks: true}: 3, + }, + } + + if err := detail.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + + rows, err := view.RetrieveData(statSpanMetadata.Name()) + if err != nil { + t.Fail() + } + // Check that the measurement has the right number of results recorded + assert.Equal(t, len(detail.spanMetadataCount), len(rows)) + for _, row := range rows { + // Confirm each row has data and has the required tag keys + assert.True(t, row.Data != nil) + assert.Equal(t, len(spanMetadataTagKeys), len(row.Tags)) + for _, rowTag := range row.Tags { + assert.Contains(t, spanMetadataTagKeys, rowTag.Key) + } + } +} + +func TestDoesNotRecordSpanMetadata(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + detail := exportMetadata{ + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "metric", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + } + + if err := detail.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + + rows, err := view.RetrieveData(statSpanMetadata.Name()) + if err != nil { + t.Fail() + } + // No results should have been recorded + assert.Equal(t, 0, len(rows)) +} + +func TestRecordAttributeMetadata(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + detail := exportMetadata{ + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "data", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + attributeMetadataCount: map[attributeStatsKey]int{ + {attributeType: pdata.AttributeValueARRAY, location: attributeLocationResource}: 1, + {attributeType: pdata.AttributeValueBOOL, location: attributeLocationSpan}: 1, + {attributeType: pdata.AttributeValueMAP, location: attributeLocationSpanEvent}: 1, + {attributeType: pdata.AttributeValueDOUBLE, location: attributeLocationLog}: 1, + {attributeType: pdata.AttributeValueINT, location: attributeLocationResource}: 1, + {attributeType: pdata.AttributeValueNULL, location: attributeLocationSpan}: 1, + {attributeType: pdata.AttributeValueSTRING, location: attributeLocationSpanEvent}: 1, + }, + } + + if err := detail.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + + rows, err := view.RetrieveData(statAttributeMetadata.Name()) + if err != nil { + t.Fail() + } + // Check that the measurement has the right number of results recorded + assert.Equal(t, len(detail.attributeMetadataCount), len(rows)) + for _, row := range rows { + // Confirm each row has data and has the required tag keys + assert.True(t, row.Data != nil) + assert.Equal(t, len(attributeMetadataTagKeys), len(row.Tags)) + for _, rowTag := range row.Tags { + assert.Contains(t, attributeMetadataTagKeys, rowTag.Key) + } + } +} + +func TestDoesNotRecordAttributeMetadata(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + detail := exportMetadata{ + grpcResponseCode: codes.OK, + httpStatusCode: 200, + apiKey: "shhh", + userAgent: "secret agent", + dataType: "metric", + dataInputCount: 2, + exporterTime: 100, + dataOutputCount: 20, + externalDuration: 50, + } + + if err := detail.recordMetrics(context.TODO()); err != nil { + t.Fail() + } + + rows, err := view.RetrieveData(statAttributeMetadata.Name()) + if err != nil { + t.Fail() + } + // No results should have been recorded + assert.Equal(t, 0, len(rows)) +} + +func TestAttributeLocationString(t *testing.T) { + locations := []attributeLocation{ + attributeLocationResource, + attributeLocationSpan, + attributeLocationSpanEvent, + attributeLocationLog, + 99, + } + + expectedStrings := []string{ + "resource", + "span", + "span_event", + "log", + "", + } + + for i := 0; i < len(locations); i++ { + assert.Equal(t, expectedStrings[i], locations[i].String()) + } +} + +func TestSanitizeApiKeyForLogging(t *testing.T) { + assert.Equal(t, "", sanitizeAPIKeyForLogging("")) + assert.Equal(t, "foo", sanitizeAPIKeyForLogging("foo")) + assert.Equal(t, "foobarba", sanitizeAPIKeyForLogging("foobarbazqux")) +} + +func TestMetadataHasDefaultValuesSet(t *testing.T) { + m := initMetadata(context.Background(), "testdatatype") + + assert.Equal(t, "not_present", m.userAgent) + assert.Equal(t, "not_present", m.apiKey) + assert.Equal(t, "testdatatype", m.dataType) + assert.NotNil(t, m.metricMetadataCount) + assert.NotNil(t, m.spanMetadataCount) + assert.NotNil(t, m.attributeMetadataCount) +} + +func TestMetadataHasUserAgentWhenAvailable(t *testing.T) { + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{"user-agent": []string{"testuseragent"}}) + + m := initMetadata(ctx, "testdatatype") + + assert.Equal(t, "testuseragent", m.userAgent) +} + +func TestErrorsAreCombinedIntoSingleError(t *testing.T) { + view.Unregister(MetricViews()...) + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + + ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{"user-agent": []string{"testuseragent"}}) + + // Tag values with length > 255 will generate an error when recording metrics + b := make([]byte, 300) + for i := 0; i < 300; i++ { + b[i] = 'a' + } + reallyLongDataType := string(b) + m := initMetadata(ctx, reallyLongDataType) + m.metricMetadataCount[metricStatsKey{}]++ + m.spanMetadataCount[spanStatsKey{}]++ + m.attributeMetadataCount[attributeStatsKey{}]++ + + err := m.recordMetrics(ctx) + + require.Error(t, err) + // The bad tag value should result in 4 errors for each metric and there are 4 metrics + assert.Equal(t, 8, len(strings.Split(err.Error(), ";"))) +} diff --git a/exporter/newrelicexporter/mock_test.go b/exporter/newrelicexporter/mock_test.go index 817a1c14523a..a12fccad055e 100644 --- a/exporter/newrelicexporter/mock_test.go +++ b/exporter/newrelicexporter/mock_test.go @@ -22,10 +22,11 @@ import ( "net/http/httptest" ) -type Data struct { +type Batch struct { Common Common `json:"common"` Spans []Span `json:"spans"` Metrics []Metric `json:"metrics"` + Logs []Log `json:"logs"` XXXUnrecognized []byte `json:"-"` } @@ -51,15 +52,23 @@ type Metric struct { XXXUnrecognized []byte `json:"-"` } +type Log struct { + Message string `json:"message"` + Timestamp int64 `json:"timestamp"` + Attributes map[string]interface{} `json:"attributes"` + XXXUnrecognized []byte `json:"-"` +} + // Mock caches decompressed request bodies type Mock struct { - Data []Data + Header http.Header + Batches []Batch StatusCode int } func (c *Mock) Spans() []Span { var spans []Span - for _, data := range c.Data { + for _, data := range c.Batches { spans = append(spans, data.Spans...) } return spans @@ -67,7 +76,7 @@ func (c *Mock) Spans() []Span { func (c *Mock) Metrics() []Metric { var metrics []Metric - for _, data := range c.Data { + for _, data := range c.Batches { metrics = append(metrics, data.Metrics...) } return metrics @@ -99,15 +108,17 @@ func (c *Mock) Server() *httptest.Server { return } + c.Header = r.Header + w.WriteHeader(c.StatusCode) })) } func (c *Mock) ParseRequest(b []byte) error { - var data []Data + var data []Batch if err := json.Unmarshal(b, &data); err != nil { return err } - c.Data = append(c.Data, data...) + c.Batches = append(c.Batches, data...) return nil } diff --git a/exporter/newrelicexporter/newrelic.go b/exporter/newrelicexporter/newrelic.go index 3e05b1f1f406..f450ad5fcca8 100644 --- a/exporter/newrelicexporter/newrelic.go +++ b/exporter/newrelicexporter/newrelic.go @@ -16,108 +16,93 @@ package newrelicexporter import ( "context" - "fmt" + "errors" "io" "io/ioutil" "net/http" "strings" + "time" - "github.com/newrelic/newrelic-telemetry-sdk-go/cumulative" "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" - "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/translator/internaldata" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) const ( - name = "opentelemetry-collector" - version = "0.0.0" - product = "NewRelic-Collector-OpenTelemetry" + product = "NewRelic-OpenTelemetry-Collector" ) -var _ io.Writer = logWriter{} - -// logWriter wraps a zap.Logger into an io.Writer. -type logWriter struct { - logf func(string, ...zapcore.Field) -} - -// Write implements io.Writer -func (w logWriter) Write(p []byte) (n int, err error) { - w.logf(string(p)) - return len(p), nil -} - -// exporter exporters OpenTelemetry Collector data to New Relic. +// exporter exports OpenTelemetry Collector data to New Relic. type exporter struct { - deltaCalculator *cumulative.DeltaCalculator - harvester *telemetry.Harvester - spanRequestFactory telemetry.RequestFactory - apiKeyHeader string - logger *zap.Logger + startInfo *component.ApplicationStartInfo + requestFactory telemetry.RequestFactory + apiKeyHeader string + logger *zap.Logger } -func newMetricsExporter(l *zap.Logger, c config.Exporter) (*exporter, error) { - nrConfig, ok := c.(*Config) - if !ok { - return nil, fmt.Errorf("invalid config: %#v", c) - } - - opts := []func(*telemetry.Config){ - nrConfig.HarvestOption, - telemetry.ConfigBasicErrorLogger(logWriter{l.Error}), - telemetry.ConfigBasicDebugLogger(logWriter{l.Info}), - telemetry.ConfigBasicAuditLogger(logWriter{l.Debug}), - } +type factoryBuilder func(options ...telemetry.ClientOption) (telemetry.RequestFactory, error) +type batchBuilder func() ([]telemetry.Batch, error) - h, err := telemetry.NewHarvester(opts...) - if nil != err { - return nil, err +func clientOptionForAPIKey(apiKey string) telemetry.ClientOption { + if apiKey != "" { + if strings.HasPrefix(apiKey, "NRII-") { + return telemetry.WithInsertKey(apiKey) + } + return telemetry.WithLicenseKey(apiKey) } - - return &exporter{ - deltaCalculator: cumulative.NewDeltaCalculator(), - harvester: h, - }, nil + return nil } -func newTracesExporter(l *zap.Logger, c config.Exporter) (*exporter, error) { - nrConfig, ok := c.(*Config) - if !ok { - return nil, fmt.Errorf("invalid config: %#v", c) +func clientOptions(info *component.ApplicationStartInfo, apiKey string, apiKeyHeader string, hostOverride string, insecure bool) []telemetry.ClientOption { + userAgent := product + if info.Version != "" { + userAgent += "/" + info.Version + } else if info.GitHash != "" { + userAgent += "/" + info.GitHash } - - options := []telemetry.ClientOption{telemetry.WithUserAgent(product + "/" + version)} - if nrConfig.APIKey != "" { - options = append(options, telemetry.WithInsertKey(nrConfig.APIKey)) - } else if nrConfig.APIKeyHeader != "" { + userAgent += " " + info.ExeName + options := []telemetry.ClientOption{telemetry.WithUserAgent(userAgent)} + if apiKey != "" { + options = append(options, clientOptionForAPIKey(apiKey)) + } else if apiKeyHeader != "" { options = append(options, telemetry.WithNoDefaultKey()) } - if nrConfig.SpansHostOverride != "" { - options = append(options, telemetry.WithEndpoint(nrConfig.SpansHostOverride)) + if hostOverride != "" { + options = append(options, telemetry.WithEndpoint(hostOverride)) } - if nrConfig.spansInsecure { + if insecure { options = append(options, telemetry.WithInsecure()) } - s, err := telemetry.NewSpanRequestFactory(options...) + return options +} + +func newExporter(l *zap.Logger, startInfo *component.ApplicationStartInfo, nrConfig EndpointConfig, createFactory factoryBuilder) (exporter, error) { + options := clientOptions( + startInfo, + nrConfig.APIKey, + nrConfig.APIKeyHeader, + nrConfig.HostOverride, + nrConfig.insecure, + ) + f, err := createFactory(options...) if nil != err { - return nil, err + return exporter{}, err } - - return &exporter{ - spanRequestFactory: s, - apiKeyHeader: strings.ToLower(nrConfig.APIKeyHeader), - logger: l, + return exporter{ + startInfo: startInfo, + requestFactory: f, + apiKeyHeader: strings.ToLower(nrConfig.APIKeyHeader), + logger: l, }, nil } -func (e *exporter) extractInsertKeyFromHeader(ctx context.Context) string { +func (e exporter) extractAPIKeyFromHeader(ctx context.Context) string { if e.apiKeyHeader == "" { return "" } @@ -138,108 +123,251 @@ func (e *exporter) extractInsertKeyFromHeader(ctx context.Context) string { return values[0] } -func (e exporter) pushTraceData(ctx context.Context, td pdata.Traces) error { - var ( - errs []error - ) +func (e exporter) pushTraceData(ctx context.Context, td pdata.Traces) (outputErr error) { + details := newTraceMetadata(ctx) + details.dataInputCount = td.SpanCount() + builder := func() ([]telemetry.Batch, error) { return e.buildTraceBatch(&details, td) } + return e.export(ctx, &details, builder) +} + +func (e exporter) buildTraceBatch(details *exportMetadata, td pdata.Traces) ([]telemetry.Batch, error) { + var errs []error - var batch telemetry.SpanBatch + transform := newTransformer(e.startInfo, details) + batches := make([]telemetry.Batch, 0, calcSpanBatches(td)) for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) - resource := rspans.Resource() for j := 0; j < rspans.InstrumentationLibrarySpans().Len(); j++ { ispans := rspans.InstrumentationLibrarySpans().At(j) - transform := newTraceTransformer(resource, ispans.InstrumentationLibrary()) + commonAttributes := transform.CommonAttributes(rspans.Resource(), ispans.InstrumentationLibrary()) + spanCommon, err := telemetry.NewSpanCommonBlock(telemetry.WithSpanAttributes(commonAttributes)) + if err != nil { + e.logger.Error("Transform of span common attributes failed.", zap.Error(err)) + errs = append(errs, err) + continue + } spans := make([]telemetry.Span, 0, ispans.Spans().Len()) for k := 0; k < ispans.Spans().Len(); k++ { span := ispans.Spans().At(k) nrSpan, err := transform.Span(span) if err != nil { + e.logger.Debug("Transform of span failed.", zap.Error(err)) errs = append(errs, err) continue } + details.dataOutputCount++ spans = append(spans, nrSpan) } - batch.Spans = append(batch.Spans, spans...) + batches = append(batches, telemetry.Batch{spanCommon, telemetry.NewSpanGroup(spans)}) } } - batches := []telemetry.PayloadEntry{&batch} - insertKey := e.extractInsertKeyFromHeader(ctx) - var req *http.Request - var err error - if insertKey != "" { - req, err = e.spanRequestFactory.BuildRequest(batches, telemetry.WithInsertKey(insertKey)) - } else { - req, err = e.spanRequestFactory.BuildRequest(batches) - } - if err != nil { - e.logger.Error("Failed to build batch", zap.Error(err)) - return err - } + return batches, consumererror.Combine(errs) +} - // Execute the http request and handle the response - response, err := http.DefaultClient.Do(req) - if err != nil { - e.logger.Error("Error making HTTP request.", zap.Error(err)) - return &urlError{Err: err} +func calcSpanBatches(td pdata.Traces) int { + rss := td.ResourceSpans() + batchCount := 0 + for i := 0; i < rss.Len(); i++ { + batchCount += rss.At(i).InstrumentationLibrarySpans().Len() } - defer response.Body.Close() - io.Copy(ioutil.Discard, response.Body) + return batchCount +} - // Check if the http payload has been accepted, if not record an error - if response.StatusCode != http.StatusAccepted { - // Log the error at an appropriate level based on the status code - if response.StatusCode >= 500 { - e.logger.Error("Error on HTTP response.", zap.String("Status", response.Status)) - } else { - e.logger.Debug("Error on HTTP response.", zap.String("Status", response.Status)) - } +func (e exporter) pushLogData(ctx context.Context, ld pdata.Logs) (outputErr error) { + details := newLogMetadata(ctx) + details.dataInputCount = ld.LogRecordCount() + builder := func() ([]telemetry.Batch, error) { return e.buildLogBatch(&details, ld) } + return e.export(ctx, &details, builder) +} - return &httpError{Response: response} +func (e exporter) buildLogBatch(details *exportMetadata, ld pdata.Logs) ([]telemetry.Batch, error) { + var errs []error + + transform := newTransformer(e.startInfo, details) + batches := make([]telemetry.Batch, 0, calcLogBatches(ld)) + + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rlogs := ld.ResourceLogs().At(i) + for j := 0; j < rlogs.InstrumentationLibraryLogs().Len(); j++ { + ilogs := rlogs.InstrumentationLibraryLogs().At(j) + commonAttributes := transform.CommonAttributes(rlogs.Resource(), ilogs.InstrumentationLibrary()) + logCommon, err := telemetry.NewLogCommonBlock(telemetry.WithLogAttributes(commonAttributes)) + if err != nil { + e.logger.Error("Transform of log common attributes failed.", zap.Error(err)) + errs = append(errs, err) + continue + } + logs := make([]telemetry.Log, 0, ilogs.Logs().Len()) + for k := 0; k < ilogs.Logs().Len(); k++ { + log := ilogs.Logs().At(k) + nrLog, err := transform.Log(log) + if err != nil { + e.logger.Error("Transform of log failed.", zap.Error(err)) + errs = append(errs, err) + continue + } + + details.dataOutputCount++ + logs = append(logs, nrLog) + } + batches = append(batches, telemetry.Batch{logCommon, telemetry.NewLogGroup(logs)}) + } } - return consumererror.Combine(errs) + return batches, consumererror.Combine(errs) +} +func calcLogBatches(ld pdata.Logs) int { + rss := ld.ResourceLogs() + batchCount := 0 + for i := 0; i < rss.Len(); i++ { + batchCount += rss.At(i).InstrumentationLibraryLogs().Len() + } + return batchCount } -func (e exporter) pushMetricData(ctx context.Context, md pdata.Metrics) error { - var errs []error +func (e exporter) pushMetricData(ctx context.Context, md pdata.Metrics) (outputErr error) { + details := newMetricMetadata(ctx) + _, details.dataInputCount = md.MetricAndDataPointCount() + builder := func() ([]telemetry.Batch, error) { return e.buildMetricBatch(&details, md) } + return e.export(ctx, &details, builder) +} - ocmds := internaldata.MetricsToOC(md) - for _, ocmd := range ocmds { - var srv string - if ocmd.Node != nil && ocmd.Node.ServiceInfo != nil { - srv = ocmd.Node.ServiceInfo.Name - } +func (e exporter) buildMetricBatch(details *exportMetadata, md pdata.Metrics) ([]telemetry.Batch, error) { + var errs []error - transform := &metricTransformer{ - DeltaCalculator: e.deltaCalculator, - ServiceName: srv, - Resource: ocmd.Resource, - } + transform := newTransformer(e.startInfo, details) + batches := make([]telemetry.Batch, 0, calcMetricBatches(md)) - for _, metric := range ocmd.Metrics { - nrMetrics, err := transform.Metric(metric) + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rmetrics := md.ResourceMetrics().At(i) + for j := 0; j < rmetrics.InstrumentationLibraryMetrics().Len(); j++ { + imetrics := rmetrics.InstrumentationLibraryMetrics().At(j) + commonAttributes := transform.CommonAttributes(rmetrics.Resource(), imetrics.InstrumentationLibrary()) + metricCommon, err := telemetry.NewMetricCommonBlock(telemetry.WithMetricAttributes(commonAttributes)) if err != nil { + e.logger.Error("Transform of metric common attributes failed.", zap.Error(err)) errs = append(errs, err) continue } - // TODO: optimize this, RecordMetric locks each call. - for _, m := range nrMetrics { - e.harvester.RecordMetric(m) + metricSlices := make([][]telemetry.Metric, 0, imetrics.Metrics().Len()) + for k := 0; k < imetrics.Metrics().Len(); k++ { + metric := imetrics.Metrics().At(k) + nrMetrics, err := transform.Metric(metric) + if err != nil { + { + var unsupportedErr *errUnsupportedMetricType + if ok := errors.As(err, &unsupportedErr); ok { + // Treat invalid metrics as a success + details.dataOutputCount += unsupportedErr.numDataPoints + } + } + e.logger.Debug("Transform of metric failed.", zap.Error(err)) + errs = append(errs, err) + continue + } + details.dataOutputCount += len(nrMetrics) + metricSlices = append(metricSlices, nrMetrics) } + metrics := combineMetricSlices(metricSlices) + batches = append(batches, telemetry.Batch{metricCommon, telemetry.NewMetricGroup(metrics)}) + } + } + + return batches, consumererror.Combine(errs) +} + +func calcMetricBatches(md pdata.Metrics) int { + rss := md.ResourceMetrics() + batchCount := 0 + for i := 0; i < rss.Len(); i++ { + batchCount += rss.At(i).InstrumentationLibraryMetrics().Len() + } + return batchCount +} + +func combineMetricSlices(groups [][]telemetry.Metric) []telemetry.Metric { + var totalLen int + for _, group := range groups { + totalLen += len(group) + } + metrics := make([]telemetry.Metric, totalLen) + var i int + for _, group := range groups { + i += copy(metrics[i:], group) + } + return metrics +} + +func (e exporter) export( + ctx context.Context, + details *exportMetadata, + buildBatches batchBuilder, +) (outputErr error) { + startTime := time.Now() + apiKey := e.extractAPIKeyFromHeader(ctx) + defer func() { + details.apiKey = sanitizeAPIKeyForLogging(apiKey) + details.exporterTime = time.Since(startTime) + details.grpcResponseCode = status.Code(outputErr) + err := details.recordMetrics(ctx) + if err != nil { + e.logger.Error("An error occurred recording metrics.", zap.Error(err)) } + }() + + batches, mapEntryErrors := buildBatches() + + var options []telemetry.ClientOption + if option := clientOptionForAPIKey(apiKey); option != nil { + options = append(options, option) + } + + req, err := e.requestFactory.BuildRequest(batches, options...) + if err != nil { + e.logger.Error("Failed to build data map", zap.Error(err)) + return err } - e.harvester.HarvestNow(ctx) + if err := e.doRequest(details, req); err != nil { + return err + } - return consumererror.Combine(errs) + return mapEntryErrors } -func (e exporter) Shutdown(ctx context.Context) error { - e.harvester.HarvestNow(ctx) +func (e exporter) doRequest(details *exportMetadata, req *http.Request) error { + startTime := time.Now() + defer func() { details.externalDuration = time.Since(startTime) }() + // Execute the http request and handle the response + response, err := http.DefaultClient.Do(req) + if err != nil { + e.logger.Error("Error making HTTP request.", zap.Error(err)) + return &urlError{Err: err} + } + defer response.Body.Close() + io.Copy(ioutil.Discard, response.Body) + details.httpStatusCode = response.StatusCode + + // Check if the http payload has been accepted, if not record an error + if response.StatusCode != http.StatusAccepted { + // Log the error at an appropriate level based on the status code + if response.StatusCode >= 500 { + // The data has been lost, but it is due to a server side error + e.logger.Warn("Server HTTP error", zap.String("Status", response.Status)) + } else if response.StatusCode == http.StatusForbidden { + // The data has been lost, but it is due to an invalid api key + e.logger.Debug("HTTP Forbidden response", zap.String("Status", response.Status)) + } else { + // The data has been lost due to an error in our payload + details.dataOutputCount = 0 + e.logger.Error("Client HTTP error.", zap.String("Status", response.Status)) + } + + return &httpError{Response: response} + } return nil } diff --git a/exporter/newrelicexporter/newrelic_test.go b/exporter/newrelicexporter/newrelic_test.go index 8c60032afdc3..950a9cbe6767 100644 --- a/exporter/newrelicexporter/newrelic_test.go +++ b/exporter/newrelicexporter/newrelic_test.go @@ -16,6 +16,8 @@ package newrelicexporter import ( "context" + "math" + "net/http" "net/url" "strings" "testing" @@ -27,31 +29,19 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/internaldata" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/timestamppb" ) -func TestLogWriter(t *testing.T) { - var messages []string - l := logWriter{func(s string, _ ...zapcore.Field) { - messages = append(messages, s) - }} - - n, err := l.Write([]byte("one")) - require.NoError(t, err) - assert.Equal(t, 3, n) - assert.Len(t, messages, 1) - - n, err = l.Write([]byte("two")) - require.NoError(t, err) - assert.Equal(t, 3, n) - assert.Len(t, messages, 2) -} +const ( + testCollectorName = "TestCollector" + testCollectorVersion = "v1.2.3" +) type mockConfig struct { useAPIKeyHeader bool @@ -59,12 +49,12 @@ type mockConfig struct { statusCode int } -func runMock(initialContext context.Context, ptrace pdata.Traces, cfg mockConfig) (*Mock, error) { +func runTraceMock(initialContext context.Context, ptrace pdata.Traces, cfg mockConfig) (*Mock, error) { ctx, cancel := context.WithCancel(initialContext) defer cancel() m := &Mock{ - Data: make([]Data, 0, 1), + Batches: make([]Batch, 0, 1), StatusCode: 202, } @@ -84,12 +74,15 @@ func runMock(initialContext context.Context, ptrace pdata.Traces, cfg mockConfig u, _ := url.Parse(urlString) if cfg.useAPIKeyHeader { - c.APIKeyHeader = "x-nr-key" + c.CommonConfig.APIKeyHeader = "api-key" } else { - c.APIKey = "1" + c.CommonConfig.APIKey = "NRII-1" } - c.spansInsecure, c.SpansHostOverride = true, u.Host - params := component.ExporterCreateParams{Logger: zap.NewNop()} + c.TracesConfig.insecure, c.TracesConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} exp, err := f.CreateTracesExporter(context.Background(), params, c) if err != nil { return m, err @@ -103,15 +96,141 @@ func runMock(initialContext context.Context, ptrace pdata.Traces, cfg mockConfig return m, nil } -func testTraceData(t *testing.T, expected []Span, resource *resourcepb.Resource, spans []*tracepb.Span, useAPIKeyHeader bool) { +func runMetricMock(initialContext context.Context, pmetrics pdata.Metrics, cfg mockConfig) (*Mock, error) { + ctx, cancel := context.WithCancel(initialContext) + defer cancel() + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + if cfg.statusCode > 0 { + m.StatusCode = cfg.statusCode + } + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.MetricsConfig.insecure, c.MetricsConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} + exp, err := f.CreateMetricsExporter(context.Background(), params, c) + if err != nil { + return m, err + } + if err := exp.ConsumeMetrics(ctx, pmetrics); err != nil { + return m, err + } + if err := exp.Shutdown(ctx); err != nil { + return m, err + } + return m, nil +} + +func runLogMock(initialContext context.Context, plogs pdata.Logs, cfg mockConfig) (*Mock, error) { + ctx, cancel := context.WithCancel(initialContext) + defer cancel() + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + if cfg.statusCode > 0 { + m.StatusCode = cfg.statusCode + } + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.LogsConfig.insecure, c.LogsConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} + exp, err := f.CreateLogsExporter(context.Background(), params, c) + if err != nil { + return m, err + } + if err := exp.ConsumeLogs(ctx, plogs); err != nil { + return m, err + } + if err := exp.Shutdown(ctx); err != nil { + return m, err + } + return m, nil +} + +func testTraceData(t *testing.T, expected []Batch, resource *resourcepb.Resource, spans []*tracepb.Span, apiKey string) { ctx := context.Background() + useAPIKeyHeader := apiKey != "" if useAPIKeyHeader { - ctx = metadata.NewIncomingContext(ctx, metadata.MD{"x-nr-key": []string{"a1b2c3d4"}}) + ctx = metadata.NewIncomingContext(ctx, metadata.MD{"api-key": []string{apiKey}}) } - m, err := runMock(ctx, internaldata.OCToTraces(nil, resource, spans), mockConfig{useAPIKeyHeader: useAPIKeyHeader}) + m, err := runTraceMock(ctx, internaldata.OCToTraces(nil, resource, spans), mockConfig{useAPIKeyHeader: useAPIKeyHeader}) require.NoError(t, err) - assert.Equal(t, expected, m.Spans()) + assert.Equal(t, expected, m.Batches) + if !useAPIKeyHeader { + assert.Equal(t, []string{"NRII-1"}, m.Header[http.CanonicalHeaderKey("api-key")]) + } else if strings.HasPrefix(apiKey, "NRII-") { + assert.Equal(t, []string{apiKey}, m.Header[http.CanonicalHeaderKey("api-key")]) + } else { + assert.Equal(t, []string{apiKey}, m.Header[http.CanonicalHeaderKey("x-license-key")]) + } +} + +func testMetricData(t *testing.T, expected []Batch, md internaldata.MetricsData, apiKey string) { + ctx := context.Background() + useAPIKeyHeader := apiKey != "" + if useAPIKeyHeader { + ctx = metadata.NewIncomingContext(ctx, metadata.MD{"api-key": []string{apiKey}}) + } + + m, err := runMetricMock(ctx, internaldata.OCToMetrics(md), mockConfig{useAPIKeyHeader: useAPIKeyHeader}) + require.NoError(t, err) + assert.Equal(t, expected, m.Batches) +} + +func testLogData(t *testing.T, expected []Batch, logs pdata.Logs, apiKey string) { + ctx := context.Background() + useAPIKeyHeader := apiKey != "" + if useAPIKeyHeader { + ctx = metadata.NewIncomingContext(ctx, metadata.MD{"api-key": []string{apiKey}}) + } + + l, err := runLogMock(ctx, logs, mockConfig{useAPIKeyHeader: useAPIKeyHeader}) + require.NoError(t, err) + assert.Equal(t, expected, l.Batches) } func TestExportTraceWithBadURL(t *testing.T) { @@ -123,7 +242,7 @@ func TestExportTraceWithBadURL(t *testing.T) { }, }) - _, err := runMock(context.Background(), ptrace, mockConfig{serverURL: "http://badurl"}) + _, err := runTraceMock(context.Background(), ptrace, mockConfig{serverURL: "http://badurl"}) require.Error(t, err) } @@ -136,7 +255,7 @@ func TestExportTraceWithErrorStatusCode(t *testing.T) { }, }) - _, err := runMock(context.Background(), ptrace, mockConfig{statusCode: 500}) + _, err := runTraceMock(context.Background(), ptrace, mockConfig{statusCode: 500}) require.Error(t, err) } @@ -149,7 +268,20 @@ func TestExportTraceWithNot202StatusCode(t *testing.T) { }, }) - _, err := runMock(context.Background(), ptrace, mockConfig{statusCode: 403}) + _, err := runTraceMock(context.Background(), ptrace, mockConfig{statusCode: 403}) + require.Error(t, err) +} + +func TestExportTraceWithBadPayload(t *testing.T) { + ptrace := internaldata.OCToTraces(nil, nil, + []*tracepb.Span{ + { + SpanId: []byte{0, 0, 0, 0, 0, 0, 0, 1}, + Name: &tracepb.TruncatableString{Value: "a"}, + }, + }) + + _, err := runTraceMock(context.Background(), ptrace, mockConfig{statusCode: 400}) require.Error(t, err) } @@ -162,7 +294,7 @@ func TestExportTraceWithInvalidMetadata(t *testing.T) { }, }) - _, err := runMock(context.Background(), ptrace, mockConfig{useAPIKeyHeader: true}) + _, err := runTraceMock(context.Background(), ptrace, mockConfig{useAPIKeyHeader: true}) require.Error(t, err) } @@ -176,7 +308,7 @@ func TestExportTraceWithNoAPIKeyInMetadata(t *testing.T) { }) ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{}) - _, err := runMock(ctx, ptrace, mockConfig{useAPIKeyHeader: true}) + _, err := runTraceMock(ctx, ptrace, mockConfig{useAPIKeyHeader: true}) require.Error(t, err) } @@ -193,7 +325,7 @@ func TestExportTracePartialData(t *testing.T) { }, }) - _, err := runMock(context.Background(), ptrace, mockConfig{useAPIKeyHeader: false}) + _, err := runTraceMock(context.Background(), ptrace, mockConfig{useAPIKeyHeader: false}) require.Error(t, err) assert.True(t, strings.Contains(err.Error(), errInvalidSpanID.Error())) assert.True(t, strings.Contains(err.Error(), errInvalidTraceID.Error())) @@ -208,27 +340,35 @@ func TestExportTraceDataMinimum(t *testing.T) { }, } - expected := []Span{ + expected := []Batch{ { - ID: "0000000000000001", - TraceID: "01010101010101010101010101010101", - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "name": "root", - "instrumentation.provider": "opentelemetry", + Common: Common{ + Attributes: map[string]string{ + "collector.name": testCollectorName, + "collector.version": testCollectorVersion, + }, + }, + Spans: []Span{ + { + ID: "0000000000000001", + TraceID: "01010101010101010101010101010101", + Attributes: map[string]interface{}{ + "name": "root", + }, + }, }, }, } - testTraceData(t, expected, nil, spans, false) - testTraceData(t, expected, nil, spans, true) + testTraceData(t, expected, nil, spans, "") + testTraceData(t, expected, nil, spans, "api-key") + testTraceData(t, expected, nil, spans, "NRII-api-key") } func TestExportTraceDataFullTrace(t *testing.T) { resource := &resourcepb.Resource{ Labels: map[string]string{ - serviceNameKey: "test-service", + "service.name": "test-service", "resource": "R1", }, } @@ -256,72 +396,68 @@ func TestExportTraceDataFullTrace(t *testing.T) { }, } - expected := []Span{ + expected := []Batch{ { - ID: "0000000000000001", - TraceID: "01010101010101010101010101010101", - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "name": "root", - "resource": "R1", - "service.name": "test-service", - "instrumentation.provider": "opentelemetry", - }, - }, - { - ID: "0000000000000002", - TraceID: "01010101010101010101010101010101", - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "name": "client", - "parent.id": "0000000000000001", - "resource": "R1", - "service.name": "test-service", - "instrumentation.provider": "opentelemetry", + Common: Common{ + Attributes: map[string]string{ + "collector.name": testCollectorName, + "collector.version": testCollectorVersion, + "resource": "R1", + "service.name": "test-service", + }, }, - }, - { - ID: "0000000000000003", - TraceID: "01010101010101010101010101010101", - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "name": "server", - "parent.id": "0000000000000002", - "resource": "R1", - "service.name": "test-service", - "instrumentation.provider": "opentelemetry", + Spans: []Span{ + { + ID: "0000000000000001", + TraceID: "01010101010101010101010101010101", + Attributes: map[string]interface{}{ + "name": "root", + }, + }, + { + ID: "0000000000000002", + TraceID: "01010101010101010101010101010101", + Attributes: map[string]interface{}{ + "name": "client", + "parent.id": "0000000000000001", + }, + }, + { + ID: "0000000000000003", + TraceID: "01010101010101010101010101010101", + Attributes: map[string]interface{}{ + "name": "server", + "parent.id": "0000000000000002", + }, + }, }, }, } - testTraceData(t, expected, resource, spans, false) - testTraceData(t, expected, resource, spans, true) + testTraceData(t, expected, resource, spans, "") + testTraceData(t, expected, resource, spans, "api-key") + testTraceData(t, expected, resource, spans, "NRII-api-key") } -func testExportMetricData(t *testing.T, expected []Metric, md internaldata.MetricsData) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestExportMetricUnsupported(t *testing.T) { + m := pdata.NewMetric() + m.SetDataType(pdata.MetricDataTypeHistogram) + dp := pdata.NewHistogramDataPoint() + dp.SetCount(1) + dp.SetSum(1) + dp.SetTimestamp(pdata.TimestampFromTime(time.Now())) + m.Histogram().DataPoints().Append(dp) - m := &Mock{ - Data: make([]Data, 0, 3), - StatusCode: 202, - } - srv := m.Server() - defer srv.Close() + ms := pdata.NewMetrics() + rm := pdata.NewResourceMetrics() + ilm := pdata.NewInstrumentationLibraryMetrics() + ilm.Metrics().Append(m) + rm.InstrumentationLibraryMetrics().Append(ilm) + ms.ResourceMetrics().Append(rm) - f := NewFactory() - c := f.CreateDefaultConfig().(*Config) - u, _ := url.Parse(srv.URL) - c.APIKey, c.metricsInsecure, c.MetricsHostOverride = "1", true, u.Host - params := component.ExporterCreateParams{Logger: zap.NewNop()} - exp, err := f.CreateMetricsExporter(context.Background(), params, c) - require.NoError(t, err) - require.NoError(t, exp.ConsumeMetrics(ctx, internaldata.OCToMetrics(md))) - require.NoError(t, exp.Shutdown(ctx)) - assert.Equal(t, expected, m.Metrics()) + _, err := runMetricMock(context.Background(), ms, mockConfig{useAPIKeyHeader: false}) + var unsupportedErr *errUnsupportedMetricType + assert.ErrorAs(t, err, &unsupportedErr, "error was not the expected unsupported metric type error") } func TestExportMetricDataMinimal(t *testing.T) { @@ -362,24 +498,34 @@ func TestExportMetricDataMinimal(t *testing.T) { }, } - expected := []Metric{ + expected := []Batch{ { - Name: "temperature", - Type: "gauge", - Value: 293.15, - Timestamp: int64(100 * time.Microsecond), - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "description": desc, - "unit": unit, - "location": "Portland", - "elevation": "0", + Common: Common{ + Attributes: map[string]string{ + "collector.name": testCollectorName, + "collector.version": testCollectorVersion, + }, + }, + Metrics: []Metric{ + { + Name: "temperature", + Type: "gauge", + Value: 293.15, + Timestamp: int64(100 * time.Microsecond), + Attributes: map[string]interface{}{ + "description": desc, + "unit": unit, + "location": "Portland", + "elevation": "0", + }, + }, }, }, } - testExportMetricData(t, expected, md) + testMetricData(t, expected, md, "NRII-api-key") + testMetricData(t, expected, md, "api-key") + testMetricData(t, expected, md, "") } func TestExportMetricDataFull(t *testing.T) { @@ -468,88 +614,412 @@ func TestExportMetricDataFull(t *testing.T) { }, } - expected := []Metric{ + expected := []Batch{ { - Name: "temperature", - Type: "gauge", - Value: 293.15, - Timestamp: int64(100 * time.Microsecond), - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "description": desc, - "unit": unit, - "resource": "R1", - "service.name": "test-service", - "location": "Portland", - "elevation": "0", - }, - }, - { - Name: "temperature", - Type: "gauge", - Value: 293.15, - Timestamp: int64(101 * time.Microsecond), - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "description": desc, - "unit": unit, - "resource": "R1", - "service.name": "test-service", - "location": "Portland", - "elevation": "0", + Common: Common{ + Attributes: map[string]string{ + "collector.name": testCollectorName, + "collector.version": testCollectorVersion, + "resource": "R1", + "service.name": "test-service", + }, }, - }, - { - Name: "temperature", - Type: "gauge", - Value: 293.45, - Timestamp: int64(102 * time.Microsecond), - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "description": desc, - "unit": unit, - "resource": "R1", - "service.name": "test-service", - "location": "Portland", - "elevation": "0", + Metrics: []Metric{ + { + Name: "temperature", + Type: "gauge", + Value: 293.15, + Timestamp: int64(100 * time.Microsecond), + Attributes: map[string]interface{}{ + "description": desc, + "unit": unit, + "location": "Portland", + "elevation": "0", + }, + }, + { + Name: "temperature", + Type: "gauge", + Value: 293.15, + Timestamp: int64(101 * time.Microsecond), + Attributes: map[string]interface{}{ + "description": desc, + "unit": unit, + "location": "Portland", + "elevation": "0", + }, + }, + { + Name: "temperature", + Type: "gauge", + Value: 293.45, + Timestamp: int64(102 * time.Microsecond), + Attributes: map[string]interface{}{ + "description": desc, + "unit": unit, + "location": "Portland", + "elevation": "0", + }, + }, + { + Name: "temperature", + Type: "gauge", + Value: 290.05, + Timestamp: int64(99 * time.Microsecond), + Attributes: map[string]interface{}{ + "description": desc, + "unit": unit, + "location": "Denver", + "elevation": "5280", + }, + }, + { + Name: "temperature", + Type: "gauge", + Value: 293.15, + Timestamp: int64(106 * time.Microsecond), + Attributes: map[string]interface{}{ + "description": desc, + "unit": unit, + "location": "Denver", + "elevation": "5280", + }, + }, }, }, + } + + testMetricData(t, expected, md, "") + testMetricData(t, expected, md, "api-key") + testMetricData(t, expected, md, "NRII-api-key") +} + +func TestExportLogs(t *testing.T) { + timestamp := time.Now() + l := pdata.NewLogRecord() + l.SetName("logname") + l.SetTimestamp(pdata.TimestampFromTime(timestamp)) + l.Body().SetStringVal("log body") + l.Attributes().InsertString("foo", "bar") + + ilog := pdata.NewInstrumentationLibraryLogs() + ilog.Logs().Append(l) + rlog := pdata.NewResourceLogs() + rlog.InstrumentationLibraryLogs().Append(ilog) + rlog.Resource().Attributes().InsertString("resource", "R1") + rlog.Resource().Attributes().InsertString("service.name", "test-service") + logs := pdata.NewLogs() + logs.ResourceLogs().Append(rlog) + + expected := []Batch{ { - Name: "temperature", - Type: "gauge", - Value: 290.05, - Timestamp: int64(99 * time.Microsecond), - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "description": desc, - "unit": unit, - "resource": "R1", - "service.name": "test-service", - "location": "Denver", - "elevation": "5280", + Common: Common{ + Attributes: map[string]string{ + "collector.name": testCollectorName, + "collector.version": testCollectorVersion, + "resource": "R1", + "service.name": "test-service", + }, }, - }, - { - Name: "temperature", - Type: "gauge", - Value: 293.15, - Timestamp: int64(106 * time.Microsecond), - Attributes: map[string]interface{}{ - "collector.name": name, - "collector.version": version, - "description": desc, - "unit": unit, - "resource": "R1", - "service.name": "test-service", - "location": "Denver", - "elevation": "5280", + Logs: []Log{ + { + Message: "log body", + Timestamp: timestamp.UnixNano() / (1000 * 1000), + Attributes: map[string]interface{}{ + "foo": "bar", + "name": "logname", + }, + }, }, }, } - testExportMetricData(t, expected, md) + testLogData(t, expected, logs, "") + testLogData(t, expected, logs, "api-key") + testLogData(t, expected, logs, "NRII-api-key") +} + +func TestCreatesClientOptionWithVersionInUserAgent(t *testing.T) { + testUserAgentContainsCollectorInfo(t, testCollectorVersion, "githash", testCollectorName, "NewRelic-OpenTelemetry-Collector/v1.2.3 TestCollector") + testUserAgentContainsCollectorInfo(t, "", "githash", testCollectorName, "NewRelic-OpenTelemetry-Collector/githash TestCollector") +} + +func testUserAgentContainsCollectorInfo(t *testing.T, version string, gitHash string, exeName string, expectedUserAgentSubstring string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + cfg := mockConfig{useAPIKeyHeader: false} + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.TracesConfig.insecure, c.TracesConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: exeName, + Version: version, + GitHash: gitHash, + }} + exp, err := f.CreateTracesExporter(context.Background(), params, c) + require.NoError(t, err) + + s := pdata.NewSpan() + s.SetName("root") + s.SetTraceID(pdata.NewTraceID([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1})) + ils := pdata.NewInstrumentationLibrarySpans() + ils.Spans().Append(s) + rs := pdata.NewResourceSpans() + rs.InstrumentationLibrarySpans().Append(ils) + ptrace := pdata.NewTraces() + ptrace.ResourceSpans().Append(rs) + + err = exp.ConsumeTraces(ctx, ptrace) + require.NoError(t, err) + err = exp.Shutdown(ctx) + require.NoError(t, err) + + assert.Contains(t, m.Header[http.CanonicalHeaderKey("user-agent")][0], expectedUserAgentSubstring) +} + +func TestBadSpanResourceGeneratesError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + cfg := mockConfig{useAPIKeyHeader: false} + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.TracesConfig.insecure, c.TracesConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} + exp, err := f.CreateTracesExporter(context.Background(), params, c) + require.NoError(t, err) + + s := pdata.NewSpan() + s.SetName("root") + s.SetTraceID(pdata.NewTraceID([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1})) + ils := pdata.NewInstrumentationLibrarySpans() + ils.Spans().Append(s) + rs := pdata.NewResourceSpans() + rs.InstrumentationLibrarySpans().Append(ils) + rs.Resource().Attributes().InsertDouble("badattribute", math.Inf(1)) + ptrace := pdata.NewTraces() + ptrace.ResourceSpans().Append(rs) + + errorFromConsumeTraces := exp.ConsumeTraces(ctx, ptrace) + + err = exp.Shutdown(ctx) + require.NoError(t, err) + + require.Error(t, errorFromConsumeTraces) +} + +func TestBadMetricResourceGeneratesError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + cfg := mockConfig{useAPIKeyHeader: false} + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.TracesConfig.insecure, c.TracesConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} + exp, err := f.CreateMetricsExporter(context.Background(), params, c) + require.NoError(t, err) + + metric := pdata.NewMetric() + metric.SetName("testmetric") + ilm := pdata.NewInstrumentationLibraryMetrics() + ilm.Metrics().Append(metric) + rm := pdata.NewResourceMetrics() + rm.InstrumentationLibraryMetrics().Append(ilm) + rm.Resource().Attributes().InsertDouble("badattribute", math.Inf(1)) + md := pdata.NewMetrics() + md.ResourceMetrics().Append(rm) + + errorFromConsumeMetrics := exp.ConsumeMetrics(ctx, md) + + err = exp.Shutdown(ctx) + require.NoError(t, err) + + require.Error(t, errorFromConsumeMetrics) +} + +func TestBadLogResourceGeneratesError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + cfg := mockConfig{useAPIKeyHeader: false} + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.TracesConfig.insecure, c.TracesConfig.HostOverride = true, u.Host + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} + exp, err := f.CreateLogsExporter(context.Background(), params, c) + require.NoError(t, err) + + log := pdata.NewLogRecord() + ill := pdata.NewInstrumentationLibraryLogs() + ill.Logs().Append(log) + rl := pdata.NewResourceLogs() + rl.InstrumentationLibraryLogs().Append(ill) + rl.Resource().Attributes().InsertDouble("badattribute", math.Inf(1)) + ld := pdata.NewLogs() + ld.ResourceLogs().Append(rl) + + errorFromConsumeLogs := exp.ConsumeLogs(ctx, ld) + + err = exp.Shutdown(ctx) + require.NoError(t, err) + + require.Error(t, errorFromConsumeLogs) +} + +func TestFailureToRecordMetricsDoesNotAffectExportingData(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := view.Register(MetricViews()...); err != nil { + t.Fail() + } + defer view.Unregister(MetricViews()...) + + m := &Mock{ + Batches: make([]Batch, 0, 1), + StatusCode: 202, + } + + cfg := mockConfig{useAPIKeyHeader: false} + + srv := m.Server() + defer srv.Close() + + f := NewFactory() + c := f.CreateDefaultConfig().(*Config) + urlString := srv.URL + if cfg.serverURL != "" { + urlString = cfg.serverURL + } + u, _ := url.Parse(urlString) + + if cfg.useAPIKeyHeader { + c.CommonConfig.APIKeyHeader = "api-key" + } else { + c.CommonConfig.APIKey = "NRII-1" + } + c.TracesConfig.insecure, c.TracesConfig.HostOverride = true, u.Host + + params := component.ExporterCreateParams{Logger: zap.NewNop(), ApplicationStartInfo: component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }} + exp, err := f.CreateTracesExporter(context.Background(), params, c) + require.NoError(t, err) + + s := pdata.NewSpan() + s.SetName("root") + s.SetTraceID(pdata.NewTraceID([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1})) + ils := pdata.NewInstrumentationLibrarySpans() + ils.Spans().Append(s) + rs := pdata.NewResourceSpans() + rs.InstrumentationLibrarySpans().Append(ils) + ptrace := pdata.NewTraces() + ptrace.ResourceSpans().Append(rs) + + // Create a long string so that the user-agent will be too long and cause RecordMetric to fail + b := make([]byte, 300) + for i := 0; i < 300; i++ { + b[i] = 'a' + } + consumeCtx := metadata.NewIncomingContext(context.Background(), metadata.MD{"user-agent": []string{string(b)}}) + err = exp.ConsumeTraces(consumeCtx, ptrace) + require.NoError(t, err) + err = exp.Shutdown(ctx) + require.NoError(t, err) + + assert.Contains(t, m.Header[http.CanonicalHeaderKey("user-agent")][0], testCollectorName) } diff --git a/exporter/newrelicexporter/testdata/config.yaml b/exporter/newrelicexporter/testdata/config.yaml index 0ac4c095fad9..02c957ea5b1f 100644 --- a/exporter/newrelicexporter/testdata/config.yaml +++ b/exporter/newrelicexporter/testdata/config.yaml @@ -9,12 +9,12 @@ exporters: newrelic/alt: apikey: a1b2c3d4 timeout: 30s - common_attributes: - server: test-server - prod: true - weight: 3 - metrics_host_override: alt.metrics.newrelic.com - spans_host_override: alt.spans.newrelic.com + metrics: + host_override: alt.metrics.newrelic.com + traces: + host_override: alt.spans.newrelic.com + logs: + host_override: alt.logs.newrelic.com service: pipelines: @@ -22,4 +22,11 @@ service: receivers: [nop] processors: [nop] exporters: [newrelic] - + metrics: + receivers: [nop] + processors: [nop] + exporters: [newrelic] + logs: + receivers: [nop] + processors: [nop] + exporters: [newrelic] diff --git a/exporter/newrelicexporter/transformer.go b/exporter/newrelicexporter/transformer.go index 854460dce3a0..37f3976a9d3a 100644 --- a/exporter/newrelicexporter/transformer.go +++ b/exporter/newrelicexporter/transformer.go @@ -17,59 +17,66 @@ package newrelicexporter import ( "errors" "fmt" + "math" "strings" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "github.com/newrelic/newrelic-telemetry-sdk-go/cumulative" "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" - "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" tracetranslator "go.opentelemetry.io/collector/translator/trace" - "google.golang.org/protobuf/types/known/timestamppb" ) const ( - unitAttrKey = "unit" - descriptionAttrKey = "description" - collectorNameKey = "collector.name" - collectorVersionKey = "collector.version" - instrumentationNameKey = "instrumentation.name" - instrumentationVersionKey = "instrumentation.version" - instrumentationProviderAttrKey = "instrumentation.provider" - statusCodeKey = "otel.status_code" - statusDescriptionKey = "otel.status_description" - spanKindKey = "span.kind" - serviceNameKey = "service.name" + unitAttrKey = "unit" + descriptionAttrKey = "description" + collectorNameKey = "collector.name" + collectorVersionKey = "collector.version" + instrumentationNameKey = "instrumentation.name" + instrumentationVersionKey = "instrumentation.version" + statusCodeKey = "otel.status_code" + statusDescriptionKey = "otel.status_description" + spanKindKey = "span.kind" + spanIDKey = "span.id" + traceIDKey = "trace.id" + logSeverityTextKey = "log.level" + logSeverityNumKey = "log.levelNum" ) -// TODO (MrAlias): unify this with the traceTransformer when the metric data -// export moves to using pdata and away from the OC proto. -type metricTransformer struct { - DeltaCalculator *cumulative.DeltaCalculator - ServiceName string - Resource *resourcepb.Resource +type transformer struct { + OverrideAttributes map[string]interface{} + details *exportMetadata } -type traceTransformer struct { - ResourceAttributes map[string]interface{} +func newTransformer(startInfo *component.ApplicationStartInfo, details *exportMetadata) *transformer { + overrideAttributes := make(map[string]interface{}) + if startInfo != nil { + overrideAttributes[collectorNameKey] = startInfo.ExeName + if startInfo.Version != "" { + overrideAttributes[collectorVersionKey] = startInfo.Version + } + } + + return &transformer{OverrideAttributes: overrideAttributes, details: details} } -func newTraceTransformer(resource pdata.Resource, lib pdata.InstrumentationLibrary) *traceTransformer { - t := &traceTransformer{ - ResourceAttributes: tracetranslator.AttributeMapToMap( - resource.Attributes(), - ), - } +func (t *transformer) CommonAttributes(resource pdata.Resource, lib pdata.InstrumentationLibrary) map[string]interface{} { + resourceAttrs := resource.Attributes() + commonAttrs := tracetranslator.AttributeMapToMap(resourceAttrs) + t.TrackAttributes(attributeLocationResource, resourceAttrs) if n := lib.Name(); n != "" { - t.ResourceAttributes[instrumentationNameKey] = n + commonAttrs[instrumentationNameKey] = n if v := lib.Version(); v != "" { - t.ResourceAttributes[instrumentationVersionKey] = v + commonAttrs[instrumentationVersionKey] = v } } - return t + + for k, v := range t.OverrideAttributes { + commonAttrs[k] = v + } + + return commonAttrs } var ( @@ -77,7 +84,7 @@ var ( errInvalidTraceID = errors.New("TraceID is invalid") ) -func (t *traceTransformer) Span(span pdata.Span) (telemetry.Span, error) { +func (t *transformer) Span(span pdata.Span) (telemetry.Span, error) { startTime := span.StartTimestamp().AsTime() sp := telemetry.Span{ // HexString validates the IDs, it will be an empty string if invalid. @@ -91,6 +98,12 @@ func (t *traceTransformer) Span(span pdata.Span) (telemetry.Span, error) { Events: t.SpanEvents(span), } + spanMetadataKey := spanStatsKey{ + hasEvents: sp.Events != nil, + hasLinks: span.Links().Len() > 0, + } + t.details.spanMetadataCount[spanMetadataKey]++ + if sp.ID == "" { return sp, errInvalidSpanID } @@ -101,9 +114,53 @@ func (t *traceTransformer) Span(span pdata.Span) (telemetry.Span, error) { return sp, nil } -func (t *traceTransformer) SpanAttributes(span pdata.Span) map[string]interface{} { +func (t *transformer) Log(log pdata.LogRecord) (telemetry.Log, error) { + var message string + + if bodyString := log.Body().StringVal(); bodyString != "" { + message = bodyString + } else { + message = log.Name() + } - length := 2 + len(t.ResourceAttributes) + span.Attributes().Len() + logAttrs := log.Attributes() + attrs := make(map[string]interface{}, logAttrs.Len()+5) + + for k, v := range tracetranslator.AttributeMapToMap(logAttrs) { + // Only include attribute if not an override attribute + if _, isOverrideKey := t.OverrideAttributes[k]; !isOverrideKey { + attrs[k] = v + } + } + t.TrackAttributes(attributeLocationLog, logAttrs) + + attrs["name"] = log.Name() + if !log.TraceID().IsEmpty() { + attrs[traceIDKey] = log.TraceID().HexString() + } + + if !log.SpanID().IsEmpty() { + attrs[spanIDKey] = log.SpanID().HexString() + } + + if log.SeverityText() != "" { + attrs[logSeverityTextKey] = log.SeverityText() + } + + if log.SeverityNumber() != 0 { + attrs[logSeverityNumKey] = int32(log.SeverityNumber()) + } + + return telemetry.Log{ + Message: message, + Timestamp: log.Timestamp().AsTime(), + Attributes: attrs, + }, nil +} + +func (t *transformer) SpanAttributes(span pdata.Span) map[string]interface{} { + spanAttrs := span.Attributes() + length := spanAttrs.Len() var hasStatusCode, hasStatusDesc bool s := span.Status() @@ -137,25 +194,19 @@ func (t *traceTransformer) SpanAttributes(span pdata.Span) map[string]interface{ attrs[spanKindKey] = strings.ToLower(kind) } - for k, v := range t.ResourceAttributes { - attrs[k] = v - } - - for k, v := range tracetranslator.AttributeMapToMap(span.Attributes()) { - attrs[k] = v + for k, v := range tracetranslator.AttributeMapToMap(spanAttrs) { + // Only include attribute if not an override attribute + if _, isOverrideKey := t.OverrideAttributes[k]; !isOverrideKey { + attrs[k] = v + } } - - // Default attributes to tell New Relic about this collector. - // (overrides any existing) - attrs[collectorNameKey] = name - attrs[collectorVersionKey] = version - attrs[instrumentationProviderAttrKey] = "opentelemetry" + t.TrackAttributes(attributeLocationSpan, spanAttrs) return attrs } // SpanEvents transforms the recorded events of span into New Relic tracing events. -func (t *traceTransformer) SpanEvents(span pdata.Span) []telemetry.Event { +func (t *transformer) SpanEvents(span pdata.Span) []telemetry.Event { length := span.Events().Len() if length == 0 { return nil @@ -165,236 +216,219 @@ func (t *traceTransformer) SpanEvents(span pdata.Span) []telemetry.Event { for i := 0; i < length; i++ { event := span.Events().At(i) + eventAttrs := event.Attributes() events[i] = telemetry.Event{ EventType: event.Name(), Timestamp: event.Timestamp().AsTime(), - Attributes: tracetranslator.AttributeMapToMap(event.Attributes()), + Attributes: tracetranslator.AttributeMapToMap(eventAttrs), } + t.TrackAttributes(attributeLocationSpanEvent, eventAttrs) } return events } -func (t *metricTransformer) Timestamp(ts *timestamppb.Timestamp) time.Time { - if ts == nil { - return time.Time{} - } - return time.Unix(ts.Seconds, int64(ts.Nanos)) +type errUnsupportedMetricType struct { + metricType string + metricName string + numDataPoints int } -func (t *metricTransformer) Metric(metric *metricspb.Metric) ([]telemetry.Metric, error) { - if metric == nil || metric.MetricDescriptor == nil { - return nil, errors.New("empty metric") - } +func (e errUnsupportedMetricType) Error() string { + return fmt.Sprintf("unsupported metric %v (%v)", e.metricName, e.metricType) +} - var errs []error - md := metric.MetricDescriptor - baseAttrs := t.MetricAttributes(metric) +func (t *transformer) Metric(m pdata.Metric) ([]telemetry.Metric, error) { + var output []telemetry.Metric + baseAttributes := t.BaseMetricAttributes(m) + + dataType := m.DataType() + k := metricStatsKey{MetricType: dataType} + + switch dataType { + case pdata.MetricDataTypeIntGauge: + t.details.metricMetadataCount[k]++ + // "StartTimestampUnixNano" is ignored for all data points. + gauge := m.IntGauge() + points := gauge.DataPoints() + output = make([]telemetry.Metric, 0, points.Len()) + for l := 0; l < points.Len(); l++ { + point := points.At(l) + attributes := t.MetricAttributes(baseAttributes, point.LabelsMap()) + + nrMetric := telemetry.Gauge{ + Name: m.Name(), + Attributes: attributes, + Value: float64(point.Value()), + Timestamp: point.Timestamp().AsTime(), + } + output = append(output, nrMetric) + } + case pdata.MetricDataTypeDoubleGauge: + t.details.metricMetadataCount[k]++ + // "StartTimestampUnixNano" is ignored for all data points. + gauge := m.DoubleGauge() + points := gauge.DataPoints() + output = make([]telemetry.Metric, 0, points.Len()) + for l := 0; l < points.Len(); l++ { + point := points.At(l) + attributes := t.MetricAttributes(baseAttributes, point.LabelsMap()) + + nrMetric := telemetry.Gauge{ + Name: m.Name(), + Attributes: attributes, + Value: point.Value(), + Timestamp: point.Timestamp().AsTime(), + } + output = append(output, nrMetric) + } + case pdata.MetricDataTypeIntSum: + // aggregation_temporality describes if the aggregator reports delta changes + // since last report time, or cumulative changes since a fixed start time. + sum := m.IntSum() + temporality := sum.AggregationTemporality() + k.MetricTemporality = temporality + t.details.metricMetadataCount[k]++ + + if temporality != pdata.AggregationTemporalityDelta { + return nil, &errUnsupportedMetricType{metricType: k.MetricType.String(), metricName: m.Name(), numDataPoints: sum.DataPoints().Len()} + } - var metrics []telemetry.Metric - for _, ts := range metric.Timeseries { - startTime := t.Timestamp(ts.StartTimestamp) + points := sum.DataPoints() + output = make([]telemetry.Metric, 0, points.Len()) + for l := 0; l < points.Len(); l++ { + point := points.At(l) + attributes := t.MetricAttributes(baseAttributes, point.LabelsMap()) + + nrMetric := telemetry.Count{ + Name: m.Name(), + Attributes: attributes, + Value: float64(point.Value()), + Timestamp: point.StartTimestamp().AsTime(), + Interval: time.Duration(point.Timestamp() - point.StartTimestamp()), + } + output = append(output, nrMetric) + } + case pdata.MetricDataTypeDoubleSum: + sum := m.DoubleSum() + temporality := sum.AggregationTemporality() + k.MetricTemporality = temporality + t.details.metricMetadataCount[k]++ + + if temporality != pdata.AggregationTemporalityDelta { + return nil, &errUnsupportedMetricType{metricType: k.MetricType.String(), metricName: m.Name(), numDataPoints: sum.DataPoints().Len()} + } - attr, err := t.MergeAttributes(baseAttrs, md.LabelKeys, ts.LabelValues) - if err != nil { - errs = append(errs, err) - continue + points := sum.DataPoints() + output = make([]telemetry.Metric, 0, points.Len()) + for l := 0; l < points.Len(); l++ { + point := points.At(l) + attributes := t.MetricAttributes(baseAttributes, point.LabelsMap()) + + nrMetric := telemetry.Count{ + Name: m.Name(), + Attributes: attributes, + Value: point.Value(), + Timestamp: point.StartTimestamp().AsTime(), + Interval: time.Duration(point.Timestamp() - point.StartTimestamp()), + } + output = append(output, nrMetric) } + case pdata.MetricDataTypeIntHistogram: + t.details.metricMetadataCount[k]++ + hist := m.IntHistogram() + return nil, &errUnsupportedMetricType{metricType: k.MetricType.String(), metricName: m.Name(), numDataPoints: hist.DataPoints().Len()} + case pdata.MetricDataTypeHistogram: + t.details.metricMetadataCount[k]++ + hist := m.Histogram() + return nil, &errUnsupportedMetricType{metricType: k.MetricType.String(), metricName: m.Name(), numDataPoints: hist.DataPoints().Len()} + case pdata.MetricDataTypeSummary: + t.details.metricMetadataCount[k]++ + summary := m.Summary() + points := summary.DataPoints() + output = make([]telemetry.Metric, 0, points.Len()) + name := m.Name() + for l := 0; l < points.Len(); l++ { + point := points.At(l) + quantiles := point.QuantileValues() + minQuantile := math.NaN() + maxQuantile := math.NaN() + + if quantiles.Len() > 0 { + quantileA := quantiles.At(0) + if quantileA.Quantile() == 0 { + minQuantile = quantileA.Value() + } + if quantiles.Len() > 1 { + quantileB := quantiles.At(quantiles.Len() - 1) + if quantileB.Quantile() == 1 { + maxQuantile = quantileB.Value() + } + } else if quantileA.Quantile() == 1 { + maxQuantile = quantileA.Value() + } + } - for _, point := range ts.Points { - switch md.Type { - case - metricspb.MetricDescriptor_GAUGE_INT64, - metricspb.MetricDescriptor_GAUGE_DOUBLE: - metrics = append(metrics, t.Gauge(md.Name, attr, point)) - case - metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, - metricspb.MetricDescriptor_SUMMARY: - metrics = append(metrics, t.DeltaSummary(md.Name, attr, startTime, point)) - case - metricspb.MetricDescriptor_CUMULATIVE_INT64, - metricspb.MetricDescriptor_CUMULATIVE_DOUBLE: - metrics = append(metrics, t.CumulativeCount(md.Name, attr, startTime, point)) - case - metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: - metrics = append(metrics, t.CumulativeSummary(md.Name, attr, startTime, point)) - default: - errs = append(errs, fmt.Errorf("unsupported metric type: %s", md.Type.String())) + attributes := t.MetricAttributes(baseAttributes, point.LabelsMap()) + nrMetric := telemetry.Summary{ + Name: name, + Attributes: attributes, + Count: float64(point.Count()), + Sum: point.Sum(), + Min: minQuantile, + Max: maxQuantile, + Timestamp: point.StartTimestamp().AsTime(), + Interval: time.Duration(point.Timestamp() - point.StartTimestamp()), } + + output = append(output, nrMetric) } + default: + t.details.metricMetadataCount[k]++ } - return metrics, consumererror.Combine(errs) + return output, nil } -func (t *metricTransformer) MetricAttributes(metric *metricspb.Metric) map[string]interface{} { - length := 3 +func (t *transformer) BaseMetricAttributes(metric pdata.Metric) map[string]interface{} { + length := 0 - if t.Resource != nil { - length += len(t.Resource.Labels) - } - - if metric.MetricDescriptor.Unit != "" { + if metric.Unit() != "" { length++ } - if metric.MetricDescriptor.Description != "" { + if metric.Description() != "" { length++ } attrs := make(map[string]interface{}, length) - if t.Resource != nil { - for k, v := range t.Resource.Labels { - attrs[k] = v - } - } - - if metric.MetricDescriptor.Unit != "" { - attrs[unitAttrKey] = metric.MetricDescriptor.Unit + if metric.Unit() != "" { + attrs[unitAttrKey] = metric.Unit() } - if metric.MetricDescriptor.Description != "" { - attrs[descriptionAttrKey] = metric.MetricDescriptor.Description + if metric.Description() != "" { + attrs[descriptionAttrKey] = metric.Description() } - - attrs[collectorNameKey] = name - attrs[collectorVersionKey] = version - if t.ServiceName != "" { - attrs[serviceNameKey] = t.ServiceName - } - return attrs } -var errIncompatibleLabels = errors.New("label keys and values do not match") - -func (t *metricTransformer) MergeAttributes(base map[string]interface{}, lk []*metricspb.LabelKey, lv []*metricspb.LabelValue) (map[string]interface{}, error) { - if len(lk) != len(lv) { - return nil, fmt.Errorf("%w: number of label keys (%d) different than label values (%d)", errIncompatibleLabels, len(lk), len(lv)) - } - - attrs := make(map[string]interface{}, len(base)+len(lk)) - - for k, v := range base { - attrs[k] = v - } - for i, k := range lk { - v := lv[i] - if v.Value != "" || v.HasValue { - attrs[k.Key] = v.Value - } - } - return attrs, nil -} - -func (t *metricTransformer) Gauge(name string, attrs map[string]interface{}, point *metricspb.Point) telemetry.Metric { - now := time.Now() - if point.Timestamp != nil { - now = t.Timestamp(point.Timestamp) - } - - m := telemetry.Gauge{ - Name: name, - Attributes: attrs, - Timestamp: now, - } - - switch t := point.Value.(type) { - case *metricspb.Point_Int64Value: - m.Value = float64(t.Int64Value) - case *metricspb.Point_DoubleValue: - m.Value = t.DoubleValue - } - - return m -} - -func (t *metricTransformer) DeltaSummary(name string, attrs map[string]interface{}, start time.Time, point *metricspb.Point) telemetry.Metric { - now := time.Now() - if point.Timestamp != nil { - now = t.Timestamp(point.Timestamp) +func (t *transformer) MetricAttributes(baseAttributes map[string]interface{}, attrMap pdata.StringMap) map[string]interface{} { + rawMap := make(map[string]interface{}, len(baseAttributes)+attrMap.Len()) + for k, v := range baseAttributes { + rawMap[k] = v } - - m := telemetry.Summary{ - Name: name, - Attributes: attrs, - Timestamp: start, - Interval: now.Sub(start), - } - - switch t := point.Value.(type) { - case *metricspb.Point_DistributionValue: - m.Count = float64(t.DistributionValue.Count) - m.Sum = t.DistributionValue.Sum - case *metricspb.Point_SummaryValue: - m.Count = float64(t.SummaryValue.Count.Value) - m.Sum = t.SummaryValue.Sum.Value - } - - return m -} - -func (t *metricTransformer) CumulativeCount(name string, attrs map[string]interface{}, start time.Time, point *metricspb.Point) telemetry.Metric { - var value float64 - switch t := point.Value.(type) { - case *metricspb.Point_Int64Value: - value = float64(t.Int64Value) - case *metricspb.Point_DoubleValue: - value = t.DoubleValue - } - - now := time.Now() - if point.Timestamp != nil { - now = t.Timestamp(point.Timestamp) - } - - count, valid := t.DeltaCalculator.CountMetric(name, attrs, value, now) - - // This is the first measurement or a reset happened. - if !valid { - count = telemetry.Count{ - Name: name, - Attributes: attrs, - Value: value, - Timestamp: start, - Interval: now.Sub(start), + attrMap.ForEach(func(k string, v string) { + // Only include attribute if not an override attribute + if _, isOverrideKey := t.OverrideAttributes[k]; !isOverrideKey { + rawMap[k] = v } - } + }) - return count + return rawMap } -func (t *metricTransformer) CumulativeSummary(name string, attrs map[string]interface{}, start time.Time, point *metricspb.Point) telemetry.Metric { - var sum, count float64 - switch t := point.Value.(type) { - case *metricspb.Point_DistributionValue: - sum = t.DistributionValue.Sum - count = float64(t.DistributionValue.Count) - } - - now := time.Now() - if point.Timestamp != nil { - now = t.Timestamp(point.Timestamp) - } - - cCount, cValid := t.DeltaCalculator.CountMetric(name+".count", attrs, count, now) - sCount, sValid := t.DeltaCalculator.CountMetric(name+".sum", attrs, sum, now) - - summary := telemetry.Summary{ - Name: name, - Attributes: attrs, - Count: cCount.Value, - Sum: sCount.Value, - Timestamp: sCount.Timestamp, - Interval: sCount.Interval, - } - - // This is the first measurement or a reset happened. - if !cValid || !sValid { - summary.Count = count - summary.Sum = sum - summary.Timestamp = start - summary.Interval = now.Sub(start) - } - - return summary +func (t *transformer) TrackAttributes(location attributeLocation, attributeMap pdata.AttributeMap) { + attributeMap.ForEach(func(k string, v pdata.AttributeValue) { + statsKey := attributeStatsKey{location: location, attributeType: v.Type()} + t.details.attributeMetadataCount[statsKey]++ + }) } diff --git a/exporter/newrelicexporter/transformer_test.go b/exporter/newrelicexporter/transformer_test.go index 3d5c6213f7fd..29351829882a 100644 --- a/exporter/newrelicexporter/transformer_test.go +++ b/exporter/newrelicexporter/transformer_test.go @@ -15,59 +15,185 @@ package newrelicexporter import ( - "encoding/json" + "context" "errors" + "math" "testing" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/newrelic/newrelic-telemetry-sdk-go/cumulative" "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/internaldata" - "google.golang.org/protobuf/types/known/timestamppb" ) -func TestNewTraceTransformerInstrumentation(t *testing.T) { +func TestCommonAttributes(t *testing.T) { + startInfo := &component.ApplicationStartInfo{ + ExeName: "the-collector", + Version: "0.0.1", + } + + resource := pdata.NewResource() + resource.Attributes().InsertString("resource", "R1") + ilm := pdata.NewInstrumentationLibrary() ilm.SetName("test name") ilm.SetVersion("test version") - transform := newTraceTransformer(pdata.NewResource(), ilm) - require.Contains(t, transform.ResourceAttributes, instrumentationNameKey) - require.Contains(t, transform.ResourceAttributes, instrumentationVersionKey) - assert.Equal(t, transform.ResourceAttributes[instrumentationNameKey], "test name") - assert.Equal(t, transform.ResourceAttributes[instrumentationVersionKey], "test version") + details := newTraceMetadata(context.TODO()) + commonAttrs := newTransformer(startInfo, &details).CommonAttributes(resource, ilm) + assert.Equal(t, "the-collector", commonAttrs[collectorNameKey]) + assert.Equal(t, "0.0.1", commonAttrs[collectorVersionKey]) + assert.Equal(t, "R1", commonAttrs["resource"]) + assert.Equal(t, "test name", commonAttrs[instrumentationNameKey]) + assert.Equal(t, "test version", commonAttrs[instrumentationVersionKey]) + + assert.Equal(t, 1, len(details.attributeMetadataCount)) + assert.Equal(t, 1, details.attributeMetadataCount[attributeStatsKey{location: attributeLocationResource, attributeType: pdata.AttributeValueSTRING}]) } -func defaultAttrFunc(res map[string]interface{}) func(map[string]interface{}) map[string]interface{} { - return func(add map[string]interface{}) map[string]interface{} { - full := make(map[string]interface{}, 2+len(res)+len(add)) - full[collectorNameKey] = name - full[collectorVersionKey] = version - full[instrumentationProviderAttrKey] = "opentelemetry" - for k, v := range res { - full[k] = v - } - for k, v := range add { - full[k] = v - } - return full +func TestDoesNotCaptureResourceAttributeMetadata(t *testing.T) { + startInfo := &component.ApplicationStartInfo{ + ExeName: "the-collector", + Version: "0.0.1", } + + resource := pdata.NewResource() + + ilm := pdata.NewInstrumentationLibrary() + ilm.SetName("test name") + ilm.SetVersion("test version") + + details := newTraceMetadata(context.TODO()) + commonAttrs := newTransformer(startInfo, &details).CommonAttributes(resource, ilm) + + assert.Greater(t, len(commonAttrs), 0) + assert.Equal(t, 0, len(details.attributeMetadataCount)) +} + +func TestCaptureSpanMetadata(t *testing.T) { + details := newTraceMetadata(context.TODO()) + transform := newTransformer(nil, &details) + + tests := []struct { + name string + err error + spanFunc func() pdata.Span + wantKey spanStatsKey + }{ + { + name: "no events or links", + spanFunc: func() pdata.Span { + s := pdata.NewSpan() + s.SetSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 1})) + s.SetName("no events or links") + return s + }, + err: errInvalidTraceID, + wantKey: spanStatsKey{hasEvents: false, hasLinks: false}, + }, + { + name: "has events but no links", + spanFunc: func() pdata.Span { + s := pdata.NewSpan() + s.SetTraceID(pdata.NewTraceID([...]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetName("invalid SpanID") + s.Events().Append(pdata.NewSpanEvent()) + return s + }, + err: errInvalidSpanID, + wantKey: spanStatsKey{hasEvents: true, hasLinks: false}, + }, + { + name: "no events but has links", + spanFunc: func() pdata.Span { + s := pdata.NewSpan() + s.SetTraceID(pdata.NewTraceID([...]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 1})) + s.SetName("no events but has links") + s.Links().Append(pdata.NewSpanLink()) + return s + }, + wantKey: spanStatsKey{hasEvents: false, hasLinks: true}, + }, + { + name: "has events and links", + spanFunc: func() pdata.Span { + s := pdata.NewSpan() + s.SetTraceID(pdata.NewTraceID([...]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 2})) + s.SetParentSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 1})) + s.SetName("has events and links") + s.Events().Append(pdata.NewSpanEvent()) + s.Links().Append(pdata.NewSpanLink()) + return s + }, + wantKey: spanStatsKey{hasEvents: true, hasLinks: true}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := transform.Span(test.spanFunc()) + if test.err != nil { + assert.True(t, errors.Is(err, test.err)) + } else { + require.NoError(t, err) + } + assert.Equal(t, 1, details.spanMetadataCount[test.wantKey]) + }) + } +} + +func TestCaptureSpanAttributeMetadata(t *testing.T) { + details := newTraceMetadata(context.TODO()) + transform := newTransformer(nil, &details) + + se := pdata.NewSpanEvent() + se.Attributes().InsertBool("testattr", true) + + s := pdata.NewSpan() + s.SetTraceID(pdata.NewTraceID([...]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 2})) + s.SetParentSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 1})) + s.SetName("test span") + s.Events().Append(se) + s.Attributes().InsertInt("spanattr", 42) + + _, err := transform.Span(s) + + require.NoError(t, err) + assert.Equal(t, 2, len(details.attributeMetadataCount)) + assert.Equal(t, 1, details.attributeMetadataCount[attributeStatsKey{location: attributeLocationSpan, attributeType: pdata.AttributeValueINT}]) + assert.Equal(t, 1, details.attributeMetadataCount[attributeStatsKey{location: attributeLocationSpanEvent, attributeType: pdata.AttributeValueBOOL}]) +} + +func TestDoesNotCaptureSpanAttributeMetadata(t *testing.T) { + details := newTraceMetadata(context.TODO()) + transform := newTransformer(nil, &details) + + se := pdata.NewSpanEvent() + + s := pdata.NewSpan() + s.SetTraceID(pdata.NewTraceID([...]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + s.SetSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 2})) + s.SetParentSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 1})) + s.SetName("test span") + s.Events().Append(se) + + _, err := transform.Span(s) + + require.NoError(t, err) + assert.Equal(t, 0, len(details.attributeMetadataCount)) } func TestTransformSpan(t *testing.T) { now := time.Unix(100, 0) - rattr := map[string]interface{}{ - serviceNameKey: "test-service", - "resource": "R1", - } - transform := &traceTransformer{ResourceAttributes: rattr} - withDefaults := defaultAttrFunc(rattr) + details := newTraceMetadata(context.TODO()) + transform := newTransformer(nil, &details) tests := []struct { name string @@ -88,7 +214,7 @@ func TestTransformSpan(t *testing.T) { ID: "0000000000000001", Name: "invalid TraceID", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(nil), + Attributes: map[string]interface{}{}, }, }, { @@ -104,7 +230,7 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "invalid SpanID", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(nil), + Attributes: map[string]interface{}{}, }, }, { @@ -121,7 +247,7 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "root", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(nil), + Attributes: map[string]interface{}{}, Events: nil, }, }, @@ -141,7 +267,7 @@ func TestTransformSpan(t *testing.T) { Name: "client", ParentID: "0000000000000001", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(nil), + Attributes: map[string]interface{}{}, Events: nil, }, }, @@ -164,9 +290,9 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "error code", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(map[string]interface{}{ + Attributes: map[string]interface{}{ statusCodeKey: "ERROR", - }), + }, Events: nil, }, }, @@ -189,17 +315,17 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "error message", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(map[string]interface{}{ + Attributes: map[string]interface{}{ statusCodeKey: "ERROR", statusDescriptionKey: "error message", - }), + }, Events: nil, }, }, { name: "attributes", spanFunc: func() pdata.Span { - // There is no setter method for Attributes so convert instead. + // There is no setter method for attributes so convert instead. return internaldata.OCToTraces( nil, nil, []*tracepb.Span{ { @@ -239,12 +365,12 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "attrs", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(map[string]interface{}{ + Attributes: map[string]interface{}{ "prod": true, "weight": int64(10), "score": 99.8, "user": "alice", - }), + }, Events: nil, }, }, @@ -265,7 +391,7 @@ func TestTransformSpan(t *testing.T) { Name: "with time", Timestamp: now.UTC(), Duration: time.Second * 5, - Attributes: withDefaults(nil), + Attributes: map[string]interface{}{}, Events: nil, }, }, @@ -284,9 +410,9 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "span kind server", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(map[string]interface{}{ + Attributes: map[string]interface{}{ spanKindKey: "server", - }), + }, Events: nil, }, }, @@ -311,7 +437,7 @@ func TestTransformSpan(t *testing.T) { TraceID: "01010101010101010101010101010101", Name: "with events", Timestamp: time.Unix(0, 0).UTC(), - Attributes: withDefaults(nil), + Attributes: map[string]interface{}{}, Events: []telemetry.Event{ { EventType: "this is the event name", @@ -336,316 +462,440 @@ func TestTransformSpan(t *testing.T) { } } -func TestMergeAttributesIncompatibleLenghts(t *testing.T) { - transform := &metricTransformer{} - lk := make([]*metricspb.LabelKey, 2) - lv := make([]*metricspb.LabelValue, 3) - _, err := transform.MergeAttributes(nil, lk, lv) - require.Error(t, err) - assert.True(t, errors.Is(err, errIncompatibleLabels)) +func testTransformMetric(t *testing.T, metric pdata.Metric, want []telemetry.Metric) { + comparer := func(t *testing.T, want []telemetry.Metric, got []telemetry.Metric) { + assert.Equal(t, want, got) + } + testTransformMetricWithComparer(t, metric, want, comparer) } -func TestTransformEmptyMetric(t *testing.T) { - transform := &metricTransformer{} - _, err := transform.Metric(nil) - assert.Error(t, err, "nil metric should return an error") - _, err = transform.Metric(&metricspb.Metric{}) - assert.Error(t, err, "nil metric descriptor should return an error") +func testTransformMetricWithComparer(t *testing.T, metric pdata.Metric, want []telemetry.Metric, compare func(t *testing.T, want []telemetry.Metric, got []telemetry.Metric)) { + details := newMetricMetadata(context.Background()) + transform := newTransformer(&component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }, &details) + got, err := transform.Metric(metric) + require.NoError(t, err) + compare(t, want, got) + + assert.Equal(t, len(details.metricMetadataCount), 1) + for k, v := range details.metricMetadataCount { + assert.Equal(t, metric.DataType(), k.MetricType) + assert.Equal(t, 1, v) + } } -func testTransformMetric(t *testing.T, metric *metricspb.Metric, want []telemetry.Metric) { - transform := &metricTransformer{ - DeltaCalculator: cumulative.NewDeltaCalculator(), - ServiceName: "test-service", - Resource: &resourcepb.Resource{ - Labels: map[string]string{ - "resource": "R1", - }, - }, +func testTransformMetricWithError(t *testing.T, metric pdata.Metric, expectedErrorType interface{}) { + details := newMetricMetadata(context.Background()) + transform := newTransformer(&component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }, &details) + _, err := transform.Metric(metric) + assert.IsType(t, expectedErrorType, err) + + assert.Equal(t, len(details.metricMetadataCount), 1) + for k, v := range details.metricMetadataCount { + assert.Equal(t, metric.DataType(), k.MetricType) + assert.Equal(t, 1, v) } - got, err := transform.Metric(metric) - require.NoError(t, err) - assert.Equal(t, want, got) } -func TestTransformGuage(t *testing.T) { - ts := ×tamppb.Timestamp{Seconds: 1} +func TestTransformGauge(t *testing.T) { + ts := pdata.TimestampFromTime(time.Unix(1, 0)) expected := []telemetry.Metric{ telemetry.Gauge{ Name: "gauge", Value: 42.0, - Timestamp: time.Unix(1, 0), + Timestamp: ts.AsTime(), Attributes: map[string]interface{}{ - collectorNameKey: name, - collectorVersionKey: version, - "resource": "R1", - "service.name": "test-service", + "unit": "1", + "description": "description", }, }, } + { + m := pdata.NewMetric() + m.SetName("gauge") + m.SetDescription("description") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeDoubleGauge) + gd := m.DoubleGauge() + dp := pdata.NewDoubleDataPoint() + dp.SetTimestamp(ts) + dp.SetValue(42.0) + gd.DataPoints().Append(dp) + t.Run("Double", func(t *testing.T) { testTransformMetric(t, m, expected) }) + } + { + m := pdata.NewMetric() + m.SetName("gauge") + m.SetDescription("description") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeIntGauge) + gi := m.IntGauge() + dp := pdata.NewIntDataPoint() + dp.SetTimestamp(ts) + dp.SetValue(42) + gi.DataPoints().Append(dp) + t.Run("Int64", func(t *testing.T) { testTransformMetric(t, m, expected) }) + } +} - gd := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "gauge", - Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, - }, - Timeseries: []*metricspb.TimeSeries{ - { - Points: []*metricspb.Point{ - { - Timestamp: ts, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 42.0, - }, - }, - }, +func TestTransformSum(t *testing.T) { + start := pdata.TimestampFromTime(time.Unix(1, 0)) + end := pdata.TimestampFromTime(time.Unix(3, 0)) + + expected := []telemetry.Metric{ + telemetry.Count{ + Name: "sum", + Value: 42.0, + Timestamp: start.AsTime(), + Interval: time.Second * 2, + Attributes: map[string]interface{}{ + "unit": "1", + "description": "description", }, }, } - t.Run("Double", func(t *testing.T) { testTransformMetric(t, gd, expected) }) - gi := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "gauge", - Type: metricspb.MetricDescriptor_GAUGE_INT64, - }, - Timeseries: []*metricspb.TimeSeries{ - { - Points: []*metricspb.Point{ - { - Timestamp: ts, - Value: &metricspb.Point_Int64Value{ - Int64Value: 42, - }, - }, - }, - }, - }, + { + m := pdata.NewMetric() + m.SetName("sum") + m.SetDescription("description") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeDoubleSum) + d := m.DoubleSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dp := pdata.NewDoubleDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetValue(42.0) + d.DataPoints().Append(dp) + t.Run("DoubleSum-Delta", func(t *testing.T) { testTransformMetric(t, m, expected) }) + } + { + m := pdata.NewMetric() + m.SetName("sum") + m.SetDescription("description") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeDoubleSum) + d := m.DoubleSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dp := pdata.NewDoubleDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetValue(42.0) + d.DataPoints().Append(dp) + t.Run("DoubleSum-Cumulative", func(t *testing.T) { testTransformMetricWithError(t, m, &errUnsupportedMetricType{}) }) + } + { + m := pdata.NewMetric() + m.SetName("sum") + m.SetDescription("description") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeIntSum) + d := m.IntSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dp := pdata.NewIntDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetValue(42.0) + d.DataPoints().Append(dp) + t.Run("IntSum-Delta", func(t *testing.T) { testTransformMetric(t, m, expected) }) + } + { + m := pdata.NewMetric() + m.SetName("sum") + m.SetDescription("description") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeIntSum) + d := m.IntSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dp := pdata.NewIntDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetValue(42.0) + d.DataPoints().Append(dp) + t.Run("IntSum-Cumulative", func(t *testing.T) { testTransformMetricWithError(t, m, &errUnsupportedMetricType{}) }) } - t.Run("Int64", func(t *testing.T) { testTransformMetric(t, gi, expected) }) } func TestTransformDeltaSummary(t *testing.T) { - start := ×tamppb.Timestamp{Seconds: 1} - ts := ×tamppb.Timestamp{Seconds: 2} + testTransformDeltaSummaryWithValues(t, "Double With Min and Max", 2, 7, 1, 6) + testTransformDeltaSummaryWithValues(t, "Double With Min and No Max", 1, 1, 1, math.NaN()) + testTransformDeltaSummaryWithValues(t, "Double With Max and No Min", 1, 1, math.NaN(), 1) + testTransformDeltaSummaryWithValues(t, "Double With No Min and No Max", 0, 0, math.NaN(), math.NaN()) +} + +func testTransformDeltaSummaryWithValues(t *testing.T, testName string, count uint64, sum float64, min float64, max float64) { + start := pdata.TimestampFromTime(time.Unix(1, 0)) + end := pdata.TimestampFromTime(time.Unix(3, 0)) + expected := []telemetry.Metric{ telemetry.Summary{ Name: "summary", - Count: 2.0, - Sum: 7.0, - Timestamp: time.Unix(1, 0), - Interval: time.Second, + Count: float64(count), + Sum: sum, + Min: min, + Max: max, + Timestamp: time.Unix(1, 0).UTC(), + Interval: 2 * time.Second, Attributes: map[string]interface{}{ - collectorNameKey: name, - collectorVersionKey: version, - "resource": "R1", - "service.name": "test-service", + "description": "description", + "unit": "s", + "foo": "bar", }, }, } - gd := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "summary", - Type: metricspb.MetricDescriptor_GAUGE_DISTRIBUTION, - }, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: start, - Points: []*metricspb.Point{ - { - Timestamp: ts, - Value: &metricspb.Point_DistributionValue{ - DistributionValue: &metricspb.DistributionValue{ - Count: 2, - Sum: 7, - }, - }, - }, - }, - }, - }, + comparer := func(t *testing.T, want []telemetry.Metric, got []telemetry.Metric) { + assert.Equal(t, len(want), len(got)) + + for i := 0; i < len(want); i++ { + wantedSummary, ok := want[i].(telemetry.Summary) + assert.True(t, ok) + gotSummary, ok := got[i].(telemetry.Summary) + assert.True(t, ok) + assert.Equal(t, wantedSummary.Name, gotSummary.Name) + assert.Equal(t, wantedSummary.Count, gotSummary.Count) + assert.Equal(t, wantedSummary.Sum, gotSummary.Sum) + assert.Equal(t, wantedSummary.Timestamp, gotSummary.Timestamp) + assert.Equal(t, wantedSummary.Interval, gotSummary.Interval) + assert.Equal(t, wantedSummary.Attributes, gotSummary.Attributes) + if math.IsNaN(wantedSummary.Min) { + assert.True(t, math.IsNaN(gotSummary.Min)) + } else { + assert.Equal(t, wantedSummary.Min, gotSummary.Min) + } + if math.IsNaN(wantedSummary.Max) { + assert.True(t, math.IsNaN(gotSummary.Max)) + } else { + assert.Equal(t, wantedSummary.Max, gotSummary.Max) + } + } } - t.Run("Distribution", func(t *testing.T) { - testTransformMetric(t, gd, expected) - // Should be a delta, running twice should not change state. - testTransformMetric(t, gd, expected) - }) - - /* Remove only dependency on wrapperspb. - s := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "summary", - Type: metricspb.MetricDescriptor_SUMMARY, - }, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: start, - Points: []*metricspb.Point{ - { - Timestamp: ts, - Value: &metricspb.Point_SummaryValue{ - SummaryValue: &metricspb.SummaryValue{ - Count: &wrapperspb.Int64Value{Value: 2}, - Sum: &wrapperspb.DoubleValue{Value: 7}, - }, - }, - }, - }, - }, - }, + + m := pdata.NewMetric() + m.SetName("summary") + m.SetDescription("description") + m.SetUnit("s") + m.SetDataType(pdata.MetricDataTypeSummary) + ds := m.Summary() + dp := pdata.NewSummaryDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetSum(sum) + dp.SetCount(count) + dp.LabelsMap().Insert("foo", "bar") + q := dp.QuantileValues() + if !math.IsNaN(min) { + minQuantile := pdata.NewValueAtQuantile() + minQuantile.SetQuantile(0) + minQuantile.SetValue(min) + q.Append(minQuantile) + } + if !math.IsNaN(max) { + maxQuantile := pdata.NewValueAtQuantile() + maxQuantile.SetQuantile(1) + maxQuantile.SetValue(max) + q.Append(maxQuantile) } - t.Run("Summary", func(t *testing.T) { - testTransformMetric(t, s, expected) - // Should be a delta, running twice should not change state. - testTransformMetric(t, s, expected) - }) - */ + ds.DataPoints().Append(dp) + + t.Run(testName, func(t *testing.T) { testTransformMetricWithComparer(t, m, expected, comparer) }) } -func TestTransformCumulativeCount(t *testing.T) { - start := ×tamppb.Timestamp{Seconds: 1} - ts1 := ×tamppb.Timestamp{Seconds: 2} - ts2 := ×tamppb.Timestamp{Seconds: 3} - attrs := map[string]interface{}{ - collectorNameKey: name, - collectorVersionKey: version, - "resource": "R1", - "service.name": "test-service", +func TestUnsupportedMetricTypes(t *testing.T) { + start := pdata.TimestampFromTime(time.Unix(1, 0)) + end := pdata.TimestampFromTime(time.Unix(3, 0)) + + { + m := pdata.NewMetric() + m.SetName("no") + m.SetDescription("no") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeIntHistogram) + h := m.IntHistogram() + dp := pdata.NewIntHistogramDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetCount(2) + dp.SetSum(8) + dp.SetExplicitBounds([]float64{3, 7, 11}) + dp.SetBucketCounts([]uint64{1, 1, 0, 0}) + h.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + h.DataPoints().Append(dp) + + t.Run("IntHistogram", func(t *testing.T) { testTransformMetricWithError(t, m, &errUnsupportedMetricType{}) }) } - jsonAttrs, err := json.Marshal(attrs) - require.NoError(t, err) - expected := []telemetry.Metric{ - telemetry.Count{ - Name: "count", - Value: 5.0, - Timestamp: time.Unix(1, 0), - Interval: time.Second, - Attributes: attrs, - }, - telemetry.Count{ - Name: "count", - Value: 2.0, - Timestamp: time.Unix(2, 0), - Interval: time.Second, - AttributesJSON: jsonAttrs, - }, + { + m := pdata.NewMetric() + m.SetName("no") + m.SetDescription("no") + m.SetUnit("1") + m.SetDataType(pdata.MetricDataTypeHistogram) + h := m.Histogram() + dp := pdata.NewHistogramDataPoint() + dp.SetStartTimestamp(start) + dp.SetTimestamp(end) + dp.SetCount(2) + dp.SetSum(8.0) + dp.SetExplicitBounds([]float64{3, 7, 11}) + dp.SetBucketCounts([]uint64{1, 1, 0, 0}) + h.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + h.DataPoints().Append(dp) + + t.Run("DoubleHistogram", func(t *testing.T) { testTransformMetricWithError(t, m, &errUnsupportedMetricType{}) }) } +} - cd := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "count", - Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, +func TestTransformUnknownMetricType(t *testing.T) { + metric := pdata.NewMetric() + details := newMetricMetadata(context.Background()) + transform := newTransformer(&component.ApplicationStartInfo{ + ExeName: testCollectorName, + Version: testCollectorVersion, + }, &details) + + got, err := transform.Metric(metric) + + require.NoError(t, err) + assert.Nil(t, got) + assert.Equal(t, 1, details.metricMetadataCount[metricStatsKey{MetricType: pdata.MetricDataTypeNone}]) +} + +func TestTransformer_Log(t *testing.T) { + tests := []struct { + name string + logFunc func() pdata.LogRecord + want telemetry.Log + }{ + { + name: "Basic Conversion", + logFunc: func() pdata.LogRecord { + log := pdata.NewLogRecord() + timestamp := pdata.TimestampFromTime(time.Unix(0, 0).UTC()) + log.SetTimestamp(timestamp) + return log + }, + want: telemetry.Log{ + Message: "", + Timestamp: time.Unix(0, 0).UTC(), + Attributes: map[string]interface{}{"name": ""}, + }, }, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: start, - Points: []*metricspb.Point{ - { - Timestamp: ts1, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 5.0, - }, - }, - { - Timestamp: ts2, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 7.0, - }, - }, - }, + { + name: "With Log attributes", + logFunc: func() pdata.LogRecord { + log := pdata.NewLogRecord() + log.SetName("bloopbleep") + log.Attributes().InsertString("foo", "bar") + log.Body().SetStringVal("Hello World") + return log + }, + want: telemetry.Log{ + Message: "Hello World", + Timestamp: time.Unix(0, 0).UTC(), + Attributes: map[string]interface{}{"foo": "bar", "name": "bloopbleep"}, }, }, - } - t.Run("Double", func(t *testing.T) { testTransformMetric(t, cd, expected) }) - - ci := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "count", - Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, + { + name: "With severity number", + logFunc: func() pdata.LogRecord { + log := pdata.NewLogRecord() + log.SetName("bloopbleep") + log.SetSeverityNumber(pdata.SeverityNumberWARN) + return log + }, + want: telemetry.Log{ + Message: "bloopbleep", + Timestamp: time.Unix(0, 0).UTC(), + Attributes: map[string]interface{}{"name": "bloopbleep", "log.levelNum": int32(13)}, + }, }, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: start, - Points: []*metricspb.Point{ - { - Timestamp: ts1, - Value: &metricspb.Point_Int64Value{ - Int64Value: 5, - }, - }, - { - Timestamp: ts2, - Value: &metricspb.Point_Int64Value{ - Int64Value: 7, - }, - }, + { + name: "With severity text", + logFunc: func() pdata.LogRecord { + log := pdata.NewLogRecord() + log.SetName("bloopbleep") + log.SetSeverityText("SEVERE") + return log + }, + want: telemetry.Log{ + Message: "bloopbleep", + Timestamp: time.Unix(0, 0).UTC(), + Attributes: map[string]interface{}{"name": "bloopbleep", "log.level": "SEVERE"}, + }, + }, + { + name: "With traceID and spanID", + logFunc: func() pdata.LogRecord { + log := pdata.NewLogRecord() + timestamp := pdata.TimestampFromTime(time.Unix(0, 0).UTC()) + log.SetTraceID(pdata.NewTraceID([...]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})) + log.SetSpanID(pdata.NewSpanID([...]byte{0, 0, 0, 0, 0, 0, 0, 1})) + log.SetTimestamp(timestamp) + return log + }, + want: telemetry.Log{ + Message: "", + Timestamp: time.Unix(0, 0).UTC(), + Attributes: map[string]interface{}{ + "name": "", + "trace.id": "01010101010101010101010101010101", + "span.id": "0000000000000001", }, }, }, } - t.Run("Int64", func(t *testing.T) { testTransformMetric(t, ci, expected) }) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + details := newLogMetadata(context.TODO()) + transform := newTransformer(nil, &details) + got, _ := transform.Log(test.logFunc()) + assert.EqualValues(t, test.want, got) + }) + } } -func TestTransformCumulativeSummary(t *testing.T) { - start := ×tamppb.Timestamp{Seconds: 1} - ts1 := ×tamppb.Timestamp{Seconds: 2} - ts2 := ×tamppb.Timestamp{Seconds: 3} - attrs := map[string]interface{}{ - collectorNameKey: name, - collectorVersionKey: version, - "resource": "R1", - "service.name": "test-service", - } - expected := []telemetry.Metric{ - telemetry.Summary{ - Name: "summary", - Sum: 5.0, - Count: 53.0, - Timestamp: time.Unix(1, 0), - Interval: time.Second, - Attributes: attrs, - }, - telemetry.Summary{ - Name: "summary", - Sum: 3.0, - Count: 3.0, - Timestamp: time.Unix(2, 0), - Interval: time.Second, - Attributes: attrs, - }, - } +func TestCaptureLogAttributeMetadata(t *testing.T) { + log := pdata.NewLogRecord() + log.SetName("bloopbleep") + log.Attributes().InsertString("foo", "bar") + log.Body().SetStringVal("Hello World") - cd := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: "summary", - Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, - }, - Timeseries: []*metricspb.TimeSeries{ - { - StartTimestamp: start, - Points: []*metricspb.Point{ - { - Timestamp: ts1, - Value: &metricspb.Point_DistributionValue{ - DistributionValue: &metricspb.DistributionValue{ - Count: 53, - Sum: 5, - }, - }, - }, - { - Timestamp: ts2, - Value: &metricspb.Point_DistributionValue{ - DistributionValue: &metricspb.DistributionValue{ - Count: 56, - Sum: 8, - }, - }, - }, - }, - }, - }, + details := newLogMetadata(context.TODO()) + transform := newTransformer(nil, &details) + _, err := transform.Log(log) + + require.NoError(t, err) + assert.Equal(t, 1, len(details.attributeMetadataCount)) + assert.Equal(t, 1, details.attributeMetadataCount[attributeStatsKey{location: attributeLocationLog, attributeType: pdata.AttributeValueSTRING}]) +} + +func TestDoesNotCaptureLogAttributeMetadata(t *testing.T) { + log := pdata.NewLogRecord() + log.SetName("bloopbleep") + log.Body().SetStringVal("Hello World") + + details := newLogMetadata(context.TODO()) + transform := newTransformer(nil, &details) + _, err := transform.Log(log) + + require.NoError(t, err) + assert.Equal(t, 0, len(details.attributeMetadataCount)) +} + +func TestUnsupportedMetricErrorCreation(t *testing.T) { + e := errUnsupportedMetricType{ + metricType: "testType", + metricName: "testName", + numDataPoints: 1, } - t.Run("Distribution", func(t *testing.T) { testTransformMetric(t, cd, expected) }) + + errorMessage := e.Error() + + assert.Equal(t, "unsupported metric testName (testType)", errorMessage) } diff --git a/go.sum b/go.sum index 1281ceccf822..5b73419879ac 100644 --- a/go.sum +++ b/go.sum @@ -1172,8 +1172,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= github.com/nbutton23/zxcvbn-go v0.0.0-20201221231540-e56b841a3c88/go.mod h1:KSVJerMDfblTH7p5MZaTt+8zaT2iEk3AkVb9PQdZuE8= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= -github.com/newrelic/newrelic-telemetry-sdk-go v0.6.0 h1:XAzKWY7T2speu5ZwW3pwgNBKaOeERj+pMXq7xmnnlCQ= -github.com/newrelic/newrelic-telemetry-sdk-go v0.6.0/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E= +github.com/newrelic/newrelic-telemetry-sdk-go v0.7.0 h1:tS/wanndQjrBFTQFeNb3jN1iJeUPSPLi74Y58ZtYAkQ= +github.com/newrelic/newrelic-telemetry-sdk-go v0.7.0/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/exhaustive v0.1.0/go.mod h1:S1j9110vxV1ECdCudXRkeMnFQ/DQk9ajLT0Uf2MYZQQ= github.com/nishanths/predeclared v0.2.1/go.mod h1:HvkGJcA3naj4lOwnFXFDkFxVtSqQMB9sbB1usJ+xjQE=