-
Notifications
You must be signed in to change notification settings - Fork 772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a value type version of CircularBuffer #1325
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
// <copyright file="CircularBufferStruct.cs" company="OpenTelemetry Authors"> | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// </copyright> | ||
|
||
using System; | ||
using System.Runtime.CompilerServices; | ||
using System.Threading; | ||
|
||
namespace OpenTelemetry.Internal | ||
{ | ||
/// <summary> | ||
/// Lock-free implementation of single-reader multi-writer circular buffer. | ||
/// </summary> | ||
/// <typeparam name="T">The type of the underlying value.</typeparam> | ||
internal class CircularBufferStruct<T> | ||
where T : struct | ||
{ | ||
private readonly Trait[] trait; | ||
private long head; | ||
private long tail; | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="CircularBufferStruct{T}"/> class. | ||
/// </summary> | ||
/// <param name="capacity">The capacity of the circular buffer, must be a positive integer.</param> | ||
public CircularBufferStruct(int capacity) | ||
{ | ||
if (capacity <= 0) | ||
{ | ||
throw new ArgumentOutOfRangeException(nameof(capacity)); | ||
} | ||
|
||
this.Capacity = capacity; | ||
this.trait = new Trait[capacity]; | ||
} | ||
|
||
/// <summary> | ||
/// Gets the capacity of the <see cref="CircularBufferStruct{T}"/>. | ||
/// </summary> | ||
public int Capacity { get; } | ||
|
||
/// <summary> | ||
/// Gets the number of items contained in the <see cref="CircularBufferStruct{T}"/>. | ||
/// </summary> | ||
public int Count | ||
{ | ||
get | ||
{ | ||
var tailSnapshot = this.tail; | ||
return (int)(this.head - tailSnapshot); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Gets the number of items added to the <see cref="CircularBufferStruct{T}"/>. | ||
/// </summary> | ||
public long AddedCount => this.head; | ||
|
||
/// <summary> | ||
/// Gets the number of items removed from the <see cref="CircularBufferStruct{T}"/>. | ||
/// </summary> | ||
public long RemovedCount => this.tail; | ||
|
||
/// <summary> | ||
/// Adds the specified item to the buffer. | ||
/// </summary> | ||
/// <param name="value">The value to add.</param> | ||
/// <returns> | ||
/// Returns <c>true</c> if the item was added to the buffer successfully; | ||
/// <c>false</c> if the buffer is full. | ||
/// </returns> | ||
public bool Add(T value) | ||
{ | ||
while (true) | ||
{ | ||
var tailSnapshot = this.tail; | ||
var headSnapshot = this.head; | ||
|
||
if (headSnapshot - tailSnapshot >= this.Capacity) | ||
{ | ||
return false; // buffer is full | ||
} | ||
|
||
var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot); | ||
if (head != headSnapshot) | ||
{ | ||
continue; | ||
} | ||
|
||
var index = (int)(head % this.Capacity); | ||
this.trait[index].Value = value; | ||
this.trait[index].IsReady = true; | ||
return true; | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Attempts to add the specified item to the buffer. | ||
/// </summary> | ||
/// <param name="value">The value to add.</param> | ||
/// <param name="maxSpinCount">The maximum allowed spin count, when set to a negative number or zero, will spin indefinitely.</param> | ||
/// <returns> | ||
/// Returns <c>true</c> if the item was added to the buffer successfully; | ||
/// <c>false</c> if the buffer is full or the spin count exceeded <paramref name="maxSpinCount"/>. | ||
/// </returns> | ||
public bool TryAdd(T value, int maxSpinCount) | ||
{ | ||
if (maxSpinCount <= 0) | ||
{ | ||
return this.Add(value); | ||
} | ||
|
||
var spinCountDown = maxSpinCount; | ||
|
||
while (true) | ||
{ | ||
var tailSnapshot = this.tail; | ||
var headSnapshot = this.head; | ||
|
||
if (headSnapshot - tailSnapshot >= this.Capacity) | ||
{ | ||
return false; // buffer is full | ||
} | ||
|
||
var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot); | ||
if (head != headSnapshot) | ||
{ | ||
if (spinCountDown-- == 0) | ||
{ | ||
return false; // exceeded maximum spin count | ||
} | ||
|
||
continue; | ||
} | ||
|
||
var index = (int)(head % this.Capacity); | ||
this.trait[index].Value = value; | ||
this.trait[index].IsReady = true; | ||
return true; | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Reads an item from the <see cref="CircularBufferStruct{T}"/>. | ||
/// </summary> | ||
/// <remarks> | ||
/// This function is not reentrant-safe, only one reader is allowed at any given time. | ||
/// Warning: There is no bounds check in this method. Do not call unless you have verified Count > 0. | ||
/// </remarks> | ||
/// <returns>Item read.</returns> | ||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
public T Read() | ||
{ | ||
var index = (int)(this.tail % this.Capacity); | ||
while (true) | ||
{ | ||
if (!this.trait[index].IsReady) | ||
{ | ||
// If we got here it means a writer isn't done. | ||
continue; | ||
} | ||
|
||
// TODO: we are doing an extra copy from the buffer, this can be optimized if Read() could take a callback | ||
var value = this.trait[index].Value; | ||
this.trait[index].IsReady = false; | ||
this.trait[index].Value = default(T); | ||
this.tail++; | ||
return value; | ||
} | ||
} | ||
|
||
private struct Trait | ||
{ | ||
internal bool IsReady; | ||
internal T Value; | ||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would help to use ref return here? Something like this...
If T is readonly struct, compiler might be smart enough to do that automatically.
For max perf this might also work...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once
this.tail++
happened, other writer threads have the freedom to overwrite the struct value, which will cause race condition if the reader hasn't finished consuming the struct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, not sure. This is fringe stuff I only rarely get a chance to use 😄 I had a mistake in the first snippet, updated now. I removed a ref that shouldn't have been in there. The first snippet I think is safe. It makes a copy from the array, and then returns that copy by ref to the caller. At best, saves one copy. Second snippet might have a race. There are strict rules with ref locals, caller has to also use it as a local. So it will be extremely short-lived, but yes there could be an issue if the buffer loops around really quickly. Maybe we go as-is and then we can try to perf hack it while you continue on with the real effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bet the 1st one is covered by NRV (named return value) optimization 😄