Skip to content

Commit

Permalink
Use async iterators in Expand.
Browse files Browse the repository at this point in the history
  • Loading branch information
bartdesmet committed Jan 29, 2019
1 parent c835b01 commit 038c969
Showing 1 changed file with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,28 @@ public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TS
if (selector == null)
throw Error.ArgumentNull(nameof(selector));

#if USE_ASYNC_ITERATOR
return AsyncEnumerable.Create(Core);

async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
{
var queue = new Queue<IAsyncEnumerable<TSource>>();

queue.Enqueue(source);

while (queue.Count > 0)
{
await foreach (TSource item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
{
queue.Enqueue(selector(item));

yield return item;
}
}
}
#else
return new ExpandAsyncIterator<TSource>(source, selector);
#endif
}

public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TSource>>> selector)
Expand All @@ -28,7 +49,28 @@ public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TS
if (selector == null)
throw Error.ArgumentNull(nameof(selector));

#if USE_ASYNC_ITERATOR
return AsyncEnumerable.Create(Core);

async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
{
var queue = new Queue<IAsyncEnumerable<TSource>>();

queue.Enqueue(source);

while (queue.Count > 0)
{
await foreach (TSource item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
{
queue.Enqueue(await selector(item).ConfigureAwait(false));

yield return item;
}
}
}
#else
return new ExpandAsyncIteratorWithTask<TSource>(source, selector);
#endif
}

#if !NO_DEEP_CANCELLATION
Expand All @@ -39,10 +81,32 @@ public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TS
if (selector == null)
throw Error.ArgumentNull(nameof(selector));

#if USE_ASYNC_ITERATOR
return AsyncEnumerable.Create(Core);

async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
{
var queue = new Queue<IAsyncEnumerable<TSource>>();

queue.Enqueue(source);

while (queue.Count > 0)
{
await foreach (TSource item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
{
queue.Enqueue(await selector(item, cancellationToken).ConfigureAwait(false));

yield return item;
}
}
}
#else
return new ExpandAsyncIteratorWithTaskAndCancellation<TSource>(source, selector);
#endif
}
#endif

#if !USE_ASYNC_ITERATOR
private sealed class ExpandAsyncIterator<TSource> : AsyncIterator<TSource>
{
private readonly Func<TSource, IAsyncEnumerable<TSource>> _selector;
Expand Down Expand Up @@ -313,6 +377,7 @@ protected override async ValueTask<bool> MoveNextCore()
return false;
}
}
#endif
#endif
}
}

0 comments on commit 038c969

Please sign in to comment.