Skip to content

Commit

Permalink
Enable cancellation for anonymous pipes and non-async named pipes on …
Browse files Browse the repository at this point in the history
…Windows (dotnet#72503)

* Enable cancellation for anonymous pipes and non-async named pipes on Windows

Although ReadAsync, WriteAsync, and WaitForConnectionAsync on pipes all accept a CancellationToken, that token is only usable on Windows for canceling an in-flight operation when the pipe is using overlapped I/O.  If the pipe was created for non-overlapped I/O, as is the case for anonymous pipes and can be the case for named pipes, the token stops being useful for anything other than an up-front cancellation check.

This change fixes that by using CancelSynchronousIo to cancel the synchronous I/O performed as part of these async operations, which are implemented as async-over-sync (queueing to the thread pool a work item that performs the synchronous I/O).

(The Unix implementation already supports cancellation in these situations.)

* Address PR feedback (tweak comments)
  • Loading branch information
stephentoub authored Jul 20, 2022
1 parent c046354 commit 73c2ce3
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;

internal static partial class Interop
{
internal static partial class Kernel32
{
[LibraryImport(Libraries.Kernel32, SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
internal static unsafe partial bool CancelSynchronousIo(SafeThreadHandle hThread);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ internal static partial class Interop
{
internal static partial class Kernel32
{
internal const int THREAD_TERMINATE = 0x0001;

[LibraryImport(Libraries.Kernel32, SetLastError = true)]
internal static partial SafeThreadHandle OpenThread(int access, [MarshalAs(UnmanagedType.Bool)] bool inherit, int threadId);
internal static partial SafeThreadHandle OpenThread(int dwDesiredAccess, [MarshalAs(UnmanagedType.Bool)] bool bInheritHandle, int dwThreadId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Runtime.InteropServices;

namespace Microsoft.Win32.SafeHandles
{
internal sealed class SafeThreadHandle : SafeHandle
{
public SafeThreadHandle() : base(invalidHandleValue: 0, ownsHandle: true) { }

public override bool IsInvalid => handle is 0 or -1;

protected override bool ReleaseHandle() => Interop.Kernel32.CloseHandle(handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,29 +529,33 @@ protected async Task ValidatePrecanceledOperations_ThrowsCancellationException(S
}
}

protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(Stream stream)
protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay)
{
if (!stream.CanRead || !FullyCancelableOperations)
{
return;
}

var cts = new CancellationTokenSource();

Task<int> t = stream.ReadAsync(new byte[1], 0, 1, cts.Token);
cts.Cancel();

cts.CancelAfter(cancellationDelay);
await AssertCanceledAsync(cts.Token, () => t);
}

protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream)
protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay)
{
if (!stream.CanRead || !FullyCancelableOperations)
{
return;
}

var cts = new CancellationTokenSource();

Task<int> t = stream.ReadAsync(new byte[1], cts.Token).AsTask();
cts.Cancel();

cts.CancelAfter(cancellationDelay);
await AssertCanceledAsync(cts.Token, () => t);
}

Expand Down Expand Up @@ -1671,26 +1675,30 @@ public virtual async Task ReadWriteAsync_PrecanceledOperations_ThrowsCancellatio
}
}

[Fact]
[Theory]
[InlineData(0)]
[InlineData(100)]
[ActiveIssue("https://github.com/dotnet/runtime/issues/67853", TestPlatforms.tvOS)]
[SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")]
public virtual async Task ReadAsync_CancelPendingTask_ThrowsCancellationException()
public virtual async Task ReadAsync_CancelPendingTask_ThrowsCancellationException(int cancellationDelay)
{
using StreamPair streams = await CreateConnectedStreamsAsync();
(Stream writeable, Stream readable) = GetReadWritePair(streams);

await ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(readable);
await ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(readable, cancellationDelay);
}

[Fact]
[Theory]
[InlineData(0)]
[InlineData(100)]
[ActiveIssue("https://github.com/dotnet/runtime/issues/67853", TestPlatforms.tvOS)]
[SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")]
public virtual async Task ReadAsync_CancelPendingValueTask_ThrowsCancellationException()
public virtual async Task ReadAsync_CancelPendingValueTask_ThrowsCancellationException(int cancellationDelay)
{
using StreamPair streams = await CreateConnectedStreamsAsync();
(Stream writeable, Stream readable) = GetReadWritePair(streams);

await ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(readable);
await ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(readable, cancellationDelay);
}

[Fact]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@
Link="Common\Interop\Windows\Interop.MAX_PATH.cs" />
<Compile Include="$(CommonPath)System\HResults.cs"
Link="Common\System\HResults.cs" />
<Compile Include="$(CommonPath)Microsoft\Win32\SafeHandles\SafeThreadHandle.cs"
Link="Microsoft\Win32\SafeHandles\SafeThreadHandle.cs" />
<Compile Include="Microsoft\Win32\SafeHandles\SafeProcessHandle.Windows.cs" />
<Compile Include="Microsoft\Win32\SafeHandles\SafeThreadHandle.cs" />
<Compile Include="System\Diagnostics\PerformanceCounterLib.cs" />
<Compile Include="System\Diagnostics\Process.Windows.cs" />
<Compile Include="System\Diagnostics\ProcessManager.Windows.cs" />
Expand Down
22 changes: 14 additions & 8 deletions src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
Link="Common\System\Threading\Tasks\TaskToApm.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'windows'">
<Compile Include="$(CommonPath)Microsoft\Win32\SafeHandles\SafeThreadHandle.cs"
Link="Microsoft\Win32\SafeHandles\SafeThreadHandle.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Interop.Libraries.cs"
Link="Common\Interop\Windows\Interop.Libraries.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CloseHandle.cs"
Expand Down Expand Up @@ -94,12 +96,23 @@
Link="Common\Interop\Windows\Interop.ImpersonateNamedPipeClient.cs" />
<Compile Include="$(CommonPath)System\IO\Win32Marshal.cs"
Link="Common\CoreLib\System\IO\Win32Marshal.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CreateNamedPipeClient.cs"
Link="Common\Interop\Windows\Interop.CreateNamedPipeClient.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.LoadLibraryEx_IntPtr.cs"
Link="Common\Interop\Windows\Interop.LoadLibraryEx_IntPtr.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.OpenThread.cs"
Link="Common\Interop\Windows\Interop.OpenThread.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CancelSynchronousIo.cs"
Link="Common\Interop\Windows\Interop.CancelSynchronousIo.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetCurrentThreadId.cs"
Link="Common\Interop\Windows\Interop.GetCurrentThreadId.cs" />
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Windows.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Win32.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRights.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRule.cs" />
<Compile Include="System\IO\Pipes\PipeAuditRule.cs" />
Expand All @@ -108,14 +121,6 @@
<Compile Include="System\IO\Pipes\PipeStream.ValueTaskSource.cs" />
<Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
</ItemGroup>
<!-- Windows : Win32 only -->
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'windows'">
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CreateNamedPipeClient.cs"
Link="Common\Interop\Windows\Interop.CreateNamedPipeClient.cs" />
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.LoadLibraryEx_IntPtr.cs"
Link="Common\Interop\Windows\Interop.LoadLibraryEx_IntPtr.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Win32.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'Unix'">
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Unix.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Unix.cs" />
Expand Down Expand Up @@ -176,6 +181,7 @@
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'windows'">
<Reference Include="System.Collections.NonGeneric" />
<Reference Include="System.Security.Claims" />
<Reference Include="System.Threading.ThreadPool" />
</ItemGroup>
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'Unix'">
<Reference Include="Microsoft.Win32.Primitives" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Security.AccessControl;
Expand Down Expand Up @@ -168,8 +169,7 @@ public void WaitForConnection()

if (IsAsync)
{
ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None);
vt.AsTask().GetAwaiter().GetResult();
WaitForConnectionCoreAsync(CancellationToken.None).AsTask().GetAwaiter().GetResult();
}
else
{
Expand All @@ -183,33 +183,58 @@ public void WaitForConnection()
}

// pipe already connected
if (errorCode == Interop.Errors.ERROR_PIPE_CONNECTED && State == PipeState.Connected)
if (State == PipeState.Connected)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected);
}

// If we reach here then a connection has been established. This can happen if a client
// connects in the interval between the call to CreateNamedPipe and the call to ConnectNamedPipe.
// In this situation, there is still a good connection between client and server, even though
// ConnectNamedPipe returns zero.
}

State = PipeState.Connected;
}
}

public Task WaitForConnectionAsync(CancellationToken cancellationToken)
public Task WaitForConnectionAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) :
IsAsync ? WaitForConnectionCoreAsync(cancellationToken).AsTask() :
AsyncOverSyncWaitForConnection(cancellationToken);

private async Task AsyncOverSyncWaitForConnection(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
// Create the work item state object. This is used to pass around state through various APIs,
// while also serving double duty as the work item used to queue the operation to the thread pool.
var workItem = new SyncAsyncWorkItem();

// Queue the work to the thread pool. This is implemented as a custom awaiter that queues the
// awaiter itself to the thread pool.
await workItem;

if (!IsAsync)
// Register for cancellation.
using (workItem.RegisterCancellation(cancellationToken))
{
return Task.Factory.StartNew(s => ((NamedPipeServerStream)s!).WaitForConnection(),
this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
try
{
// Perform the wait.
WaitForConnection();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// If the write fails because of cancellation, it will have been a Win32 error code
// that WriteCore translated into an OperationCanceledException without a stored
// CancellationToken. We want to ensure the token is stored.
throw new OperationCanceledException(cancellationToken);
}
finally
{
// Prior to calling Dispose on the CancellationTokenRegistration, we need to tell
// the registration callback to exit if it's currently running; otherwise, we could deadlock.
workItem.ContinueTryingToCancel = false;
}
}

return WaitForConnectionCoreAsync(cancellationToken).AsTask();
}

public void Disconnect()
Expand Down
Loading

0 comments on commit 73c2ce3

Please sign in to comment.