Skip to content

Commit

Permalink
Refactor exporter - step 4 (#1085)
Browse files Browse the repository at this point in the history
* add a circular buffer

* add memory barrier

* ++

* clean up

Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
  • Loading branch information
reyang and CodeBlanch authored Aug 16, 2020
1 parent 4895c59 commit 90c370f
Showing 1 changed file with 135 additions and 0 deletions.
135 changes: 135 additions & 0 deletions src/OpenTelemetry/Internal/CircularBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// <copyright file="CircularBuffer.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;

namespace OpenTelemetry.Internal
{
/// <summary>
/// Lock free implementation of single-reader multi-writer circular buffer.
/// </summary>
/// <typeparam name="T">The type of the underlying value.</typeparam>
internal class CircularBuffer<T>
where T : class
{
private readonly int capacity;
private readonly T[] trait;
private long head = 0;
private long tail = 0;

public CircularBuffer(int capacity)
{
if (capacity <= 0)
{
throw new ArgumentOutOfRangeException(nameof(capacity));
}

this.capacity = capacity;
this.trait = new T[capacity];
}

public int Capacity
{
get
{
return this.capacity;
}
}

/// <summary>
/// Gets the number of items contained in the <see cref="CircularBuffer{T}"/>.
/// </summary>
public int Count
{
get
{
var tailSnapshot = this.tail;
return (int)(this.head - tailSnapshot);
}
}

/// <summary>
/// Attempts to add the specified item to the buffer.
/// </summary>
/// <param name="value">The value to add.</param>
/// <returns>Returns true if the item was added to the buffer successfully; false if the buffer is full.</returns>
public bool TryAdd(T value)
{
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}

while (true)
{
var tailSnapshot = this.tail;
var headSnapshot = this.head;

if (headSnapshot - tailSnapshot >= this.capacity)
{
return false; // buffer is full
}

var index = (int)(headSnapshot % this.capacity);

if (this.SwapIfNull(index, value))
{
if (Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot) == headSnapshot)
{
return true;
}

this.trait[index] = null;
}
}
}

public IEnumerable<T> Consume(int count)
{
if (count <= 0)
{
yield break;
}

count = Math.Min(count, this.Count);

for (int i = 0; i < count; i++)
{
var index = (int)(this.tail % this.capacity);
var value = this.trait[index];
this.trait[index] = null;
this.tail++;
yield return value;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool CompareAndSwap(int index, T value, T comparand)
{
var result = Interlocked.CompareExchange(ref this.trait[index], value, comparand);
return object.ReferenceEquals(result, comparand);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool SwapIfNull(int index, T value)
{
return this.CompareAndSwap(index, value, null);
}
}
}

0 comments on commit 90c370f

Please sign in to comment.