Skip to content

Commit

Permalink
Akka.Streams: fix race conditions with synchronous file sink specs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored Mar 27, 2023
1 parent 091b817 commit d1299d0
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Akka.Streams.Tests.IO
public class FileSinkSpec : AkkaSpec
{
private readonly ActorMaterializer _materializer;
private readonly List<string> _testLines = new List<string>();
private readonly List<string> _testLines = new();
private readonly List<ByteString> _testByteStrings;
private readonly TimeSpan _expectTimeout = TimeSpan.FromSeconds(10);

Expand Down Expand Up @@ -263,6 +263,7 @@ await TargetFileAsync(f =>

// haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever
Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name);
return Task.CompletedTask;
}
finally
{
Expand Down Expand Up @@ -296,6 +297,7 @@ await TargetFileAsync(f =>
((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor);
var actorRef = ExpectMsg<StreamSupervisor.Children>().Refs.First(@ref => @ref.Path.ToString().Contains("File"));
Utils.AssertDispatcher(actorRef, "akka.actor.default-dispatcher");
return Task.CompletedTask;
}
finally
{
Expand Down Expand Up @@ -331,7 +333,7 @@ await AwaitAssertAsync(
[Fact]
public async Task SynchronousFileSink_should_complete_materialized_task_with_an_exception_when_upstream_fails()
{
await TargetFileAsync(f =>
await TargetFileAsync(async f =>
{
var completion = Source.From(_testByteStrings)
.Select(bytes =>
Expand All @@ -344,18 +346,24 @@ await TargetFileAsync(f =>
var ex = Intercept<AbruptIOTerminationException>(() => completion.Wait(TimeSpan.FromSeconds(3)));
ex.IoResult.Count.ShouldBe(1001);
CheckFileContent(f, string.Join("", _testLines.TakeWhile(s => !s.Contains('b'))));
await Task.CompletedTask;
}, _materializer);
}

[Fact]
public async Task SynchronousFileSink_should_complete_with_failure_when_file_cannot_be_open()
{
await TargetFileAsync(f =>
await TargetFileAsync(async f =>
{
var completion = Source.Single(ByteString.FromString("42"))
.RunWith(FileIO.ToFile(new FileInfo("I-hope-this-file-doesnt-exist.txt"), FileMode.Open), _materializer);

AssertThrows<FileNotFoundException>(completion.Wait);
async Task Exec()
{
await completion;
}

await Exec().ShouldThrowWithin<FileNotFoundException>(RemainingOrDefault);
}, _materializer);
}

Expand All @@ -377,9 +385,10 @@ await TargetFileAsync(async f =>
actor.Tell("a\n");
actor.Tell("b\n");

await AwaitAssertAsync(async() =>
await AwaitAssertAsync(() =>
{
CheckFileContent(f, "a\nb\n");
return Task.CompletedTask;
}, Remaining);

actor.Tell("a\n");
Expand All @@ -390,20 +399,20 @@ await AwaitAssertAsync(async() =>
// We still have to wait for the task to complete, because the signal
// came from the FileSink actor, not the source actor.
await task.ShouldCompleteWithin(Remaining);
ExpectTerminated(actor, Remaining);
await ExpectTerminatedAsync(actor, Remaining);

f.Length.ShouldBe(8);
CheckFileContent(f, "a\nb\na\nb\n");
}, _materializer);
});
}

[Fact(Skip = "Skipped for async_testkit conversion build")]
[Fact]
public void SynchronousFileSink_should_write_buffered_element_if_manual_flush_is_called()
{
this.AssertAllStagesStopped(async() =>
{
await TargetFileAsync(f =>
await TargetFileAsync(async f =>
{
var flusher = new FlushSignaler();
var (actor, task) = Source.ActorRef<string>(64, OverflowStrategy.DropNew)
Expand All @@ -412,23 +421,23 @@ await TargetFileAsync(f =>
FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: 0, flushSignaler:flusher),
(a, t) => (a, t))
.Run(_materializer);
Thread.Sleep(100); // wait for stream to catch up
await Task.Delay(100); // wait for stream to catch up

actor.Tell("a\n");
actor.Tell("b\n");
Thread.Sleep(200); // wait for stream to catch up
await Task.Delay(200); // wait for stream to catch up

flusher.Flush();
Thread.Sleep(100); // wait for flush
await Task.Delay(100); // wait for flush
CheckFileContent(f, "a\nb\n"); // file should be flushed

actor.Tell("c\n");
actor.Tell("d\n");
Thread.Sleep(200); // wait for stream to catch up
await Task.Delay(200); // wait for stream to catch up
CheckFileContent(f, "a\nb\n"); // file content should not change

flusher.Flush();
Thread.Sleep(100); // wait for flush
await Task.Delay(100); // wait for flush
CheckFileContent(f, "a\nb\nc\nd\n"); // file content should all be flushed

actor.Tell(new Status.Success(NotUsed.Instance));
Expand All @@ -440,11 +449,11 @@ await TargetFileAsync(f =>
}

private async Task TargetFileAsync(
Action<FileInfo> block,
Func<FileInfo, Task> block,
ActorMaterializer materializer,
bool create = true)
{
var targetFile = new FileInfo(Path.Combine(Path.GetTempPath(), "synchronous-file-sink.tmp"));
var targetFile = new FileInfo(Path.Combine(Path.GetTempPath(), $"synchronous-file-sink-{Guid.NewGuid()}.tmp"));

if (!create)
targetFile.Delete();
Expand All @@ -453,7 +462,7 @@ private async Task TargetFileAsync(

try
{
block(targetFile);
await block(targetFile);
}
finally
{
Expand Down

0 comments on commit d1299d0

Please sign in to comment.