Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove thread contention from Activity Start/Stop #107333

Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,105 +407,94 @@ internal void NotifyActivityAddException(Activity activity, Exception exception,
}
}

// SynchronizedList<T> is a helper collection which ensure thread safety on the collection
// and allow enumerating the collection items and execute some action on the enumerated item and can detect any change in the collection
// during the enumeration which force restarting the enumeration again.
// Caution: We can have the action executed on the same item more than once which is ok in our scenarios.
//this class uses a copy-on-write design to ensure thread safety all operations are thread safe.
tarekgh marked this conversation as resolved.
Show resolved Hide resolved
//However, it is possible for read-only operations to see stale versions of the item while a change
//is occurring. This should not be a practical issue if updates are infrequent
tarekgh marked this conversation as resolved.
Show resolved Hide resolved
internal sealed class SynchronizedList<T>
{
private readonly List<T> _list;
private uint _version;

public SynchronizedList() => _list = new List<T>();
private readonly object _writeLock;
//This array must not be mutated directly. To mutate, obtain the lock, copy the array and then replace it with the new array.
private T[] _volatileArray;
public SynchronizedList()
{
_volatileArray = [];
_writeLock = new();
}

public void Add(T item)
{
lock (_list)
lock (_writeLock)
tarekgh marked this conversation as resolved.
Show resolved Hide resolved
{
_list.Add(item);
_version++;
T[] newArray = new T[_volatileArray.Length + 1];

Array.Copy(_volatileArray, newArray, _volatileArray.Length);//copy existing items
tarekgh marked this conversation as resolved.
Show resolved Hide resolved
newArray[_volatileArray.Length] = item;//copy new item

_volatileArray = newArray;
}
}

public bool AddIfNotExist(T item)
{
lock (_list)
lock (_writeLock)
{
if (!_list.Contains(item))
int index = Array.IndexOf(_volatileArray, item);

if (index >= 0)
{
_list.Add(item);
_version++;
return true;
return false;
}
return false;

T[] newArray = new T[_volatileArray.Length + 1];

Array.Copy(_volatileArray, newArray, _volatileArray.Length);//copy existing items
newArray[_volatileArray.Length] = item;//copy new item

_volatileArray = newArray;

return true;
}
}

public bool Remove(T item)
{
lock (_list)
lock (_writeLock)
{
if (_list.Remove(item))
int index = Array.IndexOf(_volatileArray, item);

if (index < 0)
{
_version++;
return true;
return false;
}
return false;

T[] newArray = new T[_volatileArray.Length - 1];

Array.Copy(_volatileArray, newArray, index);//copy existing items before index

Array.Copy(
_volatileArray, index + 1, //position after the index, skipping it
newArray, index, _volatileArray.Length - index - 1//remaining items accounting for removed item
);

_volatileArray = newArray;
return true;
}
}

public int Count => _list.Count;
public int Count => _volatileArray.Length;

public void EnumWithFunc<TParent>(ActivitySource.Function<T, TParent> func, ref ActivityCreationOptions<TParent> data, ref ActivitySamplingResult samplingResult, ref ActivityCreationOptions<ActivityContext> dataWithContext)
{
uint version = _version;
int index = 0;

while (index < _list.Count)
foreach (T item in _volatileArray)
{
T item;
lock (_list)
{
if (version != _version)
{
version = _version;
index = 0;
continue;
}

item = _list[index];
index++;
}

// Important to call the func outside the lock.
// This is the whole point we are having this wrapper class.
func(item, ref data, ref samplingResult, ref dataWithContext);
}
}

public void EnumWithAction(Action<T, object> action, object arg)
{
uint version = _version;
int index = 0;

while (index < _list.Count)
foreach (T item in _volatileArray)
{
T item;
lock (_list)
{
if (version != _version)
{
version = _version;
index = 0;
continue;
}

item = _list[index];
index++;
}

// Important to call the action outside the lock.
// This is the whole point we are having this wrapper class.
action(item, arg);
}
}
Expand All @@ -517,27 +506,8 @@ public void EnumWithExceptionNotification(Activity activity, Exception exception
return;
}

uint version = _version;
int index = 0;

while (index < _list.Count)
foreach (T item in _volatileArray)
{
T item;
lock (_list)
{
if (version != _version)
{
version = _version;
index = 0;
continue;
}

item = _list[index];
index++;
}

// Important to notify outside the lock.
// This is the whole point we are having this wrapper class.
(item as ActivityListener)!.ExceptionRecorder?.Invoke(activity, exception, ref tags);
}
}
Expand Down
Loading