diff --git a/CHANGELOG.md b/CHANGELOG.md index fe29d0a6e7e..6eb6c251775 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed - The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819) +- Fix log records duplication in case of heterogeneous resource attributes by correctly mapping each log record to it's resource and scope. (#5803) diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/transform/log.go b/exporters/otlp/otlplog/otlploggrpc/internal/transform/log.go index 886d983d143..b03f3fb54ac 100644 --- a/exporters/otlp/otlplog/otlploggrpc/internal/transform/log.go +++ b/exporters/otlp/otlplog/otlploggrpc/internal/transform/log.go @@ -9,7 +9,6 @@ package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform" import ( - "sync" "time" cpb "go.opentelemetry.io/proto/otlp/common/v1" @@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs { return nil } - resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs) - defer func() { - clear(resMap) - resourceLogsMapPool.Put(resMap) - }() - resourceLogsMap(&resMap, records) + resMap := make(map[attribute.Distinct]*lpb.ResourceLogs) - out := make([]*lpb.ResourceLogs, 0, len(resMap)) - for _, rl := range resMap { - out = append(out, rl) + type key struct { + r attribute.Distinct + is instrumentation.Scope } - return out -} - -var resourceLogsMapPool = sync.Pool{ - New: func() any { - return make(map[attribute.Distinct]*lpb.ResourceLogs) - }, -} + scopeMap := make(map[key]*lpb.ScopeLogs) -func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) { + var resources int for _, r := range records { res := r.Resource() - rl, ok := (*dst)[res.Equivalent()] - if !ok { - rl = new(lpb.ResourceLogs) - if res.Len() > 0 { - rl.Resource = &rpb.Resource{ - Attributes: AttrIter(res.Iter()), - } - } - rl.SchemaUrl = res.SchemaURL() - (*dst)[res.Equivalent()] = rl - } - rl.ScopeLogs = ScopeLogs(records) - } -} - -// ScopeLogs returns a slice of OTLP ScopeLogs generated from records. -func ScopeLogs(records []log.Record) []*lpb.ScopeLogs { - scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs) - defer func() { - clear(scopeMap) - scopeLogsMapPool.Put(scopeMap) - }() - scopeLogsMap(&scopeMap, records) - - out := make([]*lpb.ScopeLogs, 0, len(scopeMap)) - for _, sl := range scopeMap { - out = append(out, sl) - } - return out -} - -var scopeLogsMapPool = sync.Pool{ - New: func() any { - return make(map[instrumentation.Scope]*lpb.ScopeLogs) - }, -} - -func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) { - for _, r := range records { + rKey := res.Equivalent() scope := r.InstrumentationScope() - sl, ok := (*dst)[scope] - if !ok { + k := key{ + r: rKey, + is: scope, + } + sl, iOk := scopeMap[k] + if !iOk { sl = new(lpb.ScopeLogs) var emptyScope instrumentation.Scope if scope != emptyScope { @@ -102,10 +55,34 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R } sl.SchemaUrl = scope.SchemaURL } - (*dst)[scope] = sl + scopeMap[k] = sl } + sl.LogRecords = append(sl.LogRecords, LogRecord(r)) + rl, rOk := resMap[rKey] + if !rOk { + resources++ + rl = new(lpb.ResourceLogs) + if res.Len() > 0 { + rl.Resource = &rpb.Resource{ + Attributes: AttrIter(res.Iter()), + } + } + rl.SchemaUrl = res.SchemaURL() + resMap[rKey] = rl + } + if !iOk { + rl.ScopeLogs = append(rl.ScopeLogs, sl) + } } + + // Transform the categorized map into a slice + resLogs := make([]*lpb.ResourceLogs, 0, resources) + for _, rl := range resMap { + resLogs = append(resLogs, rl) + } + + return resLogs } // LogRecord returns an OTLP LogRecord generated from record. diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/transform/log_test.go b/exporters/otlp/otlplog/otlploggrpc/internal/transform/log_test.go index aaa2fad3dcc..c4623ea29b9 100644 --- a/exporters/otlp/otlplog/otlploggrpc/internal/transform/log_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/internal/transform/log_test.go @@ -30,73 +30,106 @@ var ( ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) obs = ts.Add(30 * time.Second) + tom = api.String("user", "tom") + jerry = api.String("user", "jerry") // A time before unix 0. negativeTs = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC) - alice = api.String("user", "alice") - bob = api.String("user", "bob") - - pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + pbTom = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "tom"}, }} - pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + pbJerry = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "jerry"}, }} - sevA = api.SeverityInfo - sevB = api.SeverityError + sevC = api.SeverityInfo + sevD = api.SeverityError - pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO - pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + pbSevC = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevD = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR - bodyA = api.StringValue("a") - bodyB = api.StringValue("b") + bodyC = api.StringValue("c") + bodyD = api.StringValue("d") - pbBodyA = &cpb.AnyValue{ + pbBodyC = &cpb.AnyValue{ Value: &cpb.AnyValue_StringValue{ - StringValue: "a", + StringValue: "c", }, } - pbBodyB = &cpb.AnyValue{ + pbBodyD = &cpb.AnyValue{ Value: &cpb.AnyValue_StringValue{ - StringValue: "b", + StringValue: "d", }, } - spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} - spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} - traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} - traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} - flagsA = byte(1) - flagsB = byte(0) + spanIDC = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDD = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDC = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDD = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsC = byte(1) + flagsD = byte(0) scope = instrumentation.Scope{ - Name: "test/code/path", - Version: "v0.1.0", + Name: "otel/test/code/path1", + Version: "v0.1.1", + SchemaURL: semconv.SchemaURL, + } + scope2 = instrumentation.Scope{ + Name: "otel/test/code/path2", + Version: "v0.2.2", SchemaURL: semconv.SchemaURL, } + scopeList = []instrumentation.Scope{scope, scope2} + pbScope = &cpb.InstrumentationScope{ - Name: "test/code/path", - Version: "v0.1.0", + Name: "otel/test/code/path1", + Version: "v0.1.1", + } + pbScope2 = &cpb.InstrumentationScope{ + Name: "otel/test/code/path2", + Version: "v0.2.2", } res = resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceName("test server"), - semconv.ServiceVersion("v0.1.0"), + semconv.ServiceName("service1"), + semconv.ServiceVersion("v0.1.1"), + ) + res2 = resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("service2"), + semconv.ServiceVersion("v0.2.2"), ) + resList = []*resource.Resource{res, res2} + pbRes = &rpb.Resource{ Attributes: []*cpb.KeyValue{ { Key: "service.name", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + Value: &cpb.AnyValue_StringValue{StringValue: "service1"}, + }, + }, + { + Key: "service.version", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.1"}, + }, + }, + }, + } + pbRes2 = &rpb.Resource{ + Attributes: []*cpb.KeyValue{ + { + Key: "service.name", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "service2"}, }, }, { Key: "service.version", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + Value: &cpb.AnyValue_StringValue{StringValue: "v0.2.2"}, }, }, }, @@ -105,75 +138,79 @@ var ( records = func() []log.Record { var out []log.Record - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevA, - SeverityText: "A", - Body: bodyA, - Attributes: []api.KeyValue{alice}, - TraceID: trace.TraceID(traceIDA), - SpanID: trace.SpanID(spanIDA), - TraceFlags: trace.TraceFlags(flagsA), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + for _, r := range resList { + for _, s := range scopeList { + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevC, + SeverityText: "C", + Body: bodyC, + Attributes: []api.KeyValue{tom}, + TraceID: trace.TraceID(traceIDC), + SpanID: trace.SpanID(spanIDC), + TraceFlags: trace.TraceFlags(flagsC), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevA, - SeverityText: "A", - Body: bodyA, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDA), - SpanID: trace.SpanID(spanIDA), - TraceFlags: trace.TraceFlags(flagsA), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevC, + SeverityText: "C", + Body: bodyC, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDC), + SpanID: trace.SpanID(spanIDC), + TraceFlags: trace.TraceFlags(flagsC), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{alice}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{tom}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: negativeTs, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: negativeTs, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) + } + } return out }() @@ -182,76 +219,90 @@ var ( { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevA, - SeverityText: "A", - Body: pbBodyA, - Attributes: []*cpb.KeyValue{pbAlice}, - Flags: uint32(flagsA), - TraceId: traceIDA, - SpanId: spanIDA, + SeverityNumber: pbSevC, + SeverityText: "C", + Body: pbBodyC, + Attributes: []*cpb.KeyValue{pbTom}, + Flags: uint32(flagsC), + TraceId: traceIDC, + SpanId: spanIDC, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevA, - SeverityText: "A", - Body: pbBodyA, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsA), - TraceId: traceIDA, - SpanId: spanIDA, + SeverityNumber: pbSevC, + SeverityText: "C", + Body: pbBodyC, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsC), + TraceId: traceIDC, + SpanId: spanIDC, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbAlice}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbTom}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, { TimeUnixNano: 0, ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, } - pbScopeLogs = &lpb.ScopeLogs{ - Scope: pbScope, - SchemaUrl: semconv.SchemaURL, - LogRecords: pbLogRecords, + pbScopeLogsList = []*lpb.ScopeLogs{ + { + Scope: pbScope, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + }, + { + Scope: pbScope2, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + }, } - pbResourceLogs = &lpb.ResourceLogs{ - Resource: pbRes, - SchemaUrl: semconv.SchemaURL, - ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs}, + pbResourceLogsList = []*lpb.ResourceLogs{ + { + Resource: pbRes, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: pbScopeLogsList, + }, + { + Resource: pbRes2, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: pbScopeLogsList, + }, } ) func TestResourceLogs(t *testing.T) { - want := []*lpb.ResourceLogs{pbResourceLogs} - assert.Equal(t, want, ResourceLogs(records)) + want := pbResourceLogsList + assert.ElementsMatch(t, want, ResourceLogs(records)) } func TestSeverityNumber(t *testing.T) { diff --git a/exporters/otlp/otlplog/otlploghttp/internal/transform/log.go b/exporters/otlp/otlplog/otlploghttp/internal/transform/log.go index a911450e29d..1ab95f93948 100644 --- a/exporters/otlp/otlplog/otlploghttp/internal/transform/log.go +++ b/exporters/otlp/otlplog/otlploghttp/internal/transform/log.go @@ -9,7 +9,6 @@ package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/transform" import ( - "sync" "time" cpb "go.opentelemetry.io/proto/otlp/common/v1" @@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs { return nil } - resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs) - defer func() { - clear(resMap) - resourceLogsMapPool.Put(resMap) - }() - resourceLogsMap(&resMap, records) + resMap := make(map[attribute.Distinct]*lpb.ResourceLogs) - out := make([]*lpb.ResourceLogs, 0, len(resMap)) - for _, rl := range resMap { - out = append(out, rl) + type key struct { + r attribute.Distinct + is instrumentation.Scope } - return out -} - -var resourceLogsMapPool = sync.Pool{ - New: func() any { - return make(map[attribute.Distinct]*lpb.ResourceLogs) - }, -} + scopeMap := make(map[key]*lpb.ScopeLogs) -func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) { + var resources int for _, r := range records { res := r.Resource() - rl, ok := (*dst)[res.Equivalent()] - if !ok { - rl = new(lpb.ResourceLogs) - if res.Len() > 0 { - rl.Resource = &rpb.Resource{ - Attributes: AttrIter(res.Iter()), - } - } - rl.SchemaUrl = res.SchemaURL() - (*dst)[res.Equivalent()] = rl - } - rl.ScopeLogs = ScopeLogs(records) - } -} - -// ScopeLogs returns a slice of OTLP ScopeLogs generated from records. -func ScopeLogs(records []log.Record) []*lpb.ScopeLogs { - scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs) - defer func() { - clear(scopeMap) - scopeLogsMapPool.Put(scopeMap) - }() - scopeLogsMap(&scopeMap, records) - - out := make([]*lpb.ScopeLogs, 0, len(scopeMap)) - for _, sl := range scopeMap { - out = append(out, sl) - } - return out -} - -var scopeLogsMapPool = sync.Pool{ - New: func() any { - return make(map[instrumentation.Scope]*lpb.ScopeLogs) - }, -} - -func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) { - for _, r := range records { + rKey := res.Equivalent() scope := r.InstrumentationScope() - sl, ok := (*dst)[scope] - if !ok { + k := key{ + r: rKey, + is: scope, + } + sl, iOk := scopeMap[k] + if !iOk { sl = new(lpb.ScopeLogs) var emptyScope instrumentation.Scope if scope != emptyScope { @@ -102,10 +55,34 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R } sl.SchemaUrl = scope.SchemaURL } - (*dst)[scope] = sl + scopeMap[k] = sl } + sl.LogRecords = append(sl.LogRecords, LogRecord(r)) + rl, rOk := resMap[rKey] + if !rOk { + resources++ + rl = new(lpb.ResourceLogs) + if res.Len() > 0 { + rl.Resource = &rpb.Resource{ + Attributes: AttrIter(res.Iter()), + } + } + rl.SchemaUrl = res.SchemaURL() + resMap[rKey] = rl + } + if !iOk { + rl.ScopeLogs = append(rl.ScopeLogs, sl) + } } + + // Transform the categorized map into a slice + resLogs := make([]*lpb.ResourceLogs, 0, resources) + for _, rl := range resMap { + resLogs = append(resLogs, rl) + } + + return resLogs } // LogRecord returns an OTLP LogRecord generated from record. diff --git a/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go b/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go index aaa2fad3dcc..c4623ea29b9 100644 --- a/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go +++ b/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go @@ -30,73 +30,106 @@ var ( ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) obs = ts.Add(30 * time.Second) + tom = api.String("user", "tom") + jerry = api.String("user", "jerry") // A time before unix 0. negativeTs = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC) - alice = api.String("user", "alice") - bob = api.String("user", "bob") - - pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + pbTom = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "tom"}, }} - pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + pbJerry = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "jerry"}, }} - sevA = api.SeverityInfo - sevB = api.SeverityError + sevC = api.SeverityInfo + sevD = api.SeverityError - pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO - pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + pbSevC = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevD = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR - bodyA = api.StringValue("a") - bodyB = api.StringValue("b") + bodyC = api.StringValue("c") + bodyD = api.StringValue("d") - pbBodyA = &cpb.AnyValue{ + pbBodyC = &cpb.AnyValue{ Value: &cpb.AnyValue_StringValue{ - StringValue: "a", + StringValue: "c", }, } - pbBodyB = &cpb.AnyValue{ + pbBodyD = &cpb.AnyValue{ Value: &cpb.AnyValue_StringValue{ - StringValue: "b", + StringValue: "d", }, } - spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} - spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} - traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} - traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} - flagsA = byte(1) - flagsB = byte(0) + spanIDC = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDD = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDC = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDD = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsC = byte(1) + flagsD = byte(0) scope = instrumentation.Scope{ - Name: "test/code/path", - Version: "v0.1.0", + Name: "otel/test/code/path1", + Version: "v0.1.1", + SchemaURL: semconv.SchemaURL, + } + scope2 = instrumentation.Scope{ + Name: "otel/test/code/path2", + Version: "v0.2.2", SchemaURL: semconv.SchemaURL, } + scopeList = []instrumentation.Scope{scope, scope2} + pbScope = &cpb.InstrumentationScope{ - Name: "test/code/path", - Version: "v0.1.0", + Name: "otel/test/code/path1", + Version: "v0.1.1", + } + pbScope2 = &cpb.InstrumentationScope{ + Name: "otel/test/code/path2", + Version: "v0.2.2", } res = resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceName("test server"), - semconv.ServiceVersion("v0.1.0"), + semconv.ServiceName("service1"), + semconv.ServiceVersion("v0.1.1"), + ) + res2 = resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("service2"), + semconv.ServiceVersion("v0.2.2"), ) + resList = []*resource.Resource{res, res2} + pbRes = &rpb.Resource{ Attributes: []*cpb.KeyValue{ { Key: "service.name", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + Value: &cpb.AnyValue_StringValue{StringValue: "service1"}, + }, + }, + { + Key: "service.version", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.1"}, + }, + }, + }, + } + pbRes2 = &rpb.Resource{ + Attributes: []*cpb.KeyValue{ + { + Key: "service.name", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "service2"}, }, }, { Key: "service.version", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + Value: &cpb.AnyValue_StringValue{StringValue: "v0.2.2"}, }, }, }, @@ -105,75 +138,79 @@ var ( records = func() []log.Record { var out []log.Record - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevA, - SeverityText: "A", - Body: bodyA, - Attributes: []api.KeyValue{alice}, - TraceID: trace.TraceID(traceIDA), - SpanID: trace.SpanID(spanIDA), - TraceFlags: trace.TraceFlags(flagsA), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + for _, r := range resList { + for _, s := range scopeList { + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevC, + SeverityText: "C", + Body: bodyC, + Attributes: []api.KeyValue{tom}, + TraceID: trace.TraceID(traceIDC), + SpanID: trace.SpanID(spanIDC), + TraceFlags: trace.TraceFlags(flagsC), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevA, - SeverityText: "A", - Body: bodyA, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDA), - SpanID: trace.SpanID(spanIDA), - TraceFlags: trace.TraceFlags(flagsA), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevC, + SeverityText: "C", + Body: bodyC, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDC), + SpanID: trace.SpanID(spanIDC), + TraceFlags: trace.TraceFlags(flagsC), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{alice}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{tom}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: negativeTs, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: negativeTs, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) + } + } return out }() @@ -182,76 +219,90 @@ var ( { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevA, - SeverityText: "A", - Body: pbBodyA, - Attributes: []*cpb.KeyValue{pbAlice}, - Flags: uint32(flagsA), - TraceId: traceIDA, - SpanId: spanIDA, + SeverityNumber: pbSevC, + SeverityText: "C", + Body: pbBodyC, + Attributes: []*cpb.KeyValue{pbTom}, + Flags: uint32(flagsC), + TraceId: traceIDC, + SpanId: spanIDC, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevA, - SeverityText: "A", - Body: pbBodyA, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsA), - TraceId: traceIDA, - SpanId: spanIDA, + SeverityNumber: pbSevC, + SeverityText: "C", + Body: pbBodyC, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsC), + TraceId: traceIDC, + SpanId: spanIDC, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbAlice}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbTom}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, { TimeUnixNano: 0, ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, } - pbScopeLogs = &lpb.ScopeLogs{ - Scope: pbScope, - SchemaUrl: semconv.SchemaURL, - LogRecords: pbLogRecords, + pbScopeLogsList = []*lpb.ScopeLogs{ + { + Scope: pbScope, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + }, + { + Scope: pbScope2, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + }, } - pbResourceLogs = &lpb.ResourceLogs{ - Resource: pbRes, - SchemaUrl: semconv.SchemaURL, - ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs}, + pbResourceLogsList = []*lpb.ResourceLogs{ + { + Resource: pbRes, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: pbScopeLogsList, + }, + { + Resource: pbRes2, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: pbScopeLogsList, + }, } ) func TestResourceLogs(t *testing.T) { - want := []*lpb.ResourceLogs{pbResourceLogs} - assert.Equal(t, want, ResourceLogs(records)) + want := pbResourceLogsList + assert.ElementsMatch(t, want, ResourceLogs(records)) } func TestSeverityNumber(t *testing.T) { diff --git a/internal/shared/otlp/otlplog/transform/log.go.tmpl b/internal/shared/otlp/otlplog/transform/log.go.tmpl index a911450e29d..1ab95f93948 100644 --- a/internal/shared/otlp/otlplog/transform/log.go.tmpl +++ b/internal/shared/otlp/otlplog/transform/log.go.tmpl @@ -9,7 +9,6 @@ package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/transform" import ( - "sync" "time" cpb "go.opentelemetry.io/proto/otlp/common/v1" @@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs { return nil } - resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs) - defer func() { - clear(resMap) - resourceLogsMapPool.Put(resMap) - }() - resourceLogsMap(&resMap, records) + resMap := make(map[attribute.Distinct]*lpb.ResourceLogs) - out := make([]*lpb.ResourceLogs, 0, len(resMap)) - for _, rl := range resMap { - out = append(out, rl) + type key struct { + r attribute.Distinct + is instrumentation.Scope } - return out -} - -var resourceLogsMapPool = sync.Pool{ - New: func() any { - return make(map[attribute.Distinct]*lpb.ResourceLogs) - }, -} + scopeMap := make(map[key]*lpb.ScopeLogs) -func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) { + var resources int for _, r := range records { res := r.Resource() - rl, ok := (*dst)[res.Equivalent()] - if !ok { - rl = new(lpb.ResourceLogs) - if res.Len() > 0 { - rl.Resource = &rpb.Resource{ - Attributes: AttrIter(res.Iter()), - } - } - rl.SchemaUrl = res.SchemaURL() - (*dst)[res.Equivalent()] = rl - } - rl.ScopeLogs = ScopeLogs(records) - } -} - -// ScopeLogs returns a slice of OTLP ScopeLogs generated from records. -func ScopeLogs(records []log.Record) []*lpb.ScopeLogs { - scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs) - defer func() { - clear(scopeMap) - scopeLogsMapPool.Put(scopeMap) - }() - scopeLogsMap(&scopeMap, records) - - out := make([]*lpb.ScopeLogs, 0, len(scopeMap)) - for _, sl := range scopeMap { - out = append(out, sl) - } - return out -} - -var scopeLogsMapPool = sync.Pool{ - New: func() any { - return make(map[instrumentation.Scope]*lpb.ScopeLogs) - }, -} - -func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) { - for _, r := range records { + rKey := res.Equivalent() scope := r.InstrumentationScope() - sl, ok := (*dst)[scope] - if !ok { + k := key{ + r: rKey, + is: scope, + } + sl, iOk := scopeMap[k] + if !iOk { sl = new(lpb.ScopeLogs) var emptyScope instrumentation.Scope if scope != emptyScope { @@ -102,10 +55,34 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R } sl.SchemaUrl = scope.SchemaURL } - (*dst)[scope] = sl + scopeMap[k] = sl } + sl.LogRecords = append(sl.LogRecords, LogRecord(r)) + rl, rOk := resMap[rKey] + if !rOk { + resources++ + rl = new(lpb.ResourceLogs) + if res.Len() > 0 { + rl.Resource = &rpb.Resource{ + Attributes: AttrIter(res.Iter()), + } + } + rl.SchemaUrl = res.SchemaURL() + resMap[rKey] = rl + } + if !iOk { + rl.ScopeLogs = append(rl.ScopeLogs, sl) + } } + + // Transform the categorized map into a slice + resLogs := make([]*lpb.ResourceLogs, 0, resources) + for _, rl := range resMap { + resLogs = append(resLogs, rl) + } + + return resLogs } // LogRecord returns an OTLP LogRecord generated from record. diff --git a/internal/shared/otlp/otlplog/transform/log_test.go.tmpl b/internal/shared/otlp/otlplog/transform/log_test.go.tmpl index aaa2fad3dcc..c4623ea29b9 100644 --- a/internal/shared/otlp/otlplog/transform/log_test.go.tmpl +++ b/internal/shared/otlp/otlplog/transform/log_test.go.tmpl @@ -30,73 +30,106 @@ var ( ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0)) obs = ts.Add(30 * time.Second) + tom = api.String("user", "tom") + jerry = api.String("user", "jerry") // A time before unix 0. negativeTs = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC) - alice = api.String("user", "alice") - bob = api.String("user", "bob") - - pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "alice"}, + pbTom = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "tom"}, }} - pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "bob"}, + pbJerry = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "jerry"}, }} - sevA = api.SeverityInfo - sevB = api.SeverityError + sevC = api.SeverityInfo + sevD = api.SeverityError - pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO - pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR + pbSevC = lpb.SeverityNumber_SEVERITY_NUMBER_INFO + pbSevD = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR - bodyA = api.StringValue("a") - bodyB = api.StringValue("b") + bodyC = api.StringValue("c") + bodyD = api.StringValue("d") - pbBodyA = &cpb.AnyValue{ + pbBodyC = &cpb.AnyValue{ Value: &cpb.AnyValue_StringValue{ - StringValue: "a", + StringValue: "c", }, } - pbBodyB = &cpb.AnyValue{ + pbBodyD = &cpb.AnyValue{ Value: &cpb.AnyValue_StringValue{ - StringValue: "b", + StringValue: "d", }, } - spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1} - spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2} - traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} - traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} - flagsA = byte(1) - flagsB = byte(0) + spanIDC = []byte{0, 0, 0, 0, 0, 0, 0, 1} + spanIDD = []byte{0, 0, 0, 0, 0, 0, 0, 2} + traceIDC = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} + traceIDD = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2} + flagsC = byte(1) + flagsD = byte(0) scope = instrumentation.Scope{ - Name: "test/code/path", - Version: "v0.1.0", + Name: "otel/test/code/path1", + Version: "v0.1.1", + SchemaURL: semconv.SchemaURL, + } + scope2 = instrumentation.Scope{ + Name: "otel/test/code/path2", + Version: "v0.2.2", SchemaURL: semconv.SchemaURL, } + scopeList = []instrumentation.Scope{scope, scope2} + pbScope = &cpb.InstrumentationScope{ - Name: "test/code/path", - Version: "v0.1.0", + Name: "otel/test/code/path1", + Version: "v0.1.1", + } + pbScope2 = &cpb.InstrumentationScope{ + Name: "otel/test/code/path2", + Version: "v0.2.2", } res = resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceName("test server"), - semconv.ServiceVersion("v0.1.0"), + semconv.ServiceName("service1"), + semconv.ServiceVersion("v0.1.1"), + ) + res2 = resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("service2"), + semconv.ServiceVersion("v0.2.2"), ) + resList = []*resource.Resource{res, res2} + pbRes = &rpb.Resource{ Attributes: []*cpb.KeyValue{ { Key: "service.name", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "test server"}, + Value: &cpb.AnyValue_StringValue{StringValue: "service1"}, + }, + }, + { + Key: "service.version", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.1"}, + }, + }, + }, + } + pbRes2 = &rpb.Resource{ + Attributes: []*cpb.KeyValue{ + { + Key: "service.name", + Value: &cpb.AnyValue{ + Value: &cpb.AnyValue_StringValue{StringValue: "service2"}, }, }, { Key: "service.version", Value: &cpb.AnyValue{ - Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"}, + Value: &cpb.AnyValue_StringValue{StringValue: "v0.2.2"}, }, }, }, @@ -105,75 +138,79 @@ var ( records = func() []log.Record { var out []log.Record - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevA, - SeverityText: "A", - Body: bodyA, - Attributes: []api.KeyValue{alice}, - TraceID: trace.TraceID(traceIDA), - SpanID: trace.SpanID(spanIDA), - TraceFlags: trace.TraceFlags(flagsA), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + for _, r := range resList { + for _, s := range scopeList { + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevC, + SeverityText: "C", + Body: bodyC, + Attributes: []api.KeyValue{tom}, + TraceID: trace.TraceID(traceIDC), + SpanID: trace.SpanID(spanIDC), + TraceFlags: trace.TraceFlags(flagsC), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevA, - SeverityText: "A", - Body: bodyA, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDA), - SpanID: trace.SpanID(spanIDA), - TraceFlags: trace.TraceFlags(flagsA), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevC, + SeverityText: "C", + Body: bodyC, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDC), + SpanID: trace.SpanID(spanIDC), + TraceFlags: trace.TraceFlags(flagsC), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{alice}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{tom}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: ts, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) - out = append(out, logtest.RecordFactory{ - Timestamp: negativeTs, - ObservedTimestamp: obs, - Severity: sevB, - SeverityText: "B", - Body: bodyB, - Attributes: []api.KeyValue{bob}, - TraceID: trace.TraceID(traceIDB), - SpanID: trace.SpanID(spanIDB), - TraceFlags: trace.TraceFlags(flagsB), - InstrumentationScope: &scope, - Resource: res, - }.NewRecord()) + out = append(out, logtest.RecordFactory{ + Timestamp: negativeTs, + ObservedTimestamp: obs, + Severity: sevD, + SeverityText: "D", + Body: bodyD, + Attributes: []api.KeyValue{jerry}, + TraceID: trace.TraceID(traceIDD), + SpanID: trace.SpanID(spanIDD), + TraceFlags: trace.TraceFlags(flagsD), + InstrumentationScope: &s, + Resource: r, + }.NewRecord()) + } + } return out }() @@ -182,76 +219,90 @@ var ( { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevA, - SeverityText: "A", - Body: pbBodyA, - Attributes: []*cpb.KeyValue{pbAlice}, - Flags: uint32(flagsA), - TraceId: traceIDA, - SpanId: spanIDA, + SeverityNumber: pbSevC, + SeverityText: "C", + Body: pbBodyC, + Attributes: []*cpb.KeyValue{pbTom}, + Flags: uint32(flagsC), + TraceId: traceIDC, + SpanId: spanIDC, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevA, - SeverityText: "A", - Body: pbBodyA, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsA), - TraceId: traceIDA, - SpanId: spanIDA, + SeverityNumber: pbSevC, + SeverityText: "C", + Body: pbBodyC, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsC), + TraceId: traceIDC, + SpanId: spanIDC, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbAlice}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbTom}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, { TimeUnixNano: uint64(ts.UnixNano()), ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, { TimeUnixNano: 0, ObservedTimeUnixNano: uint64(obs.UnixNano()), - SeverityNumber: pbSevB, - SeverityText: "B", - Body: pbBodyB, - Attributes: []*cpb.KeyValue{pbBob}, - Flags: uint32(flagsB), - TraceId: traceIDB, - SpanId: spanIDB, + SeverityNumber: pbSevD, + SeverityText: "D", + Body: pbBodyD, + Attributes: []*cpb.KeyValue{pbJerry}, + Flags: uint32(flagsD), + TraceId: traceIDD, + SpanId: spanIDD, }, } - pbScopeLogs = &lpb.ScopeLogs{ - Scope: pbScope, - SchemaUrl: semconv.SchemaURL, - LogRecords: pbLogRecords, + pbScopeLogsList = []*lpb.ScopeLogs{ + { + Scope: pbScope, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + }, + { + Scope: pbScope2, + SchemaUrl: semconv.SchemaURL, + LogRecords: pbLogRecords, + }, } - pbResourceLogs = &lpb.ResourceLogs{ - Resource: pbRes, - SchemaUrl: semconv.SchemaURL, - ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs}, + pbResourceLogsList = []*lpb.ResourceLogs{ + { + Resource: pbRes, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: pbScopeLogsList, + }, + { + Resource: pbRes2, + SchemaUrl: semconv.SchemaURL, + ScopeLogs: pbScopeLogsList, + }, } ) func TestResourceLogs(t *testing.T) { - want := []*lpb.ResourceLogs{pbResourceLogs} - assert.Equal(t, want, ResourceLogs(records)) + want := pbResourceLogsList + assert.ElementsMatch(t, want, ResourceLogs(records)) } func TestSeverityNumber(t *testing.T) {