Skip to content

Commit

Permalink
Add MapMaterializedValue for Source/Flow WithContext (#5711)
Browse files Browse the repository at this point in the history
Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
ismaelhamed and Arkatufus authored Mar 16, 2022
1 parent 19811a1 commit 7732b36
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,7 @@ namespace Akka.Streams.Dsl
public sealed class FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> : Akka.Streams.GraphDelegate<Akka.Streams.FlowShape<System.ValueTuple<TIn, TCtxIn>, System.ValueTuple<TOut, TCtxOut>>, TMat>
{
public Akka.Streams.Dsl.Flow<System.ValueTuple<TIn, TCtxIn>, System.ValueTuple<TOut, TCtxOut>, TMat> AsFlow() { }
public Akka.Streams.Dsl.FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> combine) { }
public Akka.Streams.Dsl.FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat> Via<TOut2, TCtx2, TMat2>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtxOut>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow) { }
public Akka.Streams.Dsl.FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtxOut>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow, System.Func<TMat, TMat2, TMat3> combine) { }
}
Expand Down Expand Up @@ -2106,7 +2107,10 @@ namespace Akka.Streams.Dsl
{
public SourceWithContext(Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtx>, TMat> source) { }
public Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtx>, TMat> AsSource() { }
public Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> combine) { }
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<System.ValueTuple<TOut, TCtx>>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat> To<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<System.ValueTuple<TOut, TCtx>>, TMat2> sink) { }
public Akka.Streams.Dsl.IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.SinkShape<System.ValueTuple<TOut, TCtx>>, TMat2> sink, System.Func<TMat, TMat2, TMat3> combine) { }
public Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx2, TMat> Via<TOut2, TCtx2, TMat2>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtx>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow) { }
public Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtx>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow, System.Func<TMat, TMat2, TMat3> combine) { }
}
Expand Down
17 changes: 17 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ public void A_FlowWithContext_must_get_created_from_FlowAsFlowWithContext()
.ExpectNext((new Message("az", 1L), 1L))
.ExpectComplete();
}

[Fact]
public void A_FlowWithContext_must_be_able_to_map_materialized_value_via_FlowWithContext_MapMaterializedValue()
{
var materializedValue = "MatedValue";
var mapMaterializedValueFlow = FlowWithContext.Create<Message, long>().MapMaterializedValue(_ => materializedValue);

var (matValue, probe) = Source.From(new[] { new Message("a", 1L) })
.MapMaterializedValue(_ => 42)
.AsSourceWithContext(m => m.Offset)
.ViaMaterialized(mapMaterializedValueFlow, Keep.Both)
.ToMaterialized(this.SinkProbe<(Message, long)>(), Keep.Both)
.Run(Materializer);

matValue.ShouldBe((42, materializedValue));
probe.Request(1).ExpectNext((new Message("a", 1L), 1L)).ExpectComplete();
}
}

sealed class Message : IEquatable<Message>
Expand Down
43 changes: 24 additions & 19 deletions src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,13 @@ public void SourceWithContext_must_get_created_from_AsSourceWithContext()
{
var msg = new Message("a", 1);

var sink = this.CreateSubscriberProbe<(Message, long)>();

Source.From(new[] { msg })
.AsSourceWithContext(x => x.Offset)
.AsSource()
.RunWith(Sink.FromSubscriber(sink), Materializer);

var sub = sink.ExpectSubscription();
sub.Request(1);
sink.ExpectNext((msg, 1L));
sink.ExpectComplete();
.ToMaterialized(this.SinkProbe<(Message, long)>(), Keep.Right)
.Run(Materializer)
.Request(1)
.ExpectNext((msg, 1L))
.ExpectComplete();
}

[Fact]
Expand All @@ -100,8 +96,6 @@ public void SourceWithContext_must_be_able_to_get_turned_back_into_a_normal_sour
[Fact]
public void SourceWithContext_must_pass_through_context_using_Select_and_Where()
{
var sink = this.CreateSubscriberProbe<(string, long)>();

Source.From(new[]
{
new Message("A", 1),
Expand All @@ -113,14 +107,12 @@ public void SourceWithContext_must_pass_through_context_using_Select_and_Where()
.Select(m => m.Data.ToLower())
.Where(x => x != "b")
.WhereNot(x => x == "d")
.AsSource()
.RunWith(Sink.FromSubscriber(sink), Materializer);

var sub = sink.ExpectSubscription();
sub.Request(2);
sink.ExpectNext(("a", 1L));
sink.ExpectNext(("c", 4L));
sink.ExpectComplete();
.ToMaterialized(this.SinkProbe<(string, long)>(), Keep.Right)
.Run(Materializer)
.Request(2)
.ExpectNext(("a", 1L))
.ExpectNext(("c", 4L))
.ExpectComplete();
}

[Fact]
Expand Down Expand Up @@ -191,5 +183,18 @@ public void SourceWithContext_must_pass_through_sequence_of_context_per_element_

sink.ExpectComplete();
}

[Fact]
public void SourceWithContext_must_be_able_to_change_materialized_value_via_MapMaterializedValue()
{
var materializedValue = "MatedValue";

Source.Empty<Message>()
.AsSourceWithContext(m => m.Offset)
.MapMaterializedValue(_ => materializedValue)
.To(Sink.Ignore<(Message, long)>())
.Run(Materializer)
.ShouldBe(materializedValue);
}
}
}
6 changes: 6 additions & 0 deletions src/core/Akka.Streams/Dsl/FlowWithContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat3> ViaMaterialized<TOut2,
IGraph<FlowShape<(TOut, TCtxOut), (TOut2, TCtx2)>, TMat2> viaFlow, Func<TMat, TMat2, TMat3> combine) =>
FlowWithContext.From(Flow.FromGraph(Inner).ViaMaterialized(viaFlow, combine));

/// <summary>
/// Context-preserving variant of <see cref="Flow{TIn, TOut, TMat2}.MapMaterializedValue{TMat2}(Func{TMat2, TMat2})"/>.
/// </summary>
public FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat2> MapMaterializedValue<TMat2>(Func<TMat, TMat2> combine) =>
FlowWithContext.From(Flow.FromGraph(Inner).MapMaterializedValue(combine));

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> AsFlow() => Flow.FromGraph(Inner);
}
Expand Down
20 changes: 20 additions & 0 deletions src/core/Akka.Streams/Dsl/SourceWithContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ public SourceWithContext<TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat
IGraph<FlowShape<(TOut, TCtx), (TOut2, TCtx2)>, TMat2> viaFlow, Func<TMat, TMat2, TMat3> combine) =>
new SourceWithContext<TOut2, TCtx2, TMat3>(Source.FromGraph(Inner).ViaMaterialized(viaFlow, combine));

/// <summary>
/// Connect this <see cref="SourceWithContext{TOut, TCtx, TMat2}"/> to a <see cref="Sink"/>,
/// concatenating the processing steps of both.
/// </summary>
public IRunnableGraph<TMat> To<TMat2>(IGraph<SinkShape<(TOut, TCtx)>, TMat2> sink) =>
Source.FromGraph(Inner).ToMaterialized(sink, Keep.Left);

/// <summary>
/// Connect this <see cref="SourceWithContext{TOut, TCtx, TMat2}"/> to a <see cref="Sink"/>,
/// concatenating the processing steps of both.
/// </summary>
public IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(IGraph<SinkShape<(TOut, TCtx)>, TMat2> sink, Func<TMat, TMat2, TMat3> combine) =>
Source.FromGraph(Inner).ToMaterialized(sink, combine);

/// <summary>
/// Context-preserving variant of <see cref="Source{TOut, TMat2}.MapMaterializedValue{TMat2}(Func{TMat2, TMat2})"/>.
/// </summary>
public SourceWithContext<TOut, TCtx, TMat2> MapMaterializedValue<TMat2>(Func<TMat, TMat2> combine) =>
new SourceWithContext<TOut, TCtx, TMat2>(Source.FromGraph(Inner).MapMaterializedValue(combine));

/// <summary>
/// Connect this <see cref="SourceWithContext{TOut,TCtx,TMat}"/> to a Sink and run it. The returned value is the materialized value of the Sink.
/// Note that the ActorSystem can be used as the implicit materializer parameter to use the SystemMaterializer for running the stream.
Expand Down

0 comments on commit 7732b36

Please sign in to comment.