diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 2176971be9e..6af1dcc0ee5 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1324,7 +1324,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow AggregateAsync(this Akka.Streams.Dsl.Flow flow, TOut2 zero, System.Func> fold) { } public static Akka.Streams.Dsl.Flow AlsoTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that) { } public static Akka.Streams.Dsl.Flow AlsoToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } - public static Akka.Streams.Dsl.FlowWithContext AsFlowWithContext(this Akka.Streams.Dsl.Flow flow, System.Func collapseContext, System.Func extractContext) { } + public static Akka.Streams.Dsl.FlowWithContext AsFlowWithContext(this Akka.Streams.Dsl.Flow flow, System.Func collapseContext, System.Func extractContext) { } public static Akka.Streams.Dsl.Flow BackpressureTimeout(this Akka.Streams.Dsl.Flow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow Batch(this Akka.Streams.Dsl.Flow flow, long max, System.Func seed, System.Func aggregate) { } public static Akka.Streams.Dsl.Flow BatchWeighted(this Akka.Streams.Dsl.Flow flow, long max, System.Func costFunction, System.Func seed, System.Func aggregate) { } @@ -1416,28 +1416,29 @@ namespace Akka.Streams.Dsl } public class static FlowWithContext { - public static Akka.Streams.Dsl.FlowWithContext Create() { } - public static Akka.Streams.Dsl.FlowWithContext From(Akka.Streams.Dsl.Flow, System.ValueTuple, TMat> flow) { } + public static Akka.Streams.Dsl.FlowWithContext Create() { } + public static Akka.Streams.Dsl.FlowWithContext From(Akka.Streams.Dsl.Flow, System.ValueTuple, TMat> flow) { } } public class static FlowWithContextOperations { - public static Akka.Streams.Dsl.FlowWithContext Collect(this Akka.Streams.Dsl.FlowWithContext fn) + public static Akka.Streams.Dsl.FlowWithContext Collect(this Akka.Streams.Dsl.FlowWithContext fn) where TOut2 : class { } - public static Akka.Streams.Dsl.FlowWithContext, System.Collections.Generic.IReadOnlyList, TMat> Grouped(this Akka.Streams.Dsl.FlowWithContext Select(this Akka.Streams.Dsl.FlowWithContext fn) { } - public static Akka.Streams.Dsl.FlowWithContext SelectAsync(this Akka.Streams.Dsl.FlowWithContext> fn) { } - public static Akka.Streams.Dsl.FlowWithContext SelectConcat(this Akka.Streams.Dsl.FlowWithContext> fn) { } - public static Akka.Streams.Dsl.FlowWithContext SelectContext(this Akka.Streams.Dsl.FlowWithContext mapContext) { } - public static Akka.Streams.Dsl.FlowWithContext, System.Collections.Generic.IReadOnlyList, TMat> Sliding(this Akka.Streams.Dsl.FlowWithContext StatefulSelectConcat(this Akka.Streams.Dsl.FlowWithContext>> fn) { } - public static Akka.Streams.Dsl.FlowWithContext Where(this Akka.Streams.Dsl.FlowWithContext predicate) { } - public static Akka.Streams.Dsl.FlowWithContext WhereNot(this Akka.Streams.Dsl.FlowWithContext predicate) { } - } - public sealed class FlowWithContext : Akka.Streams.GraphDelegate, System.ValueTuple>, TMat> + public static Akka.Streams.Dsl.FlowWithContext, System.Collections.Generic.IReadOnlyList, TMat> Grouped(this Akka.Streams.Dsl.FlowWithContext Select(this Akka.Streams.Dsl.FlowWithContext fn) { } + public static Akka.Streams.Dsl.FlowWithContext SelectAsync(this Akka.Streams.Dsl.FlowWithContext> fn) { } + public static Akka.Streams.Dsl.FlowWithContext SelectConcat(this Akka.Streams.Dsl.FlowWithContext> fn) { } + public static Akka.Streams.Dsl.FlowWithContext SelectContext(this Akka.Streams.Dsl.FlowWithContext mapContext) { } + public static Akka.Streams.Dsl.FlowWithContext, System.Collections.Generic.IReadOnlyList, TMat> Sliding(this Akka.Streams.Dsl.FlowWithContext StatefulSelectConcat(this Akka.Streams.Dsl.FlowWithContext>> fn) { } + public static Akka.Streams.Dsl.FlowWithContext Where(this Akka.Streams.Dsl.FlowWithContext predicate) { } + public static Akka.Streams.Dsl.FlowWithContext WhereNot(this Akka.Streams.Dsl.FlowWithContext predicate) { } + } + [Akka.Annotations.ApiMayChangeAttribute()] + public sealed class FlowWithContext : Akka.Streams.GraphDelegate, System.ValueTuple>, TMat> { public Akka.Streams.Dsl.Flow, System.ValueTuple, TMat> AsFlow() { } - public Akka.Streams.Dsl.FlowWithContext Via(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow) { } - public Akka.Streams.Dsl.FlowWithContext ViaMaterialized(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow, System.Func combine) { } + public Akka.Streams.Dsl.FlowWithContext Via(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow) { } + public Akka.Streams.Dsl.FlowWithContext ViaMaterialized(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow, System.Func combine) { } } public sealed class Flow : Akka.Streams.Dsl.IFlow, Akka.Streams.IGraph>, Akka.Streams.IGraph, TMat> { @@ -1996,7 +1997,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source AggregateAsync(this Akka.Streams.Dsl.Source flow, TOut2 zero, System.Func> fold) { } public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that) { } public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } - public static Akka.Streams.Dsl.SourceWithContext AsSourceWithContext(this Akka.Streams.Dsl.Source flow, System.Func fn) { } + public static Akka.Streams.Dsl.SourceWithContext AsSourceWithContext(this Akka.Streams.Dsl.Source flow, System.Func fn) { } public static Akka.Streams.Dsl.Source BackpressureTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source Batch(this Akka.Streams.Dsl.Source flow, long max, System.Func seed, System.Func aggregate) { } public static Akka.Streams.Dsl.Source BatchWeighted(this Akka.Streams.Dsl.Source flow, long max, System.Func costFunction, System.Func seed, System.Func aggregate) { } @@ -2088,24 +2089,26 @@ namespace Akka.Streams.Dsl } public class static SourceWithContextOperations { - public static Akka.Streams.Dsl.SourceWithContext Collect(this Akka.Streams.Dsl.SourceWithContext flow, System.Func fn) + public static Akka.Streams.Dsl.SourceWithContext Collect(this Akka.Streams.Dsl.SourceWithContext flow, System.Func fn) where TOut2 : class { } - public static Akka.Streams.Dsl.SourceWithContext, System.Collections.Generic.IReadOnlyList, TMat> Grouped(this Akka.Streams.Dsl.SourceWithContext flow, int n) { } - public static Akka.Streams.Dsl.SourceWithContext Select(this Akka.Streams.Dsl.SourceWithContext flow, System.Func fn) { } - public static Akka.Streams.Dsl.SourceWithContext SelectAsync(this Akka.Streams.Dsl.SourceWithContext flow, int parallelism, System.Func> fn) { } - public static Akka.Streams.Dsl.SourceWithContext SelectConcat(this Akka.Streams.Dsl.SourceWithContext flow, System.Func> fn) { } - public static Akka.Streams.Dsl.SourceWithContext SelectContext(this Akka.Streams.Dsl.SourceWithContext flow, System.Func mapContext) { } - public static Akka.Streams.Dsl.SourceWithContext, System.Collections.Generic.IReadOnlyList, TMat> Sliding(this Akka.Streams.Dsl.SourceWithContext flow, int n, int step = 1) { } - public static Akka.Streams.Dsl.SourceWithContext StatefulSelectConcat(this Akka.Streams.Dsl.SourceWithContext flow, System.Func>> fn) { } - public static Akka.Streams.Dsl.SourceWithContext Where(this Akka.Streams.Dsl.SourceWithContext flow, System.Func predicate) { } - public static Akka.Streams.Dsl.SourceWithContext WhereNot(this Akka.Streams.Dsl.SourceWithContext flow, System.Func predicate) { } - } - public sealed class SourceWithContext : Akka.Streams.GraphDelegate>, TMat> + public static Akka.Streams.Dsl.SourceWithContext, System.Collections.Generic.IReadOnlyList, TMat> Grouped(this Akka.Streams.Dsl.SourceWithContext flow, int n) { } + public static Akka.Streams.Dsl.SourceWithContext Select(this Akka.Streams.Dsl.SourceWithContext flow, System.Func fn) { } + public static Akka.Streams.Dsl.SourceWithContext SelectAsync(this Akka.Streams.Dsl.SourceWithContext flow, int parallelism, System.Func> fn) { } + public static Akka.Streams.Dsl.SourceWithContext SelectConcat(this Akka.Streams.Dsl.SourceWithContext flow, System.Func> fn) { } + public static Akka.Streams.Dsl.SourceWithContext SelectContext(this Akka.Streams.Dsl.SourceWithContext flow, System.Func mapContext) { } + public static Akka.Streams.Dsl.SourceWithContext, System.Collections.Generic.IReadOnlyList, TMat> Sliding(this Akka.Streams.Dsl.SourceWithContext flow, int n, int step = 1) { } + public static Akka.Streams.Dsl.SourceWithContext StatefulSelectConcat(this Akka.Streams.Dsl.SourceWithContext flow, System.Func>> fn) { } + public static Akka.Streams.Dsl.SourceWithContext Where(this Akka.Streams.Dsl.SourceWithContext flow, System.Func predicate) { } + public static Akka.Streams.Dsl.SourceWithContext WhereNot(this Akka.Streams.Dsl.SourceWithContext flow, System.Func predicate) { } + } + [Akka.Annotations.ApiMayChangeAttribute()] + public sealed class SourceWithContext : Akka.Streams.GraphDelegate>, TMat> { public SourceWithContext(Akka.Streams.Dsl.Source, TMat> source) { } public Akka.Streams.Dsl.Source, TMat> AsSource() { } - public Akka.Streams.Dsl.SourceWithContext Via(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow) { } - public Akka.Streams.Dsl.SourceWithContext ViaMaterialized(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow, System.Func combine) { } + public TMat2 RunWith(Akka.Streams.IGraph>, TMat2> sink, Akka.Streams.IMaterializer materializer) { } + public Akka.Streams.Dsl.SourceWithContext Via(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow) { } + public Akka.Streams.Dsl.SourceWithContext ViaMaterialized(Akka.Streams.IGraph, System.ValueTuple>, TMat2> viaFlow, System.Func combine) { } } public sealed class Source : Akka.Streams.Dsl.IFlow, Akka.Streams.IGraph>, Akka.Streams.IGraph, TMat> { diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs new file mode 100644 index 00000000000..8a9d0858fb4 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs @@ -0,0 +1,80 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl +{ + public class FlowWithContextSpec : AkkaSpec + { + private ActorMaterializer Materializer { get; } + + public FlowWithContextSpec(ITestOutputHelper helper) : base(helper) + { + var settings = ActorMaterializerSettings.Create(Sys); + Materializer = ActorMaterializer.Create(Sys, settings); + } + + [Fact] + public void A_FlowWithContext_must_get_created_from_FlowAsFlowWithContext() + { + var flow = Flow.Create().Select(m => m.Copy(data: m.Data + "z")); + var flowWithContext = flow.AsFlowWithContext((m, o) => new Message(m.Data, o), m => m.Offset); + + Source.From(new[] { new Message("a", 1L) }) + .AsSourceWithContext(m => m.Offset) + .Via(flowWithContext) + .AsSource() + .RunWith(this.SinkProbe<(Message, long)>(), Materializer) + .Request(1) + .ExpectNext((new Message("az", 1L), 1L)) + .ExpectComplete(); + } + } + + sealed class Message : IEquatable + { + public string Data { get; } + public long Offset { get; } + + public Message(string data, long offset) + { + Data = data; + Offset = offset; + } + + public Message Copy(string data = null, long? offset = null) => new Message(data ?? Data, offset ?? Offset); + + public bool Equals(Message other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return string.Equals(Data, other.Data) && Offset == other.Offset; + } + + public override bool Equals(object obj) + { + if (obj is null) return false; + if (ReferenceEquals(this, obj)) return true; + return obj is Message other && Equals(other); + } + + public override int GetHashCode() + { + unchecked + { + return ((Data != null ? Data.GetHashCode() : 0) * 397) ^ Offset.GetHashCode(); + } + } + } +} diff --git a/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs b/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs index fae6584584a..60c7a849443 100644 --- a/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs @@ -126,17 +126,14 @@ public void SourceWithContext_must_pass_through_context_using_Select_and_Where() [Fact] public void SourceWithContext_must_pass_through_context_using_FlowWithContext() { - var flowWithContext = FlowWithContext.Create(); - - var msg = new Message("a", 1); + var flowWithContext = FlowWithContext.Create(); var sink = this.CreateSubscriberProbe<(string, long)>(); - Source.From(new[] { msg }) + Source.From(new[] { new Message("a", 1L) }) .AsSourceWithContext(x => x.Offset) .Select(x => x.Data) .Via(flowWithContext.Select(s => s + "b")) - .AsSource() .RunWith(Sink.FromSubscriber(sink), Materializer); var sub = sink.ExpectSubscription(); diff --git a/src/core/Akka.Streams.Tests/Dsl/WithContextUsageSpec.cs b/src/core/Akka.Streams.Tests/Dsl/WithContextUsageSpec.cs index ba9d243562b..42dc1c8b1cf 100644 --- a/src/core/Akka.Streams.Tests/Dsl/WithContextUsageSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/WithContextUsageSpec.cs @@ -214,7 +214,7 @@ private static CommittableMessage[] GenInput(int start, int end) => new CommittableMessage(new Record(GenKey(i), GenValue(i)), new CommittableOffsetImpl(i))) .ToArray(); - private static SourceWithContext CreateSourceWithContext( + private static SourceWithContext CreateSourceWithContext( params CommittableMessage[] messages) => CommittableConsumer.CommittableSource(messages) .AsSourceWithContext(m => new Offset(m.Offset.Offset)) diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index af18310fb73..b8359b7beea 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -2399,7 +2399,7 @@ public static Flow Watch(this Flow flow, IActor /// Resulting context type /// Materialized value type /// Type of passed flow elements - public static FlowWithContext AsFlowWithContext( + public static FlowWithContext AsFlowWithContext( this Flow flow, Func collapseContext, Func extractContext) diff --git a/src/core/Akka.Streams/Dsl/FlowWithContext.cs b/src/core/Akka.Streams/Dsl/FlowWithContext.cs index 5edc6d8665f..359295483b4 100644 --- a/src/core/Akka.Streams/Dsl/FlowWithContext.cs +++ b/src/core/Akka.Streams/Dsl/FlowWithContext.cs @@ -7,6 +7,7 @@ using System; using System.Runtime.CompilerServices; +using Akka.Annotations; namespace Akka.Streams.Dsl { @@ -17,17 +18,16 @@ namespace Akka.Streams.Dsl /// operations. /// /// An "empty" flow can be created by calling . - /// - /// API MAY CHANGE - /// - public sealed class FlowWithContext + /// + [ApiMayChange] + public sealed class FlowWithContext : GraphDelegate, TMat> { - internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) + internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) : base(flow) { } - + /// /// Transform this flow by the regular flow. The given flow must support manual context propagation by /// taking and producing tuples of (data, context). @@ -35,10 +35,10 @@ internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) /// This can be used as an escape hatch for operations that are not (yet) provided with automatic /// context propagation here. /// - public FlowWithContext Via( + public FlowWithContext Via( IGraph, TMat2> viaFlow) => FlowWithContext.From(Flow.FromGraph(Inner).Via(viaFlow)); - + /// /// Transform this flow by the regular flow. The given flow must support manual context propagation by /// taking and producing tuples of (data, context). @@ -49,7 +49,7 @@ public FlowWithContext Via /// The function is used to compose the materialized values of this flow and that /// flow into the materialized value of the resulting Flow. /// - public FlowWithContext ViaMaterialized( + public FlowWithContext ViaMaterialized( IGraph, TMat2> viaFlow, Func combine) => FlowWithContext.From(Flow.FromGraph(Inner).ViaMaterialized(viaFlow, combine)); @@ -60,17 +60,17 @@ public FlowWithContext ViaMaterialized - /// Creates an "empty" that passes elements through with their context unchanged. + /// Creates an "empty" that passes elements through with their context unchanged. /// - /// /// + /// /// - public static FlowWithContext Create() + public static FlowWithContext Create() { var under = Flow.Create<(TIn, TCtx), NotUsed>(); - return new FlowWithContext(under); + return new FlowWithContext(under); } - + /// /// Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements. /// @@ -81,8 +81,8 @@ public static FlowWithContext Create() /// /// /// - public static FlowWithContext From( - Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) => - new FlowWithContext(flow); + public static FlowWithContext From( + Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) => + new FlowWithContext(flow); } } diff --git a/src/core/Akka.Streams/Dsl/FlowWithContextOperations.cs b/src/core/Akka.Streams/Dsl/FlowWithContextOperations.cs index 3c13979dd52..f014543a46b 100644 --- a/src/core/Akka.Streams/Dsl/FlowWithContextOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowWithContextOperations.cs @@ -8,7 +8,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Runtime.InteropServices; using System.Threading.Tasks; using Akka.Streams.Implementation.Fusing; @@ -17,20 +16,20 @@ namespace Akka.Streams.Dsl public static class FlowWithContextOperations { /// - /// Context-preserving variant of + /// Context-preserving variant of /// - public static FlowWithContext Select( - this FlowWithContext flow, Func fn) + public static FlowWithContext Select( + this FlowWithContext flow, Func fn) { var stage = new Select<(TOut, TCtx), (TOut2, TCtx)>(x => (fn(x.Item1), x.Item2)); return flow.Via(Flow.FromGraph(stage)); } /// - /// Context-preserving variant of + /// Context-preserving variant of /// - public static FlowWithContext SelectAsync( - this FlowWithContext flow, int parallelism, Func> fn) + public static FlowWithContext SelectAsync( + this FlowWithContext flow, int parallelism, Func> fn) { var stage = new SelectAsync<(TOut, TCtx), (TOut2, TCtx)>(parallelism, async x => (await fn(x.Item1), x.Item2)); @@ -38,10 +37,10 @@ public static FlowWithContext SelectAsync - /// Context-preserving variant of + /// Context-preserving variant of /// - public static FlowWithContext Collect( - this FlowWithContext flow, Func fn) where TOut2 : class + public static FlowWithContext Collect( + this FlowWithContext flow, Func fn) where TOut2 : class { var stage = new Collect<(TOut, TCtx), (TOut2, TCtx)>(func: x => { @@ -54,8 +53,8 @@ public static FlowWithContext Collect /// Context-preserving variant of /// - public static FlowWithContext Where( - this FlowWithContext flow, Func predicate) + public static FlowWithContext Where( + this FlowWithContext flow, Func predicate) { var stage = new Where<(TOut, TCtx)>(x => predicate(x.Item1)); return flow.Via(Flow.FromGraph(stage)); @@ -64,8 +63,8 @@ public static FlowWithContext Where /// Context-preserving variant of /// - public static FlowWithContext WhereNot( - this FlowWithContext flow, Func predicate) + public static FlowWithContext WhereNot( + this FlowWithContext flow, Func predicate) { var stage = new Where<(TOut, TCtx)>(x => !predicate(x.Item1)); return flow.Via(Flow.FromGraph(stage)); @@ -75,9 +74,8 @@ public static FlowWithContext WhereNot /// Each output group will be associated with a `Seq` of corresponding context elements. /// - public static FlowWithContext, IReadOnlyList, TMat> Grouped( - this FlowWithContext flow, int n) + public static FlowWithContext, IReadOnlyList, TMat> Grouped( + this FlowWithContext flow, int n) { var stage = new Grouped<(TOut, TCtx)>(n); return flow.Via(Flow.FromGraph(stage).Select(itemsWithContexts => @@ -99,9 +97,8 @@ public static FlowWithContext WhereNot /// Each output group will be associated with a `Seq` of corresponding context elements. /// - public static FlowWithContext, IReadOnlyList, TMat> Sliding( - this FlowWithContext flow, int n, int step = 1) + public static FlowWithContext, IReadOnlyList, TMat> Sliding( + this FlowWithContext flow, int n, int step = 1) { var stage = new Sliding<(TOut, TCtx)>(n, step); return flow.Via(Flow.FromGraph(stage).Select(itemsWithContexts => @@ -120,19 +117,19 @@ public static FlowWithContext WhereNot - /// Context-preserving variant of . + /// Context-preserving variant of . /// The context of the input element will be associated with each of the output elements calculated from /// this input element. /// - public static FlowWithContext SelectConcat( - this FlowWithContext flow, Func> fn) => + public static FlowWithContext SelectConcat( + this FlowWithContext flow, Func> fn) => StatefulSelectConcat(flow, () => fn); /// - /// Context-preserving variant of . + /// Context-preserving variant of . /// - public static FlowWithContext StatefulSelectConcat( - this FlowWithContext flow, Func>> fn) + public static FlowWithContext StatefulSelectConcat( + this FlowWithContext flow, Func>> fn) { var stage = new StatefulSelectMany<(TOut, TCtx), (TOut2, TCtx)>(() => { @@ -149,33 +146,32 @@ public static FlowWithContext StatefulSelectConcat /// /// Apply the given function to each context element (leaving the data elements unchanged). /// - public static FlowWithContext SelectContext( - this FlowWithContext flow, Func mapContext) + public static FlowWithContext SelectContext( + this FlowWithContext flow, Func mapContext) { var stage = new Select<(TOut, TCtx), (TOut, TCtx2)>(x => (x.Item1, mapContext(x.Item2))); return flow.Via(Flow.FromGraph(stage)); } } - - + public static class SourceWithContextOperations { /// - /// Context-preserving variant of + /// Context-preserving variant of /// - public static SourceWithContext Select( - this SourceWithContext flow, Func fn) + public static SourceWithContext Select( + this SourceWithContext flow, Func fn) { var stage = new Select<(TOut, TCtx), (TOut2, TCtx)>(x => (fn(x.Item1), x.Item2)); return flow.Via(Flow.FromGraph(stage)); } /// - /// Context-preserving variant of + /// Context-preserving variant of /// - public static SourceWithContext SelectAsync( - this SourceWithContext flow, int parallelism, Func> fn) + public static SourceWithContext SelectAsync( + this SourceWithContext flow, int parallelism, Func> fn) { var stage = new SelectAsync<(TOut, TCtx), (TOut2, TCtx)>(parallelism, async x => (await fn(x.Item1), x.Item2)); @@ -185,8 +181,8 @@ public static SourceWithContext SelectAsync /// Context-preserving variant of /// - public static SourceWithContext Collect( - this SourceWithContext flow, Func fn) where TOut2 : class + public static SourceWithContext Collect( + this SourceWithContext flow, Func fn) where TOut2 : class { var stage = new Collect<(TOut, TCtx), (TOut2, TCtx)>(func: x => { @@ -199,8 +195,8 @@ public static SourceWithContext Collect /// Context-preserving variant of /// - public static SourceWithContext Where( - this SourceWithContext flow, Func predicate) + public static SourceWithContext Where( + this SourceWithContext flow, Func predicate) { var stage = new Where<(TOut, TCtx)>(x => predicate(x.Item1)); return flow.Via(Flow.FromGraph(stage)); @@ -209,8 +205,8 @@ public static SourceWithContext Where( /// /// Context-preserving variant of /// - public static SourceWithContext WhereNot( - this SourceWithContext flow, Func predicate) + public static SourceWithContext WhereNot( + this SourceWithContext flow, Func predicate) { var stage = new Where<(TOut, TCtx)>(x => !predicate(x.Item1)); return flow.Via(Flow.FromGraph(stage)); @@ -220,8 +216,8 @@ public static SourceWithContext WhereNot( /// Context-preserving variant of /// Each output group will be associated with a `Seq` of corresponding context elements. /// - public static SourceWithContext, IReadOnlyList, TMat> Grouped( - this SourceWithContext flow, int n) + public static SourceWithContext, IReadOnlyList, TMat> Grouped( + this SourceWithContext flow, int n) { var stage = new Grouped<(TOut, TCtx)>(n); return flow.Via(Flow.FromGraph(stage).Select(itemsWithContexts => @@ -243,8 +239,8 @@ public static SourceWithContext, IReadOnlyList, TMat> /// Context-preserving variant of /// Each output group will be associated with a `Seq` of corresponding context elements. /// - public static SourceWithContext, IReadOnlyList, TMat> Sliding( - this SourceWithContext flow, int n, int step = 1) + public static SourceWithContext, IReadOnlyList, TMat> Sliding( + this SourceWithContext flow, int n, int step = 1) { var stage = new Sliding<(TOut, TCtx)>(n, step); return flow.Via(Flow.FromGraph(stage).Select(itemsWithContexts => @@ -267,15 +263,15 @@ public static SourceWithContext, IReadOnlyList, TMat> /// The context of the input element will be associated with each of the output elements calculated from /// this input element. /// - public static SourceWithContext SelectConcat( - this SourceWithContext flow, Func> fn) => + public static SourceWithContext SelectConcat( + this SourceWithContext flow, Func> fn) => StatefulSelectConcat(flow, () => fn); /// /// Context-preserving variant of . /// - public static SourceWithContext StatefulSelectConcat( - this SourceWithContext flow, Func>> fn) + public static SourceWithContext StatefulSelectConcat( + this SourceWithContext flow, Func>> fn) { var stage = new StatefulSelectMany<(TOut, TCtx), (TOut2, TCtx)>(() => { @@ -292,8 +288,8 @@ public static SourceWithContext StatefulSelectConcat /// Apply the given function to each context element (leaving the data elements unchanged). /// - public static SourceWithContext SelectContext( - this SourceWithContext flow, Func mapContext) + public static SourceWithContext SelectContext( + this SourceWithContext flow, Func mapContext) { var stage = new Select<(TOut, TCtx), (TOut, TCtx2)>(x => (x.Item1, mapContext(x.Item2))); diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index bd681d300c2..7c22a513630 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -2275,10 +2275,9 @@ public static Source OrElseMaterialized(this So /// Type of produced events. /// Type of materialized value. /// - public static SourceWithContext AsSourceWithContext( - this Source flow, Func fn) => - new SourceWithContext(flow.Select(x => (x, fn(x)))); - + public static SourceWithContext AsSourceWithContext(this Source flow, Func fn) => + new SourceWithContext(flow.Select(x => (x, fn(x)))); + /// /// The operator fails with an if the target actor is terminated. /// diff --git a/src/core/Akka.Streams/Dsl/SourceWithContext.cs b/src/core/Akka.Streams/Dsl/SourceWithContext.cs index 96ca92ebd7c..d3e011cc2a4 100644 --- a/src/core/Akka.Streams/Dsl/SourceWithContext.cs +++ b/src/core/Akka.Streams/Dsl/SourceWithContext.cs @@ -6,7 +6,7 @@ //----------------------------------------------------------------------- using System; -using Akka.Streams.Implementation; +using Akka.Annotations; namespace Akka.Streams.Dsl { @@ -15,17 +15,15 @@ namespace Akka.Streams.Dsl /// Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can /// use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported /// operations. - /// - /// API MAY CHANGE /// - public sealed class SourceWithContext : GraphDelegate, TMat> + [ApiMayChange] + public sealed class SourceWithContext : GraphDelegate, TMat> { public SourceWithContext(Source<(TOut, TCtx), TMat> source) - : base(source) + : base(source) { } - - + /// /// Transform this flow by the regular flow. The given flow must support manual context propagation by /// taking and producing tuples of (data, context). @@ -33,10 +31,9 @@ public SourceWithContext(Source<(TOut, TCtx), TMat> source) /// This can be used as an escape hatch for operations that are not (yet) provided with automatic /// context propagation here. /// - public SourceWithContext Via( - IGraph, TMat2> viaFlow) => - new SourceWithContext(Source.FromGraph(Inner).Via(viaFlow)); - + public SourceWithContext Via(IGraph, TMat2> viaFlow) => + new SourceWithContext(Source.FromGraph(Inner).Via(viaFlow)); + /// /// Transform this flow by the regular flow. The given flow must support manual context propagation by /// taking and producing tuples of (data, context). @@ -47,16 +44,25 @@ public SourceWithContext Via( /// The function is used to compose the materialized values of this flow and that /// flow into the materialized value of the resulting Flow. /// - public SourceWithContext ViaMaterialized( + public SourceWithContext ViaMaterialized( IGraph, TMat2> viaFlow, Func combine) => - new SourceWithContext(Source.FromGraph(Inner).ViaMaterialized(viaFlow, combine)); + new SourceWithContext(Source.FromGraph(Inner).ViaMaterialized(viaFlow, combine)); + + /// + /// Connect this to a Sink and run it. The returned value is the materialized value of the Sink. + /// Note that the ActorSystem can be used as the implicit materializer parameter to use the SystemMaterializer for running the stream. + /// + /// + /// + /// + /// + public TMat2 RunWith(IGraph, TMat2> sink, IMaterializer materializer) + => Source.FromGraph(Inner).RunWith(sink, materializer); - /// ///Stops automatic context propagation from here and converts this to a regular ///stream of a pair of (data, context). /// - public Source<(TOut, TCtx), TMat> AsSource() => - Inner is Source<(TOut, TCtx), TMat> source ? source : Source.FromGraph(Inner); + public Source<(TOut, TCtx), TMat> AsSource() => Inner is Source<(TOut, TCtx), TMat> source ? source : Source.FromGraph(Inner); } }