Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Improve Amb #512

Merged
merged 1 commit into from
May 26, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 95 additions & 108 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>._>
internal sealed class Amb<TSource> : Producer<TSource, Amb<TSource>.AmbCoordinator>
{
private readonly IObservable<TSource> _left;
private readonly IObservable<TSource> _right;
Expand All @@ -17,152 +18,138 @@ public Amb(IObservable<TSource> left, IObservable<TSource> right)
_right = right;
}

protected override _ CreateSink(IObserver<TSource> observer, IDisposable cancel) => new _(observer, cancel);
protected override AmbCoordinator CreateSink(IObserver<TSource> observer, IDisposable cancel) => new AmbCoordinator(observer);

protected override IDisposable Run(_ sink) => sink.Run(this);
protected override IDisposable Run(AmbCoordinator sink) => sink.Run(_left, _right);

internal sealed class _ : Sink<TSource>
internal sealed class AmbCoordinator : IDisposable
{
public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
readonly AmbObserver leftObserver;

readonly AmbObserver rightObserver;

int winner;

public AmbCoordinator(IObserver<TSource> observer)
{
leftObserver = new AmbObserver(observer, this, true);
rightObserver = new AmbObserver(observer, this, false);
}

private AmbState _choice;
public IDisposable Run(IObservable<TSource> left, IObservable<TSource> right)
{
leftObserver.OnSubscribe(left.Subscribe(leftObserver));
rightObserver.OnSubscribe(right.Subscribe(rightObserver));
return this;
}

public IDisposable Run(Amb<TSource> parent)
public void Dispose()
{
var ls = new SingleAssignmentDisposable();
var rs = new SingleAssignmentDisposable();
var d = StableCompositeDisposable.Create(ls, rs);
leftObserver.Dispose();
rightObserver.Dispose();
}

var gate = new object();
/// <summary>
/// Try winning the race for the right of emission.
/// </summary>
/// <param name="isLeft">If true, the contender is the left source.</param>
/// <returns>True if the contender has won the race.</returns>
public bool TryWin(bool isLeft)
{
var index = isLeft ? 1 : 2;

if (Volatile.Read(ref winner) == index)
{
return true;
}
if (Interlocked.CompareExchange(ref winner, index, 0) == 0)
{
(isLeft ? rightObserver : leftObserver).Dispose();
return true;
}
return false;
}

var lo = new AmbObserver();
lo._disposable = d;
lo._target = new DecisionObserver(this, gate, AmbState.Left, ls, rs, lo);
sealed class AmbObserver : IObserver<TSource>, IDisposable
{
readonly IObserver<TSource> downstream;

var ro = new AmbObserver();
ro._disposable = d;
ro._target = new DecisionObserver(this, gate, AmbState.Right, rs, ls, ro);
readonly AmbCoordinator parent;

_choice = AmbState.Neither;
readonly bool isLeft;

ls.Disposable = parent._left.SubscribeSafe(lo);
rs.Disposable = parent._right.SubscribeSafe(ro);
IDisposable upstream;

return d;
}
/// <summary>
/// If true, this observer won the race and now can emit
/// on a fast path.
/// </summary>
bool iwon;

private sealed class DecisionObserver : IObserver<TSource>
{
private readonly _ _parent;
private readonly AmbState _me;
private readonly IDisposable _subscription;
private readonly IDisposable _otherSubscription;
private readonly object _gate;
private readonly AmbObserver _observer;

public DecisionObserver(_ parent, object gate, AmbState me, IDisposable subscription, IDisposable otherSubscription, AmbObserver observer)
public AmbObserver(IObserver<TSource> downstream, AmbCoordinator parent, bool isLeft)
{
_parent = parent;
_gate = gate;
_me = me;
_subscription = subscription;
_otherSubscription = otherSubscription;
_observer = observer;
this.downstream = downstream;
this.parent = parent;
this.isLeft = isLeft;
}

public void OnNext(TSource value)
internal void OnSubscribe(IDisposable d)
{
lock (_gate)
if (Interlocked.CompareExchange(ref upstream, d, null) != null)
{
if (_parent._choice == AmbState.Neither)
{
_parent._choice = _me;
_otherSubscription.Dispose();
_observer._disposable = _subscription;
_observer._target = _parent._observer;
}

if (_parent._choice == _me)
{
_parent._observer.OnNext(value);
}
d?.Dispose();
}
}

public void OnError(Exception error)
public void Dispose()
{
lock (_gate)
{
if (_parent._choice == AmbState.Neither)
{
_parent._choice = _me;
_otherSubscription.Dispose();
_observer._disposable = _subscription;
_observer._target = _parent._observer;
}

if (_parent._choice == _me)
{
_parent._observer.OnError(error);
_parent.Dispose();
}
}
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
}

public void OnCompleted()
{
lock (_gate)
if (iwon)
{
if (_parent._choice == AmbState.Neither)
{
_parent._choice = _me;
_otherSubscription.Dispose();
_observer._disposable = _subscription;
_observer._target = _parent._observer;
}

if (_parent._choice == _me)
{
_parent._observer.OnCompleted();
_parent.Dispose();
}
downstream.OnCompleted();
}
}
}

private sealed class AmbObserver : IObserver<TSource>
{
public IObserver<TSource> _target;

public IDisposable _disposable;

public void OnNext(TSource value)
{
_target.OnNext(value);
else
if (parent.TryWin(isLeft))
{
iwon = true;
downstream.OnCompleted();
}
Dispose();
}

public void OnError(Exception error)
{
_target.OnError(error);
_disposable.Dispose();
if (iwon)
{
downstream.OnError(error);
}
else
if (parent.TryWin(isLeft))
{
iwon = true;
downstream.OnError(error);
}
Dispose();
}

public void OnCompleted()
public void OnNext(TSource value)
{
_target.OnCompleted();
_disposable.Dispose();
if (iwon)
{
downstream.OnNext(value);
}
else
if (parent.TryWin(isLeft))
{
iwon = true;
downstream.OnNext(value);
}
}
}

private enum AmbState
{
Left,
Right,
Neither,
}
}
}
}