diff --git a/exporter/splunkhecexporter/batchperscope.go b/exporter/splunkhecexporter/batchperscope.go index 5188200d9a9d..fdd49451976b 100644 --- a/exporter/splunkhecexporter/batchperscope.go +++ b/exporter/splunkhecexporter/batchperscope.go @@ -20,12 +20,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" + "go.uber.org/zap" ) // perScopeBatcher is a consumer.Logs that rebatches logs by a type found in the scope name: profiling or regular logs. type perScopeBatcher struct { logsEnabled bool profilingEnabled bool + logger *zap.Logger next consumer.Logs } @@ -53,11 +55,17 @@ func (rb *perScopeBatcher) ConsumeLogs(ctx context.Context, logs plog.Logs) erro // if we don't have both types of logs, just call next if enabled if !profilingFound || !otherLogsFound { - if rb.logsEnabled && otherLogsFound { - return rb.next.ConsumeLogs(ctx, logs) + if otherLogsFound { + if rb.logsEnabled { + return rb.next.ConsumeLogs(ctx, logs) + } + rb.logger.Debug("Log data is not allowed", zap.Int("dropped_records", logs.LogRecordCount())) } - if rb.profilingEnabled && profilingFound { - return rb.next.ConsumeLogs(ctx, logs) + if profilingFound { + if rb.profilingEnabled { + return rb.next.ConsumeLogs(ctx, logs) + } + rb.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", logs.LogRecordCount())) } return nil } @@ -95,9 +103,15 @@ func (rb *perScopeBatcher) ConsumeLogs(ctx context.Context, logs plog.Logs) erro var err error if rb.logsEnabled { err = multierr.Append(err, rb.next.ConsumeLogs(ctx, otherLogs)) + } else { + rb.logger.Debug("Log data is not allowed", zap.Int("dropped_records", + logs.LogRecordCount()-profilingLogs.LogRecordCount())) } if rb.profilingEnabled { err = multierr.Append(err, rb.next.ConsumeLogs(ctx, profilingLogs)) + } else { + rb.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", + logs.LogRecordCount()-otherLogs.LogRecordCount())) } return err } diff --git a/exporter/splunkhecexporter/batchperscope_test.go b/exporter/splunkhecexporter/batchperscope_test.go index e138ae2b5c1e..188bcf278602 100644 --- a/exporter/splunkhecexporter/batchperscope_test.go +++ b/exporter/splunkhecexporter/batchperscope_test.go @@ -21,18 +21,28 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" ) func TestBatchLogs_ConsumeLogs(t *testing.T) { + type debugMsg struct { + text string + droppedCount int64 + } + profilingDropped := debugMsg{text: "Profiling data is not allowed", droppedCount: 4} + logsDropped := debugMsg{text: "Log data is not allowed", droppedCount: 5} tests := []struct { name string profilingEnabled bool logsEnabled bool in string out []string + wantDropped []debugMsg }{ { name: "profiling_only_both_enabled", @@ -52,6 +62,7 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) { logsEnabled: true, in: "profiling_only.yaml", out: []string{}, + wantDropped: []debugMsg{profilingDropped}, }, { name: "regular_logs_only_both_enabled", @@ -71,6 +82,7 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) { profilingEnabled: true, in: "regular_logs_only.yaml", out: []string{}, + wantDropped: []debugMsg{logsDropped}, }, { name: "combined_both_enabled", @@ -84,26 +96,27 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) { logsEnabled: true, in: "combined.yaml", out: []string{"regular_logs_only.yaml"}, + wantDropped: []debugMsg{profilingDropped}, }, { name: "combined_logs_disabled", profilingEnabled: true, in: "combined.yaml", out: []string{"profiling_only.yaml"}, - }, - { - name: "combined_both_disabled", - in: "combined.yaml", - out: []string{}, + wantDropped: []debugMsg{logsDropped}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := &consumertest.LogsSink{} + core, obs := observer.New(zapcore.DebugLevel) + logger := zap.New(core) + consumer := &perScopeBatcher{ profilingEnabled: tt.profilingEnabled, logsEnabled: tt.logsEnabled, + logger: logger, next: sink, } @@ -113,13 +126,19 @@ func TestBatchLogs_ConsumeLogs(t *testing.T) { err = consumer.ConsumeLogs(context.Background(), logs) assert.NoError(t, err) - assert.Equal(t, len(tt.out), len(sink.AllLogs())) + require.Equal(t, len(tt.out), len(sink.AllLogs())) for i, out := range tt.out { expected, err := golden.ReadLogs("testdata/batchperscope/" + out) require.NoError(t, err) assert.NoError(t, plogtest.CompareLogs(expected, sink.AllLogs()[i])) } + + require.Equal(t, len(tt.wantDropped), obs.Len()) + for _, entry := range tt.wantDropped { + filtered := obs.FilterMessage(entry.text) + require.Equal(t, 1, filtered.Len()) + assert.Equal(t, entry.droppedCount, filtered.All()[0].ContextMap()["dropped_records"]) + } }) } - } diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index de3d38425b46..b2f97be15694 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -114,11 +114,27 @@ func (c *client) pushLogData(ctx context.Context, ld plog.Logs) error { c.wg.Add(1) defer c.wg.Done() + if ld.ResourceLogs().Len() == 0 { + return nil + } + localHeaders := map[string]string{} - if ld.ResourceLogs().Len() != 0 { - accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) - if found { - localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() + + // All logs in a batch have the same access token after batchperresourceattr, so we can just check the first one. + accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) + if found { + localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() + } + + // All logs in a batch have only one type (regular or profiling logs) after perScopeBatcher, + // so we can just check the first one. + for i := 0; i < ld.ResourceLogs().Len(); i++ { + sls := ld.ResourceLogs().At(i).ScopeLogs() + if sls.Len() > 0 { + if isProfilingData(sls.At(0)) { + localHeaders[libraryHeaderName] = profilingLibraryName + } + break } } @@ -131,10 +147,6 @@ const bufCapPadding = uint(4096) const libraryHeaderName = "X-Splunk-Instrumentation-Library" const profilingLibraryName = "otel.profiling" -var profilingHeaders = map[string]string{ - libraryHeaderName: profilingLibraryName, -} - func isProfilingData(sl plog.ScopeLogs) bool { return sl.Scope().Name() == profilingLibraryName } @@ -142,81 +154,34 @@ func isProfilingData(sl plog.ScopeLogs) bool { // pushLogDataInBatches sends batches of Splunk events in JSON format. // The batch content length is restricted to MaxContentLengthLogs. // ld log records are parsed to Splunk events. -// The input data may contain both logs and profiling data. -// They are batched separately and sent with different HTTP headers func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error { - profilingLocalHeaders := map[string]string{} - for k, v := range profilingHeaders { - profilingLocalHeaders[k] = v - } - - for k, v := range headers { - profilingLocalHeaders[k] = v - } + bufState := c.bufferStatePool.get() + defer c.bufferStatePool.put(bufState) - var bufState *bufferState - var profilingBufState *bufferState var permanentErrors []error var rls = ld.ResourceLogs() - var droppedProfilingDataRecords, droppedLogRecords int for i := 0; i < rls.Len(); i++ { ills := rls.At(i).ScopeLogs() for j := 0; j < ills.Len(); j++ { var err error var newPermanentErrors []error - if isProfilingData(ills.At(j)) { - if !c.config.ProfilingDataEnabled { - droppedProfilingDataRecords += ills.At(j).LogRecords().Len() - continue - } - if profilingBufState == nil { - profilingBufState = c.bufferStatePool.get() - defer c.bufferStatePool.put(profilingBufState) - } - profilingBufState.resource, profilingBufState.library = i, j - newPermanentErrors, err = c.pushLogRecords(ctx, rls, profilingBufState, profilingLocalHeaders) - } else { - if !c.config.LogDataEnabled { - droppedLogRecords += ills.At(j).LogRecords().Len() - continue - } - if bufState == nil { - bufState = c.bufferStatePool.get() - defer c.bufferStatePool.put(bufState) - } - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushLogRecords(ctx, rls, bufState, headers) - } + bufState.resource, bufState.library = i, j + newPermanentErrors, err = c.pushLogRecords(ctx, rls, bufState, headers) if err != nil { - return consumererror.NewLogs(err, c.subLogs(ld, bufState, profilingBufState)) + return consumererror.NewLogs(err, subLogs(ld, bufState)) } permanentErrors = append(permanentErrors, newPermanentErrors...) } } - if droppedProfilingDataRecords != 0 { - c.logger.Debug("Profiling data is not allowed", zap.Int("dropped_records", droppedProfilingDataRecords)) - } - if droppedLogRecords != 0 { - c.logger.Debug("Log data is not allowed", zap.Int("dropped_records", droppedLogRecords)) - } - // There's some leftover unsent non-profiling data - if bufState != nil && bufState.containsData() { + if bufState.containsData() { if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewLogs(err, c.subLogs(ld, bufState, profilingBufState)) - } - } - - // There's some leftover unsent profiling data - if profilingBufState != nil && profilingBufState.containsData() { - if err := c.postEvents(ctx, profilingBufState, profilingLocalHeaders); err != nil { - // Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above. - return consumererror.NewLogs(err, c.subLogs(ld, nil, profilingBufState)) + return consumererror.NewLogs(err, subLogs(ld, bufState)) } } @@ -484,22 +449,9 @@ func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers return c.hecWorker.send(ctx, bufState, headers) } -// subLogs returns a subset of `ld` starting from `profilingState` for profiling data -// plus starting from `state` for non-profiling data. -func (c *client) subLogs(ld plog.Logs, state *bufferState, profilingState *bufferState) plog.Logs { - subset := plog.NewLogs() - if c.config.LogDataEnabled && state != nil { - subLogsByType(ld, state, subset, false) - } - if c.config.ProfilingDataEnabled && profilingState != nil { - subLogsByType(ld, profilingState, subset, true) - } - - return subset -} - -// subLogs returns a subset of logs starting the state. -func subLogsByType(src plog.Logs, state *bufferState, dst plog.Logs, profiling bool) { +// subLogs returns a subset of logs starting from the state. +func subLogs(src plog.Logs, state *bufferState) plog.Logs { + dst := plog.NewLogs() resources := src.ResourceLogs() resourcesSub := dst.ResourceLogs() @@ -517,11 +469,6 @@ func subLogsByType(src plog.Logs, state *bufferState, dst plog.Logs, profiling b for jSub := 0; j < libraries.Len(); j++ { lib := libraries.At(j) - // Only copy profiling data if requested. If not requested, only copy non-profiling data - if profiling != isProfilingData(lib) { - continue - } - newLibSub := librariesSub.AppendEmpty() lib.Scope().CopyTo(newLibSub.Scope()) @@ -540,6 +487,8 @@ func subLogsByType(src plog.Logs, state *bufferState, dst plog.Logs, profiling b } } } + + return dst } // subMetrics returns a subset of metrics starting from the state. diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index d9cde0eda270..5ccf6792f5da 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -37,6 +37,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -1334,7 +1335,8 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) { c := newLogsClient(exportertest.NewNopCreateSettings(), config) - logs := createLogDataWithCustomLibraries(1, []string{"otel.logs", "otel.profiling"}, []int{10, 20}) + logs := createLogDataWithCustomLibraries(1, []string{"otel.logs"}, []int{10}) + profilingData := createLogDataWithCustomLibraries(1, []string{"otel.profiling"}, []int{20}) var headers *[]http.Header httpClient, headers := newTestClient(200, "OK") @@ -1343,6 +1345,8 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) { err := c.pushLogData(context.Background(), logs) require.NoError(t, err) + err = c.pushLogData(context.Background(), profilingData) + require.NoError(t, err) assert.Equal(t, 30, len(*headers)) profilingCount, nonProfilingCount := 0, 0 @@ -1394,6 +1398,17 @@ func benchPushLogData(b *testing.B, numResources int, numRecords int, bufSize ui config.DisableCompression = true c := newLogsClient(exportertest.NewNopCreateSettings(), config) c.hecWorker = &mockHecWorker{} + exp, err := exporterhelper.NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), config, + c.pushLogData) + require.NoError(b, err) + exp = &baseLogsExporter{ + Component: exp, + Logs: &perScopeBatcher{ + logsEnabled: true, + logger: zap.NewNop(), + next: exp, + }, + } logs := createLogData(numResources, 1, numRecords) @@ -1401,7 +1416,7 @@ func benchPushLogData(b *testing.B, numResources int, numRecords int, bufSize ui b.ResetTimer() for i := 0; i < b.N; i++ { - err := c.pushLogData(context.Background(), logs) + err := exp.ConsumeLogs(context.Background(), logs) require.NoError(b, err) } } @@ -1483,13 +1498,9 @@ func TestSubLogs(t *testing.T) { // Creating 12 logs (2 resources x 2 libraries x 3 records) logs := createLogData(2, 2, 3) - c := client{ - config: NewFactory().CreateDefaultConfig().(*Config), - } - // Logs subset from leftmost index (resource 0, library 0, record 0). _0_0_0 := &bufferState{resource: 0, library: 0, record: 0} //revive:disable-line:var-naming - got := c.subLogs(logs, _0_0_0, nil) + got := subLogs(logs, _0_0_0) // Number of logs in subset should equal original logs. assert.Equal(t, logs.LogRecordCount(), got.LogRecordCount()) @@ -1503,7 +1514,7 @@ func TestSubLogs(t *testing.T) { // Logs subset from some mid index (resource 0, library 1, log 2). _0_1_2 := &bufferState{resource: 0, library: 1, record: 2} //revive:disable-line:var-naming - got = c.subLogs(logs, _0_1_2, nil) + got = subLogs(logs, _0_1_2) assert.Equal(t, 7, got.LogRecordCount()) @@ -1516,7 +1527,7 @@ func TestSubLogs(t *testing.T) { // Logs subset from rightmost index (resource 1, library 1, log 2). _1_1_2 := &bufferState{resource: 1, library: 1, record: 2} //revive:disable-line:var-naming - got = c.subLogs(logs, _1_1_2, nil) + got = subLogs(logs, _1_1_2) // Number of logs in subset should be 1. assert.Equal(t, 1, got.LogRecordCount()) @@ -1524,31 +1535,6 @@ func TestSubLogs(t *testing.T) { // The name of the sole log record should be 1_1_2. val, _ = got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get(splunk.DefaultNameLabel) assert.Equal(t, "1_1_2", val.AsString()) - - // Now see how profiling and log data are merged - logs = createLogDataWithCustomLibraries(2, []string{"otel.logs", "otel.profiling"}, []int{10, 10}) - slice := &bufferState{resource: 1, library: 0, record: 5} - profSlice := &bufferState{resource: 0, library: 1, record: 8} - - got = c.subLogs(logs, slice, profSlice) - - assert.Equal(t, 5+2+10, got.LogRecordCount()) - assert.Equal(t, "otel.logs", got.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Name()) - val, _ = got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get(splunk.DefaultNameLabel) - assert.Equal(t, "1_0_5", val.AsString()) - val, _ = got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(4).Attributes().Get(splunk.DefaultNameLabel) - assert.Equal(t, "1_0_9", val.AsString()) - - assert.Equal(t, "otel.profiling", got.ResourceLogs().At(1).ScopeLogs().At(0).Scope().Name()) - val, _ = got.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get(splunk.DefaultNameLabel) - assert.Equal(t, "0_1_8", val.AsString()) - val, _ = got.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().At(1).Attributes().Get(splunk.DefaultNameLabel) - assert.Equal(t, "0_1_9", val.AsString()) - assert.Equal(t, "otel.profiling", got.ResourceLogs().At(2).ScopeLogs().At(0).Scope().Name()) - val, _ = got.ResourceLogs().At(2).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get(splunk.DefaultNameLabel) - assert.Equal(t, "1_1_0", val.AsString()) - val, _ = got.ResourceLogs().At(2).ScopeLogs().At(0).LogRecords().At(9).Attributes().Get(splunk.DefaultNameLabel) - assert.Equal(t, "1_1_9", val.AsString()) } func TestPushLogRecordsBufferCounters(t *testing.T) { diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index 95e4e2af5236..4b69f33a64b1 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -183,7 +183,12 @@ func createLogsExporter( wrapped := &baseLogsExporter{ Component: logsExporter, - Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, logsExporter), + Logs: batchperresourceattr.NewBatchPerResourceLogs(splunk.HecTokenLabel, &perScopeBatcher{ + logsEnabled: cfg.LogDataEnabled, + profilingEnabled: cfg.ProfilingDataEnabled, + logger: set.Logger, + next: logsExporter, + }), } return wrapped, nil diff --git a/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml b/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml index c41db108609c..e346367a9c91 100644 --- a/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml +++ b/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml @@ -10,43 +10,50 @@ resourceLogs: scopeLogs: - scope: name: otel.profiling - logRecords: - - attributes: - - key: resource1_prof_scope_log1_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_prof_scope_log1_body - timeUnixNano: "11651379494838206464" - - attributes: - - key: resource1_prof_scope_log1_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_prof_scope_log2_body - timeUnixNano: "11651379494838206472" + logRecords: + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log1_body + timeUnixNano: "11651379494838206464" + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log2_body + timeUnixNano: "11651379494838206472" - scope: name: otel_collector version: 0.1.0 - logRecords: - - attributes: - - key: resource1_scope1_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_scope1_body - timeUnixNano: "11651379494838207123" + logRecords: + - attributes: + - key: resource1_scope1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope1_body + timeUnixNano: "11651379494838207123" - scope: name: external_service version: 0.2.0 - logRecords: - - attributes: - - key: resource1_scope2_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_log2_body - timeUnixNano: "11651379494838207153" + logRecords: + - attributes: + - key: resource1_scope2_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope2_log1_body + timeUnixNano: "11651379494838207153" + - attributes: + - key: resource1_scope2_log2_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope2_log2_body + timeUnixNano: "11651379494838208553" - resource: attributes: - key: resource2_attribute1 diff --git a/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml b/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml index a017eeba18e8..c26f24c55b7f 100644 --- a/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml +++ b/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml @@ -10,21 +10,21 @@ resourceLogs: scopeLogs: - scope: name: otel.profiling - logRecords: - - attributes: - - key: resource1_prof_scope_log1_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_prof_scope_log1_body - timeUnixNano: "11651379494838206464" - - attributes: - - key: resource1_prof_scope_log1_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_prof_scope_log2_body - timeUnixNano: "11651379494838206472" + logRecords: + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log1_body + timeUnixNano: "11651379494838206464" + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log2_body + timeUnixNano: "11651379494838206472" - resource: attributes: - key: resource2_attribute1 diff --git a/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml b/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml index 787813c27364..135213a0324e 100644 --- a/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml +++ b/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml @@ -11,25 +11,32 @@ resourceLogs: - scope: name: otel_collector version: 0.1.0 - logRecords: - - attributes: - - key: resource1_scope1_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_scope1_body - timeUnixNano: "11651379494838207123" + logRecords: + - attributes: + - key: resource1_scope1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope1_body + timeUnixNano: "11651379494838207123" - scope: name: external_service version: 0.2.0 - logRecords: - - attributes: - - key: resource1_scope2_attr1 - value: - stringValue: value1 - body: - stringValue: resource1_log2_body - timeUnixNano: "11651379494838207153" + logRecords: + - attributes: + - key: resource1_scope2_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope2_log1_body + timeUnixNano: "11651379494838207153" + - attributes: + - key: resource1_scope2_log2_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope2_log2_body + timeUnixNano: "11651379494838208553" - resource: attributes: - key: resource2_attribute1