Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Improve and fix CompositeDisposable #505

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 212 additions & 65 deletions Rx.NET/Source/src/System.Reactive/Disposables/CompositeDisposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace System.Reactive.Disposables
{
Expand All @@ -21,6 +22,10 @@ public sealed class CompositeDisposable : ICollection<IDisposable>, ICancelable
private int _count;
private const int SHRINK_THRESHOLD = 64;

// Default initial capacity of the _disposables list in case
// The number of items is not known upfront
private const int DEFAULT_CAPACITY = 16;

/// <summary>
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class with no disposables contained by it initially.
/// </summary>
Expand Down Expand Up @@ -49,8 +54,13 @@ public CompositeDisposable(int capacity)
/// <exception cref="ArgumentNullException"><paramref name="disposables"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Any of the disposables in the <paramref name="disposables"/> collection is <c>null</c>.</exception>
public CompositeDisposable(params IDisposable[] disposables)
: this((IEnumerable<IDisposable>)disposables)
{
if (disposables == null)
{
throw new ArgumentNullException(nameof(disposables));
}

Init(disposables, disposables.Length);
}

/// <summary>
Expand All @@ -64,21 +74,49 @@ public CompositeDisposable(IEnumerable<IDisposable> disposables)
if (disposables == null)
throw new ArgumentNullException(nameof(disposables));

_disposables = new List<IDisposable>(disposables);
// If the disposables is a collection, get its size
// and use it as a capacity hint for the copy.
if (disposables is ICollection<IDisposable> c)
{
Init(disposables, c.Count);
}
else
{
// Unknown sized disposables, use the default capacity hint
Init(disposables, DEFAULT_CAPACITY);
}
}

//
// Doing this on the list to avoid duplicate enumeration of disposables.
//
if (_disposables.Contains(null))
throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, nameof(disposables));
/// <summary>
/// Initialize the inner disposable list and count fields.
/// </summary>
/// <param name="disposables">The enumerable sequence of disposables.</param>
/// <param name="capacityHint">The number of items expected from <paramref name="disposables"/></param>
private void Init(IEnumerable<IDisposable> disposables, int capacityHint)
{
var list = new List<IDisposable>(capacityHint);

_count = _disposables.Count;
// do the copy and null-check in one step to avoid a
// second loop for just checking for null items
foreach (var d in disposables)
{
if (d == null)
{
throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, nameof(disposables));
}
list.Add(d);
}

_disposables = list;
// _count can be read by other threads and thus should be properly visible
// also releases the _disposables contents so it becomes thread-safe
Volatile.Write(ref _count, _disposables.Count());
}

/// <summary>
/// Gets the number of disposables contained in the <see cref="CompositeDisposable"/>.
/// </summary>
public int Count => _count;
public int Count => Volatile.Read(ref _count);

/// <summary>
/// Adds a disposable to the <see cref="CompositeDisposable"/> or disposes the disposable if the <see cref="CompositeDisposable"/> is disposed.
Expand All @@ -90,21 +128,20 @@ public void Add(IDisposable item)
if (item == null)
throw new ArgumentNullException(nameof(item));

var shouldDispose = false;
lock (_gate)
{
shouldDispose = _disposed;
if (!_disposed)
{
_disposables.Add(item);
_count++;
// If read atomically outside the lock, it should be written atomically inside
// the plain read on _count is fine here because manipulation always happens
// from inside a lock.
Volatile.Write(ref _count, _count + 1);
return;
}
}

if (shouldDispose)
{
item.Dispose();
}
item.Dispose();
}

/// <summary>
Expand All @@ -118,65 +155,80 @@ public bool Remove(IDisposable item)
if (item == null)
throw new ArgumentNullException(nameof(item));

var shouldDispose = false;

lock (_gate)
{
if (!_disposed)
// this composite was already disposed and if the item was in there
// it has been already removed/disposed
if (_disposed)
{
//
// List<T> doesn't shrink the size of the underlying array but does collapse the array
// by copying the tail one position to the left of the removal index. We don't need
// index-based lookup but only ordering for sequential disposal. So, instead of spending
// cycles on the Array.Copy imposed by Remove, we use a null sentinel value. We also
// do manual Swiss cheese detection to shrink the list if there's a lot of holes in it.
//
var i = _disposables.IndexOf(item);
if (i >= 0)
{
shouldDispose = true;
_disposables[i] = null;
_count--;
return false;
}

//
// List<T> doesn't shrink the size of the underlying array but does collapse the array
// by copying the tail one position to the left of the removal index. We don't need
// index-based lookup but only ordering for sequential disposal. So, instead of spending
// cycles on the Array.Copy imposed by Remove, we use a null sentinel value. We also
// do manual Swiss cheese detection to shrink the list if there's a lot of holes in it.
//

if (_disposables.Capacity > SHRINK_THRESHOLD && _count < _disposables.Capacity / 2)
// read fields as infrequently as possible
var current = _disposables;

var i = current.IndexOf(item);
if (i < 0)
{
// not found, just return
return false;
}

current[i] = null;

if (current.Capacity > SHRINK_THRESHOLD && _count < current.Capacity / 2)
{
var fresh = new List<IDisposable>(current.Capacity / 2);

foreach (var d in current)
{
if (d != null)
{
var old = _disposables;
_disposables = new List<IDisposable>(_disposables.Capacity / 2);

foreach (var d in old)
{
if (d != null)
{
_disposables.Add(d);
}
}
fresh.Add(d);
}
}

_disposables = fresh;
}
}

if (shouldDispose)
{
item.Dispose();
// make sure the Count property sees an atomic update
Volatile.Write(ref _count, _count - 1);
}

return shouldDispose;
// if we get here, the item was found and removed from the list
// just dispose it and report success

item.Dispose();

return true;
}

/// <summary>
/// Disposes all disposables in the group and removes them from the group.
/// </summary>
public void Dispose()
{
var currentDisposables = default(IDisposable[]);
var currentDisposables = default(List<IDisposable>);
lock (_gate)
{
if (!_disposed)
{
_disposed = true;
currentDisposables = _disposables.ToArray();
_disposables.Clear();
_count = 0;
currentDisposables = _disposables;
// nulling out the reference is faster no risk to
// future Add/Remove because _disposed will be true
// and thus _disposables won't be touched again.
_disposables = null;

Volatile.Write(ref _count, 0);
Volatile.Write(ref _disposed, true);
}
}

Expand All @@ -194,15 +246,24 @@ public void Dispose()
/// </summary>
public void Clear()
{
var currentDisposables = default(IDisposable[]);
var previousDisposables = default(IDisposable[]);
lock (_gate)
{
currentDisposables = _disposables.ToArray();
_disposables.Clear();
_count = 0;
// disposed composites are always clear
if (_disposed)
{
return;
}

var current = _disposables;

previousDisposables = current.ToArray();
current.Clear();

Volatile.Write(ref _count, 0);
}

foreach (var d in currentDisposables)
foreach (var d in previousDisposables)
{
d?.Dispose();
}
Expand All @@ -221,6 +282,10 @@ public bool Contains(IDisposable item)

lock (_gate)
{
if (_disposed)
{
return false;
}
return _disposables.Contains(item);
}
}
Expand All @@ -241,7 +306,26 @@ public void CopyTo(IDisposable[] array, int arrayIndex)

lock (_gate)
{
Array.Copy(_disposables.Where(d => d != null).ToArray(), 0, array, arrayIndex, array.Length - arrayIndex);
// disposed composites are always empty
if (_disposed)
{
return;
}

if (arrayIndex + _count > array.Length)
{
// there is not enough space beyond arrayIndex
// to accomodate all _count disposables in this composite
throw new ArgumentOutOfRangeException(nameof(arrayIndex));
}
var i = arrayIndex;
foreach (var d in _disposables)
{
if (d != null)
{
array[i++] = d;
}
}
}
}

Expand All @@ -256,14 +340,16 @@ public void CopyTo(IDisposable[] array, int arrayIndex)
/// <returns>An enumerator to iterate over the disposables.</returns>
public IEnumerator<IDisposable> GetEnumerator()
{
var res = default(IEnumerable<IDisposable>);

lock (_gate)
{
res = _disposables.Where(d => d != null).ToList();
if (_disposed || _count == 0)
{
return EMPTY_ENUMERATOR;
}
// the copy is unavoidable but the creation
// of an outer IEnumerable is avoidable
return new CompositeEnumerator(_disposables.ToArray());
}

return res.GetEnumerator();
}

/// <summary>
Expand All @@ -275,6 +361,67 @@ public IEnumerator<IDisposable> GetEnumerator()
/// <summary>
/// Gets a value that indicates whether the object is disposed.
/// </summary>
public bool IsDisposed => _disposed;
public bool IsDisposed => Volatile.Read(ref _disposed);

/// <summary>
/// An empty enumerator for the <see cref="GetEnumerator"/>
/// method to avoid allocation on disposed or empty composites.
/// </summary>
static readonly CompositeEnumerator EMPTY_ENUMERATOR =
new CompositeEnumerator(new IDisposable[0]);

/// <summary>
/// An enumerator for an array of disposables.
/// </summary>
sealed class CompositeEnumerator : IEnumerator<IDisposable>
{
readonly IDisposable[] disposables;

int index;

public CompositeEnumerator(IDisposable[] disposables)
{
this.disposables = disposables;
this.index = -1;
}

public IDisposable Current => disposables[index];

object IEnumerator.Current => disposables[index];

public void Dispose()
{
// Avoid retention of the referenced disposables
// beyond the lifecycle of the enumerator.
// Not sure if this happens by default to
// generic array enumerators though.
var disposables = this.disposables;
Array.Clear(disposables, 0, disposables.Length);
}

public bool MoveNext()
{
var disposables = this.disposables;

for (; ; )
{
var idx = ++index;
if (idx >= disposables.Length)
{
return false;
}
// inlined that filter for null elements
if (disposables[idx] != null)
{
return true;
}
}
}

public void Reset()
{
index = -1;
}
}
}
}