diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index cf6b1b013be9..c4708b3f863e 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -90,6 +90,13 @@ func (c *client) pushMetricsData( req.Header.Set("Content-Encoding", "gzip") } + if md.ResourceMetrics().Len() != 0 { + accessToken, found := md.ResourceMetrics().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) + if found { + req.Header.Set("Authorization", splunk.HECTokenHeader+" "+accessToken.StringVal()) + } + } + resp, err := c.client.Do(req) if err != nil { return err @@ -136,6 +143,18 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error { // Callback when each batch is to be sent. send := func(ctx context.Context, buf *bytes.Buffer, headers map[string]string) (err error) { + localHeaders := headers + if ld.ResourceLogs().Len() != 0 { + accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) + if found { + localHeaders = map[string]string{} + for k, v := range headers { + localHeaders[k] = v + } + localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.StringVal() + } + } + shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression if shouldCompress { @@ -150,10 +169,10 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error { return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err) } - return c.postEvents(ctx, gzipBuffer, headers, shouldCompress) + return c.postEvents(ctx, gzipBuffer, localHeaders, shouldCompress) } - return c.postEvents(ctx, buf, headers, shouldCompress) + return c.postEvents(ctx, buf, localHeaders, shouldCompress) } return c.pushLogDataInBatches(ctx, ld, send) diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index c998cdafae50..44db2d14c958 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -21,10 +21,12 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter/exporterhelper" conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr" ) const ( @@ -34,6 +36,18 @@ const ( defaultHTTPTimeout = 10 * time.Second ) +// TODO: Find a place for this to be shared. +type baseMetricsExporter struct { + component.Component + consumer.Metrics +} + +// TODO: Find a place for this to be shared. +type baseLogsExporter struct { + component.Component + consumer.Logs +} + // NewFactory creates a factory for Splunk HEC exporter. func NewFactory() component.ExporterFactory { return exporterhelper.NewFactory( @@ -112,7 +126,7 @@ func createMetricsExporter( return nil, err } - return exporterhelper.NewMetricsExporter( + exporter, err := exporterhelper.NewMetricsExporter( expCfg, set, exp.pushMetricsData, @@ -122,6 +136,16 @@ func createMetricsExporter( exporterhelper.WithQueue(expCfg.QueueSettings), exporterhelper.WithStart(exp.start), exporterhelper.WithShutdown(exp.stop)) + if err != nil { + return nil, err + } + + wrapped := &baseMetricsExporter{ + Component: exporter, + Metrics: batchperresourceattr.NewBatchPerResourceMetrics(splunk.HecTokenLabel, exporter), + } + + return wrapped, nil } func createLogsExporter( @@ -140,7 +164,7 @@ func createLogsExporter( return nil, err } - return exporterhelper.NewLogsExporter( + logsExporter, err := exporterhelper.NewLogsExporter( expCfg, set, exp.pushLogData, @@ -150,4 +174,15 @@ func createLogsExporter( exporterhelper.WithQueue(expCfg.QueueSettings), exporterhelper.WithStart(exp.start), exporterhelper.WithShutdown(exp.stop)) + + if err != nil { + return nil, err + } + + wrapped := &baseLogsExporter{ + Component: logsExporter, + Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, logsExporter), + } + + return wrapped, nil } diff --git a/exporter/splunkhecexporter/go.mod b/exporter/splunkhecexporter/go.mod index 7273e3e44691..8b90706fbd9a 100644 --- a/exporter/splunkhecexporter/go.mod +++ b/exporter/splunkhecexporter/go.mod @@ -5,7 +5,9 @@ go 1.17 require ( github.com/census-instrumentation/opencensus-proto v0.3.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.39.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.39.0 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.39.0 @@ -48,4 +50,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/splun replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus => ../../pkg/translator/opencensus +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr => ../../pkg/batchperresourceattr + replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal diff --git a/exporter/splunkhecexporter/logdata_to_splunk.go b/exporter/splunkhecexporter/logdata_to_splunk.go index 1a197fae55b3..02b2da054a50 100644 --- a/exporter/splunkhecexporter/logdata_to_splunk.go +++ b/exporter/splunkhecexporter/logdata_to_splunk.go @@ -82,6 +82,8 @@ func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *C sourcetype = v.StringVal() case indexKey: index = v.StringVal() + case splunk.HecTokenLabel: + // ignore default: fields[k] = convertAttributeValue(v, logger) } @@ -98,6 +100,8 @@ func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *C sourcetype = v.StringVal() case indexKey: index = v.StringVal() + case splunk.HecTokenLabel: + // ignore default: fields[k] = convertAttributeValue(v, logger) } diff --git a/exporter/splunkhecexporter/logdata_to_splunk_test.go b/exporter/splunkhecexporter/logdata_to_splunk_test.go index e84b34a8b3b1..63b7530d6479 100644 --- a/exporter/splunkhecexporter/logdata_to_splunk_test.go +++ b/exporter/splunkhecexporter/logdata_to_splunk_test.go @@ -85,6 +85,27 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) { "myhost", "myapp", "myapp-type"), }, }, + { + name: "with_hec_token", + logRecordFn: func() pdata.LogRecord { + logRecord := pdata.NewLogRecord() + logRecord.Body().SetStringVal("mylog") + logRecord.Attributes().InsertString(splunk.HecTokenLabel, "mytoken") + logRecord.SetTimestamp(ts) + return logRecord + }, + logResourceFn: pdata.NewResource, + configDataFn: func() *Config { + config := createDefaultConfig().(*Config) + config.Source = "source" + config.SourceType = "sourcetype" + return config + }, + wantSplunkEvents: []*splunk.Event{ + commonLogSplunkEvent("mylog", ts, map[string]interface{}{}, + "unknown", "source", "sourcetype"), + }, + }, { name: "non-string attribute", logRecordFn: func() pdata.LogRecord { diff --git a/exporter/splunkhecexporter/metricdata_to_splunk.go b/exporter/splunkhecexporter/metricdata_to_splunk.go index 9975eb6bf7c3..8e8817b24791 100644 --- a/exporter/splunkhecexporter/metricdata_to_splunk.go +++ b/exporter/splunkhecexporter/metricdata_to_splunk.go @@ -86,6 +86,8 @@ func metricDataToSplunk(logger *zap.Logger, data pdata.Metrics, config *Config) sourceType = v.StringVal() case indexKey: index = v.StringVal() + case splunk.HecTokenLabel: + // ignore default: commonFields[k] = v.AsString() } diff --git a/exporter/splunkhecexporter/tracedata_to_splunk.go b/exporter/splunkhecexporter/tracedata_to_splunk.go index 4d4eff4b595b..83eec0052591 100644 --- a/exporter/splunkhecexporter/tracedata_to_splunk.go +++ b/exporter/splunkhecexporter/tracedata_to_splunk.go @@ -85,6 +85,8 @@ func traceDataToSplunk(logger *zap.Logger, data pdata.Traces, config *Config) ([ sourceType = v.StringVal() case indexKey: index = v.StringVal() + case splunk.HecTokenLabel: + // ignore default: commonFields[k] = v.AsString() } diff --git a/receiver/splunkhecreceiver/go.mod b/receiver/splunkhecreceiver/go.mod index e6078d3d2e18..546ece2e309c 100644 --- a/receiver/splunkhecreceiver/go.mod +++ b/receiver/splunkhecreceiver/go.mod @@ -7,6 +7,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter v0.39.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.39.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.39.0 go.opentelemetry.io/collector/model v0.39.0 diff --git a/receiver/splunkhecreceiver/go.sum b/receiver/splunkhecreceiver/go.sum index 080d4769cee9..f14ce48f9bc9 100644 --- a/receiver/splunkhecreceiver/go.sum +++ b/receiver/splunkhecreceiver/go.sum @@ -315,6 +315,10 @@ github.com/mostynb/go-grpc-compression v1.1.14/go.mod h1:z6FFnn1nEs6zxqgicvXTqb9 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.38.0 h1:6DCsNg1Du+SHmYywdOBhQdrzrSMj5d3XTGHigUrYEiw= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.38.0/go.mod h1:xBn59tQWM94iv9UorkzsaOnZtAnyEPWreubcIV1TKCs= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0 h1:cBLG+mcF+JoyQ+XR+Bv9NYEATecvUkgzFrcwCIm01gc= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0/go.mod h1:3B8fsDcmbCLO72Qf7J+4ZWc4APtK8sRKlw3ocmC0X4E= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/receiver/splunkhecreceiver/receiver.go b/receiver/splunkhecreceiver/receiver.go index d990491565af..40aa8199c4d1 100644 --- a/receiver/splunkhecreceiver/receiver.go +++ b/receiver/splunkhecreceiver/receiver.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net" "net/http" + "strings" "sync" "time" @@ -367,10 +368,11 @@ func (r *splunkReceiver) consumeLogs(ctx context.Context, events []*splunk.Event func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resource pdata.Resource) { if r.config.AccessTokenPassthrough { - accessToken := req.Header.Get(splunk.HECTokenHeader) - if accessToken != "" { + accessToken := req.Header.Get("Authorization") + if strings.HasPrefix(accessToken, splunk.HECTokenHeader+" ") { + accessTokenValue := accessToken[len(splunk.HECTokenHeader)+1:] return func(resource pdata.Resource) { - resource.Attributes().InsertString(splunk.HecTokenLabel, accessToken) + resource.Attributes().InsertString(splunk.HecTokenLabel, accessTokenValue) } } } diff --git a/receiver/splunkhecreceiver/receiver_test.go b/receiver/splunkhecreceiver/receiver_test.go index 13cac6447e71..a1c7eab28171 100644 --- a/receiver/splunkhecreceiver/receiver_test.go +++ b/receiver/splunkhecreceiver/receiver_test.go @@ -447,29 +447,63 @@ func Test_splunkhecReceiver_TLS(t *testing.T) { func Test_splunkhecReceiver_AccessTokenPassthrough(t *testing.T) { tests := []struct { - name string - passthrough bool - token pdata.AttributeValue + name string + passthrough bool + tokenProvided string + tokenExpected string + metric bool }{ { - name: "No token provided and passthrough false", - passthrough: false, - token: pdata.NewAttributeValueEmpty(), + name: "Log, No token provided and passthrough false", + tokenExpected: "ignored", + passthrough: false, + metric: false, }, { - name: "No token provided and passthrough true", - passthrough: true, - token: pdata.NewAttributeValueEmpty(), + name: "Log, No token provided and passthrough true", + tokenExpected: "ignored", + passthrough: true, + metric: false, }, { - name: "token provided and passthrough false", - passthrough: false, - token: pdata.NewAttributeValueString("myToken"), + name: "Log, token provided and passthrough false", + tokenProvided: "passthroughToken", + passthrough: false, + tokenExpected: "ignored", + metric: false, }, { - name: "token provided and passthrough true", - passthrough: true, - token: pdata.NewAttributeValueString("myToken"), + name: "Log, token provided and passthrough true", + passthrough: true, + tokenProvided: "passthroughToken", + tokenExpected: "passthroughToken", + metric: false, + }, + { + name: "Metric, No token provided and passthrough false", + tokenExpected: "ignored", + passthrough: false, + metric: true, + }, + { + name: "Metric, No token provided and passthrough true", + tokenExpected: "ignored", + passthrough: true, + metric: true, + }, + { + name: "Metric, token provided and passthrough false", + tokenProvided: "passthroughToken", + passthrough: false, + tokenExpected: "ignored", + metric: true, + }, + { + name: "Metric, token provided and passthrough true", + passthrough: true, + tokenProvided: "passthroughToken", + tokenExpected: "passthroughToken", + metric: true, }, } @@ -478,47 +512,78 @@ func Test_splunkhecReceiver_AccessTokenPassthrough(t *testing.T) { config := createDefaultConfig().(*Config) config.Endpoint = "localhost:0" config.AccessTokenPassthrough = tt.passthrough + accessTokensChan := make(chan string) - sink := new(consumertest.LogsSink) - rcv, err := newLogsReceiver(componenttest.NewNopReceiverCreateSettings(), *config, sink) - assert.NoError(t, err) + endServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + _, err := io.Copy(io.Discard, req.Body) + assert.NoError(t, err) + rw.WriteHeader(http.StatusAccepted) + accessTokensChan <- req.Header.Get("Authorization") + })) + defer endServer.Close() + factory := splunkhecexporter.NewFactory() + exporterConfig := factory.CreateDefaultConfig().(*splunkhecexporter.Config) + exporterConfig.Token = "ignored" + exporterConfig.SourceType = "defaultsourcetype" + exporterConfig.Index = "defaultindex" + exporterConfig.DisableCompression = true + exporterConfig.Endpoint = endServer.URL currentTime := float64(time.Now().UnixNano()) / 1e6 - splunkhecMsg := buildSplunkHecMsg(currentTime, 3) + var splunkhecMsg *splunk.Event + if tt.metric { + splunkhecMsg = buildSplunkHecMetricsMsg(currentTime, 1.0, 3) + } else { + splunkhecMsg = buildSplunkHecMsg(currentTime, 3) + } msgBytes, _ := json.Marshal(splunkhecMsg) req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) - if tt.token.Type() != pdata.AttributeValueTypeEmpty { - req.Header.Set("Splunk", tt.token.StringVal()) + if tt.passthrough { + if tt.tokenProvided != "" { + req.Header.Set("Authorization", "Splunk "+tt.tokenProvided) + } } - r := rcv.(*splunkReceiver) - w := httptest.NewRecorder() - r.handleReq(w, req) - - resp := w.Result() - respBytes, err := ioutil.ReadAll(resp.Body) - assert.NoError(t, err) - - var bodyStr string - assert.NoError(t, json.Unmarshal(respBytes, &bodyStr)) - - assert.Equal(t, http.StatusAccepted, resp.StatusCode) - assert.Equal(t, responseOK, bodyStr) - - got := sink.AllLogs() - - resource := got[0].ResourceLogs().At(0).Resource() - tokenLabel, exists := resource.Attributes().Get("com.splunk.hec.access_token") + done := make(chan bool) + go func() { + tokenReceived := <-accessTokensChan + assert.Equal(t, "Splunk "+tt.tokenExpected, tokenReceived) + done <- true + }() - if tt.passthrough { - if tt.token.Type() == pdata.AttributeValueTypeEmpty { - assert.False(t, exists) - } else { - assert.Equal(t, tt.token.StringVal(), tokenLabel.StringVal()) - } + if tt.metric { + exporter, err := factory.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), exporterConfig) + exporter.Start(context.Background(), nil) + assert.NoError(t, err) + rcv, err := newMetricsReceiver(componenttest.NewNopReceiverCreateSettings(), *config, exporter) + assert.NoError(t, err) + r := rcv.(*splunkReceiver) + w := httptest.NewRecorder() + r.handleReq(w, req) + resp := w.Result() + _, err = ioutil.ReadAll(resp.Body) + assert.NoError(t, err) } else { - assert.Empty(t, tokenLabel) + exporter, err := factory.CreateLogsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), exporterConfig) + exporter.Start(context.Background(), nil) + assert.NoError(t, err) + rcv, err := newLogsReceiver(componenttest.NewNopReceiverCreateSettings(), *config, exporter) + assert.NoError(t, err) + r := rcv.(*splunkReceiver) + w := httptest.NewRecorder() + r.handleReq(w, req) + resp := w.Result() + _, err = ioutil.ReadAll(resp.Body) + assert.NoError(t, err) } + + select { + case <-done: + break + case <-time.After(5 * time.Second): + assert.Fail(t, "Timeout") + } + }) } }