diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 6af1dcc0ee5..aca25f5b718 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1437,6 +1437,7 @@ namespace Akka.Streams.Dsl public sealed class FlowWithContext : Akka.Streams.GraphDelegate, System.ValueTuple>, TMat> { public Akka.Streams.Dsl.Flow, System.ValueTuple, TMat> AsFlow() { } + public Akka.Streams.Dsl.FlowWithContext MapMaterializedValue(System.Func combine) { } public Akka.Streams.Dsl.FlowWithContext Via(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow) { } public Akka.Streams.Dsl.FlowWithContext ViaMaterialized(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow, System.Func combine) { } } @@ -2106,7 +2107,10 @@ namespace Akka.Streams.Dsl { public SourceWithContext(Akka.Streams.Dsl.Source, TMat> source) { } public Akka.Streams.Dsl.Source, TMat> AsSource() { } + public Akka.Streams.Dsl.SourceWithContext MapMaterializedValue(System.Func combine) { } public TMat2 RunWith(Akka.Streams.IGraph>, TMat2> sink, Akka.Streams.IMaterializer materializer) { } + public Akka.Streams.Dsl.IRunnableGraph To(Akka.Streams.IGraph>, TMat2> sink) { } + public Akka.Streams.Dsl.IRunnableGraph ToMaterialized(Akka.Streams.IGraph>, TMat2> sink, System.Func combine) { } public Akka.Streams.Dsl.SourceWithContext Via(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow) { } public Akka.Streams.Dsl.SourceWithContext ViaMaterialized(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow, System.Func combine) { } } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs index 8a9d0858fb4..21116c20d05 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs @@ -40,6 +40,23 @@ public void A_FlowWithContext_must_get_created_from_FlowAsFlowWithContext() .ExpectNext((new Message("az", 1L), 1L)) .ExpectComplete(); } + + [Fact] + public void A_FlowWithContext_must_be_able_to_map_materialized_value_via_FlowWithContext_MapMaterializedValue() + { + var materializedValue = "MatedValue"; + var mapMaterializedValueFlow = FlowWithContext.Create().MapMaterializedValue(_ => materializedValue); + + var (matValue, probe) = Source.From(new[] { new Message("a", 1L) }) + .MapMaterializedValue(_ => 42) + .AsSourceWithContext(m => m.Offset) + .ViaMaterialized(mapMaterializedValueFlow, Keep.Both) + .ToMaterialized(this.SinkProbe<(Message, long)>(), Keep.Both) + .Run(Materializer); + + matValue.ShouldBe((42, materializedValue)); + probe.Request(1).ExpectNext((new Message("a", 1L), 1L)).ExpectComplete(); + } } sealed class Message : IEquatable diff --git a/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs b/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs index 60c7a849443..97a1b52c859 100644 --- a/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs @@ -65,17 +65,13 @@ public void SourceWithContext_must_get_created_from_AsSourceWithContext() { var msg = new Message("a", 1); - var sink = this.CreateSubscriberProbe<(Message, long)>(); - Source.From(new[] { msg }) .AsSourceWithContext(x => x.Offset) - .AsSource() - .RunWith(Sink.FromSubscriber(sink), Materializer); - - var sub = sink.ExpectSubscription(); - sub.Request(1); - sink.ExpectNext((msg, 1L)); - sink.ExpectComplete(); + .ToMaterialized(this.SinkProbe<(Message, long)>(), Keep.Right) + .Run(Materializer) + .Request(1) + .ExpectNext((msg, 1L)) + .ExpectComplete(); } [Fact] @@ -100,8 +96,6 @@ public void SourceWithContext_must_be_able_to_get_turned_back_into_a_normal_sour [Fact] public void SourceWithContext_must_pass_through_context_using_Select_and_Where() { - var sink = this.CreateSubscriberProbe<(string, long)>(); - Source.From(new[] { new Message("A", 1), @@ -113,14 +107,12 @@ public void SourceWithContext_must_pass_through_context_using_Select_and_Where() .Select(m => m.Data.ToLower()) .Where(x => x != "b") .WhereNot(x => x == "d") - .AsSource() - .RunWith(Sink.FromSubscriber(sink), Materializer); - - var sub = sink.ExpectSubscription(); - sub.Request(2); - sink.ExpectNext(("a", 1L)); - sink.ExpectNext(("c", 4L)); - sink.ExpectComplete(); + .ToMaterialized(this.SinkProbe<(string, long)>(), Keep.Right) + .Run(Materializer) + .Request(2) + .ExpectNext(("a", 1L)) + .ExpectNext(("c", 4L)) + .ExpectComplete(); } [Fact] @@ -191,5 +183,18 @@ public void SourceWithContext_must_pass_through_sequence_of_context_per_element_ sink.ExpectComplete(); } + + [Fact] + public void SourceWithContext_must_be_able_to_change_materialized_value_via_MapMaterializedValue() + { + var materializedValue = "MatedValue"; + + Source.Empty() + .AsSourceWithContext(m => m.Offset) + .MapMaterializedValue(_ => materializedValue) + .To(Sink.Ignore<(Message, long)>()) + .Run(Materializer) + .ShouldBe(materializedValue); + } } } diff --git a/src/core/Akka.Streams/Dsl/FlowWithContext.cs b/src/core/Akka.Streams/Dsl/FlowWithContext.cs index 359295483b4..3a46e24c045 100644 --- a/src/core/Akka.Streams/Dsl/FlowWithContext.cs +++ b/src/core/Akka.Streams/Dsl/FlowWithContext.cs @@ -53,6 +53,12 @@ public FlowWithContext ViaMaterialized, TMat2> viaFlow, Func combine) => FlowWithContext.From(Flow.FromGraph(Inner).ViaMaterialized(viaFlow, combine)); + /// + /// Context-preserving variant of . + /// + public FlowWithContext MapMaterializedValue(Func combine) => + FlowWithContext.From(Flow.FromGraph(Inner).MapMaterializedValue(combine)); + [MethodImpl(MethodImplOptions.AggressiveInlining)] public Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> AsFlow() => Flow.FromGraph(Inner); } diff --git a/src/core/Akka.Streams/Dsl/SourceWithContext.cs b/src/core/Akka.Streams/Dsl/SourceWithContext.cs index d3e011cc2a4..2284f508100 100644 --- a/src/core/Akka.Streams/Dsl/SourceWithContext.cs +++ b/src/core/Akka.Streams/Dsl/SourceWithContext.cs @@ -48,6 +48,26 @@ public SourceWithContext ViaMaterialized, TMat2> viaFlow, Func combine) => new SourceWithContext(Source.FromGraph(Inner).ViaMaterialized(viaFlow, combine)); + /// + /// Connect this to a , + /// concatenating the processing steps of both. + /// + public IRunnableGraph To(IGraph, TMat2> sink) => + Source.FromGraph(Inner).ToMaterialized(sink, Keep.Left); + + /// + /// Connect this to a , + /// concatenating the processing steps of both. + /// + public IRunnableGraph ToMaterialized(IGraph, TMat2> sink, Func combine) => + Source.FromGraph(Inner).ToMaterialized(sink, combine); + + /// + /// Context-preserving variant of . + /// + public SourceWithContext MapMaterializedValue(Func combine) => + new SourceWithContext(Source.FromGraph(Inner).MapMaterializedValue(combine)); + /// /// Connect this to a Sink and run it. The returned value is the materialized value of the Sink. /// Note that the ActorSystem can be used as the implicit materializer parameter to use the SystemMaterializer for running the stream.