Skip to content

Commit

Permalink
Akka:Streams Resolve IAsyncEnumerator.DisposeAsync bug (akkadotnet#…
Browse files Browse the repository at this point in the history
…6290)

* Upgraded Akka.Streams and Akka.Streams.Tests to C# 9

* added reproduction for akkadotnet#6280

* close akkadotnet#6280

* added compiler directive back to fix compilation issues on Linux

* added comment

* bump CI
  • Loading branch information
Aaronontheweb authored Dec 6, 2022
1 parent 1974404 commit 3156272
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 38 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<PropertyGroup>
<AssemblyName>Akka.Streams.Tests</AssemblyName>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<LangVersion>9</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
84 changes: 60 additions & 24 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@
using Xunit;
using Xunit.Abstractions;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Streams.Actors;
using Akka.Streams.TestKit.Tests;
using Akka.Streams.Tests.Actor;
using Reactive.Streams;
using System.Runtime.CompilerServices;
using Akka.Util;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
#if !NETFRAMEWORK // disabling this causes .NET Framework 4.7.2 builds to fail on Linux
public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }
private ITestOutputHelper _helper;

public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
Expand All @@ -50,7 +47,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()

var cts = new CancellationTokenSource();
var token = cts.Token;

var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer);
var output = input.ToArray();
bool caught = false;
Expand All @@ -65,7 +62,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
caught = true;
}

caught.ShouldBeTrue();
}

Expand All @@ -80,6 +77,7 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}

output.Length.ShouldBe(0, "Did not receive all elements!");
}

Expand All @@ -94,14 +92,16 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations()
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}

output.Length.ShouldBe(0, "Did not receive all elements!");

output = input.ToArray();
await foreach (var a in asyncEnumerable)
{
(output[0] == a).ShouldBeTrue("Did not get elements in order!");
output = output.Skip(1).ToArray();
}

output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!");
}

Expand All @@ -112,7 +112,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()
var materializer = ActorMaterializer.Create(Sys);
var probe = this.CreatePublisherProbe<int>();
var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer);

var a = Task.Run(async () =>
{
await foreach (var notused in task)
Expand Down Expand Up @@ -140,7 +140,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination()

thrown.ShouldBeTrue();
}

[Fact]
public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration()
{
Expand All @@ -155,7 +155,7 @@ async Task ShouldThrow()
{
}
}

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}

Expand Down Expand Up @@ -187,7 +187,7 @@ public void AsyncEnumerableSource_Must_Process_All_Elements()
subscription.Request(101);

subscriber.ExpectNextN(Enumerable.Range(0, 100));

subscriber.ExpectComplete();
}

Expand All @@ -206,7 +206,7 @@ public void AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws()
subscriber.ExpectNextN(Enumerable.Range(0, 50));

var exception = subscriber.ExpectError();

// Exception should be automatically unrolled, this SHOULD NOT be AggregateException
exception.Should().BeOfType<TestException>();
exception.Message.Should().Be("BOOM!");
Expand All @@ -231,13 +231,54 @@ public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream
await WithinAsync(3.Seconds(), async () => latch.Value);
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6280
/// </summary>
[Fact]
public async Task AsyncEnumerableSource_BugFix6280()
{
async IAsyncEnumerable<int> GenerateInts()
{
foreach (var i in Enumerable.Range(0, 100))
{
if (i > 50)
await Task.Delay(1000);
yield return i;
}
}

var source = Source.From(GenerateInts);
var subscriber = this.CreateManualSubscriberProbe<int>();

await EventFilter.Warning().ExpectAsync(0, async () =>
{
var mat = source
.WatchTermination(Keep.Right)
.ToMaterialized(Sink.FromSubscriber(subscriber), Keep.Left);

#pragma warning disable CS4014
var task = mat.Run(Materializer);
#pragma warning restore CS4014

var subscription = subscriber.ExpectSubscription();
subscription.Request(50);
subscriber.ExpectNextN(Enumerable.Range(0, 50));
subscription.Request(10); // the iterator is going to start delaying 1000ms per item here
subscription.Cancel();


// The cancellation token inside the IAsyncEnumerable should be cancelled
await task;
});
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
[EnumeratorCancellation] CancellationToken token = default)
{
foreach (var i in Enumerable.Range(start, count))
{
await Task.Delay(10, token);
if(token.IsCancellationRequested)
if (token.IsCancellationRequested)
yield break;
yield return i;
}
Expand All @@ -248,33 +289,28 @@ private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int cou
{
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
if (token.IsCancellationRequested)
yield break;

if (i == throwAt)
throw new TestException("BOOM!");

yield return i;
}
}

private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
[EnumeratorCancellation] CancellationToken token = default)
{
token.Register(() =>
{
latch.GetAndSet(true);
});
token.Register(() => { latch.GetAndSet(true); });
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
if (token.IsCancellationRequested)
yield break;

yield return i;
}
}

}
#else
#endif
}
1 change: 1 addition & 0 deletions src/core/Akka.Streams/Akka.Streams.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<TargetFrameworks>$(NetStandardLibVersion)</TargetFrameworks>
<PackageTags>$(AkkaPackageTags);reactive;stream</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>9</LangVersion>
</PropertyGroup>
<ItemGroup>
<EmbeddedResource Include="reference.conf" />
Expand Down
15 changes: 1 addition & 14 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3816,20 +3816,7 @@ public override void OnDownstreamFinish()
{
_completionCts.Cancel();
_completionCts.Dispose();

try
{
_enumerator.DisposeAsync().GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to dispose IAsyncEnumerator asynchronously");
}
finally
{
CompleteStage();
base.OnDownstreamFinish();
}
CompleteStage();
base.OnDownstreamFinish();
}

Expand Down

0 comments on commit 3156272

Please sign in to comment.