Skip to content

Commit

Permalink
[chore] [exporter/splunkhec] Split the profiling data before processi…
Browse files Browse the repository at this point in the history
…ng (#21909)

To simplify the logic and make future improvements possible. Benchmarks shows no performance degradation for typical use cases (regular logs or profiling only) with small improvement on memory allocation. Benchmarks were adjusted to be applied on `ConsumeLogs` in both for before/after states. The only performance hit can be found for batches with both regular and profiling logs, but given that that use case is pretty rare, we can ignore it.
  • Loading branch information
dmitryax authored May 14, 2023
1 parent 64eea31 commit d67cbca
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 191 deletions.
22 changes: 18 additions & 4 deletions exporter/splunkhecexporter/batchperscope.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"
)

// perScopeBatcher is a consumer.Logs that rebatches logs by a type found in the scope name: profiling or regular logs.
type perScopeBatcher struct {
logsEnabled bool
profilingEnabled bool
logger *zap.Logger
next consumer.Logs
}

Expand Down Expand Up @@ -53,11 +55,17 @@ func (rb *perScopeBatcher) ConsumeLogs(ctx context.Context, logs plog.Logs) erro

// if we don't have both types of logs, just call next if enabled
if !profilingFound || !otherLogsFound {
if rb.logsEnabled && otherLogsFound {
return rb.next.ConsumeLogs(ctx, logs)
if otherLogsFound {
if rb.logsEnabled {
return rb.next.ConsumeLogs(ctx, logs)
}
rb.logger.Debug("Log data is not allowed", zap.Int("dropped_records", logs.LogRecordCount()))
}
if rb.profilingEnabled && profilingFound {
return rb.next.ConsumeLogs(ctx, logs)
if profilingFound {
if rb.profilingEnabled {
return rb.next.ConsumeLogs(ctx, logs)
}
rb.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", logs.LogRecordCount()))
}
return nil
}
Expand Down Expand Up @@ -95,9 +103,15 @@ func (rb *perScopeBatcher) ConsumeLogs(ctx context.Context, logs plog.Logs) erro
var err error
if rb.logsEnabled {
err = multierr.Append(err, rb.next.ConsumeLogs(ctx, otherLogs))
} else {
rb.logger.Debug("Log data is not allowed", zap.Int("dropped_records",
logs.LogRecordCount()-profilingLogs.LogRecordCount()))
}
if rb.profilingEnabled {
err = multierr.Append(err, rb.next.ConsumeLogs(ctx, profilingLogs))
} else {
rb.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records",
logs.LogRecordCount()-otherLogs.LogRecordCount()))
}
return err
}
Expand Down
33 changes: 26 additions & 7 deletions exporter/splunkhecexporter/batchperscope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestBatchLogs_ConsumeLogs(t *testing.T) {
type debugMsg struct {
text string
droppedCount int64
}
profilingDropped := debugMsg{text: "Profiling data is not allowed", droppedCount: 4}
logsDropped := debugMsg{text: "Log data is not allowed", droppedCount: 5}
tests := []struct {
name string
profilingEnabled bool
logsEnabled bool
in string
out []string
wantDropped []debugMsg
}{
{
name: "profiling_only_both_enabled",
Expand All @@ -52,6 +62,7 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) {
logsEnabled: true,
in: "profiling_only.yaml",
out: []string{},
wantDropped: []debugMsg{profilingDropped},
},
{
name: "regular_logs_only_both_enabled",
Expand All @@ -71,6 +82,7 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) {
profilingEnabled: true,
in: "regular_logs_only.yaml",
out: []string{},
wantDropped: []debugMsg{logsDropped},
},
{
name: "combined_both_enabled",
Expand All @@ -84,26 +96,27 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) {
logsEnabled: true,
in: "combined.yaml",
out: []string{"regular_logs_only.yaml"},
wantDropped: []debugMsg{profilingDropped},
},
{
name: "combined_logs_disabled",
profilingEnabled: true,
in: "combined.yaml",
out: []string{"profiling_only.yaml"},
},
{
name: "combined_both_disabled",
in: "combined.yaml",
out: []string{},
wantDropped: []debugMsg{logsDropped},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := &consumertest.LogsSink{}
core, obs := observer.New(zapcore.DebugLevel)
logger := zap.New(core)

consumer := &perScopeBatcher{
profilingEnabled: tt.profilingEnabled,
logsEnabled: tt.logsEnabled,
logger: logger,
next: sink,
}

Expand All @@ -113,13 +126,19 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) {
err = consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)

assert.Equal(t, len(tt.out), len(sink.AllLogs()))
require.Equal(t, len(tt.out), len(sink.AllLogs()))
for i, out := range tt.out {
expected, err := golden.ReadLogs("testdata/batchperscope/" + out)
require.NoError(t, err)
assert.NoError(t, plogtest.CompareLogs(expected, sink.AllLogs()[i]))
}

require.Equal(t, len(tt.wantDropped), obs.Len())
for _, entry := range tt.wantDropped {
filtered := obs.FilterMessage(entry.text)
require.Equal(t, 1, filtered.Len())
assert.Equal(t, entry.droppedCount, filtered.All()[0].ContextMap()["dropped_records"])
}
})
}

}
115 changes: 32 additions & 83 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,27 @@ func (c *client) pushLogData(ctx context.Context, ld plog.Logs) error {
c.wg.Add(1)
defer c.wg.Done()

if ld.ResourceLogs().Len() == 0 {
return nil
}

localHeaders := map[string]string{}
if ld.ResourceLogs().Len() != 0 {
accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
if found {
localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str()

// All logs in a batch have the same access token after batchperresourceattr, so we can just check the first one.
accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
if found {
localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str()
}

// All logs in a batch have only one type (regular or profiling logs) after perScopeBatcher,
// so we can just check the first one.
for i := 0; i < ld.ResourceLogs().Len(); i++ {
sls := ld.ResourceLogs().At(i).ScopeLogs()
if sls.Len() > 0 {
if isProfilingData(sls.At(0)) {
localHeaders[libraryHeaderName] = profilingLibraryName
}
break
}
}

Expand All @@ -131,92 +147,41 @@ const bufCapPadding = uint(4096)
const libraryHeaderName = "X-Splunk-Instrumentation-Library"
const profilingLibraryName = "otel.profiling"

var profilingHeaders = map[string]string{
libraryHeaderName: profilingLibraryName,
}

func isProfilingData(sl plog.ScopeLogs) bool {
return sl.Scope().Name() == profilingLibraryName
}

// pushLogDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthLogs.
// ld log records are parsed to Splunk events.
// The input data may contain both logs and profiling data.
// They are batched separately and sent with different HTTP headers
func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error {
profilingLocalHeaders := map[string]string{}
for k, v := range profilingHeaders {
profilingLocalHeaders[k] = v
}

for k, v := range headers {
profilingLocalHeaders[k] = v
}
bufState := c.bufferStatePool.get()
defer c.bufferStatePool.put(bufState)

var bufState *bufferState
var profilingBufState *bufferState
var permanentErrors []error

var rls = ld.ResourceLogs()
var droppedProfilingDataRecords, droppedLogRecords int
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).ScopeLogs()
for j := 0; j < ills.Len(); j++ {
var err error
var newPermanentErrors []error

if isProfilingData(ills.At(j)) {
if !c.config.ProfilingDataEnabled {
droppedProfilingDataRecords += ills.At(j).LogRecords().Len()
continue
}
if profilingBufState == nil {
profilingBufState = c.bufferStatePool.get()
defer c.bufferStatePool.put(profilingBufState)
}
profilingBufState.resource, profilingBufState.library = i, j
newPermanentErrors, err = c.pushLogRecords(ctx, rls, profilingBufState, profilingLocalHeaders)
} else {
if !c.config.LogDataEnabled {
droppedLogRecords += ills.At(j).LogRecords().Len()
continue
}
if bufState == nil {
bufState = c.bufferStatePool.get()
defer c.bufferStatePool.put(bufState)
}
bufState.resource, bufState.library = i, j
newPermanentErrors, err = c.pushLogRecords(ctx, rls, bufState, headers)
}
bufState.resource, bufState.library = i, j
newPermanentErrors, err = c.pushLogRecords(ctx, rls, bufState, headers)

if err != nil {
return consumererror.NewLogs(err, c.subLogs(ld, bufState, profilingBufState))
return consumererror.NewLogs(err, subLogs(ld, bufState))
}

permanentErrors = append(permanentErrors, newPermanentErrors...)
}
}

if droppedProfilingDataRecords != 0 {
c.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", droppedProfilingDataRecords))
}
if droppedLogRecords != 0 {
c.logger.Debug("Log data is not allowed", zap.Int("dropped_records", droppedLogRecords))
}

// There's some leftover unsent non-profiling data
if bufState != nil && bufState.containsData() {
if bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
return consumererror.NewLogs(err, c.subLogs(ld, bufState, profilingBufState))
}
}

// There's some leftover unsent profiling data
if profilingBufState != nil && profilingBufState.containsData() {
if err := c.postEvents(ctx, profilingBufState, profilingLocalHeaders); err != nil {
// Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above.
return consumererror.NewLogs(err, c.subLogs(ld, nil, profilingBufState))
return consumererror.NewLogs(err, subLogs(ld, bufState))
}
}

Expand Down Expand Up @@ -484,22 +449,9 @@ func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers
return c.hecWorker.send(ctx, bufState, headers)
}

// subLogs returns a subset of `ld` starting from `profilingState` for profiling data
// plus starting from `state` for non-profiling data.
func (c *client) subLogs(ld plog.Logs, state *bufferState, profilingState *bufferState) plog.Logs {
subset := plog.NewLogs()
if c.config.LogDataEnabled && state != nil {
subLogsByType(ld, state, subset, false)
}
if c.config.ProfilingDataEnabled && profilingState != nil {
subLogsByType(ld, profilingState, subset, true)
}

return subset
}

// subLogs returns a subset of logs starting the state.
func subLogsByType(src plog.Logs, state *bufferState, dst plog.Logs, profiling bool) {
// subLogs returns a subset of logs starting from the state.
func subLogs(src plog.Logs, state *bufferState) plog.Logs {
dst := plog.NewLogs()
resources := src.ResourceLogs()
resourcesSub := dst.ResourceLogs()

Expand All @@ -517,11 +469,6 @@ func subLogsByType(src plog.Logs, state *bufferState, dst plog.Logs, profiling b
for jSub := 0; j < libraries.Len(); j++ {
lib := libraries.At(j)

// Only copy profiling data if requested. If not requested, only copy non-profiling data
if profiling != isProfilingData(lib) {
continue
}

newLibSub := librariesSub.AppendEmpty()
lib.Scope().CopyTo(newLibSub.Scope())

Expand All @@ -540,6 +487,8 @@ func subLogsByType(src plog.Logs, state *bufferState, dst plog.Logs, profiling b
}
}
}

return dst
}

// subMetrics returns a subset of metrics starting from the state.
Expand Down
Loading

0 comments on commit d67cbca

Please sign in to comment.