Skip to content

Commit

Permalink
[Obsolete][CS0618] AwaitResult > Use ShouldCompleteWithin instead (
Browse files Browse the repository at this point in the history
…#6498)

* [Obsolete][CS0618] `AwaitResult` > `Use ShouldCompleteWithin instead`

* fix

* more fixed

* [test] Within `2.Seconds()`

* [test][FileSinkSpec] change to `Async`s

* Fixes

* [revert] `ValveSpec`  seq.Invoking*

* [change] `await this.AssertAllStagesStoppedAsync(async() => { }, materializer);`

* improves `FlowAggregateAsyncSpec`, `FlowSelectAsyncSpec`

* fixed

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
3 people authored Mar 20, 2023
1 parent 7ca22af commit a1d663f
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 303 deletions.
13 changes: 8 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/AttributesSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
using Akka.Streams.Implementation;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;

namespace Akka.Streams.Tests.Dsl
Expand All @@ -30,26 +32,27 @@ public AttributesSpec()
Attributes.CreateName("a").And(Attributes.CreateName("b")).And(Attributes.CreateInputBuffer(1, 2));

[Fact]
public void Attributes_must_be_overridable_on_a_module_basis()
public async Task Attributes_must_be_overridable_on_a_module_basis()
{
var runnable =
Source.Empty<NotUsed>()
.ToMaterialized(AttributesSink.Create().WithAttributes(Attributes.CreateName("new-name")),
Keep.Right);
var task = runnable.Run(Materializer);

task.AwaitResult().GetAttribute<Attributes.Name>().Value.Should().Contain("new-name");
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.GetAttribute<Attributes.Name>().Value.Should().Contain("new-name");
}

[Fact]
public void Attributes_must_keep_the_outermost_attribute_as_the_least_specific()
public async Task Attributes_must_keep_the_outermost_attribute_as_the_least_specific()
{
var task = Source.Empty<NotUsed>()
.ToMaterialized(AttributesSink.Create(), Keep.Right)
.WithAttributes(Attributes.CreateName("new-name"))
.Run(Materializer);

task.AwaitResult().GetAttribute<Attributes.Name>().Value.Should().Contain("attributesSink");
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.GetAttribute<Attributes.Name>().Value.Should().Contain("attributesSink");
}

[Fact]
Expand Down
41 changes: 24 additions & 17 deletions src/core/Akka.Streams.Tests/Dsl/FlowAggregateAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,43 +56,47 @@ private static Sink<int, Task<int>> AggregateSink


[Fact]
public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync()
public async Task A_AggregateAsync_must_work_when_using_Source_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = AggregateSource.RunWith(Sink.First<int>(), Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_work_when_using_Sink_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_AggregateAsync_must_work_when_using_Flow_AggregateAsync()
{
var flowTimeout = TimeSpan.FromMilliseconds(FlowDelayInMs*Input.Count()) + TimeSpan.FromSeconds(3);
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
task.AwaitResult(flowTimeout).Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(flowTimeout);
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_work_when_using_Source_AggregateAsync_and_Flow_AggregateAsync_and_Sink_AggregateAsync()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

Expand Down Expand Up @@ -277,11 +281,11 @@ await this.AssertAllStagesStoppedAsync(async () =>
}

[Fact]
public void A_AggregateAsync_must_finish_after_task_failure()
public async Task A_AggregateAsync_must_finish_after_task_failure()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
Source.From(Enumerable.Range(1, 3)).AggregateAsync(1, (_, n) => Task.Run(() =>
var complete = await Source.From(Enumerable.Range(1, 3)).AggregateAsync(1, (_, n) => Task.Run(() =>
{
if (n == 3)
throw new Exception("err3b");
Expand All @@ -290,7 +294,8 @@ public void A_AggregateAsync_must_finish_after_task_failure()
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer)
.AwaitResult().Should().BeEquivalentTo(2);
.ShouldCompleteWithin(3.Seconds());
complete.Should().BeEquivalentTo(2);
}, Materializer);
}

Expand Down Expand Up @@ -417,22 +422,24 @@ public void A_AggregateAsync_must_handle_cancel_properly()
[Fact]
public void A_AggregateAsync_must_complete_task_and_return_zero_given_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = Source.From(Enumerable.Empty<int>())
.RunAggregateAsync(0, (acc, element) => Task.FromResult(acc + element), Materializer);
task.AwaitResult(RemainingOrDefault).ShouldBe(0);
var complete = await task.ShouldCompleteWithin(RemainingOrDefault);
complete.ShouldBe(0);
}, Materializer);
}

[Fact]
public void A_AggregateAsync_must_complete_task_and_return_zero_and_item_given_a_stream_of_one_item()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = Source.Single(100)
.RunAggregateAsync(5, (acc, element) => Task.FromResult(acc + element), Materializer);
task.AwaitResult(RemainingOrDefault).ShouldBe(105);
var complete = await task.ShouldCompleteWithin(RemainingOrDefault);
complete.ShouldBe(105);
}, Materializer);
}
}
Expand Down
42 changes: 26 additions & 16 deletions src/core/Akka.Streams.Tests/Dsl/FlowAggregateSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Akka.TestKit.Extensions;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -38,50 +40,55 @@ public FlowAggregateSpec(ITestOutputHelper helper) : base(helper)
[Fact]
public void A_Aggregate_must_work_when_using_Source_RunAggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.RunAggregate(0, (sum, i) => sum + i, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Source_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.RunWith(Sink.First<int>(), Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Sink_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Flow_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
task.AwaitResult().Should().Be(Expected);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected);
}, Materializer);
}

Expand Down Expand Up @@ -129,7 +136,7 @@ public void
[Fact]
public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var error = new Exception("boom");
var aggregate = Sink.Aggregate(0, (int x, int y) =>
Expand All @@ -142,14 +149,15 @@ public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregat
var task = InputSource.RunWith(
aggregate.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)),
Materializer);
task.AwaitResult().Should().Be(Expected - 50);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Expected - 50);
}, Materializer);
}

[Fact]
public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var error = new Exception("boom");
var aggregate = Sink.Aggregate(0, (int x, int y) =>
Expand All @@ -162,18 +170,20 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_fun
var task = InputSource.RunWith(
aggregate.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)),
Materializer);
task.AwaitResult().Should().Be(Enumerable.Range(51, 50).Sum());
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(Enumerable.Range(51, 50).Sum());
}, Materializer);
}

[Fact]
public void A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
this.AssertAllStagesStopped(async() =>
{
var task = Source.From(Enumerable.Empty<int>())
.RunAggregate(0, (acc, element) => acc + element, Materializer);
task.AwaitResult().ShouldBe(0);
var complete = await task.ShouldCompleteWithin(3.Seconds());
complete.Should().Be(0);
}, Materializer);
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/core/Akka.Streams.Tests/Dsl/FlowOrElseSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
using System;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit.Extensions;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using System.Threading.Tasks;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand All @@ -28,25 +31,27 @@ public FlowOrElseSpec(ITestOutputHelper helper) : base(helper)
private ActorMaterializer Materializer { get; }

[Fact]
public void An_OrElse_flow_should_pass_elements_from_the_first_input()
public async Task An_OrElse_flow_should_pass_elements_from_the_first_input()
{
var source1 = Source.From(new[] {1, 2, 3});
var source2 = Source.From(new[] {4, 5, 6});

var sink = Sink.Seq<int>();

source1.OrElse(source2).RunWith(sink, Materializer).AwaitResult().Should().BeEquivalentTo(new[] {1, 2, 3});
var complete = await source1.OrElse(source2).RunWith(sink, Materializer).ShouldCompleteWithin(3.Seconds());
complete.Should().BeEquivalentTo(new[] { 1, 2, 3 });
}

[Fact]
public void An_OrElse_flow_should_pass_elements_from_the_second_input_if_the_first_completes_with_no_elements_emitted()
public async Task An_OrElse_flow_should_pass_elements_from_the_second_input_if_the_first_completes_with_no_elements_emitted()
{
var source1 = Source.Empty<int>();
var source2 = Source.From(new[] { 4, 5, 6 });

var sink = Sink.Seq<int>();

source1.OrElse(source2).RunWith(sink, Materializer).AwaitResult().Should().BeEquivalentTo(new[] { 4, 5, 6 });
var complete = await source1.OrElse(source2).RunWith(sink, Materializer).ShouldCompleteWithin(3.Seconds());
complete.Should().BeEquivalentTo(new[] { 4, 5, 6 });
}

[Fact]
Expand Down
7 changes: 5 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowScanAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using Akka.TestKit.Extensions;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using Decider = Akka.Streams.Supervision.Decider;
using FluentAssertions.Extensions;

namespace Akka.Streams.Tests.Dsl
{
Expand Down Expand Up @@ -59,14 +61,15 @@ public void A_ScanAsync_must_work_with_a_single_source()
}

[Fact]
public void A_ScanAsync_must_work_with_a_large_source()
public async Task A_ScanAsync_must_work_with_a_large_source()
{
var elements = Enumerable.Range(1, 100000).Select(i => (long)i).ToList();
var expectedSum = elements.Sum();
var eventualActual = Source.From(elements)
.ScanAsync(0L, (l, l1) => Task.FromResult(l + l1))
.RunWith(Sink.Last<long>(), Materializer);
eventualActual.AwaitResult().ShouldBe(expectedSum);
var complete = await eventualActual.ShouldCompleteWithin(3.Seconds());
complete.ShouldBe(expectedSum);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
Expand Down
Loading

0 comments on commit a1d663f

Please sign in to comment.