From 0e948dba0bbcd13c58a126e01dfbf7e9b7cdad7e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 20 May 2022 19:14:59 +0700 Subject: [PATCH] [PORT] Akka/Akka#27266 - Propagate stream cancellation causes (#5949) * [PORT] Akka#27266 - Propagate stream cancellation causes * Add MaybeSourceSpec * Fix tests * Update API Verify list --- ...reAPISpec.ApproveStreams.Core.verified.txt | 69 +++++-- ...APISpec.ApproveStreams.DotNet.verified.txt | 70 +++++-- ...oreAPISpec.ApproveStreams.Net.verified.txt | 69 +++++-- .../Akka.Streams.TestKit/TestGraphStage.cs | 4 +- .../InterpreterBenchmark.cs | 2 +- .../Dsl/GraphStageTimersSpec.cs | 2 +- .../Dsl/GraphUnzipWithSpec.cs | 53 +++++ .../Akka.Streams.Tests/Dsl/LazySourceSpec.cs | 36 ++++ .../Akka.Streams.Tests/Dsl/MaybeSourceSpec.cs | 183 ++++++++++++++++++ .../Fusing/ActorGraphInterpreterSpec.cs | 8 +- .../GraphInterpreterFailureModesSpec.cs | 12 +- .../Fusing/GraphInterpreterPortsSpec.cs | 14 +- .../Fusing/GraphInterpreterSpecKit.cs | 33 +++- .../Implementation/Fusing/InterpreterSpec.cs | 20 +- .../Fusing/InterpreterStressSpec.cs | 4 +- .../Fusing/InterpreterSupervisionSpec.cs | 8 +- .../Fusing/LifecycleInterpreterSpec.cs | 4 +- .../Implementation/GraphStageLogicSpec.cs | 2 +- src/core/Akka.Streams/Akka.Streams.csproj | 15 +- .../Akka.Streams/CodeGen/Dsl/UnzipWith.cs | 118 +++++------ .../Akka.Streams/CodeGen/Dsl/UnzipWith.tt | 4 +- src/core/Akka.Streams/Dsl/DelayFlow.cs | 2 +- src/core/Akka.Streams/Dsl/Graph.cs | 31 ++- src/core/Akka.Streams/Dsl/KeepAliveConcat.cs | 2 +- src/core/Akka.Streams/Dsl/One2OneBidiFlow.cs | 4 +- src/core/Akka.Streams/Dsl/Pulse.cs | 2 +- src/core/Akka.Streams/Dsl/RestartFlow.cs | 25 ++- src/core/Akka.Streams/Dsl/Retry.cs | 4 +- src/core/Akka.Streams/Dsl/UnfoldFlow.cs | 3 +- src/core/Akka.Streams/Dsl/Valve.cs | 2 +- .../Fusing/ActorGraphInterpreter.cs | 68 ++++--- .../Fusing/EnumeratorInterpreter.cs | 2 +- .../Implementation/Fusing/GraphInterpreter.cs | 33 +++- .../Implementation/Fusing/GraphStages.cs | 23 ++- .../Akka.Streams/Implementation/Fusing/Ops.cs | 44 +++-- .../Implementation/Fusing/StreamOfStreams.cs | 92 +++------ .../Implementation/IO/TcpStages.cs | 31 ++- .../ReactiveStreamsCompliance.cs | 15 +- src/core/Akka.Streams/Implementation/Sinks.cs | 6 +- .../Akka.Streams/Implementation/Sources.cs | 23 +-- .../Implementation/StreamLayout.cs | 62 +++--- .../Implementation/StreamRef/SourceRefImpl.cs | 5 +- .../Akka.Streams/Implementation/Throttle.cs | 2 +- .../Akka.Streams/Implementation/Timers.cs | 16 +- src/core/Akka.Streams/KillSwitch.cs | 4 +- src/core/Akka.Streams/Stage/AbstractStage.cs | 17 +- src/core/Akka.Streams/Stage/Context.cs | 2 + src/core/Akka.Streams/Stage/GraphStage.cs | 166 +++++++++++----- src/core/Akka.Streams/StreamTcpException.cs | 6 + .../SubscriptionWithCancelException.cs | 39 ++++ 50 files changed, 1004 insertions(+), 457 deletions(-) create mode 100644 src/core/Akka.Streams.Tests/Dsl/MaybeSourceSpec.cs create mode 100644 src/core/Akka.Streams/SubscriptionWithCancelException.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt index 8c31c106e4b..9f6c9951b53 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt @@ -723,6 +723,10 @@ namespace Akka.Streams { Akka.Streams.Dsl.Source Source { get; } } + public interface ISubscriptionWithCancelException : Reactive.Streams.ISubscription + { + void Cancel(System.Exception cause); + } public interface ITransformerLike { bool IsComplete { get; } @@ -919,6 +923,7 @@ namespace Akka.Streams public static readonly Akka.Streams.StreamDetachedException Instance; public StreamDetachedException() { } public StreamDetachedException(string message) { } + public StreamDetachedException(string message, System.Exception innerException) { } } public class StreamLimitReachedException : System.Exception { @@ -997,6 +1002,23 @@ namespace Akka.Streams public StreamTcpException(string message, System.Exception innerException) { } protected StreamTcpException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } + public class static SubscriptionWithCancelException + { + public sealed class NoMoreElementsNeeded : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation + { + public static readonly Akka.Streams.SubscriptionWithCancelException.NoMoreElementsNeeded Instance; + } + [Akka.Annotations.DoNotInheritAttribute()] + public abstract class NonFailureCancellation : System.Exception + { + protected NonFailureCancellation() { } + public virtual string StackTrace { get; } + } + public sealed class StageWasCompleted : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation + { + public static readonly Akka.Streams.SubscriptionWithCancelException.StageWasCompleted Instance; + } + } public enum SubstreamCancelStrategy { Propagate = 0, @@ -3461,7 +3483,7 @@ namespace Akka.Streams.Implementation public static void RequireNonNullException(System.Exception exception) { } public static void RequireNonNullSubscriber(Reactive.Streams.ISubscriber subscriber) { } public static void RequireNonNullSubscription(Reactive.Streams.ISubscription subscription) { } - public static void TryCancel(Reactive.Streams.ISubscription subscription) { } + public static void TryCancel(Reactive.Streams.ISubscription subscription, System.Exception cause) { } public static void TryOnComplete(Reactive.Streams.ISubscriber subscriber) { } public static void TryOnError(Reactive.Streams.ISubscriber subscriber, System.Exception cause) { } public static void TryOnNext(Reactive.Streams.ISubscriber subscriber, T element) { } @@ -3835,7 +3857,7 @@ namespace Akka.Streams.Implementation.Fusing { public BatchingActorInputBoundary(int size, int id) { } public override Akka.Streams.Outlet Out { get; } - public void Cancel() { } + public void Cancel(System.Exception cause) { } public void OnComplete() { } public void OnError(System.Exception reason) { } public void OnInternalError(System.Exception reason) { } @@ -3856,17 +3878,19 @@ namespace Akka.Streams.Implementation.Fusing public void OnNext(T element) { } public void OnSubscribe(Reactive.Streams.ISubscription subscription) { } } - public sealed class BoundarySubscription : Reactive.Streams.ISubscription + public sealed class BoundarySubscription : Akka.Streams.ISubscriptionWithCancelException, Reactive.Streams.ISubscription { public BoundarySubscription(Akka.Actor.IActorRef parent, Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { } public void Cancel() { } + public void Cancel(System.Exception cause) { } public void Request(long elements) { } public override string ToString() { } } public struct Cancel : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent { public readonly int Id; - public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { } + public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id, System.Exception cause) { } + public System.Exception Cause { get; } public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; } } public struct ExposedPublisher : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent @@ -4063,6 +4087,11 @@ namespace Akka.Streams.Implementation.Fusing public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { } public override string ToString() { } + public sealed class Cancelled + { + public readonly System.Exception Cause; + public Cancelled(System.Exception cause) { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class Connection { @@ -4627,7 +4656,7 @@ namespace Akka.Streams.Stage protected AbstractStage() { } protected virtual bool IsDetached { get; } public virtual Akka.Streams.Supervision.Directive Decide(System.Exception cause) { } - public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context); + public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause); public abstract Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context); public abstract Akka.Streams.Stage.IDirective OnPush(TIn element, Akka.Streams.Stage.IContext context); public abstract Akka.Streams.Stage.ITerminationDirective OnUpstreamFailure(System.Exception cause, Akka.Streams.Stage.IContext context); @@ -4644,8 +4673,8 @@ namespace Akka.Streams.Stage { protected TContext Context; protected AbstractStage() { } - public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context) { } - public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context) { } + public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause) { } + public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context, System.Exception cause) { } public abstract TPullDirective OnPull(TContext context); public override Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context) { } public abstract TPushDirective OnPush(TIn element, TContext context); @@ -4666,7 +4695,7 @@ namespace Akka.Streams.Stage public class ConditionalTerminateOutput : Akka.Streams.Stage.OutHandler { public ConditionalTerminateOutput(System.Func predicate) { } - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } [System.ObsoleteAttribute("Please use GraphStage instead. [1.1.0]")] @@ -4713,7 +4742,9 @@ namespace Akka.Streams.Stage protected void AbortReading(Akka.Streams.Inlet inlet) { } protected virtual void AfterPostStop() { } protected virtual void BeforePreStart() { } + protected void Cancel(Akka.Streams.Inlet inlet, System.Exception cause) { } protected void Cancel(Akka.Streams.Inlet inlet) { } + public void CancelStage(System.Exception cause) { } protected void Complete(Akka.Streams.Outlet outlet) { } public void CompleteStage() { } public static Akka.Streams.Stage.InHandler ConditionalTerminateInput(System.Func predicate) { } @@ -4735,6 +4766,8 @@ namespace Akka.Streams.Stage protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { } protected T Grab(Akka.Streams.Inlet inlet) { } protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } + [Akka.Annotations.InternalApiAttribute()] + public void InternalOnDownstreamFinish(System.Exception cause) { } protected bool IsAvailable(Akka.Streams.Inlet inlet) { } protected bool IsAvailable(Akka.Streams.Outlet outlet) { } protected bool IsClosed(Akka.Streams.Inlet inlet) { } @@ -4750,7 +4783,7 @@ namespace Akka.Streams.Stage protected void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Stage.IInHandler handler) { } protected void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action onUpstreamFailure = null) { } protected void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { } - protected void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } + protected void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } [System.ObsoleteAttribute("Use method `SetHandlers` instead. Will be removed in v1.5")] protected void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } protected void SetHandlers(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } @@ -4765,8 +4798,8 @@ namespace Akka.Streams.Stage } protected sealed class LambdaOutHandler : Akka.Streams.Stage.OutHandler { - public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { } - public override void OnDownstreamFinish() { } + public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } [Akka.Annotations.InternalApiAttribute()] @@ -4778,6 +4811,7 @@ namespace Akka.Streams.Stage public bool IsClosed { get; } public Akka.Streams.IGraph, Akka.NotUsed> Sink { get; } public void Cancel() { } + public void Cancel(System.Exception cause) { } public T Grab() { } public void Pull() { } public void SetHandler(Akka.Streams.Stage.IInHandler handler) { } @@ -4838,6 +4872,7 @@ namespace Akka.Streams.Stage Akka.Streams.Stage.ITerminationDirective AbsorbTermination(); Akka.Streams.Stage.FreeDirective Fail(System.Exception cause); Akka.Streams.Stage.FreeDirective Finish(); + Akka.Streams.Stage.FreeDirective Finish(System.Exception cause); Akka.Streams.Stage.IUpstreamDirective Pull(); Akka.Streams.Stage.IDownstreamDirective Push(object element); Akka.Streams.Stage.IDownstreamDirective PushAndFinish(object element); @@ -4888,7 +4923,7 @@ namespace Akka.Streams.Stage } public interface IOutHandler { - void OnDownstreamFinish(); + void OnDownstreamFinish(System.Exception cause); void OnPull(); } public interface IStageLogging @@ -4909,14 +4944,14 @@ namespace Akka.Streams.Stage public sealed class IgnoreTerminateOutput : Akka.Streams.Stage.OutHandler { public static readonly Akka.Streams.Stage.IgnoreTerminateOutput Instance; - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } public abstract class InAndOutGraphStageLogic : Akka.Streams.Stage.GraphStageLogic, Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler { protected InAndOutGraphStageLogic(int inCount, int outCount) { } protected InAndOutGraphStageLogic(Akka.Streams.Shape shape) { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); public abstract void OnPush(); public virtual void OnUpstreamFailure(System.Exception e) { } @@ -4925,7 +4960,7 @@ namespace Akka.Streams.Stage public abstract class InAndOutHandler : Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler { protected InAndOutHandler() { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); public abstract void OnPush(); public virtual void OnUpstreamFailure(System.Exception e) { } @@ -4956,13 +4991,13 @@ namespace Akka.Streams.Stage { protected OutGraphStageLogic(int inCount, int outCount) { } protected OutGraphStageLogic(Akka.Streams.Shape shape) { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); } public abstract class OutHandler : Akka.Streams.Stage.IOutHandler { protected OutHandler() { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); } public class PushPullGraphStageWithMaterializedValue : Akka.Streams.Stage.GraphStageWithMaterializedValue, TMat> diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index fa5aaa6ea97..a1a6a7f942b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -723,6 +723,10 @@ namespace Akka.Streams { Akka.Streams.Dsl.Source Source { get; } } + public interface ISubscriptionWithCancelException : Reactive.Streams.ISubscription + { + void Cancel(System.Exception cause); + } public interface ITransformerLike { bool IsComplete { get; } @@ -919,6 +923,7 @@ namespace Akka.Streams public static readonly Akka.Streams.StreamDetachedException Instance; public StreamDetachedException() { } public StreamDetachedException(string message) { } + public StreamDetachedException(string message, System.Exception innerException) { } } public class StreamLimitReachedException : System.Exception { @@ -997,6 +1002,23 @@ namespace Akka.Streams public StreamTcpException(string message, System.Exception innerException) { } protected StreamTcpException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } + public class static SubscriptionWithCancelException + { + public sealed class NoMoreElementsNeeded : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation + { + public static readonly Akka.Streams.SubscriptionWithCancelException.NoMoreElementsNeeded Instance; + } + [Akka.Annotations.DoNotInheritAttribute()] + public abstract class NonFailureCancellation : System.Exception + { + protected NonFailureCancellation() { } + public virtual string StackTrace { get; } + } + public sealed class StageWasCompleted : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation + { + public static readonly Akka.Streams.SubscriptionWithCancelException.StageWasCompleted Instance; + } + } public enum SubstreamCancelStrategy { Propagate = 0, @@ -3461,7 +3483,7 @@ namespace Akka.Streams.Implementation public static void RequireNonNullException(System.Exception exception) { } public static void RequireNonNullSubscriber(Reactive.Streams.ISubscriber subscriber) { } public static void RequireNonNullSubscription(Reactive.Streams.ISubscription subscription) { } - public static void TryCancel(Reactive.Streams.ISubscription subscription) { } + public static void TryCancel(Reactive.Streams.ISubscription subscription, System.Exception cause) { } public static void TryOnComplete(Reactive.Streams.ISubscriber subscriber) { } public static void TryOnError(Reactive.Streams.ISubscriber subscriber, System.Exception cause) { } public static void TryOnNext(Reactive.Streams.ISubscriber subscriber, T element) { } @@ -3837,7 +3859,7 @@ namespace Akka.Streams.Implementation.Fusing { public BatchingActorInputBoundary(int size, int id) { } public override Akka.Streams.Outlet Out { get; } - public void Cancel() { } + public void Cancel(System.Exception cause) { } public void OnComplete() { } public void OnError(System.Exception reason) { } public void OnInternalError(System.Exception reason) { } @@ -3858,17 +3880,20 @@ namespace Akka.Streams.Implementation.Fusing public void OnNext(T element) { } public void OnSubscribe(Reactive.Streams.ISubscription subscription) { } } - public sealed class BoundarySubscription : Reactive.Streams.ISubscription + public sealed class BoundarySubscription : Akka.Streams.ISubscriptionWithCancelException, Reactive.Streams.ISubscription { public BoundarySubscription(Akka.Actor.IActorRef parent, Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { } public void Cancel() { } + public void Cancel(System.Exception cause) { } public void Request(long elements) { } public override string ToString() { } } public struct Cancel : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent { public readonly int Id; - public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { } + public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id, System.Exception cause) { } + [get: System.Runtime.CompilerServices.IsReadOnlyAttribute()] + public System.Exception Cause { get; } [get: System.Runtime.CompilerServices.IsReadOnlyAttribute()] public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; } } @@ -4074,6 +4099,11 @@ namespace Akka.Streams.Implementation.Fusing public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { } public override string ToString() { } + public sealed class Cancelled + { + public readonly System.Exception Cause; + public Cancelled(System.Exception cause) { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class Connection { @@ -4638,7 +4668,7 @@ namespace Akka.Streams.Stage protected AbstractStage() { } protected virtual bool IsDetached { get; } public virtual Akka.Streams.Supervision.Directive Decide(System.Exception cause) { } - public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context); + public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause); public abstract Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context); public abstract Akka.Streams.Stage.IDirective OnPush(TIn element, Akka.Streams.Stage.IContext context); public abstract Akka.Streams.Stage.ITerminationDirective OnUpstreamFailure(System.Exception cause, Akka.Streams.Stage.IContext context); @@ -4655,8 +4685,8 @@ namespace Akka.Streams.Stage { protected TContext Context; protected AbstractStage() { } - public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context) { } - public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context) { } + public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause) { } + public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context, System.Exception cause) { } public abstract TPullDirective OnPull(TContext context); public override Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context) { } public abstract TPushDirective OnPush(TIn element, TContext context); @@ -4677,7 +4707,7 @@ namespace Akka.Streams.Stage public class ConditionalTerminateOutput : Akka.Streams.Stage.OutHandler { public ConditionalTerminateOutput(System.Func predicate) { } - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } [System.ObsoleteAttribute("Please use GraphStage instead. [1.1.0]")] @@ -4724,7 +4754,9 @@ namespace Akka.Streams.Stage protected void AbortReading(Akka.Streams.Inlet inlet) { } protected virtual void AfterPostStop() { } protected virtual void BeforePreStart() { } + protected void Cancel(Akka.Streams.Inlet inlet, System.Exception cause) { } protected void Cancel(Akka.Streams.Inlet inlet) { } + public void CancelStage(System.Exception cause) { } protected void Complete(Akka.Streams.Outlet outlet) { } public void CompleteStage() { } public static Akka.Streams.Stage.InHandler ConditionalTerminateInput(System.Func predicate) { } @@ -4746,6 +4778,8 @@ namespace Akka.Streams.Stage protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { } protected T Grab(Akka.Streams.Inlet inlet) { } protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } + [Akka.Annotations.InternalApiAttribute()] + public void InternalOnDownstreamFinish(System.Exception cause) { } protected bool IsAvailable(Akka.Streams.Inlet inlet) { } protected bool IsAvailable(Akka.Streams.Outlet outlet) { } protected bool IsClosed(Akka.Streams.Inlet inlet) { } @@ -4761,7 +4795,7 @@ namespace Akka.Streams.Stage protected void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Stage.IInHandler handler) { } protected void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action onUpstreamFailure = null) { } protected void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { } - protected void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } + protected void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } [System.ObsoleteAttribute("Use method `SetHandlers` instead. Will be removed in v1.5")] protected void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } protected void SetHandlers(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } @@ -4776,8 +4810,8 @@ namespace Akka.Streams.Stage } protected sealed class LambdaOutHandler : Akka.Streams.Stage.OutHandler { - public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { } - public override void OnDownstreamFinish() { } + public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } [Akka.Annotations.InternalApiAttribute()] @@ -4789,6 +4823,7 @@ namespace Akka.Streams.Stage public bool IsClosed { get; } public Akka.Streams.IGraph, Akka.NotUsed> Sink { get; } public void Cancel() { } + public void Cancel(System.Exception cause) { } public T Grab() { } public void Pull() { } public void SetHandler(Akka.Streams.Stage.IInHandler handler) { } @@ -4849,6 +4884,7 @@ namespace Akka.Streams.Stage Akka.Streams.Stage.ITerminationDirective AbsorbTermination(); Akka.Streams.Stage.FreeDirective Fail(System.Exception cause); Akka.Streams.Stage.FreeDirective Finish(); + Akka.Streams.Stage.FreeDirective Finish(System.Exception cause); Akka.Streams.Stage.IUpstreamDirective Pull(); Akka.Streams.Stage.IDownstreamDirective Push(object element); Akka.Streams.Stage.IDownstreamDirective PushAndFinish(object element); @@ -4899,7 +4935,7 @@ namespace Akka.Streams.Stage } public interface IOutHandler { - void OnDownstreamFinish(); + void OnDownstreamFinish(System.Exception cause); void OnPull(); } public interface IStageLogging @@ -4920,14 +4956,14 @@ namespace Akka.Streams.Stage public sealed class IgnoreTerminateOutput : Akka.Streams.Stage.OutHandler { public static readonly Akka.Streams.Stage.IgnoreTerminateOutput Instance; - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } public abstract class InAndOutGraphStageLogic : Akka.Streams.Stage.GraphStageLogic, Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler { protected InAndOutGraphStageLogic(int inCount, int outCount) { } protected InAndOutGraphStageLogic(Akka.Streams.Shape shape) { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); public abstract void OnPush(); public virtual void OnUpstreamFailure(System.Exception e) { } @@ -4936,7 +4972,7 @@ namespace Akka.Streams.Stage public abstract class InAndOutHandler : Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler { protected InAndOutHandler() { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); public abstract void OnPush(); public virtual void OnUpstreamFailure(System.Exception e) { } @@ -4969,13 +5005,13 @@ namespace Akka.Streams.Stage { protected OutGraphStageLogic(int inCount, int outCount) { } protected OutGraphStageLogic(Akka.Streams.Shape shape) { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); } public abstract class OutHandler : Akka.Streams.Stage.IOutHandler { protected OutHandler() { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); } public class PushPullGraphStageWithMaterializedValue : Akka.Streams.Stage.GraphStageWithMaterializedValue, TMat> diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 8c31c106e4b..9f6c9951b53 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -723,6 +723,10 @@ namespace Akka.Streams { Akka.Streams.Dsl.Source Source { get; } } + public interface ISubscriptionWithCancelException : Reactive.Streams.ISubscription + { + void Cancel(System.Exception cause); + } public interface ITransformerLike { bool IsComplete { get; } @@ -919,6 +923,7 @@ namespace Akka.Streams public static readonly Akka.Streams.StreamDetachedException Instance; public StreamDetachedException() { } public StreamDetachedException(string message) { } + public StreamDetachedException(string message, System.Exception innerException) { } } public class StreamLimitReachedException : System.Exception { @@ -997,6 +1002,23 @@ namespace Akka.Streams public StreamTcpException(string message, System.Exception innerException) { } protected StreamTcpException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { } } + public class static SubscriptionWithCancelException + { + public sealed class NoMoreElementsNeeded : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation + { + public static readonly Akka.Streams.SubscriptionWithCancelException.NoMoreElementsNeeded Instance; + } + [Akka.Annotations.DoNotInheritAttribute()] + public abstract class NonFailureCancellation : System.Exception + { + protected NonFailureCancellation() { } + public virtual string StackTrace { get; } + } + public sealed class StageWasCompleted : Akka.Streams.SubscriptionWithCancelException.NonFailureCancellation + { + public static readonly Akka.Streams.SubscriptionWithCancelException.StageWasCompleted Instance; + } + } public enum SubstreamCancelStrategy { Propagate = 0, @@ -3461,7 +3483,7 @@ namespace Akka.Streams.Implementation public static void RequireNonNullException(System.Exception exception) { } public static void RequireNonNullSubscriber(Reactive.Streams.ISubscriber subscriber) { } public static void RequireNonNullSubscription(Reactive.Streams.ISubscription subscription) { } - public static void TryCancel(Reactive.Streams.ISubscription subscription) { } + public static void TryCancel(Reactive.Streams.ISubscription subscription, System.Exception cause) { } public static void TryOnComplete(Reactive.Streams.ISubscriber subscriber) { } public static void TryOnError(Reactive.Streams.ISubscriber subscriber, System.Exception cause) { } public static void TryOnNext(Reactive.Streams.ISubscriber subscriber, T element) { } @@ -3835,7 +3857,7 @@ namespace Akka.Streams.Implementation.Fusing { public BatchingActorInputBoundary(int size, int id) { } public override Akka.Streams.Outlet Out { get; } - public void Cancel() { } + public void Cancel(System.Exception cause) { } public void OnComplete() { } public void OnError(System.Exception reason) { } public void OnInternalError(System.Exception reason) { } @@ -3856,17 +3878,19 @@ namespace Akka.Streams.Implementation.Fusing public void OnNext(T element) { } public void OnSubscribe(Reactive.Streams.ISubscription subscription) { } } - public sealed class BoundarySubscription : Reactive.Streams.ISubscription + public sealed class BoundarySubscription : Akka.Streams.ISubscriptionWithCancelException, Reactive.Streams.ISubscription { public BoundarySubscription(Akka.Actor.IActorRef parent, Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { } public void Cancel() { } + public void Cancel(System.Exception cause) { } public void Request(long elements) { } public override string ToString() { } } public struct Cancel : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent { public readonly int Id; - public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id) { } + public Cancel(Akka.Streams.Implementation.Fusing.GraphInterpreterShell shell, int id, System.Exception cause) { } + public System.Exception Cause { get; } public Akka.Streams.Implementation.Fusing.GraphInterpreterShell Shell { get; } } public struct ExposedPublisher : Akka.Actor.INoSerializationVerificationNeeded, Akka.Event.IDeadLetterSuppression, Akka.Streams.Implementation.Fusing.ActorGraphInterpreter.IBoundaryEvent @@ -4063,6 +4087,11 @@ namespace Akka.Streams.Implementation.Fusing public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IInHandler handler) { } public void SetHandler(Akka.Streams.Implementation.Fusing.GraphInterpreter.Connection connection, Akka.Streams.Stage.IOutHandler handler) { } public override string ToString() { } + public sealed class Cancelled + { + public readonly System.Exception Cause; + public Cancelled(System.Exception cause) { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class Connection { @@ -4627,7 +4656,7 @@ namespace Akka.Streams.Stage protected AbstractStage() { } protected virtual bool IsDetached { get; } public virtual Akka.Streams.Supervision.Directive Decide(System.Exception cause) { } - public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context); + public abstract Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause); public abstract Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context); public abstract Akka.Streams.Stage.IDirective OnPush(TIn element, Akka.Streams.Stage.IContext context); public abstract Akka.Streams.Stage.ITerminationDirective OnUpstreamFailure(System.Exception cause, Akka.Streams.Stage.IContext context); @@ -4644,8 +4673,8 @@ namespace Akka.Streams.Stage { protected TContext Context; protected AbstractStage() { } - public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context) { } - public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context) { } + public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(Akka.Streams.Stage.IContext context, System.Exception cause) { } + public virtual Akka.Streams.Stage.ITerminationDirective OnDownstreamFinish(TContext context, System.Exception cause) { } public abstract TPullDirective OnPull(TContext context); public override Akka.Streams.Stage.IDirective OnPull(Akka.Streams.Stage.IContext context) { } public abstract TPushDirective OnPush(TIn element, TContext context); @@ -4666,7 +4695,7 @@ namespace Akka.Streams.Stage public class ConditionalTerminateOutput : Akka.Streams.Stage.OutHandler { public ConditionalTerminateOutput(System.Func predicate) { } - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } [System.ObsoleteAttribute("Please use GraphStage instead. [1.1.0]")] @@ -4713,7 +4742,9 @@ namespace Akka.Streams.Stage protected void AbortReading(Akka.Streams.Inlet inlet) { } protected virtual void AfterPostStop() { } protected virtual void BeforePreStart() { } + protected void Cancel(Akka.Streams.Inlet inlet, System.Exception cause) { } protected void Cancel(Akka.Streams.Inlet inlet) { } + public void CancelStage(System.Exception cause) { } protected void Complete(Akka.Streams.Outlet outlet) { } public void CompleteStage() { } public static Akka.Streams.Stage.InHandler ConditionalTerminateInput(System.Func predicate) { } @@ -4735,6 +4766,8 @@ namespace Akka.Streams.Stage protected Akka.Streams.Stage.StageActor GetStageActor(Akka.Streams.Stage.StageActorRef.Receive receive) { } protected T Grab(Akka.Streams.Inlet inlet) { } protected bool HasBeenPulled(Akka.Streams.Inlet inlet) { } + [Akka.Annotations.InternalApiAttribute()] + public void InternalOnDownstreamFinish(System.Exception cause) { } protected bool IsAvailable(Akka.Streams.Inlet inlet) { } protected bool IsAvailable(Akka.Streams.Outlet outlet) { } protected bool IsClosed(Akka.Streams.Inlet inlet) { } @@ -4750,7 +4783,7 @@ namespace Akka.Streams.Stage protected void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Stage.IInHandler handler) { } protected void SetHandler(Akka.Streams.Inlet inlet, System.Action onPush, System.Action onUpstreamFinish = null, System.Action onUpstreamFailure = null) { } protected void SetHandler(Akka.Streams.Outlet outlet, Akka.Streams.Stage.IOutHandler handler) { } - protected void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } + protected void SetHandler(Akka.Streams.Outlet outlet, System.Action onPull, System.Action onDownstreamFinish = null) { } [System.ObsoleteAttribute("Use method `SetHandlers` instead. Will be removed in v1.5")] protected void SetHandler(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } protected void SetHandlers(Akka.Streams.Inlet inlet, Akka.Streams.Outlet outlet, Akka.Streams.Stage.InAndOutGraphStageLogic handler) { } @@ -4765,8 +4798,8 @@ namespace Akka.Streams.Stage } protected sealed class LambdaOutHandler : Akka.Streams.Stage.OutHandler { - public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { } - public override void OnDownstreamFinish() { } + public LambdaOutHandler(System.Action onPull, System.Action onDownstreamFinish = null) { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } [Akka.Annotations.InternalApiAttribute()] @@ -4778,6 +4811,7 @@ namespace Akka.Streams.Stage public bool IsClosed { get; } public Akka.Streams.IGraph, Akka.NotUsed> Sink { get; } public void Cancel() { } + public void Cancel(System.Exception cause) { } public T Grab() { } public void Pull() { } public void SetHandler(Akka.Streams.Stage.IInHandler handler) { } @@ -4838,6 +4872,7 @@ namespace Akka.Streams.Stage Akka.Streams.Stage.ITerminationDirective AbsorbTermination(); Akka.Streams.Stage.FreeDirective Fail(System.Exception cause); Akka.Streams.Stage.FreeDirective Finish(); + Akka.Streams.Stage.FreeDirective Finish(System.Exception cause); Akka.Streams.Stage.IUpstreamDirective Pull(); Akka.Streams.Stage.IDownstreamDirective Push(object element); Akka.Streams.Stage.IDownstreamDirective PushAndFinish(object element); @@ -4888,7 +4923,7 @@ namespace Akka.Streams.Stage } public interface IOutHandler { - void OnDownstreamFinish(); + void OnDownstreamFinish(System.Exception cause); void OnPull(); } public interface IStageLogging @@ -4909,14 +4944,14 @@ namespace Akka.Streams.Stage public sealed class IgnoreTerminateOutput : Akka.Streams.Stage.OutHandler { public static readonly Akka.Streams.Stage.IgnoreTerminateOutput Instance; - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(System.Exception cause) { } public override void OnPull() { } } public abstract class InAndOutGraphStageLogic : Akka.Streams.Stage.GraphStageLogic, Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler { protected InAndOutGraphStageLogic(int inCount, int outCount) { } protected InAndOutGraphStageLogic(Akka.Streams.Shape shape) { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); public abstract void OnPush(); public virtual void OnUpstreamFailure(System.Exception e) { } @@ -4925,7 +4960,7 @@ namespace Akka.Streams.Stage public abstract class InAndOutHandler : Akka.Streams.Stage.IInHandler, Akka.Streams.Stage.IOutHandler { protected InAndOutHandler() { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); public abstract void OnPush(); public virtual void OnUpstreamFailure(System.Exception e) { } @@ -4956,13 +4991,13 @@ namespace Akka.Streams.Stage { protected OutGraphStageLogic(int inCount, int outCount) { } protected OutGraphStageLogic(Akka.Streams.Shape shape) { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); } public abstract class OutHandler : Akka.Streams.Stage.IOutHandler { protected OutHandler() { } - public virtual void OnDownstreamFinish() { } + public virtual void OnDownstreamFinish(System.Exception cause) { } public abstract void OnPull(); } public class PushPullGraphStageWithMaterializedValue : Akka.Streams.Stage.GraphStageWithMaterializedValue, TMat> diff --git a/src/core/Akka.Streams.TestKit/TestGraphStage.cs b/src/core/Akka.Streams.TestKit/TestGraphStage.cs index 6531f04440e..ab86bc4909c 100644 --- a/src/core/Akka.Streams.TestKit/TestGraphStage.cs +++ b/src/core/Akka.Streams.TestKit/TestGraphStage.cs @@ -126,10 +126,10 @@ public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue { _probe.Ref.Tell(GraphStageMessages.Pull.Instance); outHandler.OnPull(); - }, onDownstreamFinish: () => + }, onDownstreamFinish: cause => { _probe.Ref.Tell(GraphStageMessages.DownstreamFinish.Instance); - outHandler.OnDownstreamFinish(); + outHandler.OnDownstreamFinish(cause); }); return logicAndMaterialized; diff --git a/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs b/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs index 3ea7fdc5516..2fec05a957a 100644 --- a/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs +++ b/src/core/Akka.Streams.Tests.Performance/InterpreterBenchmark.cs @@ -94,7 +94,7 @@ public GraphDataSource(string toString, T[] data) } else CompleteStage(); - }, onDownstreamFinish: CompleteStage); + }, onDownstreamFinish: InternalOnDownstreamFinish); Console.WriteLine("Handler Set"); } diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs index b0a245a823a..220039faf7c 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphStageTimersSpec.cs @@ -305,7 +305,7 @@ public Logic(TestStage2 stage) : base(stage.Shape) onUpstreamFailure: FailStage); - SetHandler(stage.Outlet, onPull: DoNothing, onDownstreamFinish: CompleteStage); + SetHandler(stage.Outlet, onPull: DoNothing, onDownstreamFinish: cause => CompleteStage()); } public override void PreStart() => ScheduleRepeatedly(TimerKey, TimeSpan.FromMilliseconds(100)); diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs index 23caeb70a9c..d2300113155 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs @@ -8,6 +8,8 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; @@ -173,6 +175,57 @@ public void UnzipWith_must_work_in_the_sad_case() }, Materializer); } + [Fact] + public void UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all_downstream_have_cancelled() + { + this.AssertAllStagesStopped(() => + { + var probe = CreateTestProbe(); + RunnableGraph.FromGraph(GraphDsl.Create(b => + { + var source = Source + .Maybe() + .WatchTermination(Keep.Right) + .MapMaterializedValue(t => + { + // side effecting our way out of this + probe.Ref.Tell(t, Nobody.Instance); + return NotUsed.Instance; + }); + + var unzip = b.Add(new UnzipWith(i => (1 / i, $"1 / {i}"))); + + b.From(source).To(unzip.In); + + Flow KillSwitchFlow() + => Flow.Create() + .ViaMaterialized(KillSwitches.Single(), Keep.Right) + .MapMaterializedValue(killSwitch => + { + probe.Ref.Tell(killSwitch); + return NotUsed.Instance; + }); + + b.From(unzip.Out0).Via(KillSwitchFlow()).To(Sink.Ignore()); + b.From(unzip.Out1).Via(KillSwitchFlow()).To(Sink.Ignore()); + + return ClosedShape.Instance; + })).Run(Materializer); + + var termination = probe.ExpectMsg>(); + var killSwitch1 = probe.ExpectMsg(); + var killSwitch2 = probe.ExpectMsg(); + var boom = new TestException("Boom"); + killSwitch1.Abort(boom); + killSwitch2.Abort(boom); + termination.ContinueWith(t => + { + t.Exception.Should().NotBeNull(); + t.Exception.InnerException.Should().Be(boom); + }); + }, Materializer); + } + [Fact] public void UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs() { diff --git a/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs index 11a3e7493a3..c773fd4cf1b 100644 --- a/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/LazySourceSpec.cs @@ -8,6 +8,8 @@ using System; using System.Collections.Immutable; using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; using Akka.Streams.Dsl; using Akka.Streams.Stage; using Akka.Streams.TestKit; @@ -111,6 +113,40 @@ public void A_lazy_source_must_materialize_when_the_source_has_been_created() }, Materializer); } + [Fact] + public void A_lazy_source_must_propagate_downstream_cancellation_cause_when_inner_source_has_been_materialized() + { + this.AssertAllStagesStopped(() => + { + var probe = CreateTestProbe(); + var (doneF, killSwitch) = Source.Lazily(() => + { + return Source + .Maybe() + .WatchTermination(Keep.Right) + .MapMaterializedValue(done => + { + probe.Ref.Tell(Done.Instance, Nobody.Instance); + return done; + }); + }) + .MapMaterializedValue(t => t.Unwrap()) + .ViaMaterialized(KillSwitches.Single(), Keep.Both) + .To(Sink.Ignore()) + .Run(Materializer); + + var boom = new TestException("boom"); + probe.ExpectMsg(); + killSwitch.Abort(boom); + doneF.ContinueWith(t => + { + t.Exception.Should().NotBeNull(); + t.Exception.InnerException.Should().NotBeNull(); + t.Exception.InnerException.Should().Be(boom); + }); + }, Materializer); + } + [Fact] public void A_lazy_source_must_fail_stage_when_upstream_fails() { diff --git a/src/core/Akka.Streams.Tests/Dsl/MaybeSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/MaybeSourceSpec.cs new file mode 100644 index 00000000000..daa4c300532 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/MaybeSourceSpec.cs @@ -0,0 +1,183 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation; +using Akka.Streams.Implementation.Fusing; +using Akka.Streams.TestKit; +using Akka.Streams.TestKit.Tests; +using Akka.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; +using static FluentAssertions.FluentActions; + +namespace Akka.Streams.Tests.Dsl +{ + public class MaybeSourceSpec : AkkaSpec + { + private readonly ActorMaterializer _materializer; + public MaybeSourceSpec(ITestOutputHelper output) : base(output) + { + _materializer = Sys.Materializer(); + } + + [Fact(DisplayName = "The Maybe Source must complete materialized promise with None when stream cancels")] + public void CompleteMaterializedPromiseWithNoneWhenCancelled() + { + this.AssertAllStagesStopped(() => + { + var neverSource = Source.Maybe(); + var pubSink = Sink.AsPublisher(false); + + var (tcs, neverPub) = neverSource + .ToMaterialized(pubSink, Keep.Both) + .Run(_materializer); + + var c = this.CreateManualSubscriberProbe(); + neverPub.Subscribe(c); + var subs = c.ExpectSubscription(); + + subs.Request(1000); + c.ExpectNoMsg(100.Milliseconds()); + + subs.Cancel(); + + tcs.Task.Wait(3.Seconds()).Should().BeTrue(); + tcs.Task.Result.Should().Be(0); + }, _materializer); + } + + [Fact(DisplayName = "The Maybe Source must complete materialized promise with 0 when stream cancels with a failure cause")] + public void CompleteMaterializedTaskWithNoneWhenStreamCancelsWithFailure() + { + this.AssertAllStagesStopped(() => + { + var (tcs, killSwitch) = Source.Maybe() + .ViaMaterialized(KillSwitches.Single(), Keep.Both) + .To(Sink.Ignore()) + .Run(_materializer); + + var boom = new TestException("Boom"); + killSwitch.Abort(boom); + // Could make sense to fail it with the propagated exception instead but that breaks + // the assumptions in the CoupledTerminationFlowSpec + tcs.Task.Wait(3.Seconds()).Should().BeTrue(); + tcs.Task.Result.Should().Be(0); + }, _materializer); + } + + [Fact(DisplayName = "The Maybe Source must allow external triggering of empty completion")] + public void AllowExternalTriggeringOfEmptyCompletion() + { + this.AssertAllStagesStopped(() => + { + var neverSource = Source.Maybe().Where(_ => false); + var counterSink = Sink.Aggregate(0, (acc, _) => acc + 1); + var (neverPromise, counterFuture) = neverSource + .ToMaterialized(counterSink, Keep.Both) + .Run(_materializer); + + // external cancellation + neverPromise.TrySetResult(0).Should().BeTrue(); + counterFuture.Wait(3.Seconds()).Should().BeTrue(); + counterFuture.Result.Should().Be(0); + }, _materializer); + } + + // MaybeSource code is different compared to JVM, maybe that's why? -- Greg + // TODO: Why isn't the probe receive an OnComplete? + [Fact( + DisplayName = "The Maybe Source must allow external triggering of empty completion when there was no demand", + Skip = "Not working, check Maybe source.")] + public void AllowExternalTriggerOfEmptyCompletionWhenNoDemand() + { + this.AssertAllStagesStopped(() => + { + var probe = this.CreateSubscriberProbe(); + var promise = Source + .Maybe() + .To(Sink.FromSubscriber(probe)) + .Run(_materializer); + + // external cancellation + probe.EnsureSubscription(); + promise.TrySetResult(0).Should().BeTrue(); + probe.ExpectComplete(); + }, _materializer); + } + + [Fact(DisplayName = "The Maybe Source must allow external triggering of non-empty completion")] + public void AllowExternalTriggerNonEmptyCompletion() + { + this.AssertAllStagesStopped(() => + { + var neverSource = Source.Maybe(); + var counterSink = Sink.First(); + + var (neverPromise, counterFuture) = neverSource + .ToMaterialized(counterSink, Keep.Both) + .Run(_materializer); + + // external cancellation + neverPromise.TrySetResult(6).Should().BeTrue(); + counterFuture.Wait(3.Seconds()).Should().BeTrue(); + counterFuture.Result.Should().Be(6); + }, _materializer); + } + + [Fact(DisplayName = "The Maybe Source must allow external triggering of onError")] + public void AllowExternalTriggerOnError() + { + this.AssertAllStagesStopped(() => + { + var neverSource = Source.Maybe(); + var counterSink = Sink.Aggregate(0, (acc, _) => acc + 1); + + var (neverPromise, counterFuture) = neverSource + .ToMaterialized(counterSink, Keep.Both) + .Run(_materializer); + + // external cancellation + neverPromise.TrySetException(new TestException("Boom")).Should().BeTrue(); + + Invoking(() => counterFuture.Wait(3.Seconds())) + .Should().Throw() + .WithInnerException() + .WithInnerException() + .WithMessage("Boom"); + }, _materializer); + } + + // MaybeSource code is different compared to JVM, maybe that's why? -- Greg + // TODO: Why isn't Maybe throws AbruptStageTerminationException? + [Fact( + DisplayName = "The Maybe Source must complete materialized future when materializer is shutdown", + Skip = "Not working, no exception is thrown. Check Maybe source.")] + public void CompleteMaterializedTaskWhenShutDown() + { + var mat = ActorMaterializer.Create(Sys); + var neverSource = Source.Maybe(); + var pubSink = Sink.AsPublisher(false); + + var (f, neverPub) = neverSource + .ToMaterialized(pubSink, Keep.Both) + .Run(mat); + + var c = this.CreateManualSubscriberProbe(); + neverPub.Subscribe(c); + c.ExpectSubscription(); + + mat.Shutdown(); + Invoking(() => f.Task.Wait(3.Seconds())).Should() + .Throw(); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/ActorGraphInterpreterSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/ActorGraphInterpreterSpec.cs index 9886191c6fa..a556dee84ea 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/ActorGraphInterpreterSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/ActorGraphInterpreterSpec.cs @@ -346,11 +346,11 @@ public Logic(BidiShape shape) : base(shape) SetHandler(shape.Outlet1, onPull: () => Pull(shape.Inlet1), - onDownstreamFinish: () => Cancel(shape.Inlet1)); + onDownstreamFinish: cause => Cancel(shape.Inlet1, cause)); SetHandler(shape.Outlet2, onPull: () => Pull(shape.Inlet2), - onDownstreamFinish: () => Cancel(shape.Inlet2)); + onDownstreamFinish: cause => Cancel(shape.Inlet2, cause)); } } @@ -395,11 +395,11 @@ public Logic(BidiShape shape) : base(shape) SetHandler(shape.Outlet1, onPull: () => Pull(shape.Inlet2), - onDownstreamFinish: () => Cancel(shape.Inlet2)); + onDownstreamFinish: cause => Cancel(shape.Inlet2, cause)); SetHandler(shape.Outlet2, onPull: () => Pull(shape.Inlet1), - onDownstreamFinish: () => Cancel(shape.Inlet1)); + onDownstreamFinish: cause => Cancel(shape.Inlet1, cause)); } } diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterFailureModesSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterFailureModesSpec.cs index 007bb6d4901..2292b99e3e8 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterFailureModesSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterFailureModesSpec.cs @@ -57,7 +57,7 @@ public void GraphInterpreter_should_handle_failure_on_OnPull() lastEvents() .Should() - .BeEquivalentTo(new Cancel(upstream), new OnError(downstream, testException()), new PostStop(stage.Value)); + .BeEquivalentTo(new Cancel(upstream, testException()), new OnError(downstream, testException()), new PostStop(stage.Value)); } [Fact] @@ -74,7 +74,7 @@ public void GraphInterpreter_should_handle_failure_on_OnPush() lastEvents() .Should() - .BeEquivalentTo(new Cancel(upstream), new OnError(downstream, testException()), new PostStop(stage.Value)); + .BeEquivalentTo(new Cancel(upstream, testException()), new OnError(downstream, testException()), new PostStop(stage.Value)); } [Fact] @@ -89,7 +89,7 @@ public void GraphInterpreter_should_handle_failure_on_OnPull_while_cancel_is_pen lastEvents() .Should() - .BeEquivalentTo(new Cancel(upstream), new PostStop(stage.Value)); + .BeEquivalentTo(new Cancel(upstream, testException()), new PostStop(stage.Value)); } [Fact] @@ -149,7 +149,7 @@ public void GraphInterpreter_should_handle_failure_on_OnDownstreamFinish() lastEvents() .Should() - .BeEquivalentTo(new Cancel(upstream), new PostStop(stage.Value)); + .BeEquivalentTo(new Cancel(upstream, testException()), new PostStop(stage.Value)); } [Fact] @@ -161,8 +161,8 @@ public void GraphInterpreter_should_handle_failure_in_PreStart() setup.LastEvents() .Should() - .BeEquivalentTo(new Cancel(setup.Upstream), new OnError(setup.Downstream, setup.TestException()), - new PostStop(setup.Stage.Value)); + .BeEquivalentTo(new Cancel(setup.Upstream, setup.TestException()), + new OnError(setup.Downstream, setup.TestException()), new PostStop(setup.Stage.Value)); } [Fact] diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterPortsSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterPortsSpec.cs index 9599e360a46..76164d74efb 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterPortsSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterPortsSpec.cs @@ -29,7 +29,7 @@ public class GraphInterpreterPortsSpec : GraphInterpreterSpecKit private Action step; private Action clearEvents; - public GraphInterpreterPortsSpec(ITestOutputHelper output = null) : base(output) + public GraphInterpreterPortsSpec(ITestOutputHelper output) : base(output) { } @@ -514,7 +514,7 @@ public void Port_states_should_propagate_cancel_while_downstream_is_active(bool stepAll(); - lastEvents().Should().BeEquivalentTo(new Cancel(outlet)); + lastEvents().Should().BeEquivalentTo(new Cancel(outlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); outlet.IsAvailable().Should().Be(false); outlet.IsClosed().Should().Be(true); inlet.IsAvailable().Should().Be(false); @@ -581,7 +581,7 @@ public void Port_states_should_propagate_cancel_while_upstream_is_active(bool ch stepAll(); - lastEvents().Should().BeEquivalentTo(new Cancel(outlet)); + lastEvents().Should().BeEquivalentTo(new Cancel(outlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); outlet.IsAvailable().Should().Be(false); outlet.IsClosed().Should().Be(true); inlet.IsAvailable().Should().Be(false); @@ -647,7 +647,7 @@ public void Port_states_should_propagate_cancel_while_pull_is_in_flight(bool cha stepAll(); - lastEvents().Should().BeEquivalentTo(new Cancel(outlet)); + lastEvents().Should().BeEquivalentTo(new Cancel(outlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); outlet.IsAvailable().Should().Be(false); outlet.IsClosed().Should().Be(true); inlet.IsAvailable().Should().Be(false); @@ -716,7 +716,7 @@ public void Port_states_should_propagate_cancel_while_push_is_in_flight(bool cha stepAll(); - lastEvents().Should().BeEquivalentTo(new Cancel(outlet)); + lastEvents().Should().BeEquivalentTo(new Cancel(outlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); outlet.IsAvailable().Should().Be(false); outlet.IsClosed().Should().Be(true); inlet.IsAvailable().Should().Be(false); @@ -771,7 +771,7 @@ public void Port_states_should_ignore_push_while_cancelling(bool chasing) stepAll(); - lastEvents().Should().BeEquivalentTo(new Cancel(outlet)); + lastEvents().Should().BeEquivalentTo(new Cancel(outlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); outlet.IsAvailable().Should().Be(false); outlet.IsClosed().Should().Be(true); inlet.IsAvailable().Should().Be(false); @@ -809,7 +809,7 @@ public void Port_states_should_clear_ungrabbed_element_even_when_cancelled(bool inlet.Invoking(x => x.Grab()).Should().Throw(); stepAll(); - lastEvents().Should().BeEquivalentTo(new Cancel(outlet)); + lastEvents().Should().BeEquivalentTo(new Cancel(outlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); outlet.IsAvailable().Should().Be(false); outlet.IsClosed().Should().Be(true); inlet.IsAvailable().Should().Be(false); diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs index 3e115f05018..e72751006f3 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/GraphInterpreterSpecKit.cs @@ -205,10 +205,18 @@ public override bool Equals(object obj) public class Cancel : ITestEvent { public GraphStageLogic Source { get; } + public Exception Cause { get; } - public Cancel(GraphStageLogic source) => Source = source; + public Cancel(GraphStageLogic source, Exception cause) + { + Source = source; + Cause = cause; + } - protected bool Equals(Cancel other) => Equals(Source, other.Source); + protected bool Equals(Cancel other) => + Equals(Source, other.Source) + && Cause.GetType() == other.Cause.GetType() + && Cause.Message == other.Cause.Message; public override bool Equals(object obj) { @@ -387,7 +395,7 @@ public UpstreamProbe(TestSetup setup, string name) Outlet = new Outlet("out") {Id = 0}; var probe = this; - SetHandler(Outlet, () => setup.LastEvent.Add(new RequestOne(probe)), () => setup.LastEvent.Add(new Cancel(probe))); + SetHandler(Outlet, () => setup.LastEvent.Add(new RequestOne(probe)), cause => setup.LastEvent.Add(new Cancel(probe, cause))); } public sealed override Outlet Out => Outlet; @@ -513,7 +521,7 @@ public Logic(EventPropagateStage stage) :base(stage.Shape) public void OnPull() => Pull(_stage.In); - public void OnDownstreamFinish() => Cancel(_stage.In); + public void OnDownstreamFinish(Exception cause) => Cancel(_stage.In, cause); } public EventPropagateStage() => Shape = new FlowShape(In, Out); @@ -640,7 +648,7 @@ public FailingGraphStageLogic(FailingStageSetup setup, Shape shape) : base(shape SetHandler(setup._stageOut, () => MayFail(() => Pull(setup._stageIn)), - () => MayFail(CompleteStage)); + cause => MayFail(CompleteStage)); } private void MayFail(Action task) @@ -728,7 +736,16 @@ public override bool Equals(object obj) public class Cancel : ITestEvent { - protected bool Equals(Cancel other) => true; + public Cancel(Exception cause) + { + Cause = cause; + } + + public Exception Cause { get; } + + protected bool Equals(Cancel other) => + Cause.GetType() == other.Cause.GetType() + && Cause.Message == other.Cause.Message; public override bool Equals(object obj) { @@ -846,7 +863,7 @@ public UpstreamOneBoundedProbe(OneBoundedSetup setup) setup.LastEvent.Add(new RequestAnother()); else setup.LastEvent.Add(new RequestOne()); - }, () => setup.LastEvent.Add(new Cancel())); + }, cause => setup.LastEvent.Add(new Cancel(cause))); } public override Outlet Out => _outlet; @@ -902,7 +919,7 @@ public void RequestOne() public void Cancel() { - Cancel(_inlet); + Cancel(_inlet, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); _setup.Run(); } } diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSpec.cs index 3fb4bd669f0..4c143c0de3a 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSpec.cs @@ -172,7 +172,7 @@ public void Interpreter_should_implement_many_to_one_one_to_many_chain_correctly lastEvents().Should().BeEquivalentTo(new RequestOne()); downstream.Cancel(); - lastEvents().Should().BeEquivalentTo(new Cancel()); + lastEvents().Should().BeEquivalentTo(new Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); }); } @@ -194,7 +194,7 @@ public void Interpreter_should_implement_take() lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(1); - lastEvents().Should().BeEquivalentTo(new OnNext(1), new Cancel(), new OnComplete()); + lastEvents().Should().BeEquivalentTo(new OnNext(1), new Cancel(SubscriptionWithCancelException.StageWasCompleted.Instance), new OnComplete()); }); } @@ -224,7 +224,7 @@ public void Interpreter_should_implement_take_inside_a_chain() lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(2); - lastEvents().Should().BeEquivalentTo(new OnNext(3), new Cancel(), new OnComplete()); + lastEvents().Should().BeEquivalentTo(new OnNext(3), new Cancel(SubscriptionWithCancelException.StageWasCompleted.Instance), new OnComplete()); }); } @@ -274,7 +274,7 @@ public void Interpreter_should_implement_fold_with_proper_cancel() lastEvents().Should().BeEquivalentTo(new RequestOne()); downstream.Cancel(); - lastEvents().Should().BeEquivalentTo(new Cancel()); + lastEvents().Should().BeEquivalentTo(new Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); }); } @@ -355,7 +355,7 @@ public void Interpreter_should_implement_batch_conflate() lastEvents().Should().BeEquivalentTo(new OnNext(4), new RequestOne()); downstream.Cancel(); - lastEvents().Should().BeEquivalentTo(new Cancel()); + lastEvents().Should().BeEquivalentTo(new Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); }); } @@ -424,7 +424,7 @@ public void Interpreter_should_work_with_batch_batch_conflate_conflate() lastEvents().Should().BeEquivalentTo(new RequestOne(), new OnNext(4)); downstream.Cancel(); - lastEvents().Should().BeEquivalentTo(new Cancel()); + lastEvents().Should().BeEquivalentTo(new Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); }); } @@ -502,7 +502,7 @@ public void Interpreter_should_implement_batch_expand_conflate_expand() lastEvents().Should().BeEquivalentTo(new OnNext(2)); downstream.Cancel(); - lastEvents().Should().BeEquivalentTo(new Cancel()); + lastEvents().Should().BeEquivalentTo(new Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance)); }); } @@ -662,7 +662,7 @@ public void Interpreter_should_implement_take_take() lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(1); - lastEvents().Should().BeEquivalentTo(new Cancel(), new OnNext(1), new OnComplete()); + lastEvents().Should().BeEquivalentTo(new Cancel(SubscriptionWithCancelException.StageWasCompleted.Instance), new OnNext(1), new OnComplete()); }); } @@ -700,7 +700,7 @@ public void Interpreter_should_not_allow_AbsorbTermination_from_OnDownstreamFini .ExpectOne(() => { downstream.Cancel(); - lastEvents().Should().BeEquivalentTo(new Cancel()); + lastEvents().Should().BeEquivalentTo(new Cancel(new NotSupportedException("It is not allowed to call AbsorbTermination() from OnDownstreamFinish."))); }); }); } @@ -835,7 +835,7 @@ public override ISyncDirective OnPull(IContext context) return context.Pull(); } - public override ITerminationDirective OnDownstreamFinish(IContext context) + public override ITerminationDirective OnDownstreamFinish(IContext context, Exception cause) { return context.AbsorbTermination(); } diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterStressSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterStressSpec.cs index 316ab5ce1be..6ab539e5494 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterStressSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterStressSpec.cs @@ -103,7 +103,7 @@ public void Interpreter_must_work_with_a_massive_chain_of_maps_with_early_comple lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(0); - lastEvents().Should().BeEquivalentTo(new OnNext(0 + ChainLength), new Cancel(), new OnComplete()); + lastEvents().Should().BeEquivalentTo(new OnNext(0 + ChainLength), new Cancel(SubscriptionWithCancelException.StageWasCompleted.Instance), new OnComplete()); tstamp.Stop(); var time = tstamp.Elapsed.TotalSeconds; @@ -125,7 +125,7 @@ public void Interpreter_must_work_with_a_massive_chain_of_takes() lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(0); - lastEvents().Should().BeEquivalentTo(new OnNext(0), new Cancel(), new OnComplete()); + lastEvents().Should().BeEquivalentTo(new OnNext(0), new Cancel(SubscriptionWithCancelException.StageWasCompleted.Instance), new OnComplete()); }); } diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs index cf94720d411..3b55fc85e47 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/InterpreterSupervisionSpec.cs @@ -119,7 +119,7 @@ public void Interpreter_error_handling_should_emit_failure_when_op_throws() downstream.RequestOne(); lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(0); // boom - lastEvents().Should().BeEquivalentTo(new Cancel(), new OnError(TE())); + lastEvents().Should().BeEquivalentTo(new Cancel(TE()), new OnError(TE())); }); } @@ -141,7 +141,7 @@ public void Interpreter_error_handling_should_emit_failure_when_op_throws_in_mid downstream.RequestOne(); lastEvents().Should().BeEquivalentTo(new RequestOne()); upstream.OnNext(-1); // boom - lastEvents().Should().BeEquivalentTo(new Cancel(), new OnError(TE())); + lastEvents().Should().BeEquivalentTo(new Cancel(TE()), new OnError(TE())); }); } @@ -244,7 +244,7 @@ public void Interpreter_error_handling_should_fail_when_Expand_seed_throws() lastEvents().Should().BeEquivalentTo(new OnNext(-1)); upstream.OnNext(2); // boom - lastEvents().Should().BeEquivalentTo(new OnError(TE()), new Cancel()); + lastEvents().Should().BeEquivalentTo(new OnError(TE()), new Cancel(TE())); }); } @@ -271,7 +271,7 @@ public void Interpreter_error_handling_should_fail_when_Expand_extrapolate_throw downstream.RequestOne(); var events = lastEvents(); events.OfType().Select(x => x.Cause.InnerException).Should().BeEquivalentTo(TE()); - events.OfType().Should().BeEquivalentTo(new Cancel()); + events.OfType().Should().BeEquivalentTo(new Cancel(new AggregateException(TE()))); }); } diff --git a/src/core/Akka.Streams.Tests/Implementation/Fusing/LifecycleInterpreterSpec.cs b/src/core/Akka.Streams.Tests/Implementation/Fusing/LifecycleInterpreterSpec.cs index 501ace53308..c77b76532f8 100644 --- a/src/core/Akka.Streams.Tests/Implementation/Fusing/LifecycleInterpreterSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/Fusing/LifecycleInterpreterSpec.cs @@ -284,7 +284,7 @@ public void Interpreter_must_call_OnError_when_PreStart_fails() WithOneBoundedSetup(op, (lastEvents, upstream, downstream) => { var events = lastEvents().ToArray(); - events[0].Should().Be(new Cancel()); + events[0].Should().Be(new Cancel(new TestException("Boom!"))); events[1].Should().BeOfType(); ((OnError) events[1]).Cause.Should().BeOfType(); ((OnError) events[1]).Cause.Message.Should().Be("Boom!"); @@ -322,7 +322,7 @@ public void Interpreter_must_call_OnError_when_PreStart_fails_with_stages_after( WithOneBoundedSetup(ops, (lastEvents, upstream, downstream) => { var events = lastEvents().ToArray(); - events[0].Should().Be(new Cancel()); + events[0].Should().Be(new Cancel(new TestException("Boom!"))); events[1].Should().BeOfType(); ((OnError)events[1]).Cause.Should().BeOfType(); ((OnError)events[1]).Cause.Message.Should().Be("Boom!"); diff --git a/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs b/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs index cc43d434a71..fc109b57339 100644 --- a/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/GraphStageLogicSpec.cs @@ -494,7 +494,7 @@ public void A_GraphStageLogic_must_not_double_terminate_a_single_stage() interpreter => { interpreter.Complete(interpreter.Connections[0]); - interpreter.Cancel(interpreter.Connections[1]); + interpreter.Cancel(interpreter.Connections[1], SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); interpreter.Execute(2); ExpectMsg("postStop2"); diff --git a/src/core/Akka.Streams/Akka.Streams.csproj b/src/core/Akka.Streams/Akka.Streams.csproj index 62121b1ebb8..e127751f93f 100644 --- a/src/core/Akka.Streams/Akka.Streams.csproj +++ b/src/core/Akka.Streams/Akka.Streams.csproj @@ -1,19 +1,20 @@  - + Akka.Streams Reactive stream support for Akka.NET $(NetStandardLibVersion);$(NetLibVersion) $(AkkaPackageTags);reactive;stream true + 8.0 - - + + - - + + @@ -63,8 +64,8 @@ - - + + $(DefineConstants);RELEASE diff --git a/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.cs b/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.cs index 98b915d1894..ac721057559 100644 --- a/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.cs +++ b/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.cs @@ -1,10 +1,10 @@ -//----------------------------------------------------------------------- +// --- auto generated: 5/18/2022 2:05:03 AM --- // +//----------------------------------------------------------------------- // -// Copyright (C) 2009-2021 Lightbend Inc. -// Copyright (C) 2013-2021 .NET Foundation +// Copyright (C) 2015-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project // //----------------------------------------------------------------------- - using System; using Akka.Streams.Stage; @@ -277,9 +277,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : base(sha _pending0 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending0) _pendingCount--; @@ -292,9 +292,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : base(sha _pending1 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending1) _pendingCount--; @@ -401,9 +401,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : base _pending0 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending0) _pendingCount--; @@ -416,9 +416,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : base _pending1 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending1) _pendingCount--; @@ -431,9 +431,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : base _pending2 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending2) _pendingCount--; @@ -552,9 +552,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : _pending0 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending0) _pendingCount--; @@ -567,9 +567,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : _pending1 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending1) _pendingCount--; @@ -582,9 +582,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : _pending2 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending2) _pendingCount--; @@ -597,9 +597,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage) : _pending3 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending3) _pendingCount--; @@ -730,9 +730,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage _pending0 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending0) _pendingCount--; @@ -745,9 +745,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage _pending1 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending1) _pendingCount--; @@ -760,9 +760,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage _pending2 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending2) _pendingCount--; @@ -775,9 +775,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage _pending3 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending3) _pendingCount--; @@ -790,9 +790,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith stage _pending4 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending4) _pendingCount--; @@ -935,9 +935,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith s _pending0 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending0) _pendingCount--; @@ -950,9 +950,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith s _pending1 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending1) _pendingCount--; @@ -965,9 +965,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith s _pending2 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending2) _pendingCount--; @@ -980,9 +980,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith s _pending3 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending3) _pendingCount--; @@ -995,9 +995,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith s _pending4 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending4) _pendingCount--; @@ -1010,9 +1010,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith s _pending5 = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending5) _pendingCount--; @@ -1167,9 +1167,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending0) _pendingCount--; @@ -1182,9 +1182,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending1) _pendingCount--; @@ -1197,9 +1197,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending2) _pendingCount--; @@ -1212,9 +1212,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending3) _pendingCount--; @@ -1227,9 +1227,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending4) _pendingCount--; @@ -1242,9 +1242,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending5) _pendingCount--; @@ -1257,9 +1257,9 @@ public UnzipWithStageLogic(Shape shape, UnzipWith { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending6) _pendingCount--; @@ -1387,4 +1387,4 @@ protected sealed override GraphStageLogic CreateLogic(Attributes inheritedAttrib return new UnzipWithStageLogic(Shape, this); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.tt b/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.tt index 675c931c2e1..32369058e06 100644 --- a/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.tt +++ b/src/core/Akka.Streams/CodeGen/Dsl/UnzipWith.tt @@ -115,9 +115,9 @@ namespace Akka.Streams.Dsl _pending<#=t#> = false; if (_pendingCount == 0) Pull(stage.In); }, - onDownstreamFinish: () => { + onDownstreamFinish: cause => { _downstreamRunning--; - if (_downstreamRunning == 0) CompleteStage(); + if (_downstreamRunning == 0) CancelStage(cause); else { if (_pending<#=t#>) _pendingCount--; diff --git a/src/core/Akka.Streams/Dsl/DelayFlow.cs b/src/core/Akka.Streams/Dsl/DelayFlow.cs index d4791b81fad..fcc48307750 100644 --- a/src/core/Akka.Streams/Dsl/DelayFlow.cs +++ b/src/core/Akka.Streams/Dsl/DelayFlow.cs @@ -149,7 +149,7 @@ public void OnPull() Pull(_delayFlow.Inlet); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { diff --git a/src/core/Akka.Streams/Dsl/Graph.cs b/src/core/Akka.Streams/Dsl/Graph.cs index d8eabf5ced3..a9d81a9b169 100644 --- a/src/core/Akka.Streams/Dsl/Graph.cs +++ b/src/core/Akka.Streams/Dsl/Graph.cs @@ -939,13 +939,13 @@ public Logic(Shape shape, Broadcast stage) : base(shape) _pendingCount--; TryPull(); }, - onDownstreamFinish: () => + onDownstreamFinish: cause => { - if (stage._eagerCancel) CompleteStage(); + if (stage._eagerCancel) CancelStage(cause); else { _downstreamsRunning--; - if (_downstreamsRunning == 0) CompleteStage(); + if (_downstreamsRunning == 0) CancelStage(cause); else if (_pending[i]) { _pending[i] = false; @@ -1099,19 +1099,18 @@ public Logic(Partition stage) : base(stage.Shape) } else if (!HasBeenPulled(stage.In)) Pull(stage.In); - }, onDownstreamFinish: () => + }, onDownstreamFinish: cause => { downstreamRunning--; if(downstreamRunning == 0) - CompleteStage(); - else if (_outPendingElement != null) + CancelStage(cause); + else if (_outPendingElement != null && index == _outPendingIndex) { - if (index == _outPendingIndex) - { - _outPendingElement = null; - if(!HasBeenPulled(stage.In)) - Pull(stage.In); - } + _outPendingElement = null; + if(IsClosed(stage.In)) + CancelStage(cause); + else if(!HasBeenPulled(stage.In)) + Pull(stage.In); } }); } @@ -1282,10 +1281,10 @@ public Logic(Shape shape, Balance stage) : base(shape) } else _pendingQueue.Enqueue(outlet); }, - onDownstreamFinish: () => + onDownstreamFinish: cause => { downstreamsRunning--; - if (downstreamsRunning == 0) CompleteStage(); + if (downstreamsRunning == 0) CancelStage(cause); else if (!hasPulled && needDownstreamPulls > 0) { needDownstreamPulls--; @@ -1977,7 +1976,7 @@ public Logic(WireTap stage) : base(stage.Shape) _pendingTap = elem; }); - SetHandler(stage.OutMain, () => Pull(stage.In), CompleteStage); + SetHandler(stage.OutMain, () => Pull(stage.In), CancelStage); // The 'tap' output can neither backpressure, nor cancel, the stage. SetHandler(stage.OutTap, @@ -1992,7 +1991,7 @@ public Logic(WireTap stage) : base(stage.Shape) Push(stage.OutTap, _pendingTap.Value); _pendingTap = Option.None; }, - () => + cause => { SetHandler(stage.In, () => Push(stage.OutMain, Grab(stage.In))); // Allow any outstanding element to be garbage-collected diff --git a/src/core/Akka.Streams/Dsl/KeepAliveConcat.cs b/src/core/Akka.Streams/Dsl/KeepAliveConcat.cs index 89f13dbe9a9..56a68850b64 100644 --- a/src/core/Akka.Streams/Dsl/KeepAliveConcat.cs +++ b/src/core/Akka.Streams/Dsl/KeepAliveConcat.cs @@ -95,7 +95,7 @@ public void OnPull() Pull(_keepAliveConcat.In); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { diff --git a/src/core/Akka.Streams/Dsl/One2OneBidiFlow.cs b/src/core/Akka.Streams/Dsl/One2OneBidiFlow.cs index 1a4b17b16a0..50e0d8c498d 100644 --- a/src/core/Akka.Streams/Dsl/One2OneBidiFlow.cs +++ b/src/core/Akka.Streams/Dsl/One2OneBidiFlow.cs @@ -119,7 +119,7 @@ private void SetInOutletHandler() else _pullSuppressed = true; }, - onDownstreamFinish: () => Cancel(_inInlet)); + onDownstreamFinish: cause => Cancel(_inInlet, cause)); } private void SetOutInletHandler() @@ -152,7 +152,7 @@ private void SetOutInletHandler() private void SetOutOutletHandler() { - SetHandler(_outOutlet, onPull: () => Pull(_outInlet), onDownstreamFinish: () => Cancel(_outInlet)); + SetHandler(_outOutlet, onPull: () => Pull(_outInlet), onDownstreamFinish: cause => Cancel(_outInlet, cause)); } } diff --git a/src/core/Akka.Streams/Dsl/Pulse.cs b/src/core/Akka.Streams/Dsl/Pulse.cs index bccde3766ee..96cb824a72c 100644 --- a/src/core/Akka.Streams/Dsl/Pulse.cs +++ b/src/core/Akka.Streams/Dsl/Pulse.cs @@ -57,7 +57,7 @@ public void OnPull() } } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { diff --git a/src/core/Akka.Streams/Dsl/RestartFlow.cs b/src/core/Akka.Streams/Dsl/RestartFlow.cs index df55c44f69b..5b68e8a8ee2 100644 --- a/src/core/Akka.Streams/Dsl/RestartFlow.cs +++ b/src/core/Akka.Streams/Dsl/RestartFlow.cs @@ -9,6 +9,7 @@ using Akka.Pattern; using Akka.Streams.Implementation.Fusing; using Akka.Streams.Stage; +using Akka.Util; namespace Akka.Streams.Dsl { @@ -326,10 +327,10 @@ protected SubSinkInlet CreateSubInlet(Outlet outlet) SetHandler(Out, onPull: () => sinkIn.Pull(), - onDownstreamFinish: () => + onDownstreamFinish: cause => { _finishing = true; - sinkIn.Cancel(); + sinkIn.Cancel(cause); }); return sinkIn; @@ -349,10 +350,10 @@ protected SubSourceOutlet CreateSubOutlet(Inlet inlet) Pull(In); } }, - onDownstreamFinish: () => + onDownstreamFinish: cause => { if (_finishing || MaxRestartsReached() || _onlyOnFailures) - Cancel(In); + Cancel(In, cause); else { ScheduleRestartTimer(); @@ -471,7 +472,8 @@ public DelayCancellationStage(TimeSpan delay, string name = null) : base(name) private sealed class Logic : TimerGraphStageLogic { private readonly DelayCancellationStage _stage; - + private Option _cause = Option.None; + public Logic(DelayCancellationStage stage, Attributes inheritedAttributes) : base(stage.Shape) { _stage = stage; @@ -484,9 +486,9 @@ public Logic(DelayCancellationStage stage, Attributes inheritedAttributes) : /// /// We should really. port the Cause parameter functionality for the OnDownStreamFinished delegate /// - private void OnDownStreamFinished() + private void OnDownStreamFinished(Exception cause) { - //_cause = new Option(/*cause*/); + _cause = cause; ScheduleOnce("CompleteState", _stage._delay); SetHandler(_stage.Inlet, onPush:DoNothing); } @@ -494,15 +496,10 @@ private void OnDownStreamFinished() protected internal override void OnTimer(object timerKey) { Log.Debug($"Stage was cancelled after delay of {_stage._delay}"); - CompleteStage(); - - // this code will replace the CompleteStage() call once we port the Exception Cause parameter for the OnDownStreamFinished delegate - /*if(_cause != null) - FailStage(_cause.Value); //<-- is this the same as cancelStage ? + if(_cause.HasValue) + CancelStage(_cause.Value); else - { throw new IllegalStateException("Timer hitting without first getting a cancel cannot happen"); - }*/ } } } diff --git a/src/core/Akka.Streams/Dsl/Retry.cs b/src/core/Akka.Streams/Dsl/Retry.cs index 01918a97c42..d1cdc02d804 100644 --- a/src/core/Akka.Streams/Dsl/Retry.cs +++ b/src/core/Akka.Streams/Dsl/Retry.cs @@ -176,7 +176,7 @@ public Logic(RetryCoordinator retry) : base(retry.Shape) else if (!HasBeenPulled(retry.In1)) Pull(retry.In1); } - }, onDownstreamFinish: () => + }, onDownstreamFinish: cause => { //Do Nothing, intercept completion as downstream }); @@ -327,7 +327,7 @@ public Logic(RetryConcatCoordinator retry) : base(retry.Shape Pull(_retry.In2); } } - }, onDownstreamFinish: () => + }, onDownstreamFinish: cause => { //Do Nothing, intercept completion as downstream }); diff --git a/src/core/Akka.Streams/Dsl/UnfoldFlow.cs b/src/core/Akka.Streams/Dsl/UnfoldFlow.cs index 9211222b828..2597e9dc196 100644 --- a/src/core/Akka.Streams/Dsl/UnfoldFlow.cs +++ b/src/core/Akka.Streams/Dsl/UnfoldFlow.cs @@ -57,7 +57,8 @@ public void OnPull() } } - public void OnDownstreamFinish() + // TODO: Is this correct? check JVM please + public void OnDownstreamFinish(Exception cause) { // Do Nothing until `timeout` to try and intercept completion as downstream, // but cancel stream after timeout if inlet is not closed to prevent deadlock. diff --git a/src/core/Akka.Streams/Dsl/Valve.cs b/src/core/Akka.Streams/Dsl/Valve.cs index bd5a9224f91..bdf3ba53a64 100644 --- a/src/core/Akka.Streams/Dsl/Valve.cs +++ b/src/core/Akka.Streams/Dsl/Valve.cs @@ -147,7 +147,7 @@ public void OnPull() Pull(_valve.In); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); } #endregion diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index 4e482fa1fd1..756ae9a2893 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -8,13 +8,13 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Reflection; using System.Text; using Akka.Actor; using Akka.Annotations; using Akka.Event; using Akka.Pattern; using Akka.Streams.Stage; +using Akka.Util; using Reactive.Streams; using static Akka.Streams.Implementation.Fusing.GraphInterpreter; @@ -247,7 +247,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) break; case ActorGraphInterpreter.OnSubscribe onSubscribe: - ReactiveStreamsCompliance.TryCancel(onSubscribe.Subscription); + ReactiveStreamsCompliance.TryCancel(onSubscribe.Subscription, SubscriptionWithCancelException.StageWasCompleted.Instance); _subscribersPending--; if (CanShutdown) _interpreterCompleted = true; @@ -307,7 +307,7 @@ public int Receive(ActorGraphInterpreter.IBoundaryEvent e, int eventLimit) case ActorGraphInterpreter.Cancel cancel: if (IsDebug) Console.WriteLine($"{Interpreter.Name} Cancel id={cancel.Id}"); - _outputs[cancel.Id].Cancel(); + _outputs[cancel.Id].Cancel(cancel.Cause); return RunBatch(eventLimit); case ActorGraphInterpreter.SubscribePending subscribePending: @@ -362,7 +362,7 @@ public void TryAbort(Exception reason) foreach (var output in _outputs) output.Fail(ex); foreach (var input in _inputs) - input.Cancel(); + input.Cancel(ex); } } @@ -614,21 +614,26 @@ public struct Cancel : IBoundaryEvent /// TBD /// public readonly int Id; + /// /// TBD /// /// TBD /// TBD - public Cancel(GraphInterpreterShell shell, int id) + /// + public Cancel(GraphInterpreterShell shell, int id, Exception cause) { Shell = shell; Id = id; + Cause = cause; } /// /// TBD /// public GraphInterpreterShell Shell { get; } + + public Exception Cause { get; } } /// @@ -800,7 +805,7 @@ public BoundaryPublisher(IActorRef parent, GraphInterpreterShell shell, int id) /// /// TBD /// - public sealed class BoundarySubscription : ISubscription + public sealed class BoundarySubscription : ISubscriptionWithCancelException { private readonly IActorRef _parent; private readonly GraphInterpreterShell _shell; @@ -828,7 +833,9 @@ public BoundarySubscription(IActorRef parent, GraphInterpreterShell shell, int i /// /// TBD /// - public void Cancel() => _parent.Tell(new Cancel(_shell, _id)); + public void Cancel() => Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); + + public void Cancel(Exception cause) => _parent.Tell(new Cancel(_shell, _id, cause)); /// /// TBD @@ -925,7 +932,7 @@ public override void OnPull() else if (upstreamCompleted) _that.Complete(_that._outlet); } - public override void OnDownstreamFinish() => _that.Cancel(); + public override void OnDownstreamFinish(Exception cause) => _that.Cancel(cause); public override string ToString() => _that.ToString(); } @@ -941,7 +948,7 @@ public override void OnPull() private int _inputBufferElements; private int _nextInputElementCursor; private bool _upstreamCompleted; - private bool _downstreamCanceled; + private Option _downstreamCanceled = Option.None; private readonly int _requestBatchSize; private int _batchRemaining; private readonly Outlet _outlet; @@ -981,7 +988,7 @@ public BatchingActorInputBoundary(int size, int id) /// TBD public void OnInternalError(Exception reason) { - if (!(_upstreamCompleted || _downstreamCanceled) && !ReferenceEquals(_upstream, null)) + if (!(_upstreamCompleted || _downstreamCanceled.HasValue) && !ReferenceEquals(_upstream, null)) _upstream.Cancel(); if (!IsClosed(_outlet)) @@ -994,7 +1001,7 @@ public void OnInternalError(Exception reason) /// TBD public void OnError(Exception reason) { - if (!_upstreamCompleted || !_downstreamCanceled) + if (!_upstreamCompleted || _downstreamCanceled.IsEmpty) { _upstreamCompleted = true; Clear(); @@ -1023,11 +1030,17 @@ public void OnComplete() public void OnSubscribe(ISubscription subscription) { if (subscription == null) throw new ArgumentException("Subscription cannot be null"); - if (_upstreamCompleted) ReactiveStreamsCompliance.TryCancel(subscription); - else if (_downstreamCanceled) + if (_upstreamCompleted) + ReactiveStreamsCompliance.TryCancel(subscription, SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); + else if (_downstreamCanceled.HasValue) { _upstreamCompleted = true; - ReactiveStreamsCompliance.TryCancel(subscription); + ReactiveStreamsCompliance.TryCancel(subscription, _downstreamCanceled.Value); + } + else if (_upstream != null) + { + // reactive streams spec 2.5 + ReactiveStreamsCompliance.TryCancel(subscription, new IllegalStateException("Publisher can only be subscribed once.")); } else { @@ -1058,14 +1071,14 @@ public void OnNext(object element) /// /// TBD /// - public void Cancel() + public void Cancel(Exception cause) { - _downstreamCanceled = true; + _downstreamCanceled = cause; if (!_upstreamCompleted) { _upstreamCompleted = true; if (!ReferenceEquals(_upstream, null)) - ReactiveStreamsCompliance.TryCancel(_upstream); + ReactiveStreamsCompliance.TryCancel(_upstream, cause); Clear(); } } @@ -1124,7 +1137,7 @@ internal interface IActorOutputBoundary /// /// TBD /// - void Cancel(); + void Cancel(Exception cause); /// /// TBD /// @@ -1148,8 +1161,8 @@ private sealed class InHandler : Stage.InHandler public override void OnPush() { _that.OnNext(_that.Grab(_that._inlet)); - if (_that._downstreamCompleted) - _that.Cancel(_that._inlet); + if (_that.DownstreamCompleted) + _that.Cancel(_that._inlet, _that._downstreamCompletionCause.Value); else if (_that._downstreamDemand > 0) _that.Pull(_that._inlet); } @@ -1172,7 +1185,8 @@ public override void OnPush() // This flag is only used if complete/fail is called externally since this op turns into a Finished one inside the // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) - private bool _downstreamCompleted; + private Option _downstreamCompletionCause = Option.None; + private bool DownstreamCompleted => _downstreamCompletionCause.HasValue; // when upstream failed before we got the exposed publisher private Exception _upstreamFailed; private bool _upstreamCompleted; @@ -1207,7 +1221,7 @@ public void RequestMore(long elements) { if (elements < 1) { - Cancel((Inlet) In); + Cancel((Inlet) In, ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException); Fail(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveException); } else @@ -1261,12 +1275,12 @@ public void ExposedPublisher(ActorPublisher publisher) /// /// TBD /// - public void Cancel() + public void Cancel(Exception cause) { - _downstreamCompleted = true; + _downstreamCompletionCause = cause; _subscriber = null; _exposedPublisher.Shutdown(new NormalShutdownException("UpstreamBoundary")); - Cancel(_inlet); + Cancel(_inlet, cause); } /// @@ -1276,7 +1290,7 @@ public void Cancel() public void Fail(Exception reason) { // No need to fail if had already been cancelled, or we closed earlier - if (!(_downstreamCompleted || _upstreamCompleted)) + if (!(DownstreamCompleted || _upstreamCompleted)) { _upstreamCompleted = true; _upstreamFailed = reason; @@ -1297,7 +1311,7 @@ private void OnNext(T element) private void Complete() { // No need to complete if had already been cancelled, or we closed earlier - if (!(_upstreamCompleted || _downstreamCompleted)) + if (!(_upstreamCompleted || DownstreamCompleted)) { _upstreamCompleted = true; if (!ReferenceEquals(_exposedPublisher, null)) diff --git a/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs index f75b6918d0c..4c732e3fc79 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs @@ -55,7 +55,7 @@ public EnumeratorUpstream(IEnumerator input) else Push(_outlet, element); } }, - onDownstreamFinish: CompleteStage); + onDownstreamFinish: InternalOnDownstreamFinish); } /// diff --git a/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs index 9083dc31468..d885602c9eb 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/GraphInterpreter.cs @@ -125,7 +125,7 @@ private Empty() /// - /// TBD + /// Marker class that indicates that a port was failed with a given cause and a potential outstanding element /// public sealed class Failed { @@ -149,6 +149,19 @@ public Failed(Exception reason, object previousElement) PreviousElement = previousElement; } } + + /// + /// Marker class that indicates that a port was cancelled with a given cause + /// + public sealed class Cancelled + { + public readonly Exception Cause; + + public Cancelled(Exception cause) + { + Cause = cause; + } + } /// /// TBD @@ -253,12 +266,15 @@ public Connection(int id, int inOwnerId, GraphStageLogic inOwner, int outOwnerId public IOutHandler OutHandler { get; set; } /// - /// TBD + /// See about possible states /// public int PortState { get; set; } = InReady; /// - /// TBD + /// Can either be: + /// * An in-flight element + /// * A failure (with an optional in-flight element), if elem is an instance of + /// * A cancellation cause, is elem is an instance of /// public object Slot { get; set; } = Empty.Instance; @@ -845,7 +861,9 @@ private void ProcessEvent(Connection connection) if (IsDebug) Console.WriteLine($"{Name} CANCEL {InOwnerName(connection)} -> {OutOwnerName(connection)} ({connection.OutHandler}) [{OutLogicName(connection)}]"); connection.PortState |= OutClosed; CompleteConnection(connection.OutOwnerId); - connection.OutHandler.OnDownstreamFinish(); + var cause = ((Cancelled)connection.Slot).Cause; + connection.Slot = Empty.Instance; + connection.OutHandler.OnDownstreamFinish(cause); } else if ((code & (OutClosed | InClosed)) == OutClosed) { @@ -1067,14 +1085,15 @@ internal void Fail(Connection connection, Exception reason) /// TBD /// /// TBD - internal void Cancel(Connection connection) + /// + internal void Cancel(Connection connection, Exception cause) { var currentState = connection.PortState; - if (IsDebug) Console.WriteLine($"{Name} Cancel({connection}) [{currentState}]"); + if (IsDebug) Console.WriteLine($"{Name} Cancel({connection}) [{currentState}] [{cause.Message}]"); connection.PortState = currentState | InClosed; if ((currentState & OutClosed) == 0) { - connection.Slot = Empty.Instance; + connection.Slot = new Cancelled(cause); if ((currentState & (Pulling | Pushing | InClosed)) == 0) Enqueue(connection); else if (_chasedPull == connection) diff --git a/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs b/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs index 59a3d7f51ee..58fa9b11627 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/GraphStages.cs @@ -309,7 +309,8 @@ public Logic(TerminationWatcher stage, TaskCompletionSource finishPromi _stage = stage; _finishPromise = finishPromise; - SetHandler(stage._inlet, stage._outlet, this); + SetHandler(stage._inlet, this); + SetHandler(stage._outlet, this); } public override void OnPush() => Push(_stage._outlet, Grab(_stage._inlet)); @@ -330,11 +331,15 @@ public override void OnUpstreamFailure(Exception e) public override void OnPull() => Pull(_stage._inlet); - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { - _finishPromise.TrySetResult(Done.Instance); + if (cause is SubscriptionWithCancelException.NonFailureCancellation) + _finishPromise.TrySetResult(Done.Instance); + else + _finishPromise.TrySetException(cause); + _completedSignalled = true; - CompleteStage(); + CancelStage(cause); } public override void PostStop() @@ -457,9 +462,9 @@ public override void OnUpstreamFailure(Exception e) public override void OnPull() => Pull(_stage.In); - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { - CompleteStage(); + InternalOnDownstreamFinish(cause); _monitor.Value = FlowMonitor.Finished.Instance; } @@ -843,16 +848,16 @@ public Logic(TaskFlattenSource stage, TaskCompletionSource materialized // initial handler (until task completes) SetHandler(stage.Outlet, new LambdaOutHandler( onPull: () => { }, - onDownstreamFinish: () => + onDownstreamFinish: cause => { if (!_materialized.Task.IsCompleted) { // we used to try to materialize the "inner" source here just to get // the materialized value, but that is not safe and may cause the graph shell // to leak/stay alive after the stage completes - _materialized.TrySetException(new StreamDetachedException("Stream cancelled before Source Task completed")); + _materialized.TrySetException(new StreamDetachedException("Stream cancelled before Source Task completed", cause)); } - OnDownstreamFinish(); + InternalOnDownstreamFinish(cause); })); } diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 70933b45109..de61fb36a80 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -330,7 +330,7 @@ public void OnPush() public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected override void OnResume(Exception ex) { @@ -514,7 +514,7 @@ public void OnPush() public void OnPull() => Pull(_stage.In); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected override void OnResume(Exception ex) { @@ -1556,7 +1556,7 @@ public Logic(Intersperse stage) : base(stage.Shape) public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); } #endregion @@ -1758,7 +1758,7 @@ public void OnPush() public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected override void OnResume(Exception ex) => TryPull(); @@ -2923,12 +2923,17 @@ public override void OnUpstreamFailure(Exception ex) public override void OnPull() => Pull(_stage.Inlet); - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { if (IsEnabled(_logLevels.OnFinish)) - _log.Log(_logLevels.OnFinish, $"[{_stage._name}] Downstream finished."); + _log.Log( + _logLevels.OnFinish, + "[{0}] Downstream finished. cause: {1}: {2}.", + _stage._name, + Logging.SimpleName(cause.GetType()), + cause.Message); - CompleteStage(); + InternalOnDownstreamFinish(cause); } public override void PreStart() @@ -3076,7 +3081,7 @@ public void OnPull() EmitGroup(); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); public override void PreStart() { @@ -3231,7 +3236,7 @@ public void OnPull() CompleteIfReady(); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); private long NextElementWaitTime => (long)_stage._delay.TotalMilliseconds - (DateTime.UtcNow.Ticks - _buffer.Peek().Item1) * 1000 * 10; @@ -3382,7 +3387,7 @@ public Logic(TakeWithin stage) : base(stage.Shape) public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) => CompleteStage(); @@ -3448,7 +3453,7 @@ public void OnPush() public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); public override void PreStart() => ScheduleOnce("DropWithinTimer", _stage._timeout); @@ -3651,7 +3656,7 @@ void PushOut() { if (sinkIn.IsAvailable) PushOut(); - }, onDownstreamFinish: () => sinkIn.Cancel()); + }, onDownstreamFinish: cause => sinkIn.Cancel(cause)); Source.FromGraph(source).RunWith(sinkIn.Sink, Interpreter.SubFusingMaterializer); SetHandler(_stage.Outlet, outHandler); @@ -3725,7 +3730,8 @@ public Logic(StatefulSelectMany stage, Attributes inheritedAttributes _decider = inheritedAttributes.GetAttribute(new ActorAttributes.SupervisionStrategy(Deciders.StoppingDecider)).Decider; _plainConcat = stage._concatFactory(); - SetHandler(stage._in, stage._out, this); + SetHandler(stage._in, this); + SetHandler(stage._out, this); } public override void OnPush() @@ -3918,10 +3924,10 @@ public override void OnUpstreamFailure(Exception ex) base.OnUpstreamFailure(ex); } - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { _promise.TrySetResult(Option.None); - base.OnDownstreamFinish(); + base.OnDownstreamFinish(cause); } private TMat SwitchTo(Flow flow, TIn firstElement) @@ -3979,9 +3985,9 @@ void MaybeCompleteStage() SetHandler(_stage.Out, new LambdaOutHandler( () => subInlet.Pull(), - () => + cause => { - subInlet.Cancel(); + subInlet.Cancel(cause); MaybeCompleteStage(); })); @@ -4004,9 +4010,9 @@ void MaybeCompleteStage() } } }, - () => + cause => { - if (!IsClosed(_stage.In)) Cancel(_stage.In); + if (!IsClosed(_stage.In)) Cancel(_stage.In, cause); MaybeCompleteStage(); })); diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index eb620ce9d1f..d45c6e29eda 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -303,10 +303,10 @@ public void OnUpstreamFailure(Exception ex) FailStage(ex); } - public void OnDownstreamFinish() + public void OnDownstreamFinish(Exception cause) { if (!IsPrefixComplete) - CompleteStage(); + CancelStage(cause); // Otherwise substream is open, ignore } } @@ -461,9 +461,9 @@ public void OnUpstreamFinish() public void OnUpstreamFailure(Exception ex) => Fail(ex); - public void OnDownstreamFinish() + public void OnDownstreamFinish(Exception cause) { - if (!TryCancel()) + if (!TryCancel(cause)) SetKeepGoing(true); } @@ -490,12 +490,12 @@ private bool TryCompleteAll() return false; } - private bool TryCancel() + private bool TryCancel(Exception cause) { // if there's no active substreams or there's only one but it's not been pushed yet if (_activeSubstreams.Count == 0 || (_activeSubstreams.Count == 1 && _substreamWaitingToBePushed.HasValue)) { - CompleteStage(); + CancelStage(cause); return true; } @@ -610,12 +610,12 @@ public void OnPull() TryCompleteHandler(); } - public void OnDownstreamFinish() + public void OnDownstreamFinish(Exception cause) { if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) _logic.ClearNextElement(); if (FirstPush) _logic._firstPushCounter--; CompleteSubStream(); - if (_logic.IsClosed(_logic._stage.Out)) _logic.TryCancel(); + if (_logic.IsClosed(_logic._stage.Out)) _logic.TryCancel(cause); if (_logic.IsClosed(_logic._stage.In)) _logic.TryCompleteAll(); else if (_logic.NeedToPull) _logic.Pull(_logic._stage.In); } @@ -781,15 +781,21 @@ public override void OnPull() _logic.Pull(_inlet); } - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { _logic._substreamCancelled = true; if (_logic.IsClosed(_inlet) || _logic._stage._propagateSubstreamCancel) - _logic.CompleteStage(); + { + _logic.CancelStage(cause); + } else + { // Start draining if (!_logic.HasBeenPulled(_inlet)) - _logic.Pull(_inlet); + { + _logic.Pull(_inlet); + } + } } public override void OnPush() @@ -891,11 +897,11 @@ public void OnPull() PushSubstreamSource(); } - public void OnDownstreamFinish() + public void OnDownstreamFinish(Exception cause) { // If the substream is already cancelled or it has not been handed out, we can go away if (_substreamSource == null || _substreamWaitingToBePushed || _substreamCancelled) - CompleteStage(); + CancelStage(cause); } public override void PreStart() @@ -1032,9 +1038,7 @@ private RequestOneScheduledBeforeMaterialization(ICommand command) : base(comman /// internal sealed class CancelScheduledBeforeMaterialization : CommandScheduledBeforeMaterialization { - public static readonly CancelScheduledBeforeMaterialization Instance = new CancelScheduledBeforeMaterialization(Cancel.Instance); - - private CancelScheduledBeforeMaterialization(ICommand command) : base(command) + public CancelScheduledBeforeMaterialization(Exception cause) : base(new Cancel(cause)) { } } @@ -1058,13 +1062,14 @@ private RequestOne() } } - internal class Cancel : ICommand + internal sealed class Cancel : ICommand { - public static readonly Cancel Instance = new Cancel(); - - private Cancel() + public Cancel(Exception cause) { + Cause = cause; } + + public Exception Cause { get; } } } @@ -1168,7 +1173,7 @@ public SubSink(string name, Action externalCallback) /// /// TBD /// - public void CancelSubstream() => DispatchCommand(SubSink.CancelScheduledBeforeMaterialization.Instance); + public void CancelSubstream(Exception cause) => DispatchCommand(new SubSink.CancelScheduledBeforeMaterialization(cause)); private void DispatchCommand(SubSink.CommandScheduledBeforeMaterialization newState) { @@ -1179,7 +1184,7 @@ private void DispatchCommand(SubSink.CommandScheduledBeforeMaterialization newSt if(!_status.CompareAndSet(SubSink.Uninitialized.Instance, newState)) DispatchCommand(newState); // changed to materialized in the meantime break; - case SubSink.RequestOneScheduledBeforeMaterialization _ when newState == SubSink.CancelScheduledBeforeMaterialization.Instance: + case SubSink.RequestOneScheduledBeforeMaterialization _ when newState is SubSink.CancelScheduledBeforeMaterialization: // cancellation is allowed to replace pull if(!_status.CompareAndSet(SubSink.RequestOneScheduledBeforeMaterialization.Instance, newState)) DispatchCommand(SubSink.RequestOneScheduledBeforeMaterialization.Instance); @@ -1203,47 +1208,6 @@ private void DispatchCommand(SubSink.CommandScheduledBeforeMaterialization newSt public override string ToString() => _name; } - /// - /// INTERNAL API - /// - internal static class SubSource - { - /// - /// INTERNAL API - /// - /// HERE ACTUALLY ARE DRAGONS, YOU HAVE BEEN WARNED! - /// - /// FIXME #19240 (jvm) - /// - /// TBD - /// TBD - /// TBD - /// TBD - [InternalApi] - public static void Kill(Source s) - { - var module = s.Module as GraphStageModule; - if (module?.Stage is SubSource) - { - ((SubSource) module.Stage).ExternalCallback(SubSink.Cancel.Instance); - return; - } - - var pub = s.Module as PublisherSource; - if (pub != null) - { - NotUsed _; - pub.Create(default(MaterializationContext), out _).Subscribe(CancelingSubscriber.Instance); - return; - } - - var intp = GraphInterpreter.CurrentInterpreterOrNull; - if (intp == null) - throw new NotSupportedException($"cannot drop Source of type {s.Module.GetType().Name}"); - s.RunWith(Sink.Ignore(), intp.SubFusingMaterializer); - } - } - /// /// INTERNAL API /// @@ -1265,7 +1229,7 @@ public Logic(SubSource stage) : base(stage.Shape) public override void OnPull() => _stage.ExternalCallback(SubSink.RequestOne.Instance); - public override void OnDownstreamFinish() => _stage.ExternalCallback(SubSink.Cancel.Instance); + public override void OnDownstreamFinish(Exception cause) => _stage.ExternalCallback(new SubSink.Cancel(cause)); private void SetCallback(Action callback) { diff --git a/src/core/Akka.Streams/Implementation/IO/TcpStages.cs b/src/core/Akka.Streams/Implementation/IO/TcpStages.cs index a66884d042d..95a2c097f5c 100644 --- a/src/core/Akka.Streams/Implementation/IO/TcpStages.cs +++ b/src/core/Akka.Streams/Implementation/IO/TcpStages.cs @@ -56,7 +56,19 @@ public void OnPull() _listener?.Tell(new Tcp.ResumeAccepting(1), StageActor.Ref); } - public void OnDownstreamFinish() => TryUnbind(); + public void OnDownstreamFinish(Exception cause) + { + if (Log.IsDebugEnabled) + { + var endpoint = (IPEndPoint)_stage._endpoint; + if (cause is SubscriptionWithCancelException.NonFailureCancellation) + Log.Debug("Unbinding from {0} because downstream cancelled stream", endpoint); + else + Log.Debug(cause, "Unbinding from {0} because of downstream failure", endpoint); + } + + TryUnbind(); + } private StreamTcp.IncomingConnection ConnectionFor(Tcp.Connected connected, IActorRef connection) { @@ -407,14 +419,23 @@ public TcpStreamLogic(FlowShape shape, ITcpRole role, En _readHandler = new LambdaOutHandler( onPull: () => _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref), - onDownstreamFinish: () => + onDownstreamFinish: cause => { - if (!IsClosed(_bytesIn)) - _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref); + if (cause is SubscriptionWithCancelException.NonFailureCancellation) + { + if(Log.IsDebugEnabled) + Log.Debug("Closing connection from {0} because downstream cancelled stream without failure", (IPEndPoint)_remoteAddress); + if(IsClosed(_bytesIn)) + _connection.Tell(Tcp.Close.Instance, StageActor.Ref); + else + _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref); + } else { + if(Log.IsDebugEnabled) + Log.Debug(cause, "Aborting connection from {0} because of downstream failure", (IPEndPoint)_remoteAddress); _connection.Tell(Tcp.Abort.Instance, StageActor.Ref); - CompleteStage(); + FailStage(cause); } }); diff --git a/src/core/Akka.Streams/Implementation/ReactiveStreamsCompliance.cs b/src/core/Akka.Streams/Implementation/ReactiveStreamsCompliance.cs index cb41aa26b82..a104b5ee579 100644 --- a/src/core/Akka.Streams/Implementation/ReactiveStreamsCompliance.cs +++ b/src/core/Akka.Streams/Implementation/ReactiveStreamsCompliance.cs @@ -362,14 +362,25 @@ public static void RequireNonNullElement(object element) /// TBD /// /// TBD + /// TBD /// /// This exception is thrown when an exception occurs while canceling the specified . /// - public static void TryCancel(ISubscription subscription) + public static void TryCancel(ISubscription subscription, Exception cause) { + if (subscription == null) + throw new IllegalStateException("Subscription must be not null on cancel() call, rule 1.3"); + try { - subscription.Cancel(); + if (subscription is ISubscriptionWithCancelException s) + { + s.Cancel(cause); + } + else + { + subscription.Cancel(); + } } catch (Exception e) { diff --git a/src/core/Akka.Streams/Implementation/Sinks.cs b/src/core/Akka.Streams/Implementation/Sinks.cs index ea00edea5c7..d58fb0da039 100644 --- a/src/core/Akka.Streams/Implementation/Sinks.cs +++ b/src/core/Akka.Streams/Implementation/Sinks.cs @@ -988,7 +988,7 @@ void MaybeCompleteStage() })); subOutlet.SetHandler(new LambdaOutHandler( - () => + onPull: () => { if (firstElementPushed) Pull(_stage.In); @@ -1006,9 +1006,9 @@ void MaybeCompleteStage() } } }, - () => + onDownstreamFinish: cause => { - if (!IsClosed(_stage.In)) Cancel(_stage.In); + if (!IsClosed(_stage.In)) Cancel(_stage.In, cause); MaybeCompleteStage(); })); diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index 276ec7a938c..be84136a87f 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -147,7 +147,7 @@ public void OnPull() } } - public void OnDownstreamFinish() + public void OnDownstreamFinish(Exception cause) { if (_pendingOffer != null) { @@ -454,7 +454,7 @@ public override void OnPull() } } - public override void OnDownstreamFinish() => CloseStage(); + public override void OnDownstreamFinish(Exception cause) => CloseStage(); public override void PreStart() { @@ -784,9 +784,9 @@ public Logic(LazySource stage, TaskCompletionSource completion SetHandler(stage.Out, this); } - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { - _completion.SetException(new Exception("Downstream canceled without triggering lazy source materialization")); + _completion.SetException(new Exception("Downstream canceled without triggering lazy source materialization", cause)); CompleteStage(); } @@ -797,10 +797,10 @@ public override void OnPull() var subSink = new SubSinkInlet(this, "LazySource"); subSink.Pull(); - SetHandler(_stage.Out, () => subSink.Pull(), () => + SetHandler(_stage.Out, () => subSink.Pull(), cause => { - subSink.Cancel(); - CompleteStage(); + subSink.Cancel(cause); + InternalOnDownstreamFinish(cause); }); subSink.SetHandler(new LambdaInHandler(() => Push(_stage.Out, subSink.Grab()))); @@ -813,7 +813,7 @@ public override void OnPull() } catch (Exception e) { - subSink.Cancel(); + subSink.Cancel(e); FailStage(e); _completion.TrySetException(e); } @@ -974,7 +974,7 @@ private sealed class Logic : GraphStageLogic, IObserver private readonly Action _onOverflow; private readonly Action _onEvent; private readonly Action _onError; - private readonly Action _onCompleted; + private readonly Action _onCompleted; private IDisposable _disposable; @@ -996,7 +996,7 @@ public Logic(ObservableSourceStage stage) : base(stage.Shape) } }); _onError = GetAsyncCallback(e => Fail(_stage.Outlet, e)); - _onCompleted = GetAsyncCallback(() => Complete(_stage.Outlet)); + _onCompleted = GetAsyncCallback(InternalOnDownstreamFinish); _onOverflow = SetupOverflowStrategy(stage._overflowStrategy); SetHandler(stage.Outlet, onPull: () => @@ -1011,7 +1011,8 @@ public Logic(ObservableSourceStage stage) : base(stage.Shape) public void OnNext(T value) => _onEvent(value); public void OnError(Exception error) => _onError(error); - public void OnCompleted() => _onCompleted(); + public void OnCompleted() => _onCompleted(SubscriptionWithCancelException.StageWasCompleted.Instance); + public void OnCompleted(Exception cause) => _onCompleted(cause); public override void PreStart() { diff --git a/src/core/Akka.Streams/Implementation/StreamLayout.cs b/src/core/Akka.Streams/Implementation/StreamLayout.cs index 6e5dfc96ffa..8f98d38dcd9 100644 --- a/src/core/Akka.Streams/Implementation/StreamLayout.cs +++ b/src/core/Akka.Streams/Implementation/StreamLayout.cs @@ -1647,41 +1647,37 @@ public void OnSubscribe(ISubscription subscription) private void TryOnSubscribe(object obj, ISubscription s) { - if (Value == null) - { - if (!CompareAndSet(null, obj)) - TryOnSubscribe(obj, s); - return; - } - - var subscriber = Value as ISubscriber; - if (subscriber != null) + while (true) { - var subscription = obj as ISubscription; - if (subscription != null) - { - if (CompareAndSet(subscriber, new Both(subscriber))) - EstablishSubscription(subscriber, subscription); - else - TryOnSubscribe(obj, s); - - return; - } - - var publisher = Value as IPublisher; - if (publisher != null) + switch (Value) { - var inert = GetAndSet(Inert.Instance); - if (inert != Inert.Instance) - publisher.Subscribe(subscriber); - return; + case null: + if (!CompareAndSet(null, obj)) + continue; + return; + case ISubscriber subscriber: + switch (obj) + { + case ISubscription subscription: + if (CompareAndSet(subscriber, new Both(subscriber))) + EstablishSubscription(subscriber, subscription); + else + continue; + return; + case IPublisher publisher: + var inert = GetAndSet(Inert.Instance); + if (inert != Inert.Instance) + publisher.Subscribe(subscriber); + return; + case var other: + throw new IllegalStateException($"Unexpected state in VirtualProcessor: {other.GetType()}"); + } + case var state: + // spec violation + ReactiveStreamsCompliance.TryCancel(s, new IllegalStateException($"Spec violation: VirtualProcessor in wrong state [{state.GetType()}].")); + return; } - - return; } - - // spec violation - ReactiveStreamsCompliance.TryCancel(s); } private void EstablishSubscription(ISubscriber subscriber, ISubscription subscription) @@ -1697,7 +1693,7 @@ private void EstablishSubscription(ISubscriber subscriber, ISubscription subs catch (Exception ex) { Value = Inert.Instance; - ReactiveStreamsCompliance.TryCancel(subscription); + ReactiveStreamsCompliance.TryCancel(subscription, ex); ReactiveStreamsCompliance.TryOnError(subscriber, ex); } } @@ -1956,7 +1952,7 @@ public void Request(long n) { if (n < 1) { - ReactiveStreamsCompliance.TryCancel(_real); + ReactiveStreamsCompliance.TryCancel(_real, new IllegalStateException($"Demand must not be < 1. Was: {n}")); var value = _processor.GetAndSet(Inert.Instance); var both = value as Both; if (both != null) diff --git a/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs b/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs index 293a95d3117..720596c5626 100644 --- a/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs +++ b/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs @@ -149,10 +149,7 @@ public void OnPull() TriggerCumulativeDemand(); } - public void OnDownstreamFinish() - { - CompleteStage(); - } + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); private void TriggerCumulativeDemand() { diff --git a/src/core/Akka.Streams/Implementation/Throttle.cs b/src/core/Akka.Streams/Implementation/Throttle.cs index 8be5557d21f..6ad2c2f2d0d 100644 --- a/src/core/Akka.Streams/Implementation/Throttle.cs +++ b/src/core/Akka.Streams/Implementation/Throttle.cs @@ -73,7 +73,7 @@ public void OnUpstreamFinish() public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { diff --git a/src/core/Akka.Streams/Implementation/Timers.cs b/src/core/Akka.Streams/Implementation/Timers.cs index a4caabd6b36..6ad38d392b1 100644 --- a/src/core/Akka.Streams/Implementation/Timers.cs +++ b/src/core/Akka.Streams/Implementation/Timers.cs @@ -75,7 +75,7 @@ public void OnPush() public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { @@ -149,7 +149,7 @@ public Logic(Completion stage) : base(stage.Shape) public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) => FailStage(new TimeoutException($"The stream has not been completed in {_stage.Timeout}.")); @@ -227,7 +227,7 @@ public void OnPush() public void OnPull() => Pull(_stage.Inlet); - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { @@ -315,7 +315,7 @@ public void OnPull() Pull(_stage.Inlet); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { @@ -394,7 +394,7 @@ public Logic(IdleTimeoutBidi stage) : base(stage.Shape) SetHandler(_stage.Out2, onPull: () => Pull(_stage.In2), - onDownstreamFinish: () => Cancel(_stage.In2)); + onDownstreamFinish: cause => Cancel(_stage.In2, cause)); } public void OnPush() @@ -409,7 +409,7 @@ public void OnPush() public void OnPull() => Pull(_stage.In1); - public void OnDownstreamFinish() => Cancel(_stage.In1); + public void OnDownstreamFinish(Exception cause) => Cancel(_stage.In1, cause); protected internal override void OnTimer(object timerKey) { @@ -514,7 +514,7 @@ public void OnPull() Pull(_stage.Inlet); } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { @@ -634,7 +634,7 @@ public void OnPull() } } - public void OnDownstreamFinish() => CompleteStage(); + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); protected internal override void OnTimer(object timerKey) { diff --git a/src/core/Akka.Streams/KillSwitch.cs b/src/core/Akka.Streams/KillSwitch.cs index 82e74b2805f..6e4a0be83f1 100644 --- a/src/core/Akka.Streams/KillSwitch.cs +++ b/src/core/Akka.Streams/KillSwitch.cs @@ -254,7 +254,7 @@ public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSwitc SetHandler(killSwitch.Out2, onPull: () => Pull(killSwitch.In2), - onDownstreamFinish: () => Cancel(killSwitch.In2)); + onDownstreamFinish: cause => Cancel(killSwitch.In2, cause)); } public override void OnPush() => Push(_killSwitch.Out1, Grab(_killSwitch.In1)); @@ -265,7 +265,7 @@ public Logic(Task terminationSignal, UniqueBidiKillSwitchStage killSwitc public override void OnPull() => Pull(_killSwitch.In1); - public override void OnDownstreamFinish() => Cancel(_killSwitch.In1); + public override void OnDownstreamFinish(Exception cause) => Cancel(_killSwitch.In1, cause); } #endregion diff --git a/src/core/Akka.Streams/Stage/AbstractStage.cs b/src/core/Akka.Streams/Stage/AbstractStage.cs index b1f69dfa6fe..b4bfb93ced6 100644 --- a/src/core/Akka.Streams/Stage/AbstractStage.cs +++ b/src/core/Akka.Streams/Stage/AbstractStage.cs @@ -52,7 +52,7 @@ public PushPullGraphLogic( SetHandler(_shape.Outlet, onPull: () => _currentStage.OnPull(Context), - onDownstreamFinish: () => _currentStage.OnDownstreamFinish(Context)); + onDownstreamFinish: cause => _currentStage.OnDownstreamFinish(Context, cause)); } /// @@ -116,7 +116,12 @@ public IUpstreamDirective Pull() /// TBD public FreeDirective Finish() { - CompleteStage(); + return Finish(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); + } + + public FreeDirective Finish(Exception cause) + { + CancelStage(cause); return null; } @@ -452,7 +457,7 @@ public virtual void PreStart(ILifecycleContext context) /// /// TBD /// TBD - public abstract ITerminationDirective OnDownstreamFinish(IContext context); + public abstract ITerminationDirective OnDownstreamFinish(IContext context, Exception cause); /// /// @@ -625,16 +630,18 @@ public abstract class AbstractStage. /// /// TBD + /// /// TBD - public sealed override ITerminationDirective OnDownstreamFinish(IContext context) => OnDownstreamFinish((TContext) context); + public sealed override ITerminationDirective OnDownstreamFinish(IContext context, Exception cause) => OnDownstreamFinish((TContext) context, cause); /// /// This method is called when downstream has cancelled. /// By default the cancel signal is immediately propagated with . /// /// TBD + /// /// TBD - public virtual ITerminationDirective OnDownstreamFinish(TContext context) => context.Finish(); + public virtual ITerminationDirective OnDownstreamFinish(TContext context, Exception cause) => context.Finish(cause); /// /// diff --git a/src/core/Akka.Streams/Stage/Context.cs b/src/core/Akka.Streams/Stage/Context.cs index 5efbb01f22d..f8b15351583 100644 --- a/src/core/Akka.Streams/Stage/Context.cs +++ b/src/core/Akka.Streams/Stage/Context.cs @@ -106,6 +106,8 @@ public interface IContext : ILifecycleContext /// TBD FreeDirective Finish(); + FreeDirective Finish(Exception cause); + /// /// Cancel upstreams and complete downstreams with failure. /// diff --git a/src/core/Akka.Streams/Stage/GraphStage.cs b/src/core/Akka.Streams/Stage/GraphStage.cs index 22d3bc2a5af..075008e91cf 100644 --- a/src/core/Akka.Streams/Stage/GraphStage.cs +++ b/src/core/Akka.Streams/Stage/GraphStage.cs @@ -565,7 +565,7 @@ private Emitting Dequeue() return result; } - public override void OnDownstreamFinish() => Previous.OnDownstreamFinish(); + public override void OnDownstreamFinish(Exception cause) => Previous.OnDownstreamFinish(cause); } private sealed class EmittingSingle : Emitting @@ -699,14 +699,14 @@ public override void OnUpstreamFailure(Exception e) protected sealed class LambdaOutHandler : OutHandler { private readonly Action _onPull; - private readonly Action _onDownstreamFinish; + private readonly Action _onDownstreamFinish; /// /// TBD /// /// TBD /// TBD - public LambdaOutHandler(Action onPull, Action onDownstreamFinish = null) + public LambdaOutHandler(Action onPull, Action onDownstreamFinish = null) { _onPull = onPull; _onDownstreamFinish = onDownstreamFinish; @@ -720,12 +720,12 @@ public LambdaOutHandler(Action onPull, Action onDownstreamFinish = null) /// /// TBD /// - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { if (_onDownstreamFinish != null) - _onDownstreamFinish(); + _onDownstreamFinish(cause); else - base.OnDownstreamFinish(); + base.OnDownstreamFinish(cause); } } @@ -962,7 +962,7 @@ private void SetHandler(Outlet outlet, IOutHandler handler) /// /// This exception is thrown when the specified is undefined. /// - protected internal void SetHandler(Outlet outlet, Action onPull, Action onDownstreamFinish = null) + protected internal void SetHandler(Outlet outlet, Action onPull, Action onDownstreamFinish = null) { if (onPull == null) throw new ArgumentNullException(nameof(onPull), "GraphStageLogic onPull handler must be provided"); @@ -1069,7 +1069,9 @@ protected internal void TryPull(Inlet inlet) /// Requests to stop receiving events from a given input port. Cancelling clears any ungrabbed elements from the port. /// /// TBD - protected void Cancel(Inlet inlet) => Interpreter.Cancel(GetConnection(inlet)); + /// + protected void Cancel(Inlet inlet, Exception cause) => Interpreter.Cancel(GetConnection(inlet), cause); + protected void Cancel(Inlet inlet) => Interpreter.Cancel(GetConnection(inlet), SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); /// /// Once the callback for an input port has been invoked, the element that has been pushed @@ -1089,23 +1091,29 @@ private T Grab(Inlet inlet) var connection = GetConnection(inlet); var element = connection.Slot; - if ((connection.PortState & (InReady | InFailed)) == - InReady && !ReferenceEquals(element, Empty.Instance)) + if ((connection.PortState & (InReady | InFailed | InClosed)) == InReady && !ReferenceEquals(element, Empty.Instance)) { // fast path connection.Slot = Empty.Instance; + return (T)element; } - else + + // Slow path for grabbing element from already failed or completed connections + if (!IsAvailable(inlet)) + throw new ArgumentException($"Cannot get element from already empty input port ({inlet})"); + + if ((connection.PortState & (InReady | InFailed)) == (InReady | InFailed)) { - // slow path - if (!IsAvailable(inlet)) - throw new ArgumentException("Cannot get element from already empty input port"); + // failed var failed = (GraphInterpreter.Failed)element; - element = failed.PreviousElement; connection.Slot = new GraphInterpreter.Failed(failed.Reason, Empty.Instance); + return (T)failed.PreviousElement; } - - return (T)element; + + // Completed + var elem = (T) connection.Slot; + connection.Slot = Empty.Instance; + return elem; } /// @@ -1148,7 +1156,7 @@ private bool HasBeenPulled(Inlet inlet) private bool IsAvailable(Inlet inlet) { var connection = GetConnection(inlet); - var normalArrived = (connection.PortState & (InReady | InFailed)) == InReady; + var normalArrived = (connection.PortState & (InReady | InFailed | InClosed)) == InReady; if (normalArrived) { @@ -1157,11 +1165,23 @@ private bool IsAvailable(Inlet inlet) } // slow path on failure + if ((connection.PortState & (InReady | InClosed | InFailed)) == (InReady | InClosed)) + { + return connection.Slot switch + { + Empty _ => false, // cancelled (element is discarded when cancelled) + Cancelled _ => false, // cancelled (element is discarded when cancelled) + _ => true // completed but element still there to grab + }; + } + if ((connection.PortState & (InReady | InFailed)) == (InReady | InFailed)) { - // This can only be Empty actually (if a cancel was concurrent with a failure) - return connection.Slot is GraphInterpreter.Failed failed && - !ReferenceEquals(failed.PreviousElement, Empty.Instance); + return connection.Slot switch + { + GraphInterpreter.Failed failed => !ReferenceEquals(failed.PreviousElement, Empty.Instance), // failed but element still there to grab + _ => false + }; } return false; @@ -1264,26 +1284,51 @@ private void Complete(Outlet outlet) protected void Fail(Outlet outlet, Exception reason) => Interpreter.Fail(GetConnection(outlet), reason); /// - /// Automatically invokes or on all the input or output ports that have been called, - /// then marks the stage as stopped. + /// INTERNAL API + /// + /// Variable used from `OutHandler.onDownstreamFinish` to carry over cancellation cause in cases where + /// `OutHandler` implementations call `InternalOnDownstreamFinish()`. /// - public void CompleteStage() + [InternalApi] private Exception _lastCancellationCause = null; + + [InternalApi] + public void InternalOnDownstreamFinish(Exception cause) { - for (var i = 0; i < PortToConn.Length; i++) + try { - if (i < InCount) - Interpreter.Cancel(PortToConn[i]); - else - { - if (Handlers[i] is Emitting e) - e.AddFollowUp(new EmittingCompletion(e.Out, e.Previous, this)); - else - Interpreter.Complete(PortToConn[i]); - } + if (cause == null) + throw new ArgumentException("Cancellation cause must not be null", nameof(cause)); + if (_lastCancellationCause != null) + throw new ArgumentException("OnDownstreamFinish must not be called recursively", nameof(cause)); + _lastCancellationCause = cause; + CancelStage(_lastCancellationCause); + } + finally + { + _lastCancellationCause = null; + } + } + + public void CancelStage(Exception cause) + { + + switch (cause) + { + case SubscriptionWithCancelException.NonFailureCancellation _: + InternalCompleteStage(cause, Option.None); + break; + default: + InternalCompleteStage(cause, cause); + break; } - - SetKeepGoing(false); } + + /// + /// Automatically invokes or on all the input or output ports that have been called, + /// then marks the stage as stopped. + /// + public void CompleteStage() + => InternalCompleteStage(SubscriptionWithCancelException.StageWasCompleted.Instance, Option.None); /// /// Automatically invokes or on all the input or output ports that have been called, @@ -1291,13 +1336,27 @@ public void CompleteStage() /// /// TBD public void FailStage(Exception reason) + => InternalCompleteStage(reason, reason); + + private void InternalCompleteStage(Exception cancelCause, Option optionalFailureCause) { for (var i = 0; i < PortToConn.Length; i++) { if (i < InCount) - Interpreter.Cancel(PortToConn[i]); + { + Interpreter.Cancel(PortToConn[i], cancelCause); + } + else if (optionalFailureCause.HasValue) + { + Interpreter.Fail(PortToConn[i], optionalFailureCause.Value); + } else - Interpreter.Fail(PortToConn[i], reason); + { + if (Handlers[i] is Emitting e) + e.AddFollowUp(new EmittingCompletion(e.Out, e.Previous, this)); + else + Interpreter.Complete(PortToConn[i]); + } } SetKeepGoing(false); @@ -1798,10 +1857,15 @@ public void Pull() /// /// TBD /// - public void Cancel() + public void Cancel() => Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded.Instance); + + /// + /// TBD + /// + public void Cancel(Exception cause) { _closed = true; - _sink.CancelSubstream(); + _sink.CancelSubstream(cause); } /// @@ -1856,13 +1920,13 @@ public SubSourceOutlet(GraphStageLogic logic, string name) _handler.OnPull(); } } - else if (command is SubSink.Cancel) + else if (command is SubSink.Cancel cancel) { if (!_closed) { _available = false; _closed = true; - _handler.OnDownstreamFinish(); + _handler.OnDownstreamFinish(SubscriptionWithCancelException.StageWasCompleted.Instance); } } })); @@ -1997,7 +2061,7 @@ public interface IOutHandler /// /// Called when the output port will no longer accept any new elements. After this callback no other callbacks will be called for this port. /// - void OnDownstreamFinish(); + void OnDownstreamFinish(Exception cause); } /// @@ -2014,7 +2078,8 @@ public abstract class OutHandler : IOutHandler /// /// Called when the output port will no longer accept any new elements. After this callback no other callbacks will be called for this port. /// - public virtual void OnDownstreamFinish() => Current.ActiveStage.CompleteStage(); + public virtual void OnDownstreamFinish(Exception cause) => + Current.ActiveStage.InternalOnDownstreamFinish(cause); } /// @@ -2049,7 +2114,8 @@ public abstract class InAndOutHandler : IInHandler, IOutHandler /// /// Called when the output port will no longer accept any new elements. After this callback no other callbacks will be called for this port. /// - public virtual void OnDownstreamFinish() => Current.ActiveStage.CompleteStage(); + public virtual void OnDownstreamFinish(Exception cause) => + Current.ActiveStage.InternalOnDownstreamFinish(cause); } /// @@ -2128,7 +2194,7 @@ protected OutGraphStageLogic(Shape shape) : base(shape) /// /// Called when the output port will no longer accept any new elements. After this callback no other callbacks will be called for this port. /// - public virtual void OnDownstreamFinish() => CompleteStage(); + public virtual void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); } /// @@ -2184,7 +2250,7 @@ protected InAndOutGraphStageLogic(Shape shape) : base(shape) /// /// Called when the output port will no longer accept any new elements. After this callback no other callbacks will be called for this port. /// - public virtual void OnDownstreamFinish() => CompleteStage(); + public virtual void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); } /// @@ -2338,7 +2404,7 @@ public override void OnPull() { } /// /// TBD /// - public override void OnDownstreamFinish() { } + public override void OnDownstreamFinish(Exception cause) { } } /// @@ -2362,10 +2428,10 @@ public override void OnPull() { } /// /// TBD /// - public override void OnDownstreamFinish() + public override void OnDownstreamFinish(Exception cause) { if (_predicate()) - Current.ActiveStage.CompleteStage(); + Current.ActiveStage.CancelStage(cause); } } diff --git a/src/core/Akka.Streams/StreamTcpException.cs b/src/core/Akka.Streams/StreamTcpException.cs index ef0ebe5185f..eb786f26f1d 100644 --- a/src/core/Akka.Streams/StreamTcpException.cs +++ b/src/core/Akka.Streams/StreamTcpException.cs @@ -62,6 +62,12 @@ public StreamDetachedException(string message) : base(message) { } + + public StreamDetachedException(string message, Exception innerException) + : base(message, innerException) + { + } + } /// diff --git a/src/core/Akka.Streams/SubscriptionWithCancelException.cs b/src/core/Akka.Streams/SubscriptionWithCancelException.cs new file mode 100644 index 00000000000..55289d2a969 --- /dev/null +++ b/src/core/Akka.Streams/SubscriptionWithCancelException.cs @@ -0,0 +1,39 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Annotations; +using Reactive.Streams; + +namespace Akka.Streams +{ + public static class SubscriptionWithCancelException + { + [DoNotInherit] + public abstract class NonFailureCancellation : Exception + { + public sealed override string StackTrace => ""; + } + + public sealed class NoMoreElementsNeeded : NonFailureCancellation + { + public static readonly NoMoreElementsNeeded Instance = new NoMoreElementsNeeded(); + private NoMoreElementsNeeded() { } + } + + public sealed class StageWasCompleted : NonFailureCancellation + { + public static readonly StageWasCompleted Instance = new StageWasCompleted(); + private StageWasCompleted() { } + } + } + + public interface ISubscriptionWithCancelException: ISubscription + { + void Cancel(Exception cause); + } +}