Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove thread contention from Activity Start/Stop #107333

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,105 +407,103 @@ internal void NotifyActivityAddException(Activity activity, Exception exception,
}
}

// SynchronizedList<T> is a helper collection which ensure thread safety on the collection
// and allow enumerating the collection items and execute some action on the enumerated item and can detect any change in the collection
// during the enumeration which force restarting the enumeration again.
// Caution: We can have the action executed on the same item more than once which is ok in our scenarios.
//this class uses Interlocked operations and a copy-on-write design to ensure thread safety
//all operations are thread safe
internal sealed class SynchronizedList<T>
{
private readonly List<T> _list;
private uint _version;

public SynchronizedList() => _list = new List<T>();
//This array must not be written to directly. Copy the array and then replace it with the new array.
private T[] _volatileArray;
public SynchronizedList() => _volatileArray = [];

public void Add(T item)
{
lock (_list)
while (true)
{
_list.Add(item);
_version++;
T[] local = _volatileArray;

if (TryAppendItemAndSwap(item, local))
{
return;
}
tarekgh marked this conversation as resolved.
Show resolved Hide resolved
else
{
//implicit continue
}
}
}

public bool AddIfNotExist(T item)
{
lock (_list)
while (true)
{
if (!_list.Contains(item))
T[] local = _volatileArray;

int index = Array.IndexOf(local, item);

if (index >= 0)
{
return false;
}

if (TryAppendItemAndSwap(item, local))
{
_list.Add(item);
_version++;
return true;
}
return false;
else
{
//implicit continue
}
}
}

public bool Remove(T item)
{
lock (_list)
while (true)
{
if (_list.Remove(item))
T[] local = _volatileArray;

int index = Array.IndexOf(local, item);

if (index < 0)
{
return false;
}

if (TryRemoveIndexAndSwap(index, local))
{
_version++;
return true;
}
return false;
else
{
//implicit continue
}
}
}

public int Count => _list.Count;

public void EnumWithFunc<TParent>(ActivitySource.Function<T, TParent> func, ref ActivityCreationOptions<TParent> data, ref ActivitySamplingResult samplingResult, ref ActivityCreationOptions<ActivityContext> dataWithContext)
public int Count
{
uint version = _version;
int index = 0;

while (index < _list.Count)
get
{
T item;
lock (_list)
{
if (version != _version)
{
version = _version;
index = 0;
continue;
}
T[] localArray = Volatile.Read(ref _volatileArray);

item = _list[index];
index++;
}
return localArray.Length;
}
}

// Important to call the func outside the lock.
// This is the whole point we are having this wrapper class.
public void EnumWithFunc<TParent>(ActivitySource.Function<T, TParent> func, ref ActivityCreationOptions<TParent> data, ref ActivitySamplingResult samplingResult, ref ActivityCreationOptions<ActivityContext> dataWithContext)
{
T[] localArray = Volatile.Read(ref _volatileArray);
foreach (T item in localArray)
{
func(item, ref data, ref samplingResult, ref dataWithContext);
}
}

public void EnumWithAction(Action<T, object> action, object arg)
{
uint version = _version;
int index = 0;

while (index < _list.Count)
T[] localArray = Volatile.Read(ref _volatileArray);
foreach (T item in localArray)
{
T item;
lock (_list)
{
if (version != _version)
{
version = _version;
index = 0;
continue;
}

item = _list[index];
index++;
}

// Important to call the action outside the lock.
// This is the whole point we are having this wrapper class.
action(item, arg);
}
}
Expand All @@ -517,29 +515,35 @@ public void EnumWithExceptionNotification(Activity activity, Exception exception
return;
}

uint version = _version;
int index = 0;

while (index < _list.Count)
T[] localArray = Volatile.Read(ref _volatileArray);
foreach (T item in localArray)
{
T item;
lock (_list)
{
if (version != _version)
{
version = _version;
index = 0;
continue;
}

item = _list[index];
index++;
}

// Important to notify outside the lock.
// This is the whole point we are having this wrapper class.
(item as ActivityListener)!.ExceptionRecorder?.Invoke(activity, exception, ref tags);
}
}

private bool TryAppendItemAndSwap(T item, T[] localCopy)
{
T[] newArray = new T[localCopy.Length + 1];

Array.Copy(localCopy, newArray, localCopy.Length);//copy existing items
newArray[localCopy.Length] = item;//copy new item

return Interlocked.CompareExchange(ref _volatileArray, newArray, localCopy) == localCopy;
}

private bool TryRemoveIndexAndSwap(int index, T[] localCopy)
{
T[] newArray = new T[localCopy.Length - 1];

Array.Copy(localCopy, newArray, index);//copy existing items before index

Array.Copy(
localCopy, index + 1, //position after the index, skipping it
newArray, index, localCopy.Length - index - 1//remaining items accounting for removed item
);

return Interlocked.CompareExchange(ref _volatileArray, newArray, localCopy) == localCopy;
}
}
}
Loading