Skip to content

Commit

Permalink
Merge branch 'dev' into FlowIdleInjectSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba authored Apr 4, 2023
2 parents 69e9cd7 + d925fd5 commit 097e925
Show file tree
Hide file tree
Showing 65 changed files with 2,026 additions and 1,872 deletions.
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,16 @@ You can start by taking the [Akka.NET Bootcamp](https://learnakka.net/), but the

## Build Status

| Stage | Status |
|------------------------------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Build | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=Windows%20Build)](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| NuGet Pack | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=NuGet%20Pack)](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Framework Unit Tests | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%20Framework%20Unit%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Framework MultiNode Tests | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%20Framework%20Multi-Node%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Core (Windows) Unit Tests | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%20Core%20Unit%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Core (Linux) Unit Tests | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%20Core%20Unit%20Tests%20(Linux))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Core (Windows) MultiNode Tests | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%20Core%20Multi-Node%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Core (Linux) MultiNode Tests | |
| Docs | [![Build Status](https://dev.azure.com/petabridge/akkadotnet-tools/_apis/build/status/Akka.NET%20Docs?branchName=dev)](https://dev.azure.com/petabridge/akkadotnet-tools/_build/latest?definitionId=82&branchName=dev) |
| Stage | Status |
|------------------------------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Build | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=Windows%20Build)](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| NuGet Pack | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=NuGet%20Pack)](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET Framework Unit Tests | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%20Framework%20Unit%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET 7 Unit Tests (Windows) | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%207%20Unit%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET 7 Unit Tests (Linux) | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%207%20Unit%20Tests%20(Linux))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET 7 MultiNode Tests (Windows) | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%207%20Multi-Node%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) |
| .NET 7 MultiNode Tests (Linux) | [![Build Status](https://dev.azure.com/dotnet/Akka.NET/_apis/build/status/akka.net/PR%20Validation?branchName=dev&jobName=.NET%207%20Multi-Node%20Tests%20(Windows))](https://dev.azure.com/dotnet/Akka.NET/_build/latest?definitionId=84&branchName=dev) | |
| Docs | [![Build Status](https://dev.azure.com/petabridge/akkadotnet-tools/_apis/build/status/Akka.NET%20Docs?branchName=dev)](https://dev.azure.com/petabridge/akkadotnet-tools/_build/latest?definitionId=82&branchName=dev) |


## Install Akka.NET via NuGet
Expand Down
18 changes: 14 additions & 4 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,11 @@ if more element are emitted the sink will cancel the stream

**cancels** If too many values are collected

### Foreach
### ForEach

Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure.

The sink materializes into a ``Task`` which completes when the
The sink materializes into a ``Task<Done>`` which completes when the
stream completes, or fails if the stream fails.

Note that it is not safe to mutate state from the procedure.
Expand All @@ -308,9 +308,19 @@ Note that it is not safe to mutate state from the procedure.

**backpressures** when the previous procedure invocation has not yet completed

### ForeachParallel
### ForEachASync

Like ``Foreach`` but allows up to ``parallellism`` procedure calls to happen in parallel.
Invoke a given procedure asynchronously for each element received. Note that if shared state is mutated from the procedure that must be done in a thread-safe way.

The sink materializes into a ``Task<Done>`` which completes when the stream completes, or fails if the stream fails.

**cancels** when a ``Task`` fails

**backpressures** when the number of ``Task``s reaches the configured parallelism

### ForEachParallel

Like ``ForEach`` but allows up to ``parallellism`` procedure calls to happen in parallel.

**cancels** never

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ namespace Akka.Actor
public sealed class SchedulerException : Akka.Actor.AkkaException
{
public SchedulerException(string message) { }
protected SchedulerException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public SchedulerException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class static SchedulerExtensions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ namespace Akka.Actor
public sealed class SchedulerException : Akka.Actor.AkkaException
{
public SchedulerException(string message) { }
protected SchedulerException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public SchedulerException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class static SchedulerExtensions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,8 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
[System.ObsoleteAttribute("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by " +
"calling some other API returning a Task or using Task.Run. Obsolete since 1.5.1")]
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Akka.Streams
public sealed class AbruptStageTerminationException : System.Exception
{
public AbruptStageTerminationException(Akka.Streams.Stage.GraphStageLogic logic) { }
protected AbruptStageTerminationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public AbruptStageTerminationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class AbruptTerminationException : System.Exception
{
Expand Down Expand Up @@ -1804,7 +1804,7 @@ namespace Akka.Streams.Dsl
public sealed class PartitionOutOfBoundsException : System.Exception
{
public PartitionOutOfBoundsException(string message) { }
protected PartitionOutOfBoundsException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public PartitionOutOfBoundsException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class PartitionWith<TIn, TOut0, TOut1> : Akka.Streams.Stage.GraphStage<Akka.Streams.FanOutShape<TIn, TOut0, TOut1>>
{
Expand Down Expand Up @@ -1960,6 +1960,8 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
[System.ObsoleteAttribute("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by " +
"calling some other API returning a Task or using Task.Run. Obsolete since 1.5.2")]
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Expand Down Expand Up @@ -2391,7 +2393,7 @@ namespace Akka.Streams.Dsl
public sealed class TcpIdleTimeoutException : System.TimeoutException
{
public TcpIdleTimeoutException(string message, System.TimeSpan duration) { }
protected TcpIdleTimeoutException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public TcpIdleTimeoutException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public System.TimeSpan Duration { get; }
}
public class static TcpStreamExtensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Akka.Streams
public sealed class AbruptStageTerminationException : System.Exception
{
public AbruptStageTerminationException(Akka.Streams.Stage.GraphStageLogic logic) { }
protected AbruptStageTerminationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public AbruptStageTerminationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class AbruptTerminationException : System.Exception
{
Expand Down Expand Up @@ -1804,7 +1804,7 @@ namespace Akka.Streams.Dsl
public sealed class PartitionOutOfBoundsException : System.Exception
{
public PartitionOutOfBoundsException(string message) { }
protected PartitionOutOfBoundsException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public PartitionOutOfBoundsException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class PartitionWith<TIn, TOut0, TOut1> : Akka.Streams.Stage.GraphStage<Akka.Streams.FanOutShape<TIn, TOut0, TOut1>>
{
Expand Down Expand Up @@ -1960,6 +1960,8 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<TIn>> FirstOrDefault<TIn>() { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEach<TIn>(System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachAsync<TIn>(int parallelism, System.Func<TIn, System.Threading.Tasks.Task> action) { }
[System.ObsoleteAttribute("Use `ForEachAsync` instead, it allows you to choose how to run the procedure, by " +
"calling some other API returning a Task or using Task.Run. Obsolete since 1.5.2")]
public static Akka.Streams.Dsl.Sink<TIn, System.Threading.Tasks.Task<Akka.Done>> ForEachParallel<TIn>(int parallelism, System.Action<TIn> action) { }
public static Akka.Streams.Dsl.Sink<TIn, TMat> FromGraph<TIn, TMat>(Akka.Streams.IGraph<Akka.Streams.SinkShape<TIn>, TMat> graph) { }
public static Akka.Streams.Dsl.Sink<TIn, Akka.NotUsed> FromSubscriber<TIn>(Reactive.Streams.ISubscriber<TIn> subscriber) { }
Expand Down Expand Up @@ -2391,7 +2393,7 @@ namespace Akka.Streams.Dsl
public sealed class TcpIdleTimeoutException : System.TimeoutException
{
public TcpIdleTimeoutException(string message, System.TimeSpan duration) { }
protected TcpIdleTimeoutException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public TcpIdleTimeoutException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public System.TimeSpan Duration { get; }
}
public class static TcpStreamExtensions
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public EndpointDisassociatedException(string message)
/// </summary>
/// <param name="info">The <see cref="SerializationInfo"/> that holds the serialized object data about the exception being thrown.</param>
/// <param name="context">The <see cref="StreamingContext"/> that contains contextual information about the source or destination.</param>
protected EndpointDisassociatedException(SerializationInfo info, StreamingContext context)
public EndpointDisassociatedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
Expand Down Expand Up @@ -361,7 +361,7 @@ public EndpointAssociationException(string message, Exception innerException) :
/// </summary>
/// <param name="info">The <see cref="SerializationInfo"/> that holds the serialized object data about the exception being thrown.</param>
/// <param name="context">The <see cref="StreamingContext"/> that contains contextual information about the source or destination.</param>
protected EndpointAssociationException(SerializationInfo info, StreamingContext context)
EndpointAssociationException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
Expand All @@ -386,7 +386,7 @@ public OversizedPayloadException(string message)
/// </summary>
/// <param name="info">The <see cref="SerializationInfo"/> that holds the serialized object data about the exception being thrown.</param>
/// <param name="context">The <see cref="StreamingContext"/> that contains contextual information about the source or destination.</param>
protected OversizedPayloadException(SerializationInfo info, StreamingContext context)
OversizedPayloadException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/AttributesSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public async Task Attributes_must_keep_the_outermost_attribute_as_the_least_spec

[Fact]
public void Attributes_must_give_access_to_first_attribute()
#pragma warning disable CS0618 // Type or member is obsolete
=> Attributes.GetFirstAttribute<Attributes.Name>().Value.Should().Be("a");
#pragma warning restore CS0618 // Type or member is obsolete

[Fact]
public void Attributes_must_give_access_to_attribute_by_type()
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/BidiFlowSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ public void A_BidiFlow_must_suitably_override_attribute_handling_methods()
var b = (BidiFlow<int, long, ByteString, string, NotUsed>)
Bidi().WithAttributes(Attributes.CreateName("")).Async().Named("name");

b.Module.Attributes.GetFirstAttribute<Attributes.Name>().Value.Should().Be("name");
b.Module.Attributes.GetFirstAttribute<Attributes.AsyncBoundary>()
b.Module.Attributes.GetAttribute<Attributes.Name>().Value.Should().Be("name");
b.Module.Attributes.GetAttribute<Attributes.AsyncBoundary>()
.Should()
.Be(Attributes.AsyncBoundary.Instance);
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void A_GroupedWithin_must_group_early()
}).ToArray());

RandomTestRange(Sys)
.ForEach(_ => RunScript(script, Settings, flow => flow.GroupedWithin(3, TimeSpan.FromMinutes(10))));
.Select(async _ => await RunScriptAsync(script, Settings, flow => flow.GroupedWithin(3, TimeSpan.FromMinutes(10))));
}

[Fact]
Expand All @@ -255,7 +255,7 @@ public void A_GroupedWithin_must_group_with_rest()
};

RandomTestRange(Sys)
.ForEach(_ => RunScript(script(), Settings, flow => flow.GroupedWithin(3, TimeSpan.FromMinutes(10))));
.Select(async _ => await RunScriptAsync(script(), Settings, flow => flow.GroupedWithin(3, TimeSpan.FromMinutes(10))));
}

[Fact(Skip = "Skipped for async_testkit conversion build")]
Expand Down
Loading

0 comments on commit 097e925

Please sign in to comment.