diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index 7beebfbd95b..b4ddd08cf17 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -71,7 +71,11 @@ public void Dispose() T item = this.circularBuffer.Read(); if (typeof(T) == typeof(LogRecord)) { - LogRecordSharedPool.Current.Return((LogRecord)(object)item); + var logRecord = (LogRecord)(object)item; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } } } } @@ -134,7 +138,11 @@ public struct Enumerator : IEnumerator if (currentItem != null) { - LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem); + var logRecord = (LogRecord)(object)currentItem; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } } if (circularBuffer!.RemovedCount < enumerator.targetCount) @@ -215,7 +223,12 @@ public void Dispose() var currentItem = this.current; if (currentItem != null) { - LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem); + var logRecord = (LogRecord)(object)currentItem; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } + this.current = null; } } diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 3809ccee889..4a284473573 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -13,6 +13,12 @@ state for cumulative temporality. [#5230](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5230) +* Fixed an issue causing `LogRecord`s to be incorrectly reused when wrapping an + instance of `BatchLogRecordExportProcessor` inside another + `BaseProcessor` which leads to missing or incorrect data during + export. + [#5255](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5255) + ## 1.7.0 Released 2023-Dec-08 diff --git a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs index f62758dd793..c1e341585a8 100644 --- a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs @@ -42,13 +42,27 @@ public override void OnEnd(LogRecord data) // happen here. Debug.Assert(data != null, "LogRecord was null."); - data!.Buffer(); + switch (data!.Source) + { + case LogRecord.LogRecordSource.FromSharedPool: + data.Buffer(); + data.AddReference(); + if (!this.TryExport(data)) + { + LogRecordSharedPool.Current.Return(data); + } - data.AddReference(); + break; + case LogRecord.LogRecordSource.CreatedManually: + data.Buffer(); + this.TryExport(data); + break; + default: + Debug.Assert(data.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "LogRecord source was something unexpected"); - if (!this.TryExport(data)) - { - LogRecordSharedPool.Current.Return(data); + // Note: If we are using ThreadStatic pool we make a copy of the record. + this.TryExport(data.Copy()); + break; } } } diff --git a/src/OpenTelemetry/Logs/LogRecord.cs b/src/OpenTelemetry/Logs/LogRecord.cs index 877ab83d1dd..f98363ab766 100644 --- a/src/OpenTelemetry/Logs/LogRecord.cs +++ b/src/OpenTelemetry/Logs/LogRecord.cs @@ -21,6 +21,7 @@ public sealed class LogRecord internal IReadOnlyList>? AttributeData; internal List>? AttributeStorage; internal List? ScopeStorage; + internal LogRecordSource Source = LogRecordSource.CreatedManually; internal int PoolReferenceCount = int.MaxValue; private static readonly Action> AddScopeToBufferedList = (object? scope, List state) => @@ -80,6 +81,24 @@ internal LogRecord( } } + internal enum LogRecordSource + { + /// + /// A created manually. + /// + CreatedManually, + + /// + /// A rented from the . + /// + FromThreadStaticPool, + + /// + /// A rented from the . + /// + FromSharedPool, + } + /// /// Gets or sets the log timestamp. /// diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs index ccaa47678a9..f5daa5a9904 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using OpenTelemetry.Internal; @@ -17,7 +18,7 @@ internal sealed class LogRecordSharedPool : ILogRecordPool private long rentIndex; private long returnIndex; - public LogRecordSharedPool(int capacity) + private LogRecordSharedPool(int capacity) { this.Capacity = capacity; this.pool = new LogRecord?[capacity]; @@ -54,18 +55,24 @@ public LogRecord Rent() continue; } + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool"); logRecord.ResetReferenceCount(); return logRecord; } } - var newLogRecord = new LogRecord(); + var newLogRecord = new LogRecord() + { + Source = LogRecord.LogRecordSource.FromSharedPool, + }; newLogRecord.ResetReferenceCount(); return newLogRecord; } public void Return(LogRecord logRecord) { + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool"); + if (logRecord.RemoveReference() != 0) { return; diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs index eba6aad18a3..8763cf8679d 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Diagnostics; + namespace OpenTelemetry.Logs; internal sealed class LogRecordThreadStaticPool : ILogRecordPool @@ -19,15 +21,23 @@ public LogRecord Rent() var logRecord = Storage; if (logRecord != null) { + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool"); Storage = null; - return logRecord; + } + else + { + logRecord = new() + { + Source = LogRecord.LogRecordSource.FromThreadStaticPool, + }; } - return new(); + return logRecord; } public void Return(LogRecord logRecord) { + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool"); if (Storage == null) { LogRecordPoolHelper.Clear(logRecord); diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs index c19997c5761..d285302fcfd 100644 --- a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -23,7 +23,9 @@ public void StateValuesAndScopeBufferingTest() using var scope = scopeProvider.Push(exportedItems); - var logRecord = new LogRecord(); + var pool = LogRecordSharedPool.Current; + + var logRecord = pool.Rent(); var state = new LogRecordTest.DisposingState("Hello world"); @@ -60,6 +62,7 @@ public void StateValuesAndScopeBufferingTest() processor.Shutdown(); Assert.Single(exportedItems); + Assert.Same(logRecord, exportedItems[0]); } [Fact] @@ -74,7 +77,9 @@ public void StateBufferingTest() using var processor = new BatchLogRecordExportProcessor( new InMemoryExporter(exportedItems)); - var logRecord = new LogRecord(); + var pool = LogRecordSharedPool.Current; + + var logRecord = pool.Rent(); var state = new LogRecordTest.DisposingState("Hello world"); logRecord.State = state; @@ -82,6 +87,9 @@ public void StateBufferingTest() processor.OnEnd(logRecord); processor.Shutdown(); + Assert.Single(exportedItems); + Assert.Same(logRecord, exportedItems[0]); + state.Dispose(); Assert.Throws(() => @@ -93,5 +101,41 @@ public void StateBufferingTest() } }); } + + [Fact] + public void CopyMadeWhenLogRecordIsFromThreadStaticPoolTest() + { + List exportedItems = new(); + + using var processor = new BatchLogRecordExportProcessor( + new InMemoryExporter(exportedItems)); + + var pool = LogRecordThreadStaticPool.Instance; + + var logRecord = pool.Rent(); + + processor.OnEnd(logRecord); + processor.Shutdown(); + + Assert.Single(exportedItems); + Assert.NotSame(logRecord, exportedItems[0]); + } + + [Fact] + public void LogRecordAddedToBatchIfNotFromAnyPoolTest() + { + List exportedItems = new(); + + using var processor = new BatchLogRecordExportProcessor( + new InMemoryExporter(exportedItems)); + + var logRecord = new LogRecord(); + + processor.OnEnd(logRecord); + processor.Shutdown(); + + Assert.Single(exportedItems); + Assert.Same(logRecord, exportedItems[0]); + } } #endif diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs index 69aa4a958be..d4be69397b1 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs @@ -44,20 +44,23 @@ public void RentReturnTests() Assert.Equal(1, pool.Count); - // Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue. - LogRecord manualRecord = new(); - Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount); - pool.Return(manualRecord); + var logRecordWithReferencesAdded = pool.Rent(); - Assert.Equal(1, pool.Count); + // Note: This record won't be returned to the pool because we add a reference to it. + logRecordWithReferencesAdded.AddReference(); + + Assert.Equal(2, logRecordWithReferencesAdded.PoolReferenceCount); + pool.Return(logRecordWithReferencesAdded); + + Assert.Equal(0, pool.Count); pool.Return(logRecord2); - Assert.Equal(2, pool.Count); + Assert.Equal(1, pool.Count); logRecord1 = pool.Rent(); Assert.NotNull(logRecord1); - Assert.Equal(1, pool.Count); + Assert.Equal(0, pool.Count); logRecord2 = pool.Rent(); Assert.NotNull(logRecord2); @@ -70,7 +73,7 @@ public void RentReturnTests() pool.Return(logRecord1); pool.Return(logRecord2); - pool.Return(logRecord3); + pool.Return(logRecord3); // <- Discarded due to pool size of 2 pool.Return(logRecord4); // <- Discarded due to pool size of 2 Assert.Equal(2, pool.Count); @@ -163,7 +166,7 @@ public async Task ExportTest(bool warmup) { for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++) { - pool.Return(new LogRecord { PoolReferenceCount = 1 }); + pool.Return(new LogRecord { Source = LogRecord.LogRecordSource.FromSharedPool, PoolReferenceCount = 1 }); } } diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs index ff337bccd4f..59c0b53454e 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs @@ -22,16 +22,17 @@ public void RentReturnTests() Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); - LogRecordThreadStaticPool.Instance.Return(new()); + // Note: This record will be ignored because there is already something in the ThreadStatic storage. + LogRecordThreadStaticPool.Instance.Return(new() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }); Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); LogRecordThreadStaticPool.Storage = null; - var manual = new LogRecord(); - LogRecordThreadStaticPool.Instance.Return(manual); + var newLogRecord = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }; + LogRecordThreadStaticPool.Instance.Return(newLogRecord); Assert.NotNull(LogRecordThreadStaticPool.Storage); - Assert.Equal(manual, LogRecordThreadStaticPool.Storage); + Assert.Equal(newLogRecord, LogRecordThreadStaticPool.Storage); } [Fact]