Skip to content

Commit

Permalink
[Backports #6290] Akka:Streams Resolve `IAsyncEnumerator.DisposeAsync…
Browse files Browse the repository at this point in the history
…` bug (#6296)

* Backports #6290

Fixes #6280

We no longer attempt to dispose IAsyncEnumerators inside Source stages due to the reasons outlined here: #6280 (comment)

This prevents the log from being filled with NotSupportedException warnings from failed DisposeAsync operations.

(cherry-picked from 3156272)

* post-merge fix

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored Dec 7, 2022
1 parent ae953d0 commit bedce24
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<AssemblyName>Akka.Streams.Tests</AssemblyName>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<LangVersion>8.0</LangVersion>
<LangVersion>9</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
50 changes: 43 additions & 7 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
#if !NETFRAMEWORK // disabling this causes .NET Framework 4.7.2 builds to fail on Linux
public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }
Expand Down Expand Up @@ -227,6 +227,47 @@ await this.AssertAllStagesStoppedAsync(async () =>
}, Materializer);
}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6280
/// </summary>
[Fact]
public async Task AsyncEnumerableSource_BugFix6280()
{
async IAsyncEnumerable<int> GenerateInts()
{
foreach (var i in Enumerable.Range(0, 100))
{
if (i > 50)
await Task.Delay(1000);
yield return i;
}
}

var source = Source.From(GenerateInts);
var subscriber = this.CreateManualSubscriberProbe<int>();

await EventFilter.Warning().ExpectAsync(0, async () =>
{
var mat = source
.WatchTermination(Keep.Right)
.ToMaterialized(Sink.FromSubscriber(subscriber), Keep.Left);

#pragma warning disable CS4014
var task = mat.Run(Materializer);
#pragma warning restore CS4014

var subscription = await subscriber.ExpectSubscriptionAsync();
subscription.Request(50);
subscriber.ExpectNextN(Enumerable.Range(0, 50));
subscription.Request(10); // the iterator is going to start delaying 1000ms per item here
subscription.Cancel();


// The cancellation token inside the IAsyncEnumerable should be cancelled
await task;
});
}

private static async IAsyncEnumerable<int> RangeAsync(int start, int count,
[EnumeratorCancellation] CancellationToken token = default)
{
Expand Down Expand Up @@ -257,10 +298,7 @@ private static async IAsyncEnumerable<int> ThrowingRangeAsync(int start, int cou
private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int count, AtomicBoolean latch,
[EnumeratorCancellation] CancellationToken token = default)
{
token.Register(() =>
{
latch.GetAndSet(true);
});
token.Register(() => { latch.GetAndSet(true); });
foreach (var i in Enumerable.Range(start, count))
{
if(token.IsCancellationRequested)
Expand All @@ -269,8 +307,6 @@ private static async IAsyncEnumerable<int> ProbeableRangeAsync(int start, int co
yield return i;
}
}

}
#else
#endif
}
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Akka.Streams.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TargetFrameworks>$(NetStandardLibVersion);$(NetLibVersion)</TargetFrameworks>
<PackageTags>$(AkkaPackageTags);reactive;stream</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>8.0</LangVersion>
<LangVersion>9</LangVersion>
</PropertyGroup>
<ItemGroup>
<EmbeddedResource Include="reference.conf" />
Expand Down
16 changes: 2 additions & 14 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3871,20 +3871,8 @@ public override void OnDownstreamFinish(Exception cause)
{
_completionCts.Cancel();
_completionCts.Dispose();

try
{
_enumerator.DisposeAsync().GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to dispose IAsyncEnumerator asynchronously");
}
finally
{
CompleteStage();
base.OnDownstreamFinish(cause);
}
CompleteStage();
base.OnDownstreamFinish(cause);
}

}
Expand Down

0 comments on commit bedce24

Please sign in to comment.