Skip to content

Commit

Permalink
[BACKPORT #6044] Add IAsyncEnumerable as Akka.Streams Source (#6047)
Browse files Browse the repository at this point in the history
* Cherry-picked from af513b0

* Fix missing DefaultConfig

* Fix missing StreamTestDefaultMailbox.DefaultConfig

Co-authored-by: Ebere Abanonu <eaba@users.noreply.github.com>
  • Loading branch information
Arkatufus and eaba authored Jul 22, 2022
1 parent 2d4cbb5 commit 7293c6e
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 5 deletions.
10 changes: 10 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Empty<T>() { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Failed<T>(System.Exception cause) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> From<T>(System.Collections.Generic.IEnumerable<T> enumerable) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> From<T>(System.Func<System.Collections.Generic.IAsyncEnumerable<T>> asyncEnumerable) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromEnumerator<T>(System.Func<System.Collections.Generic.IEnumerator<T>> enumeratorFactory) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromEvent<TDelegate, T>(System.Func<System.Action<T>, TDelegate> conversion, System.Action<TDelegate> addHandler, System.Action<TDelegate> removeHandler, int maxBufferCapacity = 128, Akka.Streams.OverflowStrategy overflowStrategy = 2) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromEvent<T>(System.Action<System.EventHandler<T>> addHandler, System.Action<System.EventHandler<T>> removeHandler, int maxBufferCapacity = 128, Akka.Streams.OverflowStrategy overflowStrategy = 2) { }
Expand Down Expand Up @@ -3958,6 +3959,15 @@ namespace Akka.Streams.Implementation.Fusing
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class AsyncEnumerable<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.SourceShape<T>>
{
public AsyncEnumerable(System.Func<System.Collections.Generic.IAsyncEnumerable<T>> factory) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.SourceShape<T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class Batch<TIn, TOut> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<TIn, TOut>>
{
public Batch(long max, System.Func<TIn, long> costFunc, System.Func<TIn, TOut> seed, System.Func<TOut, TIn, TOut> aggregate) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace Akka.Streams.TestKit.Tests
[InternalApi]
public sealed class StreamTestDefaultMailbox : MailboxType, IProducesMessageQueue<UnboundedMessageQueue>
{
public static Config DefaultConfig =>
ConfigurationFactory.FromResource<StreamTestDefaultMailbox>("Akka.Streams.TestKit.Tests.reference.conf");

public override IMessageQueue Create(IActorRef owner, ActorSystem system)
{
Expand Down
52 changes: 47 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,32 @@
using Nito.AsyncEx.Synchronous;
using Xunit;
using Xunit.Abstractions;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Streams.Actors;
using Akka.Streams.TestKit.Tests;
using Akka.Streams.Tests.Actor;
using Reactive.Streams;

namespace Akka.Streams.Tests.Dsl
{
#if NETCOREAPP
public class AsyncEnumerableSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public AsyncEnumerableSpec(ITestOutputHelper helper) : base(helper)
private ITestOutputHelper _helper;
public AsyncEnumerableSpec(ITestOutputHelper helper) : base(
AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig),
helper)
{
_helper = helper;
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(2, 16);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact] public async Task RunAsAsyncEnumerable_Uses_CancellationToken()

[Fact]
public async Task RunAsAsyncEnumerable_Uses_CancellationToken()
{
var input = Enumerable.Range(1, 6).ToList();

Expand Down Expand Up @@ -146,10 +157,41 @@ async Task ShouldThrow()

await Assert.ThrowsAsync<IllegalStateException>(ShouldThrow);
}


[Fact]
public void AsyncEnumerableSource_Must_Complete_Immediately_With_No_elements_When_An_Empty_IAsyncEnumerable_Is_Passed_In()
{
Func<IAsyncEnumerable<int>> range = () =>
{
return RangeAsync(1, 100);
};
var subscriber = this.CreateManualSubscriberProbe<int>();

Source.From(range)
.RunWith(Sink.FromSubscriber(subscriber), Materializer);

var subscription = subscriber.ExpectSubscription();
subscription.Request(100);
for (int i = 1; i <= 20; i++)
{
var next = subscriber.ExpectNext(i);
_helper.WriteLine(i.ToString());
}

//subscriber.ExpectComplete();
}

static async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
for (var i = 0; i < count; i++)
{
await Task.Delay(i);
yield return start + i;
}
}

}

#else
#endif

}
16 changes: 16 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,22 @@ public static Source<T, NotUsed> Cycle<T>(Func<IEnumerator<T>> enumeratorFactory
public static Source<T, NotUsed> From<T>(IEnumerable<T> enumerable)
=> Single(enumerable).SelectMany(x => x).WithAttributes(DefaultAttributes.EnumerableSource);


/// <summary>
/// Helper to create <see cref="Source{TOut,TMat}"/> from <see cref="IAsyncEnumerable{T}"/>.
/// Example usage: Source.From(Enumerable.Range(1, 10))
///
/// Starts a new <see cref="Source{TOut,TMat}"/> from the given <see cref="IAsyncEnumerable{T}"/>. This is like starting from an
/// Enumerator, but every Subscriber directly attached to the Publisher of this
/// stream will see an individual flow of elements (always starting from the
/// beginning) regardless of when they subscribed.
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <param name=" asyncEnumerable">TBD</param>
/// <returns>TBD</returns>
public static Source<T, NotUsed> From<T>(Func<IAsyncEnumerable<T>> asyncEnumerable)
=> FromGraph(new AsyncEnumerable<T>(asyncEnumerable)).WithAttributes(DefaultAttributes.EnumerableSource);

/// <summary>
/// Create a <see cref="Source{TOut,TMat}"/> with one element.
/// Every connected <see cref="Sink{TIn,TMat}"/> of this stream will see an individual stream consisting of one element.
Expand Down
112 changes: 112 additions & 0 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Akka.Annotations;
using Akka.Event;
Expand Down Expand Up @@ -3701,7 +3702,118 @@ public RecoverWith(Func<Exception, IGraph<SourceShape<TOut>, TMat>> partialFunct
/// </returns>
public override string ToString() => "RecoverWith";
}
/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="T">The type of IAsyncEnumerable.</typeparam>
///
//https://github.com/Horusiath/Akka.Persistence.Pulsar/blob/master/Akka.Persistence.Pulsar/AsyncEnumerableSource.cs
[InternalApi]
public sealed class AsyncEnumerable<T> : GraphStage<SourceShape<T>>
{
#region internal classes

private sealed class Logic : OutGraphStageLogic
{
private readonly IAsyncEnumerator<T> _enumerator;
private readonly Outlet<T> _outlet;
private readonly Action<T> _onSuccess;
private readonly Action<Exception> _onFailure;
private readonly Action _onComplete;
private readonly Action<Task<bool>> _handleContinuation;

public Logic(SourceShape<T> shape, IAsyncEnumerator<T> enumerator) : base(shape)
{
_enumerator = enumerator;
_outlet = shape.Outlet;
_onSuccess = GetAsyncCallback<T>(OnSuccess);
_onFailure = GetAsyncCallback<Exception>(OnFailure);
_onComplete = GetAsyncCallback(OnComplete);
_handleContinuation = task =>
{
// Since this Action is used as task continuation, we cannot safely call corresponding
// OnSuccess/OnFailure/OnComplete methods directly. We need to do that via async callbacks.
if (task.IsFaulted) _onFailure(task.Exception);
else if (task.IsCanceled) _onFailure(new TaskCanceledException(task));
else if (task.Result) _onSuccess(enumerator.Current);
else _onComplete();
};

SetHandler(_outlet, this);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnComplete() => CompleteStage();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnFailure(Exception exception) => FailStage(exception);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnSuccess(T element) => Push(_outlet, element);

public override void OnPull()
{
var vtask = _enumerator.MoveNextAsync();
if (vtask.IsCompletedSuccessfully)
{
// When MoveNextAsync returned immediatelly, we don't need to await.
// We can use fast path instead.
if (vtask.Result)
{
// if result is true, it means we got an element. Push it downstream.
Push(_outlet, _enumerator.Current);
}
else
{
// if result is false, it means enumerator was closed. Complete stage in that case.
CompleteStage();
}
}
else
{
vtask.AsTask().ContinueWith(_handleContinuation);
}
}

public override void OnDownstreamFinish()
{
var vtask = _enumerator.DisposeAsync();
if (vtask.IsCompletedSuccessfully)
{
CompleteStage(); // if dispose completed immediately, complete stage directly
}
else
{
// for async disposals use async callback
vtask.GetAwaiter().OnCompleted(_onComplete);
}
base.OnDownstreamFinish();
}

}

#endregion
private readonly Outlet<T> _outlet = new Outlet<T>("EnumerableSource.out");
private readonly Func<IAsyncEnumerable<T>> _factory;

public AsyncEnumerable(Func<IAsyncEnumerable<T>> factory)
{
_factory = factory;
Shape = new SourceShape<T>(_outlet);
}

public override SourceShape<T> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(Shape, _factory().GetAsyncEnumerator());

protected override Attributes InitialAttributes { get; } = DefaultAttributes.EnumerableSource;

/// <summary>
/// Returns a <see cref="string" /> that represents this instance.
/// </summary>
/// <returns>
/// A <see cref="string" /> that represents this instance.
/// </returns>
public override string ToString() => "EnumerableSource";
}
/// <summary>
/// INTERNAL API
/// </summary>
Expand Down

0 comments on commit 7293c6e

Please sign in to comment.