Skip to content

Commit

Permalink
Ensure continuations run with sync task completion (ARCH-98) (#2)
Browse files Browse the repository at this point in the history
Motivation
----------
There is currently a corner case when continuations are queued by a
synchronously completing task where those continuations are never
executed.

Modifications
-------------
When the task completes synchronously queue all remaining continuations
on the parent synchronization context.

Switch build/tests to .NET 6 and language to C# 10.

https://centeredge.atlassian.net/browse/ARCH-98
  • Loading branch information
brantburnett authored Mar 11, 2022
1 parent d9160bb commit 6f692d4
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 32 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Install GitVersion
uses: gittools/actions/gitversion/setup@v0.9.10
with:
versionSpec: "5.7.0"
versionSpec: "5.8.0"

- name: Determine Version
id: gitversion
Expand All @@ -31,7 +31,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@v1
with:
dotnet-version: 5.0.x
dotnet-version: 6.0.x

# Cache packages for faster subsequent runs
- uses: actions/cache@v2
Expand All @@ -53,14 +53,14 @@ jobs:

- name: Test
working-directory: ./src
run: dotnet test --no-build -c Release -f net5.0 -l 'trx;LogFileName=${{ runner.temp }}/results.trx' ./CenterEdge.Async.sln
run: dotnet test --no-build -c Release -f net6.0 -l 'trx;LogFileName=results.trx' ./CenterEdge.Async.sln

- name: Test Report
uses: dorny/test-reporter@v1
if: success() || failure() # run this step even if previous step failed
with:
name: Unit Tests
path: ${{ runner.temp }}/results.trx
path: src/**/results.trx
reporter: dotnet-trx

- name: Pack
Expand Down
20 changes: 20 additions & 0 deletions .github/workflows/cleanup-packages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Cleanup packages

on:
schedule:
- cron: '21 0 * * 6'
workflow_dispatch:

jobs:
clean-pr-packages:

runs-on: ubuntu-latest
permissions:
packages: write

steps:
- uses: actions/delete-package-versions@v2
with:
package-name: CenterEdge.Async
min-versions-to-keep: 30
ignore-versions: ^(?!.*ci-pr).*$
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net48;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>net48;netcoreapp3.1;net6.0</TargetFrameworks>
<OutputType>Exe</OutputType>
<LangVersion>9</LangVersion>
<LangVersion>10</LangVersion>
</PropertyGroup>
<PropertyGroup>
<PlatformTarget>AnyCPU</PlatformTarget>
Expand Down
138 changes: 138 additions & 0 deletions src/CenterEdge.Async.UnitTests/AsyncHelperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,41 @@ public void RunSync_Task_DoesAllTasks()
Assert.Equal(3, i);
}

[Fact]
public async Task RunSync_StartsTasksAndCompletesSynchronously_DoesAllTasks()
{
// Replicates the case where continuations are queued but the main task completes synchronously
// so the work must be removed from the queue

// Arrange

var i = 0;

async Task IncrementAsync()
{
await Task.Yield();
Interlocked.Increment(ref i);
}

// Act
AsyncHelper.RunSync(() =>
{
#pragma warning disable CS4014
for (var j = 0; j < 3; j++)
{
var _ = IncrementAsync();
}
#pragma warning restore CS4014
return Task.CompletedTask;
});

// Assert

await Task.Delay(500);
Assert.Equal(3, i);
}

[Fact]
public void RunSync_Task_ConfigureAwaitFalse_DoesAllTasks()
{
Expand Down Expand Up @@ -157,7 +192,40 @@ public void RunSync_ValueTask_DoesAllTasks()
Assert.Equal(3, i);
}

[Fact]
public async Task RunSync_ValueTask_StartsTasksAndCompletesSynchronously_DoesAllTasks()
{
// Replicates the case where continuations are queued but the main task completes synchronously
// so the work must be removed from the queue

// Arrange

var i = 0;

async Task IncrementAsync()
{
await Task.Yield();
Interlocked.Increment(ref i);
}

// Act
AsyncHelper.RunSync(() =>
{
#pragma warning disable CS4014
for (var j = 0; j < 3; j++)
{
var _ = IncrementAsync();
}
#pragma warning restore CS4014
return new ValueTask();
});

// Assert

await Task.Delay(500);
Assert.Equal(3, i);
}

[Fact]
public void RunSync_ValueTask_ConfigureAwaitFalse_DoesAllTasks()
Expand Down Expand Up @@ -281,6 +349,41 @@ public void RunSync_TaskT_DoesAllTasks()
Assert.Equal(3, result);
}

[Fact]
public async Task RunSync_TaskT_StartsTasksAndCompletesSynchronously_DoesAllTasks()
{
// Replicates the case where continuations are queued but the main task completes synchronously
// so the work must be removed from the queue

// Arrange

var i = 0;

async Task IncrementAsync()
{
await Task.Yield();
Interlocked.Increment(ref i);
}

// Act
AsyncHelper.RunSync(() =>
{
#pragma warning disable CS4014
for (var j = 0; j < 3; j++)
{
var _ = IncrementAsync();
}
#pragma warning restore CS4014
return Task.FromResult(true);
});

// Assert

await Task.Delay(500);
Assert.Equal(3, i);
}

[Fact]
public void RunSync_TaskT_ConfigureAwaitFalse_DoesAllTasks()
{
Expand Down Expand Up @@ -402,6 +505,41 @@ public void RunSync_ValueTaskT_DoesAllTasks()
Assert.Equal(3, result);
}

[Fact]
public async Task RunSync_ValueTaskT_StartsTasksAndCompletesSynchronously_DoesAllTasks()
{
// Replicates the case where continuations are queued but the main task completes synchronously
// so the work must be removed from the queue

// Arrange

var i = 0;

async Task IncrementAsync()
{
await Task.Yield();
Interlocked.Increment(ref i);
}

// Act
AsyncHelper.RunSync(() =>
{
#pragma warning disable CS4014
for (var j = 0; j < 3; j++)
{
var _ = IncrementAsync();
}
#pragma warning restore CS4014
return new ValueTask<bool>(true);
});

// Assert

await Task.Delay(500);
Assert.Equal(3, i);
}

[Fact]
public void RunSync_ValueTaskT_ConfigureAwaitFalse_DoesAllTasks()
{
Expand Down
10 changes: 5 additions & 5 deletions src/CenterEdge.Async.UnitTests/CenterEdge.Async.UnitTests.csproj
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net48;net5.0</TargetFrameworks>
<TargetFrameworks>net48;net6.0</TargetFrameworks>

<LangVersion>9</LangVersion>
<LangVersion>10</LangVersion>
<Nullable>warnings</Nullable>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="coverlet.collector" Version="3.0.3">
<PackageReference Include="coverlet.collector" Version="3.1.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="Moq" Version="4.16.1" />
<PackageReference Include="Moq" Version="4.17.2" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
Expand Down
73 changes: 54 additions & 19 deletions src/CenterEdge.Async/AsyncHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public static void RunSync(Func<Task> task)
{
synch.Run(awaiter);
}
else
{
synch.RunAlreadyComplete();
}

// Throw any exception returned by the task
awaiter.GetResult();
Expand Down Expand Up @@ -68,6 +72,10 @@ public static void RunSync(Func<ValueTask> task)
{
synch.Run(awaiter);
}
else
{
synch.RunAlreadyComplete();
}

// Throw any exception returned by the task
awaiter.GetResult();
Expand Down Expand Up @@ -100,6 +108,10 @@ public static T RunSync<T>(Func<Task<T>> task)
{
synch.Run(awaiter);
}
else
{
synch.RunAlreadyComplete();
}

// Throw any exception returned by the task or return the result
return awaiter.GetResult();
Expand Down Expand Up @@ -132,6 +144,10 @@ public static T RunSync<T>(Func<ValueTask<T>> task)
{
synch.Run(awaiter);
}
else
{
synch.RunAlreadyComplete();
}

// Throw any exception returned by the task or return the result
return awaiter.GetResult();
Expand Down Expand Up @@ -178,25 +194,7 @@ public override void Post(SendOrPostCallback d, object? state)
// This can occur if the main task starts additional work which isn't completed
// before the main task completes.

if (_parentSynchronizationContext != null)
{
_parentSynchronizationContext.Post(d, state);
}
else
{
// There is no parent sync context, so use the default behavior from the default
// SynchronizationContext and post to the thread pool.

#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
ThreadPool.QueueUserWorkItem(static s => s.d(s.state), (d, state), preferLocal: false);
#else
ThreadPool.QueueUserWorkItem(static s =>
{
var state = ((SendOrPostCallback d, object? state))s;
state.d(state.state);
}, (d, state));
#endif
}
ExecuteOnParent(d, state);
}

private void EndMessageLoop()
Expand Down Expand Up @@ -228,6 +226,43 @@ public void Run(TAwaiter awaiter)
}
}

// Processes any remaining continuations in the queue
public void RunAlreadyComplete()
{
EndMessageLoop();

while (!_items.IsCompleted)
{
var task = _items.Take();

ExecuteOnParent(task.Callback, task.State);
}
}

// Executes a work item on the parent SynchronizationContext or on the thread pool if there is not one
private void ExecuteOnParent(SendOrPostCallback callback, object? state)
{
if (_parentSynchronizationContext != null)
{
_parentSynchronizationContext.Post(callback, state);
}
else
{
// There is no parent sync context, so use the default behavior from the default
// SynchronizationContext and post to the thread pool.

#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
ThreadPool.QueueUserWorkItem(static s => s.callback(s.state), (callback, state), preferLocal: false);
#else
ThreadPool.QueueUserWorkItem(static s =>
{
var state = ((SendOrPostCallback callback, object? state))s;
state.callback(state.state);
}, (callback, state));
#endif
}
}

public override SynchronizationContext CreateCopy()
{
return this;
Expand Down
2 changes: 1 addition & 1 deletion src/CenterEdge.Async/CenterEdge.Async.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net5.0</TargetFrameworks>

<LangVersion>9</LangVersion>
<LangVersion>10</LangVersion>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand Down

0 comments on commit 6f692d4

Please sign in to comment.