Skip to content

Commit

Permalink
Fix token passthrough for HEC
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Nov 16, 2021
1 parent e8dee66 commit bfc1f6e
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 53 deletions.
23 changes: 21 additions & 2 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func (c *client) pushMetricsData(
req.Header.Set("Content-Encoding", "gzip")
}

if md.ResourceMetrics().Len() != 0 {
accessToken, found := md.ResourceMetrics().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
if found {
req.Header.Set("Authorization", splunk.HECTokenHeader+" "+accessToken.StringVal())
}
}

resp, err := c.client.Do(req)
if err != nil {
return err
Expand Down Expand Up @@ -136,6 +143,18 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {

// Callback when each batch is to be sent.
send := func(ctx context.Context, buf *bytes.Buffer, headers map[string]string) (err error) {
localHeaders := headers
if ld.ResourceLogs().Len() != 0 {
accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
if found {
localHeaders = map[string]string{}
for k, v := range headers {
localHeaders[k] = v
}
localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.StringVal()
}
}

shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression

if shouldCompress {
Expand All @@ -150,10 +169,10 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
}

return c.postEvents(ctx, gzipBuffer, headers, shouldCompress)
return c.postEvents(ctx, gzipBuffer, localHeaders, shouldCompress)
}

return c.postEvents(ctx, buf, headers, shouldCompress)
return c.postEvents(ctx, buf, localHeaders, shouldCompress)
}

return c.pushLogDataInBatches(ctx, ld, send)
Expand Down
39 changes: 37 additions & 2 deletions exporter/splunkhecexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterhelper"
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr"
)

const (
Expand All @@ -34,6 +36,18 @@ const (
defaultHTTPTimeout = 10 * time.Second
)

// TODO: Find a place for this to be shared.
type baseMetricsExporter struct {
component.Component
consumer.Metrics
}

// TODO: Find a place for this to be shared.
type baseLogsExporter struct {
component.Component
consumer.Logs
}

// NewFactory creates a factory for Splunk HEC exporter.
func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
Expand Down Expand Up @@ -112,7 +126,7 @@ func createMetricsExporter(
return nil, err
}

return exporterhelper.NewMetricsExporter(
exporter, err := exporterhelper.NewMetricsExporter(
expCfg,
set,
exp.pushMetricsData,
Expand All @@ -122,6 +136,16 @@ func createMetricsExporter(
exporterhelper.WithQueue(expCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.stop))
if err != nil {
return nil, err
}

wrapped := &baseMetricsExporter{
Component: exporter,
Metrics: batchperresourceattr.NewBatchPerResourceMetrics(splunk.HecTokenLabel, exporter),
}

return wrapped, nil
}

func createLogsExporter(
Expand All @@ -140,7 +164,7 @@ func createLogsExporter(
return nil, err
}

return exporterhelper.NewLogsExporter(
logsExporter, err := exporterhelper.NewLogsExporter(
expCfg,
set,
exp.pushLogData,
Expand All @@ -150,4 +174,15 @@ func createLogsExporter(
exporterhelper.WithQueue(expCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.stop))

if err != nil {
return nil, err
}

wrapped := &baseLogsExporter{
Component: logsExporter,
Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, logsExporter),
}

return wrapped, nil
}
4 changes: 4 additions & 0 deletions exporter/splunkhecexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.17
require (
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.39.0
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.39.0
Expand Down Expand Up @@ -48,4 +50,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/splun

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus => ../../pkg/translator/opencensus

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr => ../../pkg/batchperresourceattr

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
4 changes: 4 additions & 0 deletions exporter/splunkhecexporter/logdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *C
sourcetype = v.StringVal()
case indexKey:
index = v.StringVal()
case splunk.HecTokenLabel:
// ignore
default:
fields[k] = convertAttributeValue(v, logger)
}
Expand All @@ -98,6 +100,8 @@ func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *C
sourcetype = v.StringVal()
case indexKey:
index = v.StringVal()
case splunk.HecTokenLabel:
// ignore
default:
fields[k] = convertAttributeValue(v, logger)
}
Expand Down
21 changes: 21 additions & 0 deletions exporter/splunkhecexporter/logdata_to_splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
"myhost", "myapp", "myapp-type"),
},
},
{
name: "with_hec_token",
logRecordFn: func() pdata.LogRecord {
logRecord := pdata.NewLogRecord()
logRecord.Body().SetStringVal("mylog")
logRecord.Attributes().InsertString(splunk.HecTokenLabel, "mytoken")
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
config := createDefaultConfig().(*Config)
config.Source = "source"
config.SourceType = "sourcetype"
return config
},
wantSplunkEvents: []*splunk.Event{
commonLogSplunkEvent("mylog", ts, map[string]interface{}{},
"unknown", "source", "sourcetype"),
},
},
{
name: "non-string attribute",
logRecordFn: func() pdata.LogRecord {
Expand Down
2 changes: 2 additions & 0 deletions exporter/splunkhecexporter/metricdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func metricDataToSplunk(logger *zap.Logger, data pdata.Metrics, config *Config)
sourceType = v.StringVal()
case indexKey:
index = v.StringVal()
case splunk.HecTokenLabel:
// ignore
default:
commonFields[k] = v.AsString()
}
Expand Down
2 changes: 2 additions & 0 deletions exporter/splunkhecexporter/tracedata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func traceDataToSplunk(logger *zap.Logger, data pdata.Traces, config *Config) ([
sourceType = v.StringVal()
case indexKey:
index = v.StringVal()
case splunk.HecTokenLabel:
// ignore
default:
commonFields[k] = v.AsString()
}
Expand Down
1 change: 1 addition & 0 deletions receiver/splunkhecreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.39.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.39.0
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.39.0
go.opentelemetry.io/collector/model v0.39.0
Expand Down
4 changes: 4 additions & 0 deletions receiver/splunkhecreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -367,10 +368,11 @@ func (r *splunkReceiver) consumeLogs(ctx context.Context, events []*splunk.Event

func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resource pdata.Resource) {
if r.config.AccessTokenPassthrough {
accessToken := req.Header.Get(splunk.HECTokenHeader)
if accessToken != "" {
accessToken := req.Header.Get("Authorization")
if strings.HasPrefix(accessToken, splunk.HECTokenHeader+" ") {
accessTokenValue := accessToken[len(splunk.HECTokenHeader)+1:]
return func(resource pdata.Resource) {
resource.Attributes().InsertString(splunk.HecTokenLabel, accessToken)
resource.Attributes().InsertString(splunk.HecTokenLabel, accessTokenValue)
}
}
}
Expand Down
Loading

0 comments on commit bfc1f6e

Please sign in to comment.