Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the helper methods for IDisposable-fields, avoid repeating code patterns #556

Merged
merged 1 commit into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Reactive.Disposables
/// </summary>
public sealed class ContextDisposable : ICancelable
{
private volatile IDisposable _disposable;
private IDisposable _disposable;

/// <summary>
/// Initializes a new instance of the <see cref="ContextDisposable"/> class that uses the specified <see cref="SynchronizationContext"/> on which to dispose the specified disposable resource.
Expand All @@ -28,7 +28,7 @@ public ContextDisposable(SynchronizationContext context, IDisposable disposable)
throw new ArgumentNullException(nameof(disposable));

Context = context;
_disposable = disposable;
Disposable.SetSingle(ref _disposable, disposable);
}

/// <summary>
Expand All @@ -39,19 +39,14 @@ public ContextDisposable(SynchronizationContext context, IDisposable disposable)
/// <summary>
/// Gets a value that indicates whether the object is disposed.
/// </summary>
public bool IsDisposed => _disposable == BooleanDisposable.True;
public bool IsDisposed => Disposable.GetIsDisposed(ref _disposable);

/// <summary>
/// Disposes the underlying disposable on the provided <see cref="SynchronizationContext"/>.
/// </summary>
public void Dispose()
{
var disposable = Interlocked.Exchange(ref _disposable, BooleanDisposable.True);

if (disposable != BooleanDisposable.True)
{
Context.PostWithStartComplete(d => d.Dispose(), disposable);
}
Disposable.TryRelease(ref _disposable, this.Context, (disposable, context) => context.PostWithStartComplete(d => d.Dispose(), disposable));
}
}
}
11 changes: 11 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Disposables/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,16 @@ internal static bool TryDispose(ref IDisposable fieldRef)
old?.Dispose();
return true;
}

internal static bool TryRelease<TState>(ref IDisposable fieldRef, TState state, Action<IDisposable, TState> disposeAction)
{
var old = Interlocked.Exchange(ref fieldRef, BooleanDisposable.True);

if (old == BooleanDisposable.True)
return false;

disposeAction(old, state);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Reactive.Disposables
/// </summary>
public sealed class ScheduledDisposable : ICancelable
{
private volatile IDisposable _disposable;
private IDisposable _disposable;

/// <summary>
/// Initializes a new instance of the <see cref="ScheduledDisposable"/> class that uses an <see cref="IScheduler"/> on which to dispose the disposable.
Expand All @@ -28,7 +28,7 @@ public ScheduledDisposable(IScheduler scheduler, IDisposable disposable)
throw new ArgumentNullException(nameof(disposable));

Scheduler = scheduler;
_disposable = disposable;
Disposables.Disposable.SetSingle(ref _disposable, disposable);
}

/// <summary>
Expand All @@ -39,39 +39,16 @@ public ScheduledDisposable(IScheduler scheduler, IDisposable disposable)
/// <summary>
/// Gets the underlying disposable. After disposal, the result is undefined.
/// </summary>
public IDisposable Disposable
{
get
{
var current = _disposable;

if (current == BooleanDisposable.True)
{
return Disposables.Disposable.Empty; // Don't leak the sentinel value.
}

return current;
}
}
public IDisposable Disposable => Disposables.Disposable.GetValueOrDefault(ref _disposable);

/// <summary>
/// Gets a value that indicates whether the object is disposed.
/// </summary>
public bool IsDisposed => _disposable == BooleanDisposable.True;
public bool IsDisposed => Disposables.Disposable.GetIsDisposed(ref _disposable);

/// <summary>
/// Disposes the wrapped disposable on the provided scheduler.
/// </summary>
public void Dispose() => Scheduler.Schedule(DisposeInner);

private void DisposeInner()
{
var disposable = Interlocked.Exchange(ref _disposable, BooleanDisposable.True);

if (disposable != BooleanDisposable.True)
{
disposable.Dispose();
}
}
public void Dispose() => Scheduler.Schedule(scheduler => Disposables.Disposable.TryDispose(ref scheduler._disposable), this);
}
}
16 changes: 3 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Internal/AutoDetachObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal sealed class AutoDetachObserver<T> : ObserverBase<T>
{
private readonly IObserver<T> _observer;

private IDisposable disposable;
private IDisposable _disposable;

public AutoDetachObserver(IObserver<T> observer)
{
Expand All @@ -20,17 +20,7 @@ public AutoDetachObserver(IObserver<T> observer)

public IDisposable Disposable
{
set
{
if (Interlocked.CompareExchange(ref disposable, value, null) != null)
{
value?.Dispose();
if (Volatile.Read(ref disposable) != BooleanDisposable.True)
{
throw new InvalidOperationException(Strings_Core.DISPOSABLE_ALREADY_ASSIGNED);
}
}
}
set => Disposables.Disposable.SetSingle(ref _disposable, value);
}

protected override void OnNextCore(T value)
Expand Down Expand Up @@ -111,7 +101,7 @@ protected override void Dispose(bool disposing)

if (disposing)
{
Interlocked.Exchange(ref disposable, BooleanDisposable.True)?.Dispose();
Disposables.Disposable.TryDispose(ref _disposable);
}
}
}
Expand Down
10 changes: 3 additions & 7 deletions Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream, IDisp
public void Dispose()
{
Volatile.Write(ref disposed, true);
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
Interlocked.Exchange(ref task, BooleanDisposable.True)?.Dispose();
Disposable.TryDispose(ref upstream);
Disposable.TryDispose(ref task);
Clear();
}

Expand Down Expand Up @@ -443,14 +443,10 @@ void Schedule()
{
if (Interlocked.Increment(ref wip) == 1)
{
var oldTask = Volatile.Read(ref task);

var newTask = new SingleAssignmentDisposable();

if (oldTask != BooleanDisposable.True
&& Interlocked.CompareExchange(ref task, newTask, oldTask) == oldTask)
if (Disposable.TrySetMultiple(ref task, newTask))
{

var longRunning = this.longRunning;
if (longRunning != null)
{
Expand Down
20 changes: 6 additions & 14 deletions Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ void Drain()
var enumerator = stack.Pop();
enumerator.Dispose();
}
if (Volatile.Read(ref currentSubscription) != BooleanDisposable.True)
{
Interlocked.Exchange(ref currentSubscription, BooleanDisposable.True)?.Dispose();
}

Disposable.TryDispose(ref currentSubscription);
}
else
{
Expand Down Expand Up @@ -131,7 +129,8 @@ void Drain()
else
{
var sad = new SingleAssignmentDisposable();
if (Interlocked.CompareExchange(ref currentSubscription, sad, null) == null)

if (Disposable.TrySetSingle(ref currentSubscription, sad) == TrySetSingleResult.Success)
Copy link
Collaborator Author

@danielcweber danielcweber May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd This change was added (the original reason to split SetSingle and TrySetSingle).

{
sad.Disposable = next.SubscribeSafe(this);
}
Expand Down Expand Up @@ -172,15 +171,8 @@ void DisposeAll()

protected void Recurse()
{
var d = Volatile.Read(ref currentSubscription);
if (d != BooleanDisposable.True)
{
d?.Dispose();
if (Interlocked.CompareExchange(ref currentSubscription, null, d) == d)
{
Drain();
}
}
if (Disposable.TrySetSerial(ref currentSubscription, null))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still capture the original semantics?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a loop in TrySetSerial where a concurrent dispose failing the CAS will have it repeat the loop, detect that it needs to dispose d and then quits. This logic relies on the fact that if the single CAS fails, it can be only due to cancellation and there is nothing to do further.

Drain();
}

protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);
Expand Down
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ internal void OnSubscribe(IDisposable d)

public void Dispose()
{
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
Disposable.TryDispose(ref upstream);
}

public void OnCompleted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public InnerObserver(AmbCoordinator<T> parent, int index)

public void Dispose()
{
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
Disposable.TryDispose(ref upstream);
}

public void OnCompleted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public void Dispose()

void DisposeMain()
{
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
Disposable.TryDispose(ref upstream);
}

bool IsDisposed()
{
return Volatile.Read(ref upstream) == BooleanDisposable.True;
return Disposable.GetIsDisposed(ref upstream);
}

public void OnCompleted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ internal void OnSubscribe(IDisposable d)

public void Dispose()
{
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
Disposable.TryDispose(ref upstream);
}

public void OnCompleted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public TaskDisposeCompletionObserver(IObserver<TResult> observer)

public void Dispose()
{
Interlocked.Exchange(ref disposable, BooleanDisposable.True)?.Dispose();
Disposable.TryDispose(ref disposable);
}

public void OnCompleted()
Expand Down