Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not use expression based props for long lived streams #6807

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public Children(IImmutableSet<IActorRef> refs)
/// <param name="haveShutdown">TBD</param>
/// <returns>TBD</returns>
public static Props Props(ActorMaterializerSettings settings, AtomicBoolean haveShutdown)
=> Actor.Props.Create(() => new StreamSupervisor(settings, haveShutdown)).WithDeploy(Deploy.Local);
=> Actor.Props.Create<StreamSupervisor>(settings, haveShutdown).WithDeploy(Deploy.Local);

/// <summary>
/// TBD
Expand All @@ -648,6 +648,7 @@ public static Props Props(ActorMaterializerSettings settings, AtomicBoolean have
/// </summary>
/// <param name="settings">TBD</param>
/// <param name="haveShutdown">TBD</param>
/// If this changes you must also change StreamSupervisor.Props as well!
public StreamSupervisor(ActorMaterializerSettings settings, AtomicBoolean haveShutdown)
{
Settings = settings;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/ActorRefSinkActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ActorRefSinkActor : ActorSubscriber
/// <param name="onCompleteMessage">TBD</param>
/// <returns>TBD</returns>
public static Props Props(IActorRef @ref, int highWatermark, object onCompleteMessage)
=> Actor.Props.Create(() => new ActorRefSinkActor(@ref, highWatermark, onCompleteMessage));
=> Actor.Props.Create<ActorRefSinkActor>(@ref, highWatermark, onCompleteMessage);

private ILoggingAdapter _log;

Expand All @@ -47,6 +47,7 @@ public static Props Props(IActorRef @ref, int highWatermark, object onCompleteMe
/// <param name="ref">TBD</param>
/// <param name="highWatermark">TBD</param>
/// <param name="onCompleteMessage">TBD</param>
/// If this gets changed you must change <see cref="ActorRefSinkActor.Props"/> as well!
public ActorRefSinkActor(IActorRef @ref, int highWatermark, object onCompleteMessage)
{
Ref = @ref;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
throw new NotSupportedException("Backpressure overflow strategy not supported");

var maxFixedBufferSize = settings.MaxFixedBufferSize;
return Actor.Props.Create(() => new ActorRefSourceActor<T>(bufferSize, overflowStrategy, maxFixedBufferSize));
return Actor.Props.Create<ActorRefSourceActor<T>>(bufferSize, overflowStrategy, maxFixedBufferSize);
}

/// <summary>
Expand All @@ -58,6 +58,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
/// <param name="bufferSize">TBD</param>
/// <param name="overflowStrategy">TBD</param>
/// <param name="maxFixedBufferSize">TBD</param>
/// If this changes you must also change <see cref="ActorRefSourceActor{T}.Props"/> as well!
public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, int maxFixedBufferSize)
{
BufferSize = bufferSize;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/FanOut.cs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ internal static class Unzip
/// <param name="settings">TBD</param>
/// <returns>TBD</returns>
public static Props Props<T>(ActorMaterializerSettings settings)
=> Actor.Props.Create(() => new Unzip<T>(settings, 2)).WithDeploy(Deploy.Local);
=> Actor.Props.Create<Unzip<T>>(settings, 2).WithDeploy(Deploy.Local);
}

/// <summary>
Expand All @@ -728,6 +728,7 @@ internal sealed class Unzip<T> : FanOut<T>
/// This exception is thrown when the elements in <see cref="Akka.Streams.Implementation.FanOut{T}.PrimaryInputs"/>
/// are of an unknown type.
/// </exception>>
/// If this gets changed you must change <see cref="Akka.Streams.Implementation.FanOut.Unzip{T}"/> as well!
public Unzip(ActorMaterializerSettings settings, int outputCount = 2) : base(settings, outputCount)
{
OutputBunch.MarkAllOutputs();
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/FanoutProcessorImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ internal sealed class FanoutProcessorImpl<T, TStreamBuffer> : ActorProcessorImpl
/// <param name="onTerminated">TBD</param>
/// <returns>TBD</returns>
public static Props Props(ActorMaterializerSettings settings, Action onTerminated = null)
=> Actor.Props.Create(() => new FanoutProcessorImpl<T, TStreamBuffer>(settings, onTerminated)).WithDeploy(Deploy.Local);
=> Actor.Props.Create<FanoutProcessorImpl<T, TStreamBuffer>>(settings, onTerminated).WithDeploy(Deploy.Local);

/// <summary>
/// TBD
Expand All @@ -239,6 +239,7 @@ public static Props Props(ActorMaterializerSettings settings, Action onTerminate
/// </summary>
/// <param name="settings">TBD</param>
/// <param name="onTerminated">TBD</param>
/// If this gets changed you must change <see cref="FanoutProcessorImpl{T,TStreamBuffer}.Props"/> as well!
public FanoutProcessorImpl(ActorMaterializerSettings settings, Action onTerminated) : base(settings)
{
PrimaryOutputs = new FanoutOutputs<T, TStreamBuffer>(settings.MaxInputBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,8 @@ private void Complete()
/// </summary>
/// <param name="shell">TBD</param>
/// <returns>TBD</returns>
public static Props Props(GraphInterpreterShell shell) => Actor.Props.Create(() => new ActorGraphInterpreter(shell)).WithDeploy(Deploy.Local);
public static Props Props(GraphInterpreterShell shell) => Actor.Props
.Create<ActorGraphInterpreter>(shell).WithDeploy(Deploy.Local);

private ISet<GraphInterpreterShell> _activeInterpreters = new HashSet<GraphInterpreterShell>();
private readonly Queue<GraphInterpreterShell> _newShells = new Queue<GraphInterpreterShell>();
Expand All @@ -1332,6 +1333,7 @@ private void Complete()
/// TBD
/// </summary>
/// <param name="shell">TBD</param>
/// If this ctor gets changed you -must- change <see cref="ActorGraphInterpreter.Props"/> as well!
public ActorGraphInterpreter(GraphInterpreterShell shell)
{
_initial = shell;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/IO/FilePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static Props Props(FileInfo f, TaskCompletionSource<IOResult> completionP
if (maxBuffer < initialBuffer)
throw new ArgumentException($"maxBuffer must be >= initialBuffer (was {maxBuffer})", nameof(maxBuffer));

return Actor.Props.Create(() => new FilePublisher(f, completionPromise, chunkSize, startPosition, maxBuffer))
return Actor.Props.Create<FilePublisher>( f, completionPromise, chunkSize, startPosition, maxBuffer)
.WithDeploy(Deploy.Local);
}

Expand Down Expand Up @@ -86,6 +86,7 @@ private struct Continue : IDeadLetterSuppression
/// <param name="chunkSize">TBD</param>
/// <param name="startPosition">TBD</param>
/// <param name="maxBuffer">TBD</param>
/// If this changes you must also change <see cref="FilePublisher.Props"/> as well!
public FilePublisher(FileInfo f, TaskCompletionSource<IOResult> completionPromise, int chunkSize, long startPosition, int maxBuffer)
{
_f = f;
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Props Props(
if (startPosition < 0)
throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition));

return Actor.Props.Create(() => new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand))
return Actor.Props.Create<FileSubscriber>(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand)
.WithDeploy(Deploy.Local);
}

Expand All @@ -72,6 +72,7 @@ public static Props Props(
/// <param name="fileMode">TBD</param>
/// <param name="autoFlush"></param>
/// <param name="flushSignaler"></param>
/// If this changes you must change <see cref="FileSubscriber.Props"/> as well!
public FileSubscriber(
FileInfo f,
TaskCompletionSource<IOResult> completionPromise,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static Props Props(Stream inputstream, TaskCompletionSource<IOResult> com
if (chunkSize <= 0)
throw new ArgumentException($"chunkSize must be > 0 was {chunkSize}", nameof(chunkSize));

return Actor.Props.Create(()=> new InputStreamPublisher(inputstream, completionSource, chunkSize)).WithDeploy(Deploy.Local);
return Actor.Props.Create<InputStreamPublisher>(inputstream, completionSource, chunkSize).WithDeploy(Deploy.Local);
}

private struct Continue : IDeadLetterSuppression
Expand All @@ -58,6 +58,7 @@ private struct Continue : IDeadLetterSuppression
/// <param name="inputstream">TBD</param>
/// <param name="completionSource">TBD</param>
/// <param name="chunkSize">TBD</param>
/// If this gets changed you must change <see cref="InputStreamPublisher.Props"/> as well!
public InputStreamPublisher(Stream inputstream, TaskCompletionSource<IOResult> completionSource, int chunkSize)
{
_inputstream = inputstream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static Props Props(Stream os, TaskCompletionSource<IOResult> completionPr
throw new ArgumentException("Buffer size must be > 0");

return
Actor.Props.Create(() => new OutputStreamSubscriber(os, completionPromise, bufferSize, autoFlush))
Actor.Props.Create<OutputStreamSubscriber>(os, completionPromise, bufferSize, autoFlush)
.WithDeploy(Deploy.Local);
}

Expand All @@ -53,6 +53,7 @@ public static Props Props(Stream os, TaskCompletionSource<IOResult> completionPr
/// <param name="completionPromise">TBD</param>
/// <param name="bufferSize">TBD</param>
/// <param name="autoFlush">TBD</param>
/// If this gets changed you must change <see cref="OutputStreamSubscriber.Props"/> as well!
public OutputStreamSubscriber(Stream outputStream, TaskCompletionSource<IOResult> completionPromise, int bufferSize, bool autoFlush)
{
_outputStream = outputStream;
Expand Down