Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT #6045] cleaned up IAsyncEnumerable Source to use local functions #6048

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 116 additions & 33 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Routing;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;
using System.Collections.Generic;
Expand All @@ -24,6 +22,9 @@
using Akka.Streams.TestKit.Tests;
using Akka.Streams.Tests.Actor;
using Reactive.Streams;
using System.Runtime.CompilerServices;
using Akka.Util;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -33,16 +34,16 @@ public class AsyncEnumerableSpec : AkkaSpec
private ActorMaterializer Materializer { get; }
private ITestOutputHelper _helper;
public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
{
_helper = helper;
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}


[Fact]
[Fact]
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();
Expand All @@ -67,7 +68,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()

caught.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
Expand All @@ -79,7 +80,7 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");
output.Length.ShouldBe(0, "Did not receive all elements!");
}

[Fact]
Expand All @@ -93,15 +94,15 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");
output.Length.ShouldBe(0, "Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");
output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
}


Expand All @@ -112,7 +113,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);

var a = Task.Run( async () =>
var a = Task.Run(async () =>
{
await foreach (var notused in task)
{
Expand All @@ -123,19 +124,20 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
bool thrown = false;
var thrown = false;
try
{
await a;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (AbruptTerminationException e)
{
thrown = true;
}

thrown.ShouldBeTrue();
}

Expand All @@ -151,47 +153,128 @@ async Task ShouldThrow()
{
await foreach (var a in task)
{

}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}

[Fact]
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
[Fact]
public void
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
{
Func<IAsyncEnumerable<int>> range = () =>
{
return RangeAsync(1, 100);
};
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(range)
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(100);
for (int i = 1; i <= 20; i++)
subscriber.ExpectComplete();
}

[Fact]
public void AsyncEnumerableSource_Must_Process_All_Elements()
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(101);

subscriber.ExpectNextN(Enumerable.Range(0, 100));

subscriber.ExpectComplete();
}

[Fact]
public void AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
{
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(101);

subscriber.ExpectNextN(Enumerable.Range(0, 50));

var exception = subscriber.ExpectError();

// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
exception.Message.Should().Be("BOOM!");
}

[Fact]
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(50);
subscriber.ExpectNextN(Enumerable.Range(0, 50));
subscription.Cancel();

// The cancellation token inside the IAsyncEnumerable should be cancelled
await WithinAsync(3.Seconds(), async () => latch.Value);
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
var next = subscriber.ExpectNext(i);
_helper.WriteLine(i.ToString());
await Task.Delay(10, token);
if(token.IsCancellationRequested)
yield break;
yield return i;
}
}

//subscriber.ExpectComplete();
private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int count, int throwAt,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
yield break;

if (i == throwAt)
throw new TestException("BOOM!");

yield return i;
}
}

static async IAsyncEnumerable<int> RangeAsync(int start, int count)
private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
[EnumeratorCancellation] CancellationToken token = default)
{
for (var i = 0; i < count; i++)
token.Register(() =>
{
latch.GetAndSet(true);
});
foreach (var i in Enumerable.Range(start, count))
{
await Task.Delay(i);
yield return start + i;
if(token.IsCancellationRequested)
yield break;

yield return i;
}
}

}
#else
#endif

}
}
Loading