Skip to content

Commit

Permalink
Rewrite actor ref sink as a graph stage (#5930)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
ismaelhamed and Aaronontheweb authored May 11, 2022
1 parent 3c7a8e4 commit 1893800
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,8 @@ namespace Akka.Streams.Dsl
}
public class static Sink
{
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
[System.ObsoleteAttribute("Use overload accepting both on complete and on failure message")]
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRefWithAck<TIn>(Akka.Actor.IActorRef actorRef, object onInitMessage, object ackMessage, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage = null) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.Actor.IActorRef> ActorSubscriber<TIn>(Akka.Actor.Props props) { }
Expand Down Expand Up @@ -2738,25 +2740,14 @@ namespace Akka.Streams.Implementation
public void Subscribe(Reactive.Streams.ISubscriber<TOut> subscriber) { }
public System.Collections.Generic.IEnumerable<Reactive.Streams.ISubscriber<TOut>> TakePendingSubscribers() { }
}
public class ActorRefSinkActor : Akka.Streams.Actors.ActorSubscriber
{
protected readonly int HighWatermark;
protected readonly object OnCompleteMessage;
protected readonly Akka.Actor.IActorRef Ref;
public ActorRefSinkActor(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
protected Akka.Event.ILoggingAdapter Log { get; }
public override Akka.Streams.Actors.IRequestStrategy RequestStrategy { get; }
public static Akka.Actor.Props Props(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
protected override bool Receive(object message) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class ActorRefSink<TIn> : Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed>
public sealed class ActorRefSinkStage<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.SinkShape<T>>
{
public ActorRefSink(Akka.Actor.IActorRef @ref, object onCompleteMessage, Akka.Streams.Attributes attributes, Akka.Streams.SinkShape<TIn> shape) { }
public override Akka.Streams.Attributes Attributes { get; }
public override object Create(Akka.Streams.MaterializationContext context, out Akka.NotUsed materializer) { }
protected override Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed> NewInstance(Akka.Streams.SinkShape<TIn> shape) { }
public override Akka.Streams.Implementation.IModule WithAttributes(Akka.Streams.Attributes attributes) { }
public ActorRefSinkStage(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
public Akka.Streams.Inlet<T> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class ActorRefSource<TOut> : Akka.Streams.Implementation.SourceModule<TOut, Akka.Actor.IActorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,8 @@ namespace Akka.Streams.Dsl
}
public class static Sink
{
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
[System.ObsoleteAttribute("Use overload accepting both on complete and on failure message")]
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRefWithAck<TIn>(Akka.Actor.IActorRef actorRef, object onInitMessage, object ackMessage, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage = null) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.Actor.IActorRef> ActorSubscriber<TIn>(Akka.Actor.Props props) { }
Expand Down Expand Up @@ -2738,25 +2740,14 @@ namespace Akka.Streams.Implementation
public void Subscribe(Reactive.Streams.ISubscriber<TOut> subscriber) { }
public System.Collections.Generic.IEnumerable<Reactive.Streams.ISubscriber<TOut>> TakePendingSubscribers() { }
}
public class ActorRefSinkActor : Akka.Streams.Actors.ActorSubscriber
{
protected readonly int HighWatermark;
protected readonly object OnCompleteMessage;
protected readonly Akka.Actor.IActorRef Ref;
public ActorRefSinkActor(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
protected Akka.Event.ILoggingAdapter Log { get; }
public override Akka.Streams.Actors.IRequestStrategy RequestStrategy { get; }
public static Akka.Actor.Props Props(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
protected override bool Receive(object message) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class ActorRefSink<TIn> : Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed>
public sealed class ActorRefSinkStage<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.SinkShape<T>>
{
public ActorRefSink(Akka.Actor.IActorRef @ref, object onCompleteMessage, Akka.Streams.Attributes attributes, Akka.Streams.SinkShape<TIn> shape) { }
public override Akka.Streams.Attributes Attributes { get; }
public override object Create(Akka.Streams.MaterializationContext context, out Akka.NotUsed materializer) { }
protected override Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed> NewInstance(Akka.Streams.SinkShape<TIn> shape) { }
public override Akka.Streams.Implementation.IModule WithAttributes(Akka.Streams.Attributes attributes) { }
public ActorRefSinkStage(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
public Akka.Streams.Inlet<T> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class ActorRefSource<TOut> : Akka.Streams.Implementation.SourceModule<TOut, Akka.Actor.IActorRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,8 @@ namespace Akka.Streams.Dsl
}
public class static Sink
{
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
[System.ObsoleteAttribute("Use overload accepting both on complete and on failure message")]
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRef<TIn>(Akka.Actor.IActorRef actorRef, object onCompleteMessage) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> ActorRefWithAck<TIn>(Akka.Actor.IActorRef actorRef, object onInitMessage, object ackMessage, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage = null) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.Actor.IActorRef> ActorSubscriber<TIn>(Akka.Actor.Props props) { }
Expand Down Expand Up @@ -2738,25 +2740,14 @@ namespace Akka.Streams.Implementation
public void Subscribe(Reactive.Streams.ISubscriber<TOut> subscriber) { }
public System.Collections.Generic.IEnumerable<Reactive.Streams.ISubscriber<TOut>> TakePendingSubscribers() { }
}
public class ActorRefSinkActor : Akka.Streams.Actors.ActorSubscriber
{
protected readonly int HighWatermark;
protected readonly object OnCompleteMessage;
protected readonly Akka.Actor.IActorRef Ref;
public ActorRefSinkActor(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
protected Akka.Event.ILoggingAdapter Log { get; }
public override Akka.Streams.Actors.IRequestStrategy RequestStrategy { get; }
public static Akka.Actor.Props Props(Akka.Actor.IActorRef @ref, int highWatermark, object onCompleteMessage) { }
protected override bool Receive(object message) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class ActorRefSink<TIn> : Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed>
public sealed class ActorRefSinkStage<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.SinkShape<T>>
{
public ActorRefSink(Akka.Actor.IActorRef @ref, object onCompleteMessage, Akka.Streams.Attributes attributes, Akka.Streams.SinkShape<TIn> shape) { }
public override Akka.Streams.Attributes Attributes { get; }
public override object Create(Akka.Streams.MaterializationContext context, out Akka.NotUsed materializer) { }
protected override Akka.Streams.Implementation.SinkModule<TIn, Akka.NotUsed> NewInstance(Akka.Streams.SinkShape<TIn> shape) { }
public override Akka.Streams.Implementation.IModule WithAttributes(Akka.Streams.Attributes attributes) { }
public ActorRefSinkStage(Akka.Actor.IActorRef actorRef, object onCompleteMessage, System.Func<System.Exception, object> onFailureMessage) { }
public Akka.Streams.Inlet<T> In { get; }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.SinkShape<T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class ActorRefSource<TOut> : Akka.Streams.Implementation.SourceModule<TOut, Akka.Actor.IActorRef>
Expand Down
12 changes: 12 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams.Dsl;
Expand Down Expand Up @@ -58,5 +59,16 @@ public void ActorRefSink_should_cancel_a_stream_when_actor_terminates()
Sys.Stop(fw);
publisher.ExpectCancellation();
}

[Fact]
public void ActorRefSink_should_sends_error_message_if_upstream_fails()
{
var actorProbe = CreateTestProbe();
var probe = this.SourceProbe<string>().To(Sink.ActorRef<string>(actorProbe.Ref, "complete", _ => "failure"))
.Run(Materializer);

probe.SendError(new Exception("oh dear"));
actorProbe.ExpectMsg("failure");
}
}
}
29 changes: 28 additions & 1 deletion src/core/Akka.Streams/Dsl/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,32 @@ public static Sink<TIn, NotUsed> OnComplete<TIn>(Action success, Action<Exceptio
.To(Ignore<NotUsed>())
.Named("OnCompleteSink");

/// <summary>
/// INTERNAL API
///
/// <para>
/// Sends the elements of the stream to the given <see cref="IActorRef"/>.
/// If the target actor terminates the stream will be canceled.
/// When the stream is completed successfully the given <paramref name="onCompleteMessage"/>
/// will be sent to the destination actor.
/// When the stream is completed with failure the <paramref name="onFailureMessage"/> will be
/// invoked and its result will be sent to the destination actor.
/// </para>
/// <para>
/// It will request at most <see cref="ActorMaterializerSettings.MaxInputBufferSize"/> number of elements from
/// upstream, but there is no back-pressure signal from the destination actor,
/// i.e. if the actor is not consuming the messages fast enough the mailbox
/// of the actor will grow. For potentially slow consumer actors it is recommended
/// to use a bounded mailbox with zero <see cref="BoundedMessageQueue.PushTimeOut"/> or use a rate
/// limiting stage in front of this <see cref="Sink{TIn, TMat}"/>.
/// </para>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <param name="actorRef">TBD</param>
/// <param name="onCompleteMessage">TBD</param>
/// <param name="onFailureMessage">TBD</param>
public static Sink<TIn, NotUsed> ActorRef<TIn>(IActorRef actorRef, object onCompleteMessage, Func<Exception, object> onFailureMessage)
=> FromGraph(new ActorRefSinkStage<TIn>(actorRef, onCompleteMessage, onFailureMessage));

///<summary>
/// Sends the elements of the stream to the given <see cref="IActorRef"/>.
Expand All @@ -462,8 +488,9 @@ public static Sink<TIn, NotUsed> OnComplete<TIn>(Action success, Action<Exceptio
/// <param name="actorRef">TBD</param>
/// <param name="onCompleteMessage">TBD</param>
/// <returns>TBD</returns>
[Obsolete("Use overload accepting both on complete and on failure message")]
public static Sink<TIn, NotUsed> ActorRef<TIn>(IActorRef actorRef, object onCompleteMessage)
=> new Sink<TIn, NotUsed>(new ActorRefSink<TIn>(actorRef, onCompleteMessage, DefaultAttributes.ActorRefSink, Shape<TIn>("ActorRefSink")));
=> FromGraph(new ActorRefSinkStage<TIn>(actorRef, onCompleteMessage, ex => new Status.Failure(ex)));

/// <summary>
/// Sends the elements of the stream to the given <see cref="IActorRef"/> that sends back back-pressure signal.
Expand Down
102 changes: 0 additions & 102 deletions src/core/Akka.Streams/Implementation/ActorRefSinkActor.cs

This file was deleted.

Loading

0 comments on commit 1893800

Please sign in to comment.