Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Reimplement Timeout(TimeSpan) with lock-free methods #546

Merged
merged 5 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Internal/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ public void Dispose()

protected virtual void Dispose(bool disposing)
{
_observer = NopObserver<TTarget>.Instance;
ClearObserver();

Interlocked.Exchange(ref _cancel, null)?.Dispose();
}

protected void ClearObserver()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why divide this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because _cancel holds a SingleAssignmentDisposable which holds this in itself and would call Dispose again on the implementing instance. In fact, any newer Run implementation returning this is prone to this wasteful call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an issue I have been trying to wrap my head around for some time now. Can we defer this change for later, going with the wasteful call for now, because it would add more complexity to the Sink and I am thinking around ways to lower complexity of Sink.

{
_observer = NopObserver<TTarget>.Instance;
}

protected void ForwardOnNext(TTarget value)
{
_observer.OnNext(value);
Expand Down
105 changes: 45 additions & 60 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
Expand Down Expand Up @@ -34,9 +35,11 @@ internal sealed class _ : IdentitySink<TSource>
private readonly IObservable<TSource> _other;
private readonly IScheduler _scheduler;

private readonly object _gate = new object();
private SerialDisposable _subscription = new SerialDisposable();
private SerialDisposable _timer = new SerialDisposable();
long _index;

IDisposable _mainDisposable;
IDisposable _otherDisposable;
IDisposable _timerDisposable;

public _(Relative parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
Expand All @@ -46,100 +49,82 @@ public _(Relative parent, IObserver<TSource> observer, IDisposable cancel)
_scheduler = parent._scheduler;
}

private ulong _id;
private bool _switched;

public IDisposable Run(IObservable<TSource> source)
{
var original = new SingleAssignmentDisposable();

_subscription.Disposable = original;
CreateTimer(0L);

_id = 0UL;
_switched = false;

CreateTimer();

original.Disposable = source.SubscribeSafe(this);
Disposable.SetSingle(ref _mainDisposable, source.SubscribeSafe(this));

return StableCompositeDisposable.Create(_subscription, _timer);
return this;
}

private void CreateTimer()
protected override void Dispose(bool disposing)
{
_timer.Disposable = _scheduler.Schedule(_id, _dueTime, Timeout);
if (disposing)
{
Disposable.TryDispose(ref _mainDisposable);
Disposable.TryDispose(ref _otherDisposable);
Disposable.TryDispose(ref _timerDisposable);
}
base.ClearObserver();
}

private IDisposable Timeout(IScheduler _, ulong myid)
private void CreateTimer(long idx)
{
var timerWins = false;

lock (_gate)
if (Disposable.TrySetMultiple(ref _timerDisposable, null))
{
_switched = (_id == myid);
timerWins = _switched;
}

if (timerWins)
_subscription.Disposable = _other.SubscribeSafe(GetForwarder());
var d = _scheduler.Schedule((idx, instance: this), _dueTime, (_, state) => { state.instance.Timeout(state.idx); return Disposable.Empty; });

return Disposable.Empty;
Disposable.TrySetMultiple(ref _timerDisposable, d);
}
}

public override void OnNext(TSource value)
private void Timeout(long idx)
{
var onNextWins = false;

lock (_gate)
if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, long.MaxValue, idx) == idx)
{
onNextWins = !_switched;
if (onNextWins)
{
_id = unchecked(_id + 1);
}
Disposable.TryDispose(ref _mainDisposable);

var d = _other.Subscribe(GetForwarder());

Disposable.SetSingle(ref _otherDisposable, d);
}
}

if (onNextWins)
public override void OnNext(TSource value)
{
var idx = Volatile.Read(ref _index);
if (idx != long.MaxValue && Interlocked.CompareExchange(ref _index, idx + 1, idx) == idx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many messages would it have to produce per second for how many years to actually break this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you could produce 1 message per nanosecond, this would overflow in 500+ years.

{
// Do not swap in the BooleanDisposable.True here
// As we'll need _timerDisposable to store the next timer
// BD.True would cancel it immediately and break the operation
Volatile.Read(ref _timerDisposable)?.Dispose();

ForwardOnNext(value);
CreateTimer();

CreateTimer(idx + 1);
}
}

public override void OnError(Exception error)
{
var onErrorWins = false;

lock (_gate)
if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
{
onErrorWins = !_switched;
if (onErrorWins)
{
_id = unchecked(_id + 1);
}
}
Disposable.TryDispose(ref _timerDisposable);

if (onErrorWins)
{
ForwardOnError(error);
}
}

public override void OnCompleted()
{
var onCompletedWins = false;

lock (_gate)
if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
{
onCompletedWins = !_switched;
if (onCompletedWins)
{
_id = unchecked(_id + 1);
}
}
Disposable.TryDispose(ref _timerDisposable);

if (onCompletedWins)
{
ForwardOnCompleted();
}
}
Expand Down