Skip to content

Commit

Permalink
Merge pull request #742 from akarnokd/ObserveOnQueueAccess
Browse files Browse the repository at this point in the history
4.x: Upgrade the ObserveOn operator to IdentitySink, fix queue usage
  • Loading branch information
danielcweber authored Jul 3, 2018
2 parents 8c0f185 + 8a05ef2 commit 89cde7a
Showing 1 changed file with 39 additions and 37 deletions.
76 changes: 39 additions & 37 deletions Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,8 @@ internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
/// techniques to signal events to the downstream.
/// </summary>
/// <typeparam name="T">The element type of the sequence.</typeparam>
internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
{
private readonly IObserver<T> _downstream;
private readonly IScheduler _scheduler;

/// <summary>
Expand All @@ -388,7 +387,6 @@ internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
/// </summary>
private readonly ISchedulerLongRunning _longRunning;
private readonly ConcurrentQueue<T> _queue;
private IDisposable _run;

/// <summary>
/// The current task representing a running drain operation.
Expand Down Expand Up @@ -417,54 +415,55 @@ internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
/// </summary>
private bool _disposed;

public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
{
_downstream = downstream;
_scheduler = scheduler;
_longRunning = scheduler.AsLongRunning();
_queue = new ConcurrentQueue<T>();
}

public void Run(IObservable<T> source)
{
Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
}

public void Dispose()
protected override void Dispose(bool disposing)
{
Volatile.Write(ref _disposed, true);
Disposable.TryDispose(ref _task);
Disposable.TryDispose(ref _run);
Clear();

base.Dispose(disposing);
if (disposing)
{
Disposable.TryDispose(ref _task);
Clear(_queue);
}
}

/// <summary>
/// Remove remaining elements from the queue upon
/// cancellation or failure.
/// </summary>
private void Clear()
/// <param name="q">The queue to use. The argument ensures that the
/// _queue field is not re-read from memory unnecessarily
/// due to the memory barriers inside TryDequeue mandating it
/// despite the field is read-only.</param>
private void Clear(ConcurrentQueue<T> q)
{
var q = _queue;
while (q.TryDequeue(out var _))
{
;
}
}

public void OnCompleted()
public override void OnCompleted()
{
Volatile.Write(ref _done, true);
Schedule();
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
_error = error;
Volatile.Write(ref _done, true);
Schedule();
}

public void OnNext(T value)
public override void OnNext(T value)
{
_queue.Enqueue(value);
Schedule();
Expand All @@ -485,11 +484,11 @@ private void Schedule()
var longRunning = _longRunning;
if (longRunning != null)
{
newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
}
else
{
newTask.Disposable = _scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
}
}

Expand All @@ -499,7 +498,7 @@ private void Schedule()
// is of a multi-consumer type.
if (Volatile.Read(ref _disposed))
{
Clear();
Clear(_queue);
}
}
}
Expand All @@ -509,15 +508,15 @@ private void Schedule()
/// Avoids creating a delegate that captures <code>this</code>
/// whenever the signals have to be drained.
/// </summary>
private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
(self, cancel) => self.DrainLongRunning();

/// <summary>
/// The static action to be scheduled on a simple scheduler.
/// Avoids creating a delegate that captures <code>this</code>
/// whenever the signals have to be drained.
/// </summary>
private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc =
(scheduler, self) => self.DrainShortRunning(scheduler);

/// <summary>
Expand All @@ -528,13 +527,13 @@ private void Schedule()
/// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
{
DrainStep(_queue, _downstream, false);
DrainStep(_queue, false);

if (Interlocked.Decrement(ref _wip) != 0)
{
// Don't return the disposable of Schedule() because that may chain together
// a long string of ScheduledItems causing StackOverflowException upon Dispose()
var d = recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc);
Disposable.TrySetMultiple(ref _task, d);
}
return Disposable.Empty;
Expand All @@ -546,18 +545,22 @@ private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
/// empty queue, issuing the appropriate signals to the
/// given downstream.
/// </summary>
/// <param name="q">The queue to use.</param>
/// <param name="downstream">The intended consumer of the events.</param>
/// <param name="q">The queue to use. The argument ensures that the
/// _queue field is not re-read from memory due to the memory barriers
/// inside TryDequeue mandating it despite the field is read-only.
/// In addition, the DrainStep is invoked from the DrainLongRunning's loop
/// so reading _queue inside this method would still incur the same barrier
/// overhead otherwise.</param>
/// <param name="delayError">Should the errors be delayed until all
/// queued items have been emitted to the downstream?</param>
/// <returns>True if the drain loop should stop.</returns>
private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
{
// Check if the operator has been disposed
if (Volatile.Read(ref _disposed))
{
// cleanup residue items in the queue
Clear();
Clear(q);
return true;
}

Expand All @@ -573,13 +576,13 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
if (ex != null)
{
Volatile.Write(ref _disposed, true);
downstream.OnError(ex);
ForwardOnError(ex);
return true;
}
}

// get the next item from the queue if any
var empty = !_queue.TryDequeue(out var v);
var empty = !q.TryDequeue(out var v);

// the upstream called OnComplete and the queue is empty
// that means we are done, no further signals can happen
Expand All @@ -592,12 +595,12 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
// if not null, there was an OnError call
if (ex != null)
{
downstream.OnError(ex);
ForwardOnError(ex);
}
else
{
// otherwise, complete normally
downstream.OnCompleted();
ForwardOnCompleted();
}
return true;
}
Expand All @@ -608,7 +611,7 @@ private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delay
return true;
}
// emit the item
downstream.OnNext(v);
ForwardOnNext(v);

// keep looping
return false;
Expand All @@ -627,7 +630,6 @@ private void DrainLongRunning()
// that would force the re-read of these constant values
// from memory, regardless of readonly, afaik
var q = _queue;
var downstream = _downstream;

for (; ; )
{
Expand All @@ -636,7 +638,7 @@ private void DrainLongRunning()
// delayError: true - because of
// ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
// expects something that almost looks like full delayError
if (DrainStep(q, downstream, true))
if (DrainStep(q, true))
{
break;
}
Expand Down

0 comments on commit 89cde7a

Please sign in to comment.