diff --git a/src/OpenTelemetry/Internal/CircularBuffer.cs b/src/OpenTelemetry/Internal/CircularBuffer.cs index 245fd653f74..d0b59631f7b 100644 --- a/src/OpenTelemetry/Internal/CircularBuffer.cs +++ b/src/OpenTelemetry/Internal/CircularBuffer.cs @@ -174,7 +174,7 @@ public T Read() var index = (int)(this.tail % this.Capacity); while (true) { - T value = this.trait[index]; + var value = this.trait[index]; if (value == null) { // If we got here it means a writer isn't done. diff --git a/src/OpenTelemetry/Internal/CircularBufferStruct.cs b/src/OpenTelemetry/Internal/CircularBufferStruct.cs new file mode 100644 index 00000000000..e174af24183 --- /dev/null +++ b/src/OpenTelemetry/Internal/CircularBufferStruct.cs @@ -0,0 +1,190 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace OpenTelemetry.Internal +{ + /// + /// Lock-free implementation of single-reader multi-writer circular buffer. + /// + /// The type of the underlying value. + internal class CircularBufferStruct + where T : struct + { + private readonly Trait[] trait; + private long head; + private long tail; + + /// + /// Initializes a new instance of the class. + /// + /// The capacity of the circular buffer, must be a positive integer. + public CircularBufferStruct(int capacity) + { + if (capacity <= 0) + { + throw new ArgumentOutOfRangeException(nameof(capacity)); + } + + this.Capacity = capacity; + this.trait = new Trait[capacity]; + } + + /// + /// Gets the capacity of the . + /// + public int Capacity { get; } + + /// + /// Gets the number of items contained in the . + /// + public int Count + { + get + { + var tailSnapshot = this.tail; + return (int)(this.head - tailSnapshot); + } + } + + /// + /// Gets the number of items added to the . + /// + public long AddedCount => this.head; + + /// + /// Gets the number of items removed from the . + /// + public long RemovedCount => this.tail; + + /// + /// Adds the specified item to the buffer. + /// + /// The value to add. + /// + /// Returns true if the item was added to the buffer successfully; + /// false if the buffer is full. + /// + public bool Add(T value) + { + while (true) + { + var tailSnapshot = this.tail; + var headSnapshot = this.head; + + if (headSnapshot - tailSnapshot >= this.Capacity) + { + return false; // buffer is full + } + + var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot); + if (head != headSnapshot) + { + continue; + } + + var index = (int)(head % this.Capacity); + this.trait[index].Value = value; + this.trait[index].IsReady = true; + return true; + } + } + + /// + /// Attempts to add the specified item to the buffer. + /// + /// The value to add. + /// The maximum allowed spin count, when set to a negative number or zero, will spin indefinitely. + /// + /// Returns true if the item was added to the buffer successfully; + /// false if the buffer is full or the spin count exceeded . + /// + public bool TryAdd(T value, int maxSpinCount) + { + if (maxSpinCount <= 0) + { + return this.Add(value); + } + + var spinCountDown = maxSpinCount; + + while (true) + { + var tailSnapshot = this.tail; + var headSnapshot = this.head; + + if (headSnapshot - tailSnapshot >= this.Capacity) + { + return false; // buffer is full + } + + var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot); + if (head != headSnapshot) + { + if (spinCountDown-- == 0) + { + return false; // exceeded maximum spin count + } + + continue; + } + + var index = (int)(head % this.Capacity); + this.trait[index].Value = value; + this.trait[index].IsReady = true; + return true; + } + } + + /// + /// Reads an item from the . + /// + /// + /// This function is not reentrant-safe, only one reader is allowed at any given time. + /// Warning: There is no bounds check in this method. Do not call unless you have verified Count > 0. + /// + /// Item read. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public T Read() + { + var index = (int)(this.tail % this.Capacity); + while (true) + { + if (!this.trait[index].IsReady) + { + // If we got here it means a writer isn't done. + continue; + } + + // TODO: we are doing an extra copy from the buffer, this can be optimized if Read() could take a callback + var value = this.trait[index].Value; + this.trait[index].IsReady = false; + this.trait[index].Value = default(T); + this.tail++; + return value; + } + } + + private struct Trait + { + internal bool IsReady; + internal T Value; + } + } +}