Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleaned up IAsyncEnumerable Source to use local functions #6045

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 128 additions & 45 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Routing;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Streams.Actors;
using Akka.Streams.Tests.Actor;
using Reactive.Streams;
using System.Runtime.CompilerServices;
using Akka.Util;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -31,24 +28,25 @@ public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }
private ITestOutputHelper _helper;

public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
{
_helper = helper;
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}


[Fact]
[Fact]
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();

var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
Expand All @@ -63,10 +61,10 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
caught = true;
}

caught.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_Source()
{
Expand All @@ -78,7 +76,8 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");

output.Length.ShouldBe(0, "Did not receive all elements!");
}

[Fact]
Expand All @@ -92,15 +91,17 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements!");


output.Length.ShouldBe(0, "Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}
output.Length.ShouldBe(0,"Did not receive all elements in second enumeration!!");

output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
}


Expand All @@ -110,8 +111,8 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);
var a = Task.Run( async () =>

var a = Task.Run(async () =>
{
await foreach (var notused in task)
{
Expand All @@ -122,22 +123,23 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
//we want to send messages so we aren't just waiting forever.
probe.SendNext(1);
probe.SendNext(2);
bool thrown = false;
var thrown = false;
try
{
await a;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (StreamDetachedException e)
{
thrown = true;
}
catch (AbruptTerminationException e)
{
thrown = true;
}

thrown.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
{
Expand All @@ -150,47 +152,128 @@ async Task ShouldThrow()
{
await foreach (var a in task)
{

}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}

[Fact]
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
[Fact]
public async Task
AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
{
Func<IAsyncEnumerable<int>> range = () =>
{
return RangeAsync(1, 100);
};
IAsyncEnumerable<int> Range() => RangeAsync(0, 0);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(range)
Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(100);
for (int i = 1; i <= 20; i++)
await subscriber.ExpectCompleteAsync();
}

[Fact]
public async Task AsyncEnumerableSource_Must_Process_All_Elements()
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
IAsyncEnumerable<int> Range() => RangeAsync(0, 100);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);

await subscriber.ExpectNextNAsync(Enumerable.Range(0, 100));

await subscriber.ExpectCompleteAsync();
}

[Fact]
public async Task AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
{
IAsyncEnumerable<int> Range() => ThrowingRangeAsync(0, 100, 50);
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(101);

await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));

var exception = await subscriber.ExpectErrorAsync();

// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
exception.Message.Should().Be("BOOM!");
}

[Fact]
public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream_Completes()
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
var latch = new AtomicBoolean();
IAsyncEnumerable<int> Range() => ProbeableRangeAsync(0, 100, latch);
var subscriber = this.CreateManualSubscriberProbe<int>();

var probe = Source.From(Range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(50);
await subscriber.ExpectNextNAsync(Enumerable.Range(0, 50));
subscription.Cancel();

await WithinAsync(3.Seconds(), async () => latch.Value);
latch.Value.Should().BeTrue();
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
var next = subscriber.ExpectNext(i);
_helper.WriteLine(i.ToString());
await Task.Delay(10, token);
if(token.IsCancellationRequested)
yield break;
yield return i;
}

//subscriber.ExpectComplete();
}

private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int count, int throwAt,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
yield break;

static async IAsyncEnumerable<int> RangeAsync(int start, int count)
if (i == throwAt)
throw new TestException("BOOM!");

yield return i;
}
}

private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
[EnumeratorCancellation] CancellationToken token = default)
{
for (var i = 0; i < count; i++)
token.Register(() =>
{
await Task.Delay(i);
yield return start + i;
latch.GetAndSet(true);
});
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
yield break;

yield return i;
}
}

}
#else
#endif

}
}
Loading