diff --git a/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.CancelSynchronousIo.cs b/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.CancelSynchronousIo.cs new file mode 100644 index 0000000000000..78984484ddfe5 --- /dev/null +++ b/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.CancelSynchronousIo.cs @@ -0,0 +1,17 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using Microsoft.Win32.SafeHandles; + +internal static partial class Interop +{ + internal static partial class Kernel32 + { + [LibraryImport(Libraries.Kernel32, SetLastError = true)] + [return: MarshalAs(UnmanagedType.Bool)] + internal static unsafe partial bool CancelSynchronousIo(SafeThreadHandle hThread); + } +} diff --git a/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.OpenThread.cs b/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.OpenThread.cs index f608aaecd77a6..7009218ef1783 100644 --- a/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.OpenThread.cs +++ b/src/libraries/Common/src/Interop/Windows/Kernel32/Interop.OpenThread.cs @@ -8,7 +8,9 @@ internal static partial class Interop { internal static partial class Kernel32 { + internal const int THREAD_TERMINATE = 0x0001; + [LibraryImport(Libraries.Kernel32, SetLastError = true)] - internal static partial SafeThreadHandle OpenThread(int access, [MarshalAs(UnmanagedType.Bool)] bool inherit, int threadId); + internal static partial SafeThreadHandle OpenThread(int dwDesiredAccess, [MarshalAs(UnmanagedType.Bool)] bool bInheritHandle, int dwThreadId); } } diff --git a/src/libraries/Common/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs b/src/libraries/Common/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs new file mode 100644 index 0000000000000..6c8e58fb618c8 --- /dev/null +++ b/src/libraries/Common/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs @@ -0,0 +1,17 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Runtime.InteropServices; + +namespace Microsoft.Win32.SafeHandles +{ + internal sealed class SafeThreadHandle : SafeHandle + { + public SafeThreadHandle() : base(invalidHandleValue: 0, ownsHandle: true) { } + + public override bool IsInvalid => handle is 0 or -1; + + protected override bool ReleaseHandle() => Interop.Kernel32.CloseHandle(handle); + } +} diff --git a/src/libraries/Common/tests/StreamConformanceTests/System/IO/StreamConformanceTests.cs b/src/libraries/Common/tests/StreamConformanceTests/System/IO/StreamConformanceTests.cs index 93339e7732cd7..4b4be02959467 100644 --- a/src/libraries/Common/tests/StreamConformanceTests/System/IO/StreamConformanceTests.cs +++ b/src/libraries/Common/tests/StreamConformanceTests/System/IO/StreamConformanceTests.cs @@ -529,7 +529,7 @@ protected async Task ValidatePrecanceledOperations_ThrowsCancellationException(S } } - protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(Stream stream) + protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay) { if (!stream.CanRead || !FullyCancelableOperations) { @@ -537,12 +537,14 @@ protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCance } var cts = new CancellationTokenSource(); + Task t = stream.ReadAsync(new byte[1], 0, 1, cts.Token); - cts.Cancel(); + + cts.CancelAfter(cancellationDelay); await AssertCanceledAsync(cts.Token, () => t); } - protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream) + protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay) { if (!stream.CanRead || !FullyCancelableOperations) { @@ -550,8 +552,10 @@ protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_Throws } var cts = new CancellationTokenSource(); + Task t = stream.ReadAsync(new byte[1], cts.Token).AsTask(); - cts.Cancel(); + + cts.CancelAfter(cancellationDelay); await AssertCanceledAsync(cts.Token, () => t); } @@ -1671,26 +1675,30 @@ public virtual async Task ReadWriteAsync_PrecanceledOperations_ThrowsCancellatio } } - [Fact] + [Theory] + [InlineData(0)] + [InlineData(100)] [ActiveIssue("https://github.com/dotnet/runtime/issues/67853", TestPlatforms.tvOS)] [SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")] - public virtual async Task ReadAsync_CancelPendingTask_ThrowsCancellationException() + public virtual async Task ReadAsync_CancelPendingTask_ThrowsCancellationException(int cancellationDelay) { using StreamPair streams = await CreateConnectedStreamsAsync(); (Stream writeable, Stream readable) = GetReadWritePair(streams); - await ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(readable); + await ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(readable, cancellationDelay); } - [Fact] + [Theory] + [InlineData(0)] + [InlineData(100)] [ActiveIssue("https://github.com/dotnet/runtime/issues/67853", TestPlatforms.tvOS)] [SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")] - public virtual async Task ReadAsync_CancelPendingValueTask_ThrowsCancellationException() + public virtual async Task ReadAsync_CancelPendingValueTask_ThrowsCancellationException(int cancellationDelay) { using StreamPair streams = await CreateConnectedStreamsAsync(); (Stream writeable, Stream readable) = GetReadWritePair(streams); - await ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(readable); + await ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(readable, cancellationDelay); } [Fact] diff --git a/src/libraries/System.Diagnostics.Process/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs b/src/libraries/System.Diagnostics.Process/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs deleted file mode 100644 index 34f3df7885f96..0000000000000 --- a/src/libraries/System.Diagnostics.Process/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs +++ /dev/null @@ -1,38 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -/*============================================================ -** -** Class: SafeThreadHandle -** -** -** A wrapper for a thread handle -** -** -===========================================================*/ - -using System; -using System.Diagnostics; -using System.Runtime.InteropServices; -using System.Security; - -namespace Microsoft.Win32.SafeHandles -{ - internal sealed class SafeThreadHandle : SafeHandle - { - public SafeThreadHandle() - : base(new IntPtr(0), true) - { - } - - public override bool IsInvalid - { - get { return handle == IntPtr.Zero || handle == new IntPtr(-1); } - } - - protected override bool ReleaseHandle() - { - return Interop.Kernel32.CloseHandle(handle); - } - } -} diff --git a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj index b605a138a1d7d..2c273820afb4c 100644 --- a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj +++ b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj @@ -211,8 +211,9 @@ Link="Common\Interop\Windows\Interop.MAX_PATH.cs" /> + - diff --git a/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj b/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj index 8a69c548ea170..17d78c6923f8b 100644 --- a/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj +++ b/src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj @@ -28,6 +28,8 @@ Link="Common\System\Threading\Tasks\TaskToApm.cs" /> + + + + + + + @@ -108,14 +121,6 @@ - - - - - - @@ -176,6 +181,7 @@ + diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs index 142a1b1921cde..c9bdc35c09aaf 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Security.AccessControl; @@ -168,8 +169,7 @@ public void WaitForConnection() if (IsAsync) { - ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None); - vt.AsTask().GetAwaiter().GetResult(); + WaitForConnectionCoreAsync(CancellationToken.None).AsTask().GetAwaiter().GetResult(); } else { @@ -183,33 +183,58 @@ public void WaitForConnection() } // pipe already connected - if (errorCode == Interop.Errors.ERROR_PIPE_CONNECTED && State == PipeState.Connected) + if (State == PipeState.Connected) { throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected); } + // If we reach here then a connection has been established. This can happen if a client // connects in the interval between the call to CreateNamedPipe and the call to ConnectNamedPipe. // In this situation, there is still a good connection between client and server, even though // ConnectNamedPipe returns zero. } + State = PipeState.Connected; } } - public Task WaitForConnectionAsync(CancellationToken cancellationToken) + public Task WaitForConnectionAsync(CancellationToken cancellationToken) => + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + IsAsync ? WaitForConnectionCoreAsync(cancellationToken).AsTask() : + AsyncOverSyncWaitForConnection(cancellationToken); + + private async Task AsyncOverSyncWaitForConnection(CancellationToken cancellationToken) { - if (cancellationToken.IsCancellationRequested) - { - return Task.FromCanceled(cancellationToken); - } + // Create the work item state object. This is used to pass around state through various APIs, + // while also serving double duty as the work item used to queue the operation to the thread pool. + var workItem = new SyncAsyncWorkItem(); + + // Queue the work to the thread pool. This is implemented as a custom awaiter that queues the + // awaiter itself to the thread pool. + await workItem; - if (!IsAsync) + // Register for cancellation. + using (workItem.RegisterCancellation(cancellationToken)) { - return Task.Factory.StartNew(s => ((NamedPipeServerStream)s!).WaitForConnection(), - this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + try + { + // Perform the wait. + WaitForConnection(); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // If the write fails because of cancellation, it will have been a Win32 error code + // that WriteCore translated into an OperationCanceledException without a stored + // CancellationToken. We want to ensure the token is stored. + throw new OperationCanceledException(cancellationToken); + } + finally + { + // Prior to calling Dispose on the CancellationTokenRegistration, we need to tell + // the registration callback to exit if it's currently running; otherwise, we could deadlock. + workItem.ContinueTryingToCancel = false; + } } - - return WaitForConnectionCoreAsync(cancellationToken).AsTask(); } public void Disconnect() diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs index 043ad40323515..1aef6b5566306 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Runtime.Versioning; @@ -66,27 +67,19 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel CheckReadOperations(); - if (!_isAsync) - { - return base.ReadAsync(buffer, offset, count, cancellationToken); - } - if (count == 0) { UpdateMessageCompletion(false); return Task.FromResult(0); } - return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask(); + return _isAsync ? + ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask() : + AsyncOverSyncRead(new Memory(buffer, offset, count), cancellationToken).AsTask(); } public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken)) { - if (!_isAsync) - { - return base.ReadAsync(buffer, cancellationToken); - } - if (!CanRead) { throw Error.GetReadNotSupported(); @@ -105,7 +98,9 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel return new ValueTask(0); } - return ReadAsyncCore(buffer, cancellationToken); + return _isAsync ? + ReadAsyncCore(buffer, cancellationToken) : + AsyncOverSyncRead(buffer, cancellationToken); } public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) @@ -174,26 +169,14 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati CheckWriteOperations(); - if (!_isAsync) - { - return base.WriteAsync(buffer, offset, count, cancellationToken); - } - - if (count == 0) - { - return Task.CompletedTask; - } - - return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); + return + count == 0 ? Task.CompletedTask : + _isAsync ? WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask() : + AsyncOverSyncWrite(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); } public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken)) { - if (!_isAsync) - { - return base.WriteAsync(buffer, cancellationToken); - } - if (!CanWrite) { throw Error.GetWriteNotSupported(); @@ -206,12 +189,10 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati CheckWriteOperations(); - if (buffer.Length == 0) - { - return default; - } - - return WriteAsyncCore(buffer, cancellationToken); + return + buffer.Length == 0 ? default : + _isAsync ? WriteAsyncCore(buffer, cancellationToken) : + AsyncOverSyncWrite(buffer, cancellationToken); } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) @@ -230,6 +211,191 @@ public override void EndWrite(IAsyncResult asyncResult) base.EndWrite(asyncResult); } + /// Initiates an async-over-sync read for a pipe opened for non-overlapped I/O. + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + private async ValueTask AsyncOverSyncRead(Memory buffer, CancellationToken cancellationToken) + { + // Create the work item state object. This is used to pass around state through various APIs, + // while also serving double duty as the work item used to queue the operation to the thread pool. + var workItem = new SyncAsyncWorkItem(); + + // Queue the work to the thread pool. This is implemented as a custom awaiter that queues the + // awaiter itself to the thread pool. + await workItem; + + // Register for cancellation. + using (workItem.RegisterCancellation(cancellationToken)) + { + try + { + // Perform the read. + return ReadCore(buffer.Span); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // If the read fails because of cancellation, it will have been a Win32 error code + // that ReadCore translated into an OperationCanceledException without a stored + // CancellationToken. We want to ensure the token is stored. + throw new OperationCanceledException(cancellationToken); + } + finally + { + // Prior to calling Dispose on the CancellationTokenRegistration, we need to tell + // the registration callback to exit if it's currently running; otherwise, we could deadlock. + workItem.ContinueTryingToCancel = false; + } + } + } + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask AsyncOverSyncWrite(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + // Create the work item state object. This is used to pass around state through various APIs, + // while also serving double duty as the work item used to queue the operation to the thread pool. + var workItem = new SyncAsyncWorkItem(); + + // Queue the work to the thread pool. This is implemented as a custom awaiter that queues the + // awaiter itself to the thread pool. + await workItem; + + // Register for cancellation. + using (workItem.RegisterCancellation(cancellationToken)) + { + try + { + // Perform the write. + WriteCore(buffer.Span); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // If the write fails because of cancellation, it will have been a Win32 error code + // that WriteCore translated into an OperationCanceledException without a stored + // CancellationToken. We want to ensure the token is stored. + throw new OperationCanceledException(cancellationToken); + } + finally + { + // Prior to calling Dispose on the CancellationTokenRegistration, we need to tell + // the registration callback to exit if it's currently running; otherwise, we could deadlock. + workItem.ContinueTryingToCancel = false; + } + } + } + + /// + /// State object used for implementing async pipe operations as async-over-sync + /// (queueing a work item to invoke a synchronous operation). + /// + private protected sealed class SyncAsyncWorkItem : IThreadPoolWorkItem, ICriticalNotifyCompletion + { + /// A thread handle for the current OS thread. + /// This is lazily-initialized for the current OS thread. We rely on finalization to clean up after it when the thread goes away. + [ThreadStatic] + private static SafeThreadHandle? t_currentThreadHandle; + + /// The OS handle of the thread performing the I/O. + public SafeThreadHandle? ThreadHandle; + + /// Whether the call to CancellationToken.UnsafeRegister completed. + public volatile bool FinishedCancellationRegistration; + /// Whether the I/O operation has finished (successfully or unsuccessfully) and is requesting cancellation attempts stop. + public volatile bool ContinueTryingToCancel = true; + /// The Action continuation object handed to this instance when used as an awaiter to scheduler work to the thread pool. + private Action? _continuation; + + // awaitable / awaiter implementation that enables this instance to be awaited in order to queue + // execution to the thread pool. This is purely a cost-saving measure in order to reuse this + // object we already need as the queued work item. + public SyncAsyncWorkItem GetAwaiter() => this; + public bool IsCompleted => false; + public void GetResult() { } + public void OnCompleted(Action continuation) => throw new NotSupportedException(); + public void UnsafeOnCompleted(Action continuation) + { + Debug.Assert(_continuation is null); + _continuation = continuation; + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true); + } + void IThreadPoolWorkItem.Execute() => _continuation!(); + + /// Registers for cancellation with the specified token. + /// Upon cancellation being requested, the implementation will attempt to CancelSynchronousIo for the thread calling RegisterCancellation. + public CancellationTokenRegistration RegisterCancellation(CancellationToken cancellationToken) + { + // If the token can't be canceled, there's nothing to register. + if (!cancellationToken.CanBeCanceled) + { + return default; + } + + // Get a handle for the current thread. This is stored and used to cancel the I/O on this thread + // in response to the cancellation token having cancellation requested. If the handle is invalid, + // which could happen if OpenThread fails, skip attempts at cancellation. The handle needs to be + // opened with THREAD_TERMINATE in order to be able to call CancelSynchronousIo. + ThreadHandle = t_currentThreadHandle ??= Interop.Kernel32.OpenThread(Interop.Kernel32.THREAD_TERMINATE, bInheritHandle: false, Interop.Kernel32.GetCurrentThreadId()); + if (ThreadHandle.IsInvalid) + { + return default; + } + + // Register with the token. + CancellationTokenRegistration reg = cancellationToken.UnsafeRegister(static s => + { + var state = (SyncAsyncWorkItem)s!; + + // If cancellation was already requested when UnsafeRegister was called, it'll invoke + // the callback immediately. If we allowed that to loop until cancellation was successful, + // we'd deadlock, as we'd never perform the very I/O it was waiting for. As such, if + // the callback is invoked prior to be ready for it, we ignore the callback. + if (!state.FinishedCancellationRegistration) + { + return; + } + + // Cancel the I/O. If the cancellation happens too early and we haven't yet initiated + // the synchronous operation, CancelSynchronousIo will fail with ERROR_NOT_FOUND, and + // we'll loop to try again. + SpinWait sw = default; + while (state.ContinueTryingToCancel) + { + if (Interop.Kernel32.CancelSynchronousIo(state.ThreadHandle!)) + { + // Successfully canceled I/O. + break; + } + + if (Marshal.GetLastPInvokeError() != Interop.Errors.ERROR_NOT_FOUND) + { + // Failed to cancel even though there may have been I/O to cancel. + // Attempting to keep trying could result in an infinite loop, so + // give up on trying to cancel. + break; + } + + sw.SpinOnce(); + } + }, this); + + // Now that we've registered with the token, tell the callback it's safe to enter + // its cancellation loop if the callback is invoked. + FinishedCancellationRegistration = true; + + // And now since cancellation may have been requested and we may have suppressed it + // until the previous line, check to see if cancellation has now been requested, and + // if it has, stop any callback, remove the registration, and throw. + if (cancellationToken.IsCancellationRequested) + { + ContinueTryingToCancel = false; + reg.Dispose(); + throw new OperationCanceledException(cancellationToken); + } + + // Return the registration. Now and moving forward, a cancellation request could come in, + // and the callback will end up spinning until we reach the actual I/O. + return reg; + } + } + internal static string GetPipePath(string serverName, string pipeName) { string normalizedPipePath = Path.GetFullPath(@"\\" + serverName + @"\pipe\" + pipeName); diff --git a/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs b/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs index 6db6334dde260..dd68ad0b4867d 100644 --- a/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs +++ b/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.IO.Tests; using System.Linq; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -20,7 +21,6 @@ public static string GetUniquePipeName() => protected override Type UnsupportedConcurrentExceptionType => null; protected override bool UsableAfterCanceledReads => false; protected override bool CansReturnFalseAfterDispose => false; - protected override bool FullyCancelableOperations => !OperatingSystem.IsWindows(); [PlatformSpecific(TestPlatforms.Windows)] // WaitForPipeDrain isn't supported on Unix [Fact] @@ -65,6 +65,7 @@ public abstract class NamedPipeStreamConformanceTests : PipeStreamConformanceTes protected abstract NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1); protected abstract NamedPipeClientStream CreateClientStream(string pipeName); + protected abstract PipeOptions Options { get; } protected (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams() { @@ -215,9 +216,11 @@ public async Task WaitForConnectionOnAlreadyConnectedServer_Throws_InvalidOperat Assert.Throws(() => server.WaitForConnection()); } - [Fact] + [Theory] + [InlineData(0)] + [InlineData(100)] [SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")] - public async Task CancelTokenOn_ServerWaitForConnectionAsync_Throws_OperationCanceledException() + public async Task CancelTokenOn_ServerWaitForConnectionAsync_Throws_OperationCanceledException(int cancellationDelay) { (NamedPipeServerStream server, NamedPipeClientStream client) = CreateServerAndClientStreams(); using StreamPair streams = (server, client); @@ -225,7 +228,8 @@ public async Task CancelTokenOn_ServerWaitForConnectionAsync_Throws_OperationCan var ctx = new CancellationTokenSource(); Task serverWaitTimeout = server.WaitForConnectionAsync(ctx.Token); - ctx.Cancel(); + + ctx.CancelAfter(cancellationDelay); await Assert.ThrowsAnyAsync(() => serverWaitTimeout); Assert.True(server.WaitForConnectionAsync(ctx.Token).IsCanceled); @@ -235,6 +239,12 @@ public async Task CancelTokenOn_ServerWaitForConnectionAsync_Throws_OperationCan [PlatformSpecific(TestPlatforms.Windows)] // P/Invoking to Win32 functions public async Task CancelTokenOff_ServerWaitForConnectionAsyncWithOuterCancellation_Throws_OperationCanceledException() { + if ((Options & PipeOptions.Asynchronous) == 0) + { + // Test depends on PipeOptions.Asynchronous, as CancelIoEx is for overlapped I/O + return; + } + (NamedPipeServerStream server, NamedPipeClientStream client) = CreateServerAndClientStreams(); using StreamPair streams = (server, client); @@ -249,6 +259,12 @@ public async Task CancelTokenOff_ServerWaitForConnectionAsyncWithOuterCancellati [PlatformSpecific(TestPlatforms.Windows)] // P/Invoking to Win32 functions public async Task CancelTokenOn_ServerWaitForConnectionAsyncWithOuterCancellation_Throws_IOException() { + if ((Options & PipeOptions.Asynchronous) == 0) + { + // Test depends on PipeOptions.Asynchronous, as CancelIoEx is for overlapped I/O + return; + } + (NamedPipeServerStream server, NamedPipeClientStream client) = CreateServerAndClientStreams(); using StreamPair streams = (server, client); @@ -523,6 +539,12 @@ public async Task Server_ReadWriteCancelledToken_Throws_OperationCanceledExcepti [PlatformSpecific(TestPlatforms.Windows)] // P/Invoking to Win32 functions public async Task CancelTokenOff_Server_ReadWriteCancelledToken_Throws_OperationCanceledException() { + if ((Options & PipeOptions.Asynchronous) == 0) + { + // Test depends on PipeOptions.Asynchronous, as CancelIoEx is for overlapped I/O + return; + } + using StreamPair streams = await CreateConnectedStreamsAsync(); (NamedPipeServerStream server, NamedPipeClientStream client) = GetClientAndServer(streams); @@ -551,6 +573,12 @@ public async Task CancelTokenOff_Server_ReadWriteCancelledToken_Throws_Operation [PlatformSpecific(TestPlatforms.Windows)] // P/Invoking to Win32 functions public async Task CancelTokenOn_Server_ReadWriteCancelledToken_Throws_OperationCanceledException() { + if ((Options & PipeOptions.Asynchronous) == 0) + { + // Test depends on PipeOptions.Asynchronous, as CancelIoEx is for overlapped I/O + return; + } + using StreamPair streams = await CreateConnectedStreamsAsync(); (NamedPipeServerStream server, NamedPipeClientStream client) = GetClientAndServer(streams); @@ -612,6 +640,12 @@ public async Task Client_ReadWriteCancelledToken_Throws_OperationCanceledExcepti [PlatformSpecific(TestPlatforms.Windows)] // P/Invoking to Win32 functions public async Task CancelTokenOff_Client_ReadWriteCancelledToken_Throws_OperationCanceledException() { + if ((Options & PipeOptions.Asynchronous) == 0) + { + // Test depends on PipeOptions.Asynchronous, as CancelIoEx is for overlapped I/O + return; + } + using StreamPair streams = await CreateConnectedStreamsAsync(); (NamedPipeServerStream server, NamedPipeClientStream client) = GetClientAndServer(streams); @@ -640,6 +674,12 @@ public async Task CancelTokenOff_Client_ReadWriteCancelledToken_Throws_Operation [PlatformSpecific(TestPlatforms.Windows)] // P/Invoking to Win32 functions public async Task CancelTokenOn_Client_ReadWriteCancelledToken_Throws_OperationCanceledException() { + if ((Options & PipeOptions.Asynchronous) == 0) + { + // Test depends on PipeOptions.Asynchronous, as CancelIoEx is for overlapped I/O + return; + } + using StreamPair streams = await CreateConnectedStreamsAsync(); (NamedPipeServerStream server, NamedPipeClientStream client) = GetClientAndServer(streams); @@ -680,7 +720,7 @@ public async Task TwoServerInstances_OnceDisposed_Throws() using NamedPipeClientStream client = CreateClientStream(pipeName); await client.ConnectAsync(); - await Assert.ThrowsAsync(() => wait1); + await Assert.ThrowsAnyAsync(() => wait1); await wait2; @@ -717,30 +757,65 @@ protected override (AnonymousPipeServerStream Server, AnonymousPipeClientStream } } - public sealed class NamedPipeTest_ServerOut_ClientIn : NamedPipeStreamConformanceTests + public abstract class NamedPipeTest_ServerOut_ClientIn : NamedPipeStreamConformanceTests { protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) => - new NamedPipeServerStream(pipeName, PipeDirection.Out, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); + new NamedPipeServerStream(pipeName, PipeDirection.Out, maxInstances, PipeTransmissionMode.Byte, Options); protected override NamedPipeClientStream CreateClientStream(string pipeName) => - new NamedPipeClientStream(".", pipeName, PipeDirection.In, PipeOptions.Asynchronous); + new NamedPipeClientStream(".", pipeName, PipeDirection.In, Options); } - public sealed class NamedPipeTest_ServerIn_ClientOut : NamedPipeStreamConformanceTests + public abstract class NamedPipeTest_ServerIn_ClientOut : NamedPipeStreamConformanceTests { protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) => - new NamedPipeServerStream(pipeName, PipeDirection.In, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); + new NamedPipeServerStream(pipeName, PipeDirection.In, maxInstances, PipeTransmissionMode.Byte, Options); protected override NamedPipeClientStream CreateClientStream(string pipeName) => - new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous); + new NamedPipeClientStream(".", pipeName, PipeDirection.Out, Options); } - public sealed class NamedPipeTest_ServerInOut_ClientInOut : NamedPipeStreamConformanceTests + public abstract class NamedPipeTest_ServerInOut_ClientInOut : NamedPipeStreamConformanceTests { protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) => - new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); + new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxInstances, PipeTransmissionMode.Byte, Options); protected override NamedPipeClientStream CreateClientStream(string pipeName) => - new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); + new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, Options); + } + + public sealed class NamedPipeTest_ServerOut_ClientIn_Synchronous : NamedPipeTest_ServerOut_ClientIn + { + protected override PipeOptions Options => PipeOptions.None; + } + + public sealed class NamedPipeTest_ServerOut_ClientIn_Asynchronous : NamedPipeTest_ServerOut_ClientIn + { + protected override PipeOptions Options => PipeOptions.Asynchronous; + } + + public sealed class NamedPipeTest_ServerIn_ClientOut_Synchronous : NamedPipeTest_ServerIn_ClientOut + { + protected override PipeOptions Options => PipeOptions.None; + } + + public sealed class NamedPipeTest_ServerIn_ClientOut_Asynchronous : NamedPipeTest_ServerIn_ClientOut + { + protected override PipeOptions Options => PipeOptions.Asynchronous; + } + + public sealed class NamedPipeTest_ServerInOut_ClientInOut_Synchronous : NamedPipeTest_ServerInOut_ClientInOut + { + protected override PipeOptions Options => PipeOptions.None; + + // TODO https://github.com/dotnet/runtime/issues/72526: + // The ConcurrentBidirectionalReadsWrites_Success test hangs on Windows with PipeOptions.None and InOut named pipes. + // Disabling for now. + protected override bool SupportsConcurrentBidirectionalUse => !OperatingSystem.IsWindows(); + } + + public sealed class NamedPipeTest_ServerInOut_ClientInOut_Asynchronous : NamedPipeTest_ServerInOut_ClientInOut + { + protected override PipeOptions Options => PipeOptions.Asynchronous; } }