Skip to content

Commit

Permalink
Merge pull request #905 from dotnet/UseAsyncIterators
Browse files Browse the repository at this point in the history
Remove dead code of hand-rolled iterators
  • Loading branch information
bartdesmet authored May 24, 2019
2 parents 3529def + 6335194 commit 447c3fb
Show file tree
Hide file tree
Showing 30 changed files with 5 additions and 6,827 deletions.
217 changes: 0 additions & 217 deletions Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -18,7 +17,6 @@ public static IAsyncEnumerable<TSource> Amb<TSource>(this IAsyncEnumerable<TSour
if (second == null)
throw Error.ArgumentNull(nameof(second));

#if USE_ASYNC_ITERATOR
return AsyncEnumerable.Create(Core);

async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
Expand Down Expand Up @@ -126,17 +124,13 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
await disposeLoser.ConfigureAwait(false);
}
}
#else
return new AmbAsyncIterator<TSource>(first, second);
#endif
}

public static IAsyncEnumerable<TSource> Amb<TSource>(params IAsyncEnumerable<TSource>[] sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));

#if USE_ASYNC_ITERATOR
return AsyncEnumerable.Create(Core);

async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
Expand Down Expand Up @@ -224,24 +218,16 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
await cleanupLosers.ConfigureAwait(false);
}
}
#else
return new AmbAsyncIteratorN<TSource>(sources);
#endif
}

public static IAsyncEnumerable<TSource> Amb<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));

#if USE_ASYNC_ITERATOR
return Amb(sources.ToArray());
#else
return new AmbAsyncIteratorN<TSource>(sources.ToArray());
#endif
}

#if USE_ASYNC_ITERATOR
private static async Task AwaitMoveNextAsyncAndDispose<T>(Task<bool> moveNextAsync, IAsyncEnumerator<T> enumerator)
{
if (enumerator != null)
Expand All @@ -255,208 +241,5 @@ private static async Task AwaitMoveNextAsyncAndDispose<T>(Task<bool> moveNextAsy
}
}
}
#endif

#if !USE_ASYNC_ITERATOR
private sealed class AmbAsyncIterator<TSource> : AsyncIterator<TSource>
{
private readonly IAsyncEnumerable<TSource> _first;
private readonly IAsyncEnumerable<TSource> _second;

private IAsyncEnumerator<TSource> _enumerator;

public AmbAsyncIterator(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
{
Debug.Assert(first != null);
Debug.Assert(second != null);

_first = first;
_second = second;
}

public override AsyncIteratorBase<TSource> Clone()
{
return new AmbAsyncIterator<TSource>(_first, _second);
}

public override async ValueTask DisposeAsync()
{
if (_enumerator != null)
{
await _enumerator.DisposeAsync().ConfigureAwait(false);
_enumerator = null;
}

await base.DisposeAsync().ConfigureAwait(false);
}

protected override async ValueTask<bool> MoveNextCore()
{
switch (_state)
{
case AsyncIteratorState.Allocated:
//
// REVIEW: Exceptions in any of these steps don't cause cleanup. This has been fixed in the new implementation.
//

var firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
var secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);

var firstMoveNext = firstEnumerator.MoveNextAsync().AsTask();
var secondMoveNext = secondEnumerator.MoveNextAsync().AsTask();

var winner = await Task.WhenAny(firstMoveNext, secondMoveNext).ConfigureAwait(false);

//
// REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks:
//
// 1. Concurrent DisposeAsync while a MoveNextAsync is in flight.
// 2. The winner elected by Amb is blocked to yield results until the loser unblocks.
//
// The approach below has one drawback, namely that exceptions raised by loser are dropped on the floor.
//

if (winner == firstMoveNext)
{
_enumerator = firstEnumerator;

_ = secondMoveNext.ContinueWith(_ =>
{
secondEnumerator.DisposeAsync();
});
}
else
{
_enumerator = secondEnumerator;

_ = firstMoveNext.ContinueWith(_ =>
{
firstEnumerator.DisposeAsync();
});
}

_state = AsyncIteratorState.Iterating;

if (await winner.ConfigureAwait(false))
{
_current = _enumerator.Current;
return true;
}

break;

case AsyncIteratorState.Iterating:
if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
{
_current = _enumerator.Current;
return true;
}

break;
}

await DisposeAsync().ConfigureAwait(false);
return false;
}
}

private sealed class AmbAsyncIteratorN<TSource> : AsyncIterator<TSource>
{
private readonly IAsyncEnumerable<TSource>[] _sources;

private IAsyncEnumerator<TSource> _enumerator;

public AmbAsyncIteratorN(IAsyncEnumerable<TSource>[] sources)
{
Debug.Assert(sources != null);

_sources = sources;
}

public override AsyncIteratorBase<TSource> Clone()
{
return new AmbAsyncIteratorN<TSource>(_sources);
}

public override async ValueTask DisposeAsync()
{
if (_enumerator != null)
{
await _enumerator.DisposeAsync().ConfigureAwait(false);
_enumerator = null;
}

await base.DisposeAsync().ConfigureAwait(false);
}

protected override async ValueTask<bool> MoveNextCore()
{
switch (_state)
{
case AsyncIteratorState.Allocated:
var n = _sources.Length;

var enumerators = new IAsyncEnumerator<TSource>[n];
var moveNexts = new Task<bool>[n];

for (var i = 0; i < n; i++)
{
var enumerator = _sources[i].GetAsyncEnumerator(_cancellationToken);

enumerators[i] = enumerator;
moveNexts[i] = enumerator.MoveNextAsync().AsTask();
}

var winner = await Task.WhenAny(moveNexts).ConfigureAwait(false);

//
// REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks:
//
// 1. Concurrent DisposeAsync while a MoveNextAsync is in flight.
// 2. The winner elected by Amb is blocked to yield results until all losers unblocks.
//
// The approach below has one drawback, namely that exceptions raised by any loser are dropped on the floor.
//

var winnerIndex = Array.IndexOf(moveNexts, winner);

_enumerator = enumerators[winnerIndex];

for (var i = 0; i < n; i++)
{
if (i != winnerIndex)
{
_ = moveNexts[i].ContinueWith(_ =>
{
enumerators[i].DisposeAsync();
});
}
}

_state = AsyncIteratorState.Iterating;

if (await winner.ConfigureAwait(false))
{
_current = _enumerator.Current;
return true;
}

break;

case AsyncIteratorState.Iterating:
if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
{
_current = _enumerator.Current;
return true;
}

break;
}

await DisposeAsync().ConfigureAwait(false);
return false;
}
}
#endif
}
}
Loading

0 comments on commit 447c3fb

Please sign in to comment.