From ab13c101c3202353f50518096215aea2f1eee134 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Fri, 13 Nov 2020 16:21:12 +0100 Subject: [PATCH 01/16] experiment: use read/write for socket operations instead of recvmsg/sendmsg --- .../Unix/System.Native/pal_networking.c | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/libraries/Native/Unix/System.Native/pal_networking.c b/src/libraries/Native/Unix/System.Native/pal_networking.c index 67d41f9a48d76..88cca514485a5 100644 --- a/src/libraries/Native/Unix/System.Native/pal_networking.c +++ b/src/libraries/Native/Unix/System.Native/pal_networking.c @@ -1504,7 +1504,14 @@ int32_t SystemNative_Receive(intptr_t socket, void* buffer, int32_t bufferLen, i } ssize_t res; - while ((res = recv(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); + if (socketFlags == 0) + { + while ((res = read(fd, buffer, (size_t)bufferLen)) < 0 && errno == EINTR); + } + else + { + while ((res = recv(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); + } if (res != -1) { @@ -1575,12 +1582,19 @@ int32_t SystemNative_Send(intptr_t socket, void* buffer, int32_t bufferLen, int3 } ssize_t res; + if (socketFlags == 0) + { + while ((res = write(fd, buffer, (size_t)bufferLen)) < 0 && errno == EINTR); + } + else + { #if defined(__APPLE__) && __APPLE__ - // possible OSX kernel bug: https://github.com/dotnet/runtime/issues/27221 - while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && (errno == EINTR || errno == EPROTOTYPE)); + // possible OSX kernel bug: https://github.com/dotnet/runtime/issues/27221 + while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && (errno == EINTR || errno == EPROTOTYPE)); #else - while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); + while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); #endif + } if (res != -1) { *sent = (int32_t)res; From 4ee7176e115f743a85213c429f5fa77ebd3e2c0f Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Mon, 16 Nov 2020 14:37:43 +0100 Subject: [PATCH 02/16] AnonymousPipeStreams: perform async I/O by using Socket implementation --- .../Unix/System.Native/pal_networking.c | 22 +-- .../Win32/SafeHandles/SafePipeHandle.Unix.cs | 128 ++++++++++---- .../SafeHandles/SafePipeHandle.Windows.cs | 5 + .../Win32/SafeHandles/SafePipeHandle.cs | 7 +- .../IO/Pipes/NamedPipeClientStream.Unix.cs | 4 +- .../IO/Pipes/NamedPipeServerStream.Unix.cs | 8 +- .../src/System/IO/Pipes/PipeStream.Unix.cs | 92 ++++------ .../src/System/IO/Pipes/PipeStream.Windows.cs | 12 +- .../src/System/IO/Pipes/PipeStream.cs | 4 +- .../ref/System.Net.Sockets.cs | 1 + .../src/System.Net.Sockets.csproj | 2 + .../Net/Sockets/SafeSocketHandle.Unix.cs | 88 +++++----- .../System/Net/Sockets/SafeSocketHandle.cs | 3 + .../src/System/Net/Sockets/Socket.Unix.cs | 38 +++-- .../src/System/Net/Sockets/Socket.Windows.cs | 3 +- .../src/System/Net/Sockets/Socket.cs | 157 +++++++++--------- .../src/System/Net/Sockets/SocketPal.Unix.cs | 86 +++++++++- 17 files changed, 394 insertions(+), 266 deletions(-) diff --git a/src/libraries/Native/Unix/System.Native/pal_networking.c b/src/libraries/Native/Unix/System.Native/pal_networking.c index 88cca514485a5..67d41f9a48d76 100644 --- a/src/libraries/Native/Unix/System.Native/pal_networking.c +++ b/src/libraries/Native/Unix/System.Native/pal_networking.c @@ -1504,14 +1504,7 @@ int32_t SystemNative_Receive(intptr_t socket, void* buffer, int32_t bufferLen, i } ssize_t res; - if (socketFlags == 0) - { - while ((res = read(fd, buffer, (size_t)bufferLen)) < 0 && errno == EINTR); - } - else - { - while ((res = recv(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); - } + while ((res = recv(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); if (res != -1) { @@ -1582,19 +1575,12 @@ int32_t SystemNative_Send(intptr_t socket, void* buffer, int32_t bufferLen, int3 } ssize_t res; - if (socketFlags == 0) - { - while ((res = write(fd, buffer, (size_t)bufferLen)) < 0 && errno == EINTR); - } - else - { #if defined(__APPLE__) && __APPLE__ - // possible OSX kernel bug: https://github.com/dotnet/runtime/issues/27221 - while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && (errno == EINTR || errno == EPROTOTYPE)); + // possible OSX kernel bug: https://github.com/dotnet/runtime/issues/27221 + while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && (errno == EINTR || errno == EPROTOTYPE)); #else - while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); + while ((res = send(fd, buffer, (size_t)bufferLen, socketFlags)) < 0 && errno == EINTR); #endif - } if (res != -1) { *sent = (int32_t)res; diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs index 98e9e10d816a8..0a70f513107c4 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs @@ -7,6 +7,7 @@ using System.Reflection; using System.Runtime.InteropServices; using System.Security; +using System.Threading; namespace Microsoft.Win32.SafeHandles { @@ -15,37 +16,35 @@ public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid private const int DefaultInvalidHandle = -1; // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe, and the - // _named* fields remain null. For named pipes, SafePipeHandle.handle is a copy of the file descriptor - // extracted from the Socket's SafeHandle, and the _named* fields are the socket and its safe handle. + // For named pipes, SafePipeHandle.handle is a copy of the file descriptor + // extracted from the Socket's SafeHandle. // This allows operations related to file descriptors to be performed directly on the SafePipeHandle, - // and operations that should go through the Socket to be done via _namedPipeSocket. We keep the + // and operations that should go through the Socket to be done via PipeSocket. We keep the // Socket's SafeHandle alive as long as this SafeHandle is alive. - private Socket? _namedPipeSocket; - private SafeHandle? _namedPipeSocketHandle; + private Socket? _pipeSocket; + private SafeHandle? _pipeSocketHandle; + private volatile int _disposed; internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true) { - Debug.Assert(namedPipeSocket != null); - _namedPipeSocket = namedPipeSocket; - - _namedPipeSocketHandle = namedPipeSocket.SafeHandle; - - bool ignored = false; - _namedPipeSocketHandle.DangerousAddRef(ref ignored); - SetHandle(_namedPipeSocketHandle.DangerousGetHandle()); + SetPipeSocket(namedPipeSocket, ownsHandle: true); + base.SetHandle(_pipeSocketHandle!.DangerousGetHandle()); } - internal Socket? NamedPipeSocket => _namedPipeSocket; - internal SafeHandle? NamedPipeSocketHandle => _namedPipeSocketHandle; + internal Socket? PipeSocket => _pipeSocket ?? CreatePipeSocket(); + + internal SafeHandle? PipeSocketHandle => _pipeSocketHandle; protected override void Dispose(bool disposing) { base.Dispose(disposing); // must be called before trying to Dispose the socket - if (disposing && _namedPipeSocket != null) + _disposed = 1; + Socket? socket; + if (disposing && (socket = Volatile.Read(ref _pipeSocket)) != null) { - _namedPipeSocket.Dispose(); - _namedPipeSocket = null; + socket.Dispose(); + _pipeSocket = null; } } @@ -53,24 +52,95 @@ protected override bool ReleaseHandle() { Debug.Assert(!IsInvalid); - // Clean up resources for named handles - if (_namedPipeSocketHandle != null) + if (_pipeSocketHandle != null) { - SetHandle(DefaultInvalidHandle); - _namedPipeSocketHandle.DangerousRelease(); - _namedPipeSocketHandle = null; + base.SetHandle((IntPtr)DefaultInvalidHandle); + _pipeSocketHandle.DangerousRelease(); + _pipeSocketHandle = null; return true; } - - // Clean up resources for anonymous handles - return (long)handle >= 0 ? - Interop.Sys.Close(handle) == 0 : - true; + else + { + return (long)handle >= 0 ? + Interop.Sys.Close(handle) == 0 : + true; + } } public override bool IsInvalid { - get { return (long)handle < 0 && _namedPipeSocket == null; } + get { return (long)handle < 0 && _pipeSocket == null; } + } + + private Socket? CreatePipeSocket(bool ownsHandle = true) + { + bool refAdded = false; + try + { + if (_disposed == 1) + { + return null; + } + + DangerousAddRef(ref refAdded); + + var socketHandle = new SafeSocketHandle(handle, ownsHandle); + socketHandle.IsPipe = true; + var socket = new Socket(socketHandle); + socket = SetPipeSocket(socket, ownsHandle); + + if (_disposed == 1) + { + Volatile.Write(ref _pipeSocket, null); + socket.Dispose(); + socket = null; + } + + return socket; + } + finally + { + if (refAdded) + { + DangerousRelease(); + } + } + } + + private Socket SetPipeSocket(Socket socket, bool ownsHandle) + { + Debug.Assert(socket != null); + + // Multiple threads may try to create the PipeSocket. + Socket? current = Interlocked.CompareExchange(ref _pipeSocket, socket, null); + if (current != null) + { + socket.Dispose(); + return current; + } + + // If we own the handle, defer ownership to the SocketHandle. + SafeSocketHandle socketHandle = _pipeSocket.SafeHandle; + if (ownsHandle) + { + _pipeSocketHandle = socketHandle; + + bool ignored = false; + socketHandle.DangerousAddRef(ref ignored); + } + + return socket; + } + + internal void SetHandle(IntPtr descriptor, bool ownsHandle = true) + { + base.SetHandle(descriptor); + + // Avoid throwing when we own the handle by defering pipe creation. + if (!ownsHandle) + { + _pipeSocket = CreatePipeSocket(ownsHandle); + } } } } diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs index 946f305c9c7f9..5a1d27149f944 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs @@ -15,5 +15,10 @@ protected override bool ReleaseHandle() { return Interop.Kernel32.CloseHandle(handle); } + + internal void SetHandle(IntPtr descriptor, bool ownsHandle = true) + { + base.SetHandle(descriptor); + } } } diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs index f19fa957d7910..13d9d4ae5acef 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs @@ -17,12 +17,7 @@ public SafePipeHandle() public SafePipeHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle) { - SetHandle(preexistingHandle); - } - - internal void SetHandle(int descriptor) - { - base.SetHandle((IntPtr)descriptor); + SetHandle(preexistingHandle, ownsHandle); } } } diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs index 42fe518e0012e..c0034f2f34b78 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs @@ -83,7 +83,7 @@ public override int InBufferSize { CheckPipePropertyOperations(); if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream); - return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? 0; + return InternalHandle?.PipeSocket?.ReceiveBufferSize ?? 0; } } @@ -93,7 +93,7 @@ public override int OutBufferSize { CheckPipePropertyOperations(); if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream); - return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? 0; + return InternalHandle?.PipeSocket?.SendBufferSize ?? 0; } } diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs index 7f0d62611bc44..5ce0d72ff2eec 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs @@ -134,7 +134,7 @@ public string GetImpersonationUserName() { CheckWriteOperations(); - SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle; + SafeHandle? handle = InternalHandle?.PipeSocketHandle; if (handle == null) { throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet); @@ -155,7 +155,7 @@ public override int InBufferSize { CheckPipePropertyOperations(); if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream); - return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? _inBufferSize; + return InternalHandle?.PipeSocket?.ReceiveBufferSize ?? _inBufferSize; } } @@ -165,7 +165,7 @@ public override int OutBufferSize { CheckPipePropertyOperations(); if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream); - return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? _outBufferSize; + return InternalHandle?.PipeSocket?.SendBufferSize ?? _outBufferSize; } } @@ -173,7 +173,7 @@ public override int OutBufferSize public void RunAsClient(PipeStreamImpersonationWorker impersonationWorker) { CheckWriteOperations(); - SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle; + SafeHandle? handle = InternalHandle?.PipeSocketHandle; if (handle == null) { throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet); diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs index 6e6fa8a5c1385..51805c66f0218 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs @@ -80,26 +80,22 @@ internal static string GetPipePath(string serverName, string pipeName) /// The handle to validate. internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle) { - if (safePipeHandle.NamedPipeSocket == null) + Interop.Sys.FileStatus status; + int result = CheckPipeCall(Interop.Sys.FStat(safePipeHandle, out status)); + if (result == 0) { - Interop.Sys.FileStatus status; - int result = CheckPipeCall(Interop.Sys.FStat(safePipeHandle, out status)); - if (result == 0) + if ((status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFIFO && + (status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFSOCK) { - if ((status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFIFO && - (status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFSOCK) - { - throw new IOException(SR.IO_InvalidPipeHandle); - } + throw new IOException(SR.IO_InvalidPipeHandle); } } } /// Initializes the handle to be used asynchronously. - /// The handle. - private void InitializeAsyncHandle(SafePipeHandle handle) + private void InitializeAsyncHandle(SafePipeHandle handle, bool isAsync) { - // nop + handle.PipeSocket!.Blocking = !isAsync; } internal virtual void DisposeCore(bool disposing) @@ -112,30 +108,17 @@ private unsafe int ReadCore(Span buffer) Debug.Assert(_handle != null); DebugAssertHandleValid(_handle); - // For named pipes, receive on the socket. - Socket? socket = _handle.NamedPipeSocket; - if (socket != null) + // For a blocking socket, we could simply use the same Read syscall as is done + // for reading an anonymous pipe. However, for a non-blocking socket, Read could + // end up returning EWOULDBLOCK rather than blocking waiting for data. Such a case + // is already handled by Socket.Receive, so we use it here. + try { - // For a blocking socket, we could simply use the same Read syscall as is done - // for reading an anonymous pipe. However, for a non-blocking socket, Read could - // end up returning EWOULDBLOCK rather than blocking waiting for data. Such a case - // is already handled by Socket.Receive, so we use it here. - try - { - return socket.Receive(buffer, SocketFlags.None); - } - catch (SocketException e) - { - throw GetIOExceptionForSocketException(e); - } + return _handle.PipeSocket!.Receive(buffer, SocketFlags.None); } - - // For anonymous pipes, read from the file descriptor. - fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer)) + catch (SocketException e) { - int result = CheckPipeCall(Interop.Sys.Read(_handle, bufPtr, buffer.Length)); - Debug.Assert(result <= buffer.Length); - return result; + throw GetIOExceptionForSocketException(e); } } @@ -144,37 +127,22 @@ private unsafe void WriteCore(ReadOnlySpan buffer) Debug.Assert(_handle != null); DebugAssertHandleValid(_handle); - // For named pipes, send to the socket. - Socket? socket = _handle.NamedPipeSocket; - if (socket != null) - { - // For a blocking socket, we could simply use the same Write syscall as is done - // for writing to anonymous pipe. However, for a non-blocking socket, Write could - // end up returning EWOULDBLOCK rather than blocking waiting for space available. - // Such a case is already handled by Socket.Send, so we use it here. - try - { - while (buffer.Length > 0) - { - int bytesWritten = socket.Send(buffer, SocketFlags.None); - buffer = buffer.Slice(bytesWritten); - } - } - catch (SocketException e) - { - throw GetIOExceptionForSocketException(e); - } - } - - // For anonymous pipes, write the file descriptor. - fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer)) + // For a blocking socket, we could simply use the same Write syscall as is done + // for writing to anonymous pipe. However, for a non-blocking socket, Write could + // end up returning EWOULDBLOCK rather than blocking waiting for space available. + // Such a case is already handled by Socket.Send, so we use it here. + try { while (buffer.Length > 0) { - int bytesWritten = CheckPipeCall(Interop.Sys.Write(_handle, bufPtr, buffer.Length)); + int bytesWritten = _handle.PipeSocket!.Send(buffer, SocketFlags.None); buffer = buffer.Slice(bytesWritten); } } + catch (SocketException e) + { + throw GetIOExceptionForSocketException(e); + } } private async ValueTask ReadAsyncCore(Memory destination, CancellationToken cancellationToken) @@ -183,7 +151,7 @@ private async ValueTask ReadAsyncCore(Memory destination, Cancellatio try { - return await InternalHandle!.NamedPipeSocket!.ReceiveAsync(destination, SocketFlags.None, cancellationToken).ConfigureAwait(false); + return await InternalHandle!.PipeSocket!.ReceiveAsync(destination, SocketFlags.None, cancellationToken).ConfigureAwait(false); } catch (SocketException e) { @@ -199,7 +167,7 @@ private async Task WriteAsyncCore(ReadOnlyMemory source, CancellationToken { while (source.Length > 0) { - int bytesWritten = await _handle!.NamedPipeSocket!.SendAsync(source, SocketFlags.None, cancellationToken).ConfigureAwait(false); + int bytesWritten = await _handle!.PipeSocket!.SendAsync(source, SocketFlags.None, cancellationToken).ConfigureAwait(false); Debug.Assert(bytesWritten > 0 && bytesWritten <= source.Length); source = source.Slice(bytesWritten); } @@ -350,8 +318,8 @@ internal static unsafe void CreateAnonymousPipe(out SafePipeHandle reader, out S Interop.CheckIo(Interop.Sys.Pipe(fds, Interop.Sys.PipeFlags.O_CLOEXEC)); // Store the file descriptors into our safe handles - reader.SetHandle(fds[Interop.Sys.ReadEndOfPipe]); - writer.SetHandle(fds[Interop.Sys.WriteEndOfPipe]); + reader.SetHandle(new IntPtr(fds[Interop.Sys.ReadEndOfPipe])); + writer.SetHandle(new IntPtr(fds[Interop.Sys.WriteEndOfPipe])); } internal int CheckPipeCall(int result) diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs index 252796cd1f199..34abeeb05ec40 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs @@ -38,12 +38,14 @@ internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle) } /// Initializes the handle to be used asynchronously. - /// The handle. - private void InitializeAsyncHandle(SafePipeHandle handle) + private void InitializeAsyncHandle(SafePipeHandle handle, bool isAsync) { - // If the handle is of async type, bind the handle to the ThreadPool so that we can use - // the async operations (it's needed so that our native callbacks get called). - _threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); + if (isAsync) + { + // If the handle is of async type, bind the handle to the ThreadPool so that we can use + // the async operations (it's needed so that our native callbacks get called). + _threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); + } } private void DisposeCore(bool disposing) diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs index 4dad917bd453f..7f0947116b091 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs @@ -94,9 +94,9 @@ private void Init(PipeDirection direction, PipeTransmissionMode transmissionMode // This method may also be called to uninitialize a handle, setting it to null. protected void InitializeHandle(SafePipeHandle? handle, bool isExposed, bool isAsync) { - if (isAsync && handle != null) + if (handle != null) { - InitializeAsyncHandle(handle); + InitializeAsyncHandle(handle, isAsync); } _handle = handle; diff --git a/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs b/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs index 2da5179682d37..193720d3d125a 100644 --- a/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs +++ b/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs @@ -223,6 +223,7 @@ public sealed partial class SafeSocketHandle : Microsoft.Win32.SafeHandles.SafeH public SafeSocketHandle() : base (default(bool)) { } public SafeSocketHandle(System.IntPtr preexistingHandle, bool ownsHandle) : base (default(bool)) { } protected override bool ReleaseHandle() { throw null; } + public bool IsPipe { get { throw null; } set { } } } public enum SelectMode { diff --git a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj index c6421ec4e1986..bbddbb391d54b 100644 --- a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj +++ b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj @@ -263,6 +263,8 @@ Link="Common\Interop\Unix\Interop.Poll.Structs.cs" /> + ExposedHandleOrUntrackedConfiguration = true; @@ -238,56 +239,63 @@ private unsafe SocketError DoCloseHandle(bool abortive) { Interop.Error errorCode = Interop.Error.SUCCESS; - // If abortive is not set, we're not running on the finalizer thread, so it's safe to block here. - // We can honor the linger options set on the socket. It also means closesocket() might return - // EWOULDBLOCK, in which case we need to do some recovery. - if (!abortive) + if (IsPipe) { - if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle} Following 'non-abortive' branch."); - - // Close, and if its errno is other than EWOULDBLOCK, there's nothing more to do - we either succeeded or failed. - errorCode = CloseHandle(handle); - if (errorCode != Interop.Error.EWOULDBLOCK) + return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); + } + else + { + // If abortive is not set, we're not running on the finalizer thread, so it's safe to block here. + // We can honor the linger options set on the socket. It also means closesocket() might return + // EWOULDBLOCK, in which case we need to do some recovery. + if (!abortive) { - return SocketPal.GetSocketErrorForErrorCode(errorCode); - } + if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle} Following 'non-abortive' branch."); - // The socket must be non-blocking with a linger timeout set. - // We have to set the socket to blocking. - if (Interop.Sys.Fcntl.DangerousSetIsNonBlocking(handle, 0) == 0) - { - // The socket successfully made blocking; retry the close(). - return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); + // Close, and if its errno is other than EWOULDBLOCK, there's nothing more to do - we either succeeded or failed. + errorCode = CloseHandle(handle); + if (errorCode != Interop.Error.EWOULDBLOCK) + { + return SocketPal.GetSocketErrorForErrorCode(errorCode); + } + + // The socket must be non-blocking with a linger timeout set. + // We have to set the socket to blocking. + if (Interop.Sys.Fcntl.DangerousSetIsNonBlocking(handle, 0) == 0) + { + // The socket successfully made blocking; retry the close(). + return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); + } + + // The socket could not be made blocking; fall through to the regular abortive close. } - // The socket could not be made blocking; fall through to the regular abortive close. - } + // By default or if the non-abortive path failed, set linger timeout to zero to get an abortive close (RST). + var linger = new Interop.Sys.LingerOption + { + OnOff = 1, + Seconds = 0 + }; - // By default or if the non-abortive path failed, set linger timeout to zero to get an abortive close (RST). - var linger = new Interop.Sys.LingerOption - { - OnOff = 1, - Seconds = 0 - }; + errorCode = Interop.Sys.SetLingerOption(handle, &linger); + #if DEBUG + _closeSocketLinger = SocketPal.GetSocketErrorForErrorCode(errorCode); + #endif + if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle}, setsockopt():{errorCode}"); - errorCode = Interop.Sys.SetLingerOption(handle, &linger); -#if DEBUG - _closeSocketLinger = SocketPal.GetSocketErrorForErrorCode(errorCode); -#endif - if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle}, setsockopt():{errorCode}"); + switch (errorCode) + { + case Interop.Error.SUCCESS: + case Interop.Error.EINVAL: + case Interop.Error.ENOPROTOOPT: + errorCode = CloseHandle(handle); + break; - switch (errorCode) - { - case Interop.Error.SUCCESS: - case Interop.Error.EINVAL: - case Interop.Error.ENOPROTOOPT: - errorCode = CloseHandle(handle); - break; + // For other errors, it's too dangerous to try closesocket() - it might block! + } - // For other errors, it's too dangerous to try closesocket() - it might block! + return SocketPal.GetSocketErrorForErrorCode(errorCode); } - - return SocketPal.GetSocketErrorForErrorCode(errorCode); } private Interop.Error CloseHandle(IntPtr handle) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs index bce056e31e2bb..9abb2b71aa1e9 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs @@ -31,6 +31,9 @@ public sealed partial class SafeSocketHandle : SafeHandleMinusOneIsInvalid #endif private int _ownClose; + // TODO: make this Unix internal. + public bool IsPipe { get; set; } // (ab)use Socket class for performing async I/O on pipes. + public SafeSocketHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle) { diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs index a9e08bd076c96..b984da05b9dca 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs @@ -75,21 +75,33 @@ partial void ValidateForMultiConnect(bool isMultiEndpoint) } private static unsafe void LoadSocketTypeFromHandle( - SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening) + SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening, out bool isSocket) { - // Validate that the supplied handle is indeed a socket. - if (Interop.Sys.FStat(handle, out Interop.Sys.FileStatus stat) == -1 || - (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) != Interop.Sys.FileTypes.S_IFSOCK) + if (handle.IsPipe) { - throw new SocketException((int)SocketError.NotSocket); + addressFamily = AddressFamily.Unknown; + socketType = SocketType.Unknown; + protocolType = ProtocolType.Unknown; + isListening = false; + isSocket = false; + } + else + { + // Validate that the supplied handle is indeed a socket. + if (Interop.Sys.FStat(handle, out Interop.Sys.FileStatus stat) == -1 || + (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) != Interop.Sys.FileTypes.S_IFSOCK) + { + throw new SocketException((int)SocketError.NotSocket); + } + isSocket = true; + + // On Linux, GetSocketType will be able to query SO_DOMAIN, SO_TYPE, and SO_PROTOCOL to get the + // address family, socket type, and protocol type, respectively. On macOS, this will only succeed + // in getting the socket type, and the others will be unknown. Subsequently the Socket ctor + // can use getsockname to retrieve the address family as part of trying to get the local end point. + Interop.Error e = Interop.Sys.GetSocketType(handle, out addressFamily, out socketType, out protocolType, out isListening); + Debug.Assert(e == Interop.Error.SUCCESS, e.ToString()); } - - // On Linux, GetSocketType will be able to query SO_DOMAIN, SO_TYPE, and SO_PROTOCOL to get the - // address family, socket type, and protocol type, respectively. On macOS, this will only succeed - // in getting the socket type, and the others will be unknown. Subsequently the Socket ctor - // can use getsockname to retrieve the address family as part of trying to get the local end point. - Interop.Error e = Interop.Sys.GetSocketType(handle, out addressFamily, out socketType, out protocolType, out isListening); - Debug.Assert(e == Interop.Error.SUCCESS, e.ToString()); // Get whether the socket is in non-blocking mode. On Unix, we automatically put the underlying // Socket into non-blocking mode whenever an async method is first invoked on the instance, but we @@ -101,7 +113,7 @@ private static unsafe void LoadSocketTypeFromHandle( bool nonBlocking; int rv = Interop.Sys.Fcntl.GetIsNonBlocking(handle, out nonBlocking); blocking = !nonBlocking; - Debug.Assert(rv == 0 || blocking, e.ToString()); // ignore failures + Debug.Assert(rv == 0 || blocking); // ignore failures } internal void ReplaceHandleIfNecessaryAfterFailedConnect() diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs index 267bfb0ab0aa5..11325495e0a31 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs @@ -94,7 +94,7 @@ public Socket(SocketInformation socketInformation) } private unsafe void LoadSocketTypeFromHandle( - SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening) + SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening, out bool isSocket) { // This can be called without winsock initialized. The handle is not going to be a valid socket handle in that case and the code will throw exception anyway. // Initializing winsock will ensure the error SocketError.NotSocket as opposed to SocketError.NotInitialized. @@ -121,6 +121,7 @@ private unsafe void LoadSocketTypeFromHandle( // This affects the result of querying Socket.Blocking, which will mostly only affect user code that happens to query // that property, though there are a few places we check it internally, e.g. as part of NetworkStream argument validation. blocking = true; + isSocket = true; } [SupportedOSPlatform("windows")] diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs index eb5277e8e01a2..e44a88a0d3dd4 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs @@ -136,95 +136,98 @@ private unsafe Socket(SafeSocketHandle handle, bool loadPropertiesFromHandle) try { // Get properties like address family and blocking mode from the OS. - LoadSocketTypeFromHandle(handle, out _addressFamily, out _socketType, out _protocolType, out _willBlockInternal, out _isListening); + LoadSocketTypeFromHandle(handle, out _addressFamily, out _socketType, out _protocolType, out _willBlockInternal, out _isListening, out bool isSocket); - // We should change stackalloc if this ever grows too big. - Debug.Assert(SocketPal.MaximumAddressSize <= 512); - // Try to get the address of the socket. - Span buffer = stackalloc byte[SocketPal.MaximumAddressSize]; - int bufferLength = buffer.Length; - fixed (byte* bufferPtr = buffer) + if (isSocket) { - if (SocketPal.GetSockName(handle, bufferPtr, &bufferLength) != SocketError.Success) + // We should change stackalloc if this ever grows too big. + Debug.Assert(SocketPal.MaximumAddressSize <= 512); + // Try to get the address of the socket. + Span buffer = stackalloc byte[SocketPal.MaximumAddressSize]; + int bufferLength = buffer.Length; + fixed (byte* bufferPtr = buffer) { - return; + if (SocketPal.GetSockName(handle, bufferPtr, &bufferLength) != SocketError.Success) + { + return; + } } - } - - Debug.Assert(bufferLength <= buffer.Length); - - // Try to get the local end point. That will in turn enable the remote - // end point to be retrieved on-demand when the property is accessed. - Internals.SocketAddress? socketAddress = null; - switch (_addressFamily) - { - case AddressFamily.InterNetwork: - _rightEndPoint = new IPEndPoint( - new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF), - SocketAddressPal.GetPort(buffer)); - break; - case AddressFamily.InterNetworkV6: - Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes]; - SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope); - _rightEndPoint = new IPEndPoint( - new IPAddress(address, scope), - SocketAddressPal.GetPort(buffer)); - break; + Debug.Assert(bufferLength <= buffer.Length); - case AddressFamily.Unix: - socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength)); - _rightEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress)); - break; - } + // Try to get the local end point. That will in turn enable the remote + // end point to be retrieved on-demand when the property is accessed. + Internals.SocketAddress? socketAddress = null; + switch (_addressFamily) + { + case AddressFamily.InterNetwork: + _rightEndPoint = new IPEndPoint( + new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF), + SocketAddressPal.GetPort(buffer)); + break; + + case AddressFamily.InterNetworkV6: + Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes]; + SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope); + _rightEndPoint = new IPEndPoint( + new IPAddress(address, scope), + SocketAddressPal.GetPort(buffer)); + break; + + case AddressFamily.Unix: + socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength)); + _rightEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress)); + break; + } - // Try to determine if we're connected, based on querying for a peer, just as we would in RemoteEndPoint, - // but ignoring any failures; this is best-effort (RemoteEndPoint also does a catch-all around the Create call). - if (_rightEndPoint != null) - { - try + // Try to determine if we're connected, based on querying for a peer, just as we would in RemoteEndPoint, + // but ignoring any failures; this is best-effort (RemoteEndPoint also does a catch-all around the Create call). + if (_rightEndPoint != null) { - // Local and remote end points may be different sizes for protocols like Unix Domain Sockets. - bufferLength = buffer.Length; - switch (SocketPal.GetPeerName(handle, buffer, ref bufferLength)) + try { - case SocketError.Success: - switch (_addressFamily) - { - case AddressFamily.InterNetwork: - _remoteEndPoint = new IPEndPoint( - new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF), - SocketAddressPal.GetPort(buffer)); - break; - - case AddressFamily.InterNetworkV6: - Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes]; - SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope); - _remoteEndPoint = new IPEndPoint( - new IPAddress(address, scope), - SocketAddressPal.GetPort(buffer)); - break; - - case AddressFamily.Unix: - socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength)); - _remoteEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress)); - break; - } + // Local and remote end points may be different sizes for protocols like Unix Domain Sockets. + bufferLength = buffer.Length; + switch (SocketPal.GetPeerName(handle, buffer, ref bufferLength)) + { + case SocketError.Success: + switch (_addressFamily) + { + case AddressFamily.InterNetwork: + _remoteEndPoint = new IPEndPoint( + new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF), + SocketAddressPal.GetPort(buffer)); + break; + + case AddressFamily.InterNetworkV6: + Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes]; + SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope); + _remoteEndPoint = new IPEndPoint( + new IPAddress(address, scope), + SocketAddressPal.GetPort(buffer)); + break; + + case AddressFamily.Unix: + socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength)); + _remoteEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress)); + break; + } - _isConnected = true; - break; - - case SocketError.InvalidArgument: - // On some OSes (e.g. macOS), EINVAL means the socket has been shut down. - // This can happen if, for example, socketpair was used and the parent - // process closed its copy of the child's socket. Since we don't know - // whether we're actually connected or not, err on the side of saying - // we're connected. - _isConnected = true; - break; + _isConnected = true; + break; + + case SocketError.InvalidArgument: + // On some OSes (e.g. macOS), EINVAL means the socket has been shut down. + // This can happen if, for example, socketpair was used and the parent + // process closed its copy of the child's socket. Since we don't know + // whether we're actually connected or not, err on the side of saying + // we're connected. + _isConnected = true; + break; + } } + catch { } } - catch { } } } catch diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs index 6f800064abad7..e33e320a2c301 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @@ -91,8 +91,23 @@ public static unsafe SocketError CreateSocket(AddressFamily addressFamily, Socke return errorCode; } + private static unsafe int SysRead(SafeSocketHandle socket, Span buffer, out Interop.Error errno) + { + int received = 0; + + fixed (byte* b = &MemoryMarshal.GetReference(buffer)) + { + received = Interop.Sys.Read(socket, b, buffer.Length); + errno = received != -1 ? Interop.Error.SUCCESS : Interop.Sys.GetLastError(); + } + + return received; + } + private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Span buffer, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); + int received = 0; fixed (byte* b = &MemoryMarshal.GetReference(buffer)) @@ -115,6 +130,8 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Span buffer, byte[]? socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); + Debug.Assert(socketAddress != null || socketAddressLen == 0, $"Unexpected values: socketAddress={socketAddress}, socketAddressLen={socketAddressLen}"); long received = 0; @@ -154,8 +171,32 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, return checked((int)received); } + private static unsafe int SysWrite(SafeSocketHandle socket, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno) + { + int sent; + + fixed (byte* b = &MemoryMarshal.GetReference(buffer)) + { + sent = Interop.Sys.Write(socket, &b[offset], buffer.Length); + if (sent == -1) + { + errno = Interop.Sys.GetLastError(); + } + else + { + errno = Interop.Error.SUCCESS; + offset += sent; + count -= sent; + } + } + + return sent; + } + private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); + int sent; fixed (byte* b = &MemoryMarshal.GetReference(buffer)) { @@ -179,6 +220,8 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, ReadOnlySpan buffer, ref int offset, ref int count, byte[] socketAddress, int socketAddressLen, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); + int sent; fixed (byte* sockAddr = socketAddress) fixed (byte* b = &MemoryMarshal.GetReference(buffer)) @@ -219,6 +262,8 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, IList> buffers, ref int bufferIndex, ref int offset, byte[]? socketAddress, int socketAddressLen, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); + // Pin buffers and set up iovecs. int startIndex = bufferIndex, startOffset = offset; @@ -313,6 +358,8 @@ private static unsafe long SendFile(SafeSocketHandle socket, SafeFileHandle file private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, IList> buffers, byte[]? socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); + int maxBuffers = buffers.Count; bool allocOnStack = maxBuffers <= IovStackThreshold; @@ -412,6 +459,7 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, private static unsafe int SysReceiveMessageFrom(SafeSocketHandle socket, SocketFlags flags, Span buffer, byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); Debug.Assert(socketAddress != null, "Expected non-null socketAddress"); int cmsgBufferLen = Interop.Sys.GetControlMessageBufferSize(Convert.ToInt32(isIPv4), Convert.ToInt32(isIPv6)); @@ -465,6 +513,7 @@ private static unsafe int SysReceiveMessageFrom( byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, out Interop.Error errno) { + Debug.Assert(!socket.IsPipe); Debug.Assert(socketAddress != null, "Expected non-null socketAddress"); int buffersCount = buffers.Count; @@ -678,7 +727,12 @@ public static unsafe bool TryCompleteReceive(SafeSocketHandle socket, Span Interop.Error errno; int received; - if (buffer.Length == 0) + if (socket.IsPipe) + { + Debug.Assert(flags == SocketFlags.None); + received = SysRead(socket, buffer, out errno); + } + else if (buffer.Length == 0) { // Special case a receive of 0 bytes into a single buffer. A common pattern is to ReceiveAsync 0 bytes in order // to be asynchronously notified when data is available, without needing to dedicate a buffer. Some platforms (e.g. macOS), @@ -732,7 +786,16 @@ public static unsafe bool TryCompleteReceiveFrom(SafeSocketHandle socket, Span), buffers, ref bufferIndex, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode); } - public static bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan buffer, IList>? buffers, ref int bufferIndex, ref int offset, ref int count, SocketFlags flags, byte[]? socketAddress, int socketAddressLen, ref int bytesSent, out SocketError errorCode) + public static unsafe bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan buffer, IList>? buffers, ref int bufferIndex, ref int offset, ref int count, SocketFlags flags, byte[]? socketAddress, int socketAddressLen, ref int bytesSent, out SocketError errorCode) { bool successfulSend = false; long start = socket.IsUnderlyingBlocking && socket.SendTimeout > 0 ? Environment.TickCount64 : 0; // Get ticks only if timeout is set and socket is blocking. @@ -853,10 +916,19 @@ public static bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan Interop.Error errno; try { - sent = buffers != null ? - SysSend(socket, flags, buffers, ref bufferIndex, ref offset, socketAddress, socketAddressLen, out errno) : - socketAddress == null ? SysSend(socket, flags, buffer, ref offset, ref count, out errno) : - SysSend(socket, flags, buffer, ref offset, ref count, socketAddress, socketAddressLen, out errno); + if (socket.IsPipe) + { + Debug.Assert(flags == SocketFlags.None); + Debug.Assert(buffers == null); + sent = SysWrite(socket, buffer, ref offset, ref count, out errno); + } + else + { + sent = buffers != null ? + SysSend(socket, flags, buffers, ref bufferIndex, ref offset, socketAddress, socketAddressLen, out errno) : + socketAddress == null ? SysSend(socket, flags, buffer, ref offset, ref count, out errno) : + SysSend(socket, flags, buffer, ref offset, ref count, socketAddress, socketAddressLen, out errno); + } } catch (ObjectDisposedException) { From 5d4e977e9a2e62080972d90a279db197774ca3e9 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Thu, 3 Dec 2020 09:05:01 +0100 Subject: [PATCH 03/16] PipeStream.Unix: use sync/async functions on Socket --- .../Win32/SafeHandles/SafePipeHandle.Unix.cs | 55 +++-- .../IO/Pipes/NamedPipeClientStream.Unix.cs | 4 +- .../IO/Pipes/NamedPipeServerStream.Unix.cs | 4 +- .../src/System/IO/Pipes/PipeStream.Unix.cs | 171 +++++++++++++- .../src/System/IO/Pipes/PipeStream.Windows.cs | 212 ++++++++++++++++++ .../src/System/IO/Pipes/PipeStream.cs | 212 ------------------ 6 files changed, 402 insertions(+), 256 deletions(-) diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs index 0a70f513107c4..760e8e09f137b 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs @@ -28,11 +28,11 @@ public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true) { - SetPipeSocket(namedPipeSocket, ownsHandle: true); + SetPipeSocketInterlocked(namedPipeSocket, ownsHandle: true); base.SetHandle(_pipeSocketHandle!.DangerousGetHandle()); } - internal Socket? PipeSocket => _pipeSocket ?? CreatePipeSocket(); + internal Socket PipeSocket => _pipeSocket ?? CreatePipeSocket(); internal SafeHandle? PipeSocketHandle => _pipeSocketHandle; @@ -72,42 +72,39 @@ public override bool IsInvalid get { return (long)handle < 0 && _pipeSocket == null; } } - private Socket? CreatePipeSocket(bool ownsHandle = true) + private Socket CreatePipeSocket(bool ownsHandle = true) { - bool refAdded = false; - try + Socket? socket = null; + if (_disposed == 0) { - if (_disposed == 1) + bool refAdded = false; + try { - return null; + DangerousAddRef(ref refAdded); + + var socketHandle = new SafeSocketHandle(handle, ownsHandle); + socketHandle.IsPipe = true; + socket = SetPipeSocketInterlocked(new Socket(socketHandle), ownsHandle); + + if (_disposed == 1) + { + Volatile.Write(ref _pipeSocket, null); + socket.Dispose(); + socket = null; + } } - - DangerousAddRef(ref refAdded); - - var socketHandle = new SafeSocketHandle(handle, ownsHandle); - socketHandle.IsPipe = true; - var socket = new Socket(socketHandle); - socket = SetPipeSocket(socket, ownsHandle); - - if (_disposed == 1) - { - Volatile.Write(ref _pipeSocket, null); - socket.Dispose(); - socket = null; - } - - return socket; - } - finally - { - if (refAdded) + finally { - DangerousRelease(); + if (refAdded) + { + DangerousRelease(); + } } } + return socket ?? throw new ObjectDisposedException(GetType().ToString());; } - private Socket SetPipeSocket(Socket socket, bool ownsHandle) + private Socket SetPipeSocketInterlocked(Socket socket, bool ownsHandle) { Debug.Assert(socket != null); diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs index c0034f2f34b78..3e07b15738cb6 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs @@ -83,7 +83,7 @@ public override int InBufferSize { CheckPipePropertyOperations(); if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream); - return InternalHandle?.PipeSocket?.ReceiveBufferSize ?? 0; + return InternalHandle?.PipeSocket.ReceiveBufferSize ?? 0; } } @@ -93,7 +93,7 @@ public override int OutBufferSize { CheckPipePropertyOperations(); if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream); - return InternalHandle?.PipeSocket?.SendBufferSize ?? 0; + return InternalHandle?.PipeSocket.SendBufferSize ?? 0; } } diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs index 5ce0d72ff2eec..4628fe4fce0b2 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs @@ -155,7 +155,7 @@ public override int InBufferSize { CheckPipePropertyOperations(); if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream); - return InternalHandle?.PipeSocket?.ReceiveBufferSize ?? _inBufferSize; + return InternalHandle?.PipeSocket.ReceiveBufferSize ?? _inBufferSize; } } @@ -165,7 +165,7 @@ public override int OutBufferSize { CheckPipePropertyOperations(); if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream); - return InternalHandle?.PipeSocket?.SendBufferSize ?? _outBufferSize; + return InternalHandle?.PipeSocket.SendBufferSize ?? _outBufferSize; } } diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs index 51805c66f0218..d0f7de7962a1e 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs @@ -32,6 +32,156 @@ public abstract partial class PipeStream : Stream /// Prefix to prepend to all pipe names. private static readonly string s_pipePrefix = Path.Combine(Path.GetTempPath(), "CoreFxPipe_"); + public override int Read(byte[] buffer, int offset, int count) + { + ValidateBufferArguments(buffer, offset, count); + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + CheckReadOperations(); + + return ReadCore(new Span(buffer, offset, count)); + } + + public override int Read(Span buffer) + { + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + CheckReadOperations(); + + return ReadCore(buffer); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValidateBufferArguments(buffer, offset, count); + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + CheckReadOperations(); + + if (count == 0) + { + UpdateMessageCompletion(false); + return Task.FromResult(0); + } + + return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken)) + { + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + CheckReadOperations(); + + if (buffer.Length == 0) + { + UpdateMessageCompletion(false); + return new ValueTask(0); + } + + return ReadAsyncCore(buffer, cancellationToken); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + => TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state); + + public override int EndRead(IAsyncResult asyncResult) + => TaskToApm.End(asyncResult); + + public override void Write(byte[] buffer, int offset, int count) + { + ValidateBufferArguments(buffer, offset, count); + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + CheckWriteOperations(); + + WriteCore(new ReadOnlySpan(buffer, offset, count)); + } + + public override void Write(ReadOnlySpan buffer) + { + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + CheckWriteOperations(); + + WriteCore(buffer); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValidateBufferArguments(buffer, offset, count); + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + CheckWriteOperations(); + + if (count == 0) + { + return Task.CompletedTask; + } + + return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken)) + { + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + CheckWriteOperations(); + + if (buffer.Length == 0) + { + return default; + } + + return new ValueTask(WriteAsyncCore(buffer, cancellationToken)); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + => TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state); + + public override void EndWrite(IAsyncResult asyncResult) + => TaskToApm.End(asyncResult); + internal static string GetPipePath(string serverName, string pipeName) { if (serverName != "." && serverName != Interop.Sys.GetHostName()) @@ -94,9 +244,7 @@ internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle) /// Initializes the handle to be used asynchronously. private void InitializeAsyncHandle(SafePipeHandle handle, bool isAsync) - { - handle.PipeSocket!.Blocking = !isAsync; - } + { } internal virtual void DisposeCore(bool disposing) { @@ -108,13 +256,18 @@ private unsafe int ReadCore(Span buffer) Debug.Assert(_handle != null); DebugAssertHandleValid(_handle); + if (buffer.Length == 0) + { + return 0; + } + // For a blocking socket, we could simply use the same Read syscall as is done // for reading an anonymous pipe. However, for a non-blocking socket, Read could // end up returning EWOULDBLOCK rather than blocking waiting for data. Such a case // is already handled by Socket.Receive, so we use it here. try { - return _handle.PipeSocket!.Receive(buffer, SocketFlags.None); + return _handle!.PipeSocket.Receive(buffer, SocketFlags.None); } catch (SocketException e) { @@ -135,7 +288,7 @@ private unsafe void WriteCore(ReadOnlySpan buffer) { while (buffer.Length > 0) { - int bytesWritten = _handle.PipeSocket!.Send(buffer, SocketFlags.None); + int bytesWritten = _handle!.PipeSocket.Send(buffer, SocketFlags.None); buffer = buffer.Slice(bytesWritten); } } @@ -147,11 +300,9 @@ private unsafe void WriteCore(ReadOnlySpan buffer) private async ValueTask ReadAsyncCore(Memory destination, CancellationToken cancellationToken) { - Debug.Assert(this is NamedPipeClientStream || this is NamedPipeServerStream, $"Expected a named pipe, got a {GetType()}"); - try { - return await InternalHandle!.PipeSocket!.ReceiveAsync(destination, SocketFlags.None, cancellationToken).ConfigureAwait(false); + return await InternalHandle!.PipeSocket.ReceiveAsync(destination, SocketFlags.None, cancellationToken).ConfigureAwait(false); } catch (SocketException e) { @@ -161,13 +312,11 @@ private async ValueTask ReadAsyncCore(Memory destination, Cancellatio private async Task WriteAsyncCore(ReadOnlyMemory source, CancellationToken cancellationToken) { - Debug.Assert(this is NamedPipeClientStream || this is NamedPipeServerStream, $"Expected a named pipe, got a {GetType()}"); - try { while (source.Length > 0) { - int bytesWritten = await _handle!.PipeSocket!.SendAsync(source, SocketFlags.None, cancellationToken).ConfigureAwait(false); + int bytesWritten = await _handle!.PipeSocket.SendAsync(source, SocketFlags.None, cancellationToken).ConfigureAwait(false); Debug.Assert(bytesWritten > 0 && bytesWritten <= source.Length); source = source.Slice(bytesWritten); } diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs index 34abeeb05ec40..78d4666ad4f2c 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs @@ -16,6 +16,218 @@ public abstract partial class PipeStream : Stream internal const bool CheckOperationsRequiresSetHandle = true; internal ThreadPoolBoundHandle? _threadPoolBinding; + public override int Read(byte[] buffer, int offset, int count) + { + if (_isAsync) + { + return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + } + + ValidateBufferArguments(buffer, offset, count); + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + CheckReadOperations(); + + return ReadCore(new Span(buffer, offset, count)); + } + + public override int Read(Span buffer) + { + if (_isAsync) + { + return base.Read(buffer); + } + + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + CheckReadOperations(); + + return ReadCore(buffer); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValidateBufferArguments(buffer, offset, count); + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + CheckReadOperations(); + + if (!_isAsync) + { + return base.ReadAsync(buffer, offset, count, cancellationToken); + } + + if (count == 0) + { + UpdateMessageCompletion(false); + return Task.FromResult(0); + } + + return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken)) + { + if (!_isAsync) + { + return base.ReadAsync(buffer, cancellationToken); + } + + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + CheckReadOperations(); + + if (buffer.Length == 0) + { + UpdateMessageCompletion(false); + return new ValueTask(0); + } + + return ReadAsyncCore(buffer, cancellationToken); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + if (_isAsync) + return TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state); + else + return base.BeginRead(buffer, offset, count, callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + if (_isAsync) + return TaskToApm.End(asyncResult); + else + return base.EndRead(asyncResult); + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (_isAsync) + { + WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + return; + } + + ValidateBufferArguments(buffer, offset, count); + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + CheckWriteOperations(); + + WriteCore(new ReadOnlySpan(buffer, offset, count)); + } + + public override void Write(ReadOnlySpan buffer) + { + if (_isAsync) + { + base.Write(buffer); + return; + } + + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + CheckWriteOperations(); + + WriteCore(buffer); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValidateBufferArguments(buffer, offset, count); + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + CheckWriteOperations(); + + if (!_isAsync) + { + return base.WriteAsync(buffer, offset, count, cancellationToken); + } + + if (count == 0) + { + return Task.CompletedTask; + } + + return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken)) + { + if (!_isAsync) + { + return base.WriteAsync(buffer, cancellationToken); + } + + if (!CanWrite) + { + throw Error.GetWriteNotSupported(); + } + + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + CheckWriteOperations(); + + if (buffer.Length == 0) + { + return default; + } + + return new ValueTask(WriteAsyncCore(buffer, cancellationToken)); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + if (_isAsync) + return TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state); + else + return base.BeginWrite(buffer, offset, count, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + if (_isAsync) + TaskToApm.End(asyncResult); + else + base.EndWrite(asyncResult); + } + internal static string GetPipePath(string serverName, string pipeName) { string normalizedPipePath = Path.GetFullPath(@"\\" + serverName + @"\pipe\" + pipeName); diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs index 7f0947116b091..a41125bb1dc85 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs @@ -107,218 +107,6 @@ protected void InitializeHandle(SafePipeHandle? handle, bool isExposed, bool isA _isFromExistingHandle = isExposed; } - public override int Read(byte[] buffer, int offset, int count) - { - if (_isAsync) - { - return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); - } - - ValidateBufferArguments(buffer, offset, count); - if (!CanRead) - { - throw Error.GetReadNotSupported(); - } - CheckReadOperations(); - - return ReadCore(new Span(buffer, offset, count)); - } - - public override int Read(Span buffer) - { - if (_isAsync) - { - return base.Read(buffer); - } - - if (!CanRead) - { - throw Error.GetReadNotSupported(); - } - CheckReadOperations(); - - return ReadCore(buffer); - } - - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - ValidateBufferArguments(buffer, offset, count); - if (!CanRead) - { - throw Error.GetReadNotSupported(); - } - - if (cancellationToken.IsCancellationRequested) - { - return Task.FromCanceled(cancellationToken); - } - - CheckReadOperations(); - - if (!_isAsync) - { - return base.ReadAsync(buffer, offset, count, cancellationToken); - } - - if (count == 0) - { - UpdateMessageCompletion(false); - return Task.FromResult(0); - } - - return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask(); - } - - public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken)) - { - if (!_isAsync) - { - return base.ReadAsync(buffer, cancellationToken); - } - - if (!CanRead) - { - throw Error.GetReadNotSupported(); - } - - if (cancellationToken.IsCancellationRequested) - { - return ValueTask.FromCanceled(cancellationToken); - } - - CheckReadOperations(); - - if (buffer.Length == 0) - { - UpdateMessageCompletion(false); - return new ValueTask(0); - } - - return ReadAsyncCore(buffer, cancellationToken); - } - - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - if (_isAsync) - return TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state); - else - return base.BeginRead(buffer, offset, count, callback, state); - } - - public override int EndRead(IAsyncResult asyncResult) - { - if (_isAsync) - return TaskToApm.End(asyncResult); - else - return base.EndRead(asyncResult); - } - - public override void Write(byte[] buffer, int offset, int count) - { - if (_isAsync) - { - WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); - return; - } - - ValidateBufferArguments(buffer, offset, count); - if (!CanWrite) - { - throw Error.GetWriteNotSupported(); - } - CheckWriteOperations(); - - WriteCore(new ReadOnlySpan(buffer, offset, count)); - } - - public override void Write(ReadOnlySpan buffer) - { - if (_isAsync) - { - base.Write(buffer); - return; - } - - if (!CanWrite) - { - throw Error.GetWriteNotSupported(); - } - CheckWriteOperations(); - - WriteCore(buffer); - } - - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - ValidateBufferArguments(buffer, offset, count); - if (!CanWrite) - { - throw Error.GetWriteNotSupported(); - } - - if (cancellationToken.IsCancellationRequested) - { - return Task.FromCanceled(cancellationToken); - } - - CheckWriteOperations(); - - if (!_isAsync) - { - return base.WriteAsync(buffer, offset, count, cancellationToken); - } - - if (count == 0) - { - return Task.CompletedTask; - } - - return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken); - } - - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken)) - { - if (!_isAsync) - { - return base.WriteAsync(buffer, cancellationToken); - } - - if (!CanWrite) - { - throw Error.GetWriteNotSupported(); - } - - if (cancellationToken.IsCancellationRequested) - { - return ValueTask.FromCanceled(cancellationToken); - } - - CheckWriteOperations(); - - if (buffer.Length == 0) - { - return default; - } - - return new ValueTask(WriteAsyncCore(buffer, cancellationToken)); - } - - public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - if (_isAsync) - return TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state); - else - return base.BeginWrite(buffer, offset, count, callback, state); - } - - public override void EndWrite(IAsyncResult asyncResult) - { - if (_isAsync) - TaskToApm.End(asyncResult); - else - base.EndWrite(asyncResult); - } - [Conditional("DEBUG")] private static void DebugAssertHandleValid(SafePipeHandle handle) { From 3912dd6fefb68c65aee52228d2589d160f0575ce Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Thu, 3 Dec 2020 10:34:13 +0100 Subject: [PATCH 04/16] Refactor Socket creation --- .../Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs | 7 ++++--- .../System.Net.Sockets/ref/System.Net.Sockets.cs | 2 +- .../src/System/Net/Sockets/SafeSocketHandle.Unix.cs | 1 + .../src/System/Net/Sockets/SafeSocketHandle.cs | 3 --- .../src/System/Net/Sockets/Socket.Unix.cs | 8 ++++++++ 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs index 760e8e09f137b..c4946fc1048ff 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs @@ -14,6 +14,7 @@ namespace Microsoft.Win32.SafeHandles public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid { private const int DefaultInvalidHandle = -1; + private static Func? s_createSocketForPipe; // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe, and the // For named pipes, SafePipeHandle.handle is a copy of the file descriptor @@ -82,9 +83,9 @@ private Socket CreatePipeSocket(bool ownsHandle = true) { DangerousAddRef(ref refAdded); - var socketHandle = new SafeSocketHandle(handle, ownsHandle); - socketHandle.IsPipe = true; - socket = SetPipeSocketInterlocked(new Socket(socketHandle), ownsHandle); + Func createSocketForPipe = s_createSocketForPipe ?? + (s_createSocketForPipe = (Func)typeof(Socket).GetMethod("CreateForPipeSafeHandle", BindingFlags.Static | BindingFlags.Public)!.CreateDelegate(typeof(Func))); + socket = SetPipeSocketInterlocked(createSocketForPipe(handle, ownsHandle), ownsHandle); if (_disposed == 1) { diff --git a/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs b/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs index 193720d3d125a..27efd4549d752 100644 --- a/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs +++ b/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs @@ -223,7 +223,6 @@ public sealed partial class SafeSocketHandle : Microsoft.Win32.SafeHandles.SafeH public SafeSocketHandle() : base (default(bool)) { } public SafeSocketHandle(System.IntPtr preexistingHandle, bool ownsHandle) : base (default(bool)) { } protected override bool ReleaseHandle() { throw null; } - public bool IsPipe { get { throw null; } set { } } } public enum SelectMode { @@ -257,6 +256,7 @@ public SendPacketsElement(string filepath, long offset, int count, bool endOfPac } public partial class Socket : System.IDisposable { + public static Socket CreateForPipeSafeHandle(System.IntPtr handle, bool ownsHandle) { throw null; } public Socket(System.Net.Sockets.SafeSocketHandle handle) { } public Socket(System.Net.Sockets.AddressFamily addressFamily, System.Net.Sockets.SocketType socketType, System.Net.Sockets.ProtocolType protocolType) { } [System.Runtime.Versioning.SupportedOSPlatformAttribute("windows")] diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs index dd6df4284416d..ec922acc4ecf5 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs @@ -23,6 +23,7 @@ public partial class SafeSocketHandle internal bool DualMode { get; set; } internal bool ExposedHandleOrUntrackedConfiguration { get; private set; } internal bool PreferInlineCompletions { get; set; } = SocketAsyncEngine.InlineSocketCompletionsEnabled; + internal bool IsPipe { get; set; } // (ab)use Socket class for performing async I/O on pipes. internal void RegisterConnectResult(SocketError error) { diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs index 9abb2b71aa1e9..bce056e31e2bb 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.cs @@ -31,9 +31,6 @@ public sealed partial class SafeSocketHandle : SafeHandleMinusOneIsInvalid #endif private int _ownClose; - // TODO: make this Unix internal. - public bool IsPipe { get; set; } // (ab)use Socket class for performing async I/O on pipes. - public SafeSocketHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle) { diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs index b984da05b9dca..1b0fd9ad1ba13 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs @@ -10,6 +10,14 @@ namespace System.Net.Sockets { public partial class Socket { + // TODO: make internal + public static Socket CreateForPipeSafeHandle(IntPtr handle, bool ownsHandle) + { + var socketHandle = new SafeSocketHandle(handle, ownsHandle); + socketHandle.IsPipe = true; + return new Socket(socketHandle); + } + [SupportedOSPlatform("windows")] public Socket(SocketInformation socketInformation) { From aa2711175e947950108da119bb21f671305d0fa6 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Wed, 9 Dec 2020 09:41:39 +0100 Subject: [PATCH 05/16] Cleanup --- .../src/System/IO/Pipes/PipeStream.Unix.cs | 3 ++- .../src/System/IO/Pipes/PipeStream.Windows.cs | 12 +++++------- .../src/System/IO/Pipes/PipeStream.cs | 4 ++-- .../src/System/Net/Sockets/SocketPal.Unix.cs | 2 +- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs index d0f7de7962a1e..2fa979b222a81 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs @@ -243,7 +243,8 @@ internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle) } /// Initializes the handle to be used asynchronously. - private void InitializeAsyncHandle(SafePipeHandle handle, bool isAsync) + /// The handle. + private void InitializeAsyncHandle(SafePipeHandle handle) { } internal virtual void DisposeCore(bool disposing) diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs index 78d4666ad4f2c..928cc5d1a081f 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs @@ -250,14 +250,12 @@ internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle) } /// Initializes the handle to be used asynchronously. - private void InitializeAsyncHandle(SafePipeHandle handle, bool isAsync) + /// The handle. + private void InitializeAsyncHandle(SafePipeHandle handle) { - if (isAsync) - { - // If the handle is of async type, bind the handle to the ThreadPool so that we can use - // the async operations (it's needed so that our native callbacks get called). - _threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); - } + // If the handle is of async type, bind the handle to the ThreadPool so that we can use + // the async operations (it's needed so that our native callbacks get called). + _threadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); } private void DisposeCore(bool disposing) diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs index a41125bb1dc85..6df150682be21 100644 --- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs +++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs @@ -94,9 +94,9 @@ private void Init(PipeDirection direction, PipeTransmissionMode transmissionMode // This method may also be called to uninitialize a handle, setting it to null. protected void InitializeHandle(SafePipeHandle? handle, bool isExposed, bool isAsync) { - if (handle != null) + if (isAsync && handle != null) { - InitializeAsyncHandle(handle, isAsync); + InitializeAsyncHandle(handle); } _handle = handle; diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs index e33e320a2c301..804a1b5c3ff3c 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @@ -905,7 +905,7 @@ public static bool TryCompleteSendTo(SafeSocketHandle socket, IList), buffers, ref bufferIndex, ref offset, ref count, flags, socketAddress, socketAddressLen, ref bytesSent, out errorCode); } - public static unsafe bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan buffer, IList>? buffers, ref int bufferIndex, ref int offset, ref int count, SocketFlags flags, byte[]? socketAddress, int socketAddressLen, ref int bytesSent, out SocketError errorCode) + public static bool TryCompleteSendTo(SafeSocketHandle socket, ReadOnlySpan buffer, IList>? buffers, ref int bufferIndex, ref int offset, ref int count, SocketFlags flags, byte[]? socketAddress, int socketAddressLen, ref int bytesSent, out SocketError errorCode) { bool successfulSend = false; long start = socket.IsUnderlyingBlocking && socket.SendTimeout > 0 ? Environment.TickCount64 : 0; // Get ticks only if timeout is set and socket is blocking. From 99f4a01bcd5faffc2000debd63f4fce88ffa62d2 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Tue, 2 Feb 2021 11:10:06 +0100 Subject: [PATCH 06/16] Fix broken comment --- .../src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs index c4946fc1048ff..bb277c17806fa 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs @@ -16,7 +16,7 @@ public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid private const int DefaultInvalidHandle = -1; private static Func? s_createSocketForPipe; - // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe, and the + // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe. // For named pipes, SafePipeHandle.handle is a copy of the file descriptor // extracted from the Socket's SafeHandle. // This allows operations related to file descriptors to be performed directly on the SafePipeHandle, From b2d9433629a84337f945fdd734476c5a5fd09206 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Wed, 3 Feb 2021 15:52:21 +0100 Subject: [PATCH 07/16] Allow passing pipe fd to Socket/SafeSocketHandle public ctor --- .../Win32/SafeHandles/SafePipeHandle.Unix.cs | 5 +- .../ref/System.Net.Sockets.cs | 1 - .../Net/Sockets/SafeSocketHandle.Unix.cs | 1 + .../src/System/Net/Sockets/Socket.Unix.cs | 26 ++++------ .../FunctionalTests/CreateSocketTests.cs | 49 +++++++++++++++---- 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs index bb277c17806fa..ea4c98560cb11 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs @@ -14,7 +14,6 @@ namespace Microsoft.Win32.SafeHandles public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid { private const int DefaultInvalidHandle = -1; - private static Func? s_createSocketForPipe; // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe. // For named pipes, SafePipeHandle.handle is a copy of the file descriptor @@ -83,9 +82,7 @@ private Socket CreatePipeSocket(bool ownsHandle = true) { DangerousAddRef(ref refAdded); - Func createSocketForPipe = s_createSocketForPipe ?? - (s_createSocketForPipe = (Func)typeof(Socket).GetMethod("CreateForPipeSafeHandle", BindingFlags.Static | BindingFlags.Public)!.CreateDelegate(typeof(Func))); - socket = SetPipeSocketInterlocked(createSocketForPipe(handle, ownsHandle), ownsHandle); + socket = SetPipeSocketInterlocked(new Socket(new SafeSocketHandle(handle, ownsHandle)), ownsHandle); if (_disposed == 1) { diff --git a/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs b/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs index 27efd4549d752..2da5179682d37 100644 --- a/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs +++ b/src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs @@ -256,7 +256,6 @@ public SendPacketsElement(string filepath, long offset, int count, bool endOfPac } public partial class Socket : System.IDisposable { - public static Socket CreateForPipeSafeHandle(System.IntPtr handle, bool ownsHandle) { throw null; } public Socket(System.Net.Sockets.SafeSocketHandle handle) { } public Socket(System.Net.Sockets.AddressFamily addressFamily, System.Net.Sockets.SocketType socketType, System.Net.Sockets.ProtocolType protocolType) { } [System.Runtime.Versioning.SupportedOSPlatformAttribute("windows")] diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs index ec922acc4ecf5..8c840e263218b 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs @@ -289,6 +289,7 @@ private unsafe SocketError DoCloseHandle(bool abortive) case Interop.Error.SUCCESS: case Interop.Error.EINVAL: case Interop.Error.ENOPROTOOPT: + case Interop.Error.ENOTSOCK: errorCode = CloseHandle(handle); break; diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs index 1b0fd9ad1ba13..bf8d12ac15b25 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs @@ -10,14 +10,6 @@ namespace System.Net.Sockets { public partial class Socket { - // TODO: make internal - public static Socket CreateForPipeSafeHandle(IntPtr handle, bool ownsHandle) - { - var socketHandle = new SafeSocketHandle(handle, ownsHandle); - socketHandle.IsPipe = true; - return new Socket(socketHandle); - } - [SupportedOSPlatform("windows")] public Socket(SocketInformation socketInformation) { @@ -85,24 +77,24 @@ partial void ValidateForMultiConnect(bool isMultiEndpoint) private static unsafe void LoadSocketTypeFromHandle( SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening, out bool isSocket) { + if (Interop.Sys.FStat(handle, out Interop.Sys.FileStatus stat) == -1) + { + throw new SocketException((int)SocketError.NotSocket); + } + isSocket = (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) == Interop.Sys.FileTypes.S_IFSOCK; + + // If it is not a socket, treat it like a pipe. + handle.IsPipe = !isSocket; + if (handle.IsPipe) { addressFamily = AddressFamily.Unknown; socketType = SocketType.Unknown; protocolType = ProtocolType.Unknown; isListening = false; - isSocket = false; } else { - // Validate that the supplied handle is indeed a socket. - if (Interop.Sys.FStat(handle, out Interop.Sys.FileStatus stat) == -1 || - (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) != Interop.Sys.FileTypes.S_IFSOCK) - { - throw new SocketException((int)SocketError.NotSocket); - } - isSocket = true; - // On Linux, GetSocketType will be able to query SO_DOMAIN, SO_TYPE, and SO_PROTOCOL to get the // address family, socket type, and protocol type, respectively. On macOS, this will only succeed // in getting the socket type, and the others will be unknown. Subsequently the Socket ctor diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs index c291583fd54d7..6445b6bf08317 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.ComponentModel; using System.Diagnostics; using System.IO; using System.IO.Pipes; @@ -244,17 +245,28 @@ public void Ctor_Netcoreapp_Success(AddressFamily addressFamily) s.Close(); } - [Fact] - public void Ctor_SafeHandle_Invalid_ThrowsException() + [Theory] + [InlineData(true)] + [InlineData(false)] + [PlatformSpecific(TestPlatforms.Linux)] + public void Ctor_SafeHandle_FromPipeHandle_Ctor_Dispose_Success(bool ownsHandle) { - AssertExtensions.Throws("handle", () => new Socket(null)); - AssertExtensions.Throws("handle", () => new Socket(new SafeSocketHandle((IntPtr)(-1), false))); + (int fd1, int fd2) = pipe2(); + close(fd2); - using (var pipe = new AnonymousPipeServerStream()) - { - SocketException se = Assert.Throws(() => new Socket(new SafeSocketHandle(pipe.ClientSafePipeHandle.DangerousGetHandle(), false))); - Assert.Equal(SocketError.NotSocket, se.SocketErrorCode); - } + using var _ = new SafeSocketHandle(new IntPtr(fd1), ownsHandle); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + [PlatformSpecific(TestPlatforms.Linux)] + public void Ctor_Socket_FromPipeHandle_Ctor_Dispose_Success(bool ownsHandle) + { + (int fd1, int fd2) = pipe2(); + close(fd2); + + using var _ = new Socket(new SafeSocketHandle(new IntPtr(fd1), ownsHandle)); } [Theory] @@ -624,6 +636,25 @@ public unsafe void Ctor_SafeHandle_UnknownSocket_Success() [DllImport("libc")] private static extern int close(int fd); + [DllImport("libc", SetLastError = true)] + private static unsafe extern int pipe2(int* pipefd, int flags); + + private static unsafe (int, int) pipe2(int flags = 0) + { + Span pipefd = stackalloc int[2]; + fixed (int* ptr = pipefd) + { + if (pipe2(ptr, flags) == 0) + { + return (pipefd[0], pipefd[1]); + } + else + { + throw new Win32Exception(); + } + } + } + [Fact] [PlatformSpecific(TestPlatforms.AnyUnix)] public unsafe void Ctor_SafeHandle_SocketPair_Success() From 6ccc342417b23fbfa3a946913aa2ce5e4db6f1b0 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Thu, 18 Feb 2021 16:18:50 +0100 Subject: [PATCH 08/16] macOS: Handle EPIPE on registration --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 29 ++++++++++++++++--- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 18 ++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 4d15435d3567e..a9f5e9cdc2f55 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -789,7 +789,18 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation if (!context.IsRegistered) { - context.Register(); + // TryRegister throws for fatal errors. + // When it returns false the registration failed because the peer has closed. + // We'll retry the operation, and abort when it fails to complete. + if (!context.TryRegister()) + { + if (!operation.TryComplete(context)) + { + operation.DoAbort(); + } + Trace(context, $"Leave, not registered"); + return false; + } } while (true) @@ -1224,7 +1235,7 @@ public bool PreferInlineCompletions get => _socket.PreferInlineCompletions; } - private void Register() + private bool TryRegister() { Debug.Assert(_nonBlockingSet); lock (_registerLock) @@ -1236,9 +1247,18 @@ private void Register() { _socket.DangerousAddRef(ref addedRef); IntPtr handle = _socket.DangerousGetHandle(); - Volatile.Write(ref _asyncEngine, SocketAsyncEngine.RegisterSocket(handle, this)); + if (SocketAsyncEngine.TryRegisterSocket(handle, this, out SocketAsyncEngine? engine)) + { + Volatile.Write(ref _asyncEngine, engine); - Trace("Registered"); + Trace("Registered"); + return true; + } + else + { + Trace("Registration failed"); + return false; + } } finally { @@ -1248,6 +1268,7 @@ private void Register() } } } + return true; } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 28e7c3a092ef3..532298c03c9f4 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -97,15 +97,16 @@ private static SocketAsyncEngine[] CreateEngines() // // Registers the Socket with a SocketAsyncEngine, and returns the associated engine. // - public static SocketAsyncEngine RegisterSocket(IntPtr socketHandle, SocketAsyncContext context) + public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine) { int engineIndex = Math.Abs(Interlocked.Increment(ref s_allocateFromEngine) % s_engines.Length); - SocketAsyncEngine engine = s_engines[engineIndex]; - engine.RegisterCore(socketHandle, context); - return engine; + SocketAsyncEngine nextEngine = s_engines[engineIndex]; + bool registered = nextEngine.TryRegisterCore(socketHandle, context); + engine = registered ? nextEngine : null; + return registered; } - private void RegisterCore(IntPtr socketHandle, SocketAsyncContext context) + private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context) { bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context)); if (!added) @@ -119,10 +120,15 @@ private void RegisterCore(IntPtr socketHandle, SocketAsyncContext context) Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle); if (error == Interop.Error.SUCCESS) { - return; + return true; } _handleToContextMap.TryRemove(socketHandle, out _); + // macOS: kevent returns EPIPE when adding pipe fd for which the other end is closed. + if (error == Interop.Error.EPIPE) + { + return false; + } if (error == Interop.Error.ENOMEM || error == Interop.Error.ENOSPC) { throw new OutOfMemoryException(); From a2625f0553f5623d54d13dd6cf06e092ed3b84c3 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Mon, 22 Feb 2021 09:01:35 +0100 Subject: [PATCH 09/16] Refactor IsPipe to IsSocket --- .../Net/Sockets/SafeSocketHandle.Unix.cs | 6 ++--- .../src/System/Net/Sockets/Socket.Unix.cs | 19 ++++++++-------- .../src/System/Net/Sockets/SocketPal.Unix.cs | 22 +++++++++---------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs index 8c840e263218b..852b25f9c0f3e 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs @@ -23,7 +23,7 @@ public partial class SafeSocketHandle internal bool DualMode { get; set; } internal bool ExposedHandleOrUntrackedConfiguration { get; private set; } internal bool PreferInlineCompletions { get; set; } = SocketAsyncEngine.InlineSocketCompletionsEnabled; - internal bool IsPipe { get; set; } // (ab)use Socket class for performing async I/O on pipes. + internal bool IsSocket { get; set; } = true; // (ab)use Socket class for performing async I/O on non-socket fds. internal void RegisterConnectResult(SocketError error) { @@ -44,7 +44,7 @@ internal void TransferTrackedState(SafeSocketHandle target) target.LastConnectFailed = LastConnectFailed; target.DualMode = DualMode; target.ExposedHandleOrUntrackedConfiguration = ExposedHandleOrUntrackedConfiguration; - target.IsPipe = IsPipe; + target.IsSocket = IsSocket; } internal void SetExposed() => ExposedHandleOrUntrackedConfiguration = true; @@ -240,7 +240,7 @@ private unsafe SocketError DoCloseHandle(bool abortive) { Interop.Error errorCode = Interop.Error.SUCCESS; - if (IsPipe) + if (!IsSocket) { return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs index bf8d12ac15b25..79ba1e95c1d3f 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs @@ -83,17 +83,9 @@ private static unsafe void LoadSocketTypeFromHandle( } isSocket = (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) == Interop.Sys.FileTypes.S_IFSOCK; - // If it is not a socket, treat it like a pipe. - handle.IsPipe = !isSocket; + handle.IsSocket = isSocket; - if (handle.IsPipe) - { - addressFamily = AddressFamily.Unknown; - socketType = SocketType.Unknown; - protocolType = ProtocolType.Unknown; - isListening = false; - } - else + if (isSocket) { // On Linux, GetSocketType will be able to query SO_DOMAIN, SO_TYPE, and SO_PROTOCOL to get the // address family, socket type, and protocol type, respectively. On macOS, this will only succeed @@ -102,6 +94,13 @@ private static unsafe void LoadSocketTypeFromHandle( Interop.Error e = Interop.Sys.GetSocketType(handle, out addressFamily, out socketType, out protocolType, out isListening); Debug.Assert(e == Interop.Error.SUCCESS, e.ToString()); } + else + { + addressFamily = AddressFamily.Unknown; + socketType = SocketType.Unknown; + protocolType = ProtocolType.Unknown; + isListening = false; + } // Get whether the socket is in non-blocking mode. On Unix, we automatically put the underlying // Socket into non-blocking mode whenever an async method is first invoked on the instance, but we diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs index 804a1b5c3ff3c..20c421440ae39 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @@ -106,7 +106,7 @@ private static unsafe int SysRead(SafeSocketHandle socket, Span buffer, ou private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Span buffer, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); int received = 0; @@ -130,7 +130,7 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Span buffer, byte[]? socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); Debug.Assert(socketAddress != null || socketAddressLen == 0, $"Unexpected values: socketAddress={socketAddress}, socketAddressLen={socketAddressLen}"); @@ -195,7 +195,7 @@ private static unsafe int SysWrite(SafeSocketHandle socket, ReadOnlySpan b private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); int sent; fixed (byte* b = &MemoryMarshal.GetReference(buffer)) @@ -220,7 +220,7 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, ReadOnlySpan buffer, ref int offset, ref int count, byte[] socketAddress, int socketAddressLen, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); int sent; fixed (byte* sockAddr = socketAddress) @@ -262,7 +262,7 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, IList> buffers, ref int bufferIndex, ref int offset, byte[]? socketAddress, int socketAddressLen, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); // Pin buffers and set up iovecs. int startIndex = bufferIndex, startOffset = offset; @@ -358,7 +358,7 @@ private static unsafe long SendFile(SafeSocketHandle socket, SafeFileHandle file private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, IList> buffers, byte[]? socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); int maxBuffers = buffers.Count; bool allocOnStack = maxBuffers <= IovStackThreshold; @@ -459,7 +459,7 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, private static unsafe int SysReceiveMessageFrom(SafeSocketHandle socket, SocketFlags flags, Span buffer, byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); Debug.Assert(socketAddress != null, "Expected non-null socketAddress"); int cmsgBufferLen = Interop.Sys.GetControlMessageBufferSize(Convert.ToInt32(isIPv4), Convert.ToInt32(isIPv6)); @@ -513,7 +513,7 @@ private static unsafe int SysReceiveMessageFrom( byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, out Interop.Error errno) { - Debug.Assert(!socket.IsPipe); + Debug.Assert(socket.IsSocket); Debug.Assert(socketAddress != null, "Expected non-null socketAddress"); int buffersCount = buffers.Count; @@ -727,7 +727,7 @@ public static unsafe bool TryCompleteReceive(SafeSocketHandle socket, Span Interop.Error errno; int received; - if (socket.IsPipe) + if (!socket.IsSocket) { Debug.Assert(flags == SocketFlags.None); received = SysRead(socket, buffer, out errno); @@ -786,7 +786,7 @@ public static unsafe bool TryCompleteReceiveFrom(SafeSocketHandle socket, Span Interop.Error errno; try { - if (socket.IsPipe) + if (!socket.IsSocket) { Debug.Assert(flags == SocketFlags.None); Debug.Assert(buffers == null); From 25dc01489fc45d5520e716a7bcd7f5fd7201a679 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Mon, 22 Feb 2021 09:20:40 +0100 Subject: [PATCH 10/16] Move registration error handling in to single method --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 40 ++++++++++++++----- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 22 +++------- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index a9f5e9cdc2f55..61c0783da9bc7 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -789,15 +789,10 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation if (!context.IsRegistered) { - // TryRegister throws for fatal errors. - // When it returns false the registration failed because the peer has closed. - // We'll retry the operation, and abort when it fails to complete. - if (!context.TryRegister()) + if (!context.TryRegister(out Interop.Error error)) { - if (!operation.TryComplete(context)) - { - operation.DoAbort(); - } + HandleFailedRegistration(context, operation, error); + Trace(context, $"Leave, not registered"); return false; } @@ -878,6 +873,30 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation return false; } } + + static void HandleFailedRegistration(SocketAsyncContext context, TOperation operation, Interop.Error error) + { + Debug.Assert(error != Interop.Error.SUCCESS); + + // macOS: kevent returns EPIPE when adding pipe fd for which the other end is closed. + if (error == Interop.Error.EPIPE) + { + // Retry the operation. + if (operation.TryComplete(context)) + { + return; + } + } + + if (error == Interop.Error.ENOMEM || error == Interop.Error.ENOSPC) + { + throw new OutOfMemoryException(); + } + else + { + throw new InternalException(error); + } + } } public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context, bool skipAsyncEvents = false) @@ -1235,7 +1254,7 @@ public bool PreferInlineCompletions get => _socket.PreferInlineCompletions; } - private bool TryRegister() + private bool TryRegister(out Interop.Error error) { Debug.Assert(_nonBlockingSet); lock (_registerLock) @@ -1247,7 +1266,7 @@ private bool TryRegister() { _socket.DangerousAddRef(ref addedRef); IntPtr handle = _socket.DangerousGetHandle(); - if (SocketAsyncEngine.TryRegisterSocket(handle, this, out SocketAsyncEngine? engine)) + if (SocketAsyncEngine.TryRegisterSocket(handle, this, out SocketAsyncEngine? engine, out error)) { Volatile.Write(ref _asyncEngine, engine); @@ -1268,6 +1287,7 @@ private bool TryRegister() } } } + error = Interop.Error.SUCCESS; return true; } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 532298c03c9f4..983ca51ed27ee 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -97,16 +97,16 @@ private static SocketAsyncEngine[] CreateEngines() // // Registers the Socket with a SocketAsyncEngine, and returns the associated engine. // - public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine) + public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine, out Interop.Error error) { int engineIndex = Math.Abs(Interlocked.Increment(ref s_allocateFromEngine) % s_engines.Length); SocketAsyncEngine nextEngine = s_engines[engineIndex]; - bool registered = nextEngine.TryRegisterCore(socketHandle, context); + bool registered = nextEngine.TryRegisterCore(socketHandle, context, out error); engine = registered ? nextEngine : null; return registered; } - private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context) + private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error) { bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context)); if (!added) @@ -116,7 +116,7 @@ private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context) throw new InvalidOperationException(SR.net_sockets_handle_already_used); } - Interop.Error error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None, + error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None, Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle); if (error == Interop.Error.SUCCESS) { @@ -124,19 +124,7 @@ private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context) } _handleToContextMap.TryRemove(socketHandle, out _); - // macOS: kevent returns EPIPE when adding pipe fd for which the other end is closed. - if (error == Interop.Error.EPIPE) - { - return false; - } - if (error == Interop.Error.ENOMEM || error == Interop.Error.ENOSPC) - { - throw new OutOfMemoryException(); - } - else - { - throw new InternalException(error); - } + return false; } public void UnregisterSocket(IntPtr socketHandle) From 4811590cf9b668eb695013b0371a3870a7a90174 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Wed, 3 Mar 2021 17:15:03 +0100 Subject: [PATCH 11/16] Process: use AnonymousPipeClientStream for redirected streams. --- .../src/System.Diagnostics.Process.csproj | 1 + .../src/System/Diagnostics/Process.Unix.cs | 15 +++++++-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj index 8896c6b0bfd16..5fc815d5dd868 100644 --- a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj +++ b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj @@ -342,6 +342,7 @@ + diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs index fb52afebdf2a2..f3212ab9e607a 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.ComponentModel; using System.IO; +using System.IO.Pipes; using System.Security; using System.Text; using System.Threading; @@ -445,20 +446,20 @@ private bool StartCore(ProcessStartInfo startInfo) if (startInfo.RedirectStandardInput) { Debug.Assert(stdinFd >= 0); - _standardInput = new StreamWriter(OpenStream(stdinFd, FileAccess.Write), + _standardInput = new StreamWriter(OpenStream(stdinFd, PipeDirection.Out), startInfo.StandardInputEncoding ?? Encoding.Default, StreamBufferSize) { AutoFlush = true }; } if (startInfo.RedirectStandardOutput) { Debug.Assert(stdoutFd >= 0); - _standardOutput = new StreamReader(OpenStream(stdoutFd, FileAccess.Read), + _standardOutput = new StreamReader(OpenStream(stdoutFd, PipeDirection.In), startInfo.StandardOutputEncoding ?? Encoding.Default, true, StreamBufferSize); } if (startInfo.RedirectStandardError) { Debug.Assert(stderrFd >= 0); - _standardError = new StreamReader(OpenStream(stderrFd, FileAccess.Read), + _standardError = new StreamReader(OpenStream(stderrFd, PipeDirection.In), startInfo.StandardErrorEncoding ?? Encoding.Default, true, StreamBufferSize); } @@ -753,14 +754,12 @@ internal static TimeSpan TicksToTimeSpan(double ticks) /// Opens a stream around the specified file descriptor and with the specified access. /// The file descriptor. - /// The access mode. + /// The pipe direction. /// The opened stream. - private static FileStream OpenStream(int fd, FileAccess access) + private static Stream OpenStream(int fd, PipeDirection direction) { Debug.Assert(fd >= 0); - return new FileStream( - new SafeFileHandle((IntPtr)fd, ownsHandle: true), - access, StreamBufferSize, isAsync: false); + return new BufferedStream(new AnonymousPipeClientStream(direction, fd.ToString()), StreamBufferSize); } /// Parses a command-line argument string into a list of arguments. From fdbd7f01f7ab8f36730e2fe82740549f76b87809 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Wed, 3 Mar 2021 22:20:14 +0100 Subject: [PATCH 12/16] Remove unneeded BufferedStream --- .../src/System/Diagnostics/Process.Unix.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs index f3212ab9e607a..7024eb6c548da 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs @@ -4,6 +4,7 @@ using Microsoft.Win32.SafeHandles; using System.Collections.Generic; using System.ComponentModel; +using System.Globalization; using System.IO; using System.IO.Pipes; using System.Security; @@ -759,7 +760,7 @@ internal static TimeSpan TicksToTimeSpan(double ticks) private static Stream OpenStream(int fd, PipeDirection direction) { Debug.Assert(fd >= 0); - return new BufferedStream(new AnonymousPipeClientStream(direction, fd.ToString()), StreamBufferSize); + return new AnonymousPipeClientStream(direction, fd.ToString(CultureInfo.InvariantCulture)); } /// Parses a command-line argument string into a list of arguments. From f4e6480decbe3bee7526422596078f70b1f8cd26 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Thu, 4 Mar 2021 10:36:25 +0100 Subject: [PATCH 13/16] PR feedback --- .../src/System/Diagnostics/Process.Unix.cs | 3 +- .../tests/ProcessStreamReadTests.cs | 1 - .../Win32/SafeHandles/SafePipeHandle.Unix.cs | 5 +- .../Net/Sockets/SafeSocketHandle.Unix.cs | 88 +++++++++---------- .../Net/Sockets/SocketAsyncContext.Unix.cs | 14 ++- .../src/System/Net/Sockets/SocketPal.Unix.cs | 6 +- 6 files changed, 56 insertions(+), 61 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs index 7024eb6c548da..f6508b0b8a26e 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs @@ -4,7 +4,6 @@ using Microsoft.Win32.SafeHandles; using System.Collections.Generic; using System.ComponentModel; -using System.Globalization; using System.IO; using System.IO.Pipes; using System.Security; @@ -760,7 +759,7 @@ internal static TimeSpan TicksToTimeSpan(double ticks) private static Stream OpenStream(int fd, PipeDirection direction) { Debug.Assert(fd >= 0); - return new AnonymousPipeClientStream(direction, fd.ToString(CultureInfo.InvariantCulture)); + return new AnonymousPipeClientStream(direction, new SafePipeHandle((IntPtr)fd, ownsHandle: true)); } /// Parses a command-line argument string into a list of arguments. diff --git a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs index 577a9f1a06137..39e068d409dd6 100644 --- a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs +++ b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs @@ -344,7 +344,6 @@ async private Task WaitPipeSignal(PipeStream pipe, int millisecond) } } - [ActiveIssue("https://github.com/dotnet/runtime/issues/44329")] [PlatformSpecific(~TestPlatforms.Windows)] // currently on Windows these operations async-over-sync on Windows [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] public async Task ReadAsync_OutputStreams_Cancel_RespondsQuickly() diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs index ea4c98560cb11..e52957f40aa69 100644 --- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs +++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs @@ -40,8 +40,7 @@ protected override void Dispose(bool disposing) { base.Dispose(disposing); // must be called before trying to Dispose the socket _disposed = 1; - Socket? socket; - if (disposing && (socket = Volatile.Read(ref _pipeSocket)) != null) + if (disposing && Volatile.Read(ref _pipeSocket) is Socket socket) { socket.Dispose(); _pipeSocket = null; @@ -84,6 +83,8 @@ private Socket CreatePipeSocket(bool ownsHandle = true) socket = SetPipeSocketInterlocked(new Socket(new SafeSocketHandle(handle, ownsHandle)), ownsHandle); + // Double check if we haven't Disposed in the meanwhile, and ensure + // the Socket is disposed, in case Dispose() missed the _pipeSocket assignment. if (_disposed == 1) { Volatile.Write(ref _pipeSocket, null); diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs index 852b25f9c0f3e..1cef1b1d39f46 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SafeSocketHandle.Unix.cs @@ -244,60 +244,58 @@ private unsafe SocketError DoCloseHandle(bool abortive) { return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); } - else - { - // If abortive is not set, we're not running on the finalizer thread, so it's safe to block here. - // We can honor the linger options set on the socket. It also means closesocket() might return - // EWOULDBLOCK, in which case we need to do some recovery. - if (!abortive) - { - if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle} Following 'non-abortive' branch."); - - // Close, and if its errno is other than EWOULDBLOCK, there's nothing more to do - we either succeeded or failed. - errorCode = CloseHandle(handle); - if (errorCode != Interop.Error.EWOULDBLOCK) - { - return SocketPal.GetSocketErrorForErrorCode(errorCode); - } - // The socket must be non-blocking with a linger timeout set. - // We have to set the socket to blocking. - if (Interop.Sys.Fcntl.DangerousSetIsNonBlocking(handle, 0) == 0) - { - // The socket successfully made blocking; retry the close(). - return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); - } + // If abortive is not set, we're not running on the finalizer thread, so it's safe to block here. + // We can honor the linger options set on the socket. It also means closesocket() might return + // EWOULDBLOCK, in which case we need to do some recovery. + if (!abortive) + { + if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle} Following 'non-abortive' branch."); - // The socket could not be made blocking; fall through to the regular abortive close. + // Close, and if its errno is other than EWOULDBLOCK, there's nothing more to do - we either succeeded or failed. + errorCode = CloseHandle(handle); + if (errorCode != Interop.Error.EWOULDBLOCK) + { + return SocketPal.GetSocketErrorForErrorCode(errorCode); } - // By default or if the non-abortive path failed, set linger timeout to zero to get an abortive close (RST). - var linger = new Interop.Sys.LingerOption + // The socket must be non-blocking with a linger timeout set. + // We have to set the socket to blocking. + if (Interop.Sys.Fcntl.DangerousSetIsNonBlocking(handle, 0) == 0) { - OnOff = 1, - Seconds = 0 - }; + // The socket successfully made blocking; retry the close(). + return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle)); + } - errorCode = Interop.Sys.SetLingerOption(handle, &linger); - #if DEBUG - _closeSocketLinger = SocketPal.GetSocketErrorForErrorCode(errorCode); - #endif - if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle}, setsockopt():{errorCode}"); + // The socket could not be made blocking; fall through to the regular abortive close. + } - switch (errorCode) - { - case Interop.Error.SUCCESS: - case Interop.Error.EINVAL: - case Interop.Error.ENOPROTOOPT: - case Interop.Error.ENOTSOCK: - errorCode = CloseHandle(handle); - break; - - // For other errors, it's too dangerous to try closesocket() - it might block! - } + // By default or if the non-abortive path failed, set linger timeout to zero to get an abortive close (RST). + var linger = new Interop.Sys.LingerOption + { + OnOff = 1, + Seconds = 0 + }; + + errorCode = Interop.Sys.SetLingerOption(handle, &linger); +#if DEBUG + _closeSocketLinger = SocketPal.GetSocketErrorForErrorCode(errorCode); +#endif + if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"handle:{handle}, setsockopt():{errorCode}"); + + switch (errorCode) + { + case Interop.Error.SUCCESS: + case Interop.Error.EINVAL: + case Interop.Error.ENOPROTOOPT: + case Interop.Error.ENOTSOCK: + errorCode = CloseHandle(handle); + break; - return SocketPal.GetSocketErrorForErrorCode(errorCode); + // For other errors, it's too dangerous to try closesocket() - it might block! } + + return SocketPal.GetSocketErrorForErrorCode(errorCode); } private Interop.Error CloseHandle(IntPtr handle) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 61c0783da9bc7..bf57c9311602c 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -787,15 +787,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation { Trace(context, $"Enter"); - if (!context.IsRegistered) + if (!context.IsRegistered && !context.TryRegister(out Interop.Error error)) { - if (!context.TryRegister(out Interop.Error error)) - { - HandleFailedRegistration(context, operation, error); + HandleFailedRegistration(context, operation, error); - Trace(context, $"Leave, not registered"); - return false; - } + Trace(context, "Leave, not registered"); + return false; } while (true) @@ -881,7 +878,8 @@ static void HandleFailedRegistration(SocketAsyncContext context, TOperation oper // macOS: kevent returns EPIPE when adding pipe fd for which the other end is closed. if (error == Interop.Error.EPIPE) { - // Retry the operation. + // Because the other end close, we expect the operation to complete when we retry it. + // If it doesn't, we fall through and throw an Exception. if (operation.TryComplete(context)) { return; diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs index 20c421440ae39..3c1b8863ea5bf 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @@ -177,7 +177,7 @@ private static unsafe int SysWrite(SafeSocketHandle socket, ReadOnlySpan b fixed (byte* b = &MemoryMarshal.GetReference(buffer)) { - sent = Interop.Sys.Write(socket, &b[offset], buffer.Length); + sent = Interop.Sys.Write(socket, b + offset, count); if (sent == -1) { errno = Interop.Sys.GetLastError(); @@ -202,7 +202,7 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re { errno = Interop.Sys.Send( socket, - &b[offset], + b + offset, count, flags, &sent); @@ -228,7 +228,7 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re { var iov = new Interop.Sys.IOVector { - Base = &b[offset], + Base = b + offset, Count = (UIntPtr)count }; From 382323d6ed6bc41b7ce1516b1bdec8d2f8b5124b Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Thu, 4 Mar 2021 14:20:30 +0100 Subject: [PATCH 14/16] PipeStream.Unix: enable cancelation tests --- .../System.IO.Pipes/tests/PipeStreamConformanceTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs b/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs index 444ea87431c06..b4962b60d11be 100644 --- a/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs +++ b/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs @@ -20,7 +20,7 @@ public static string GetUniquePipeName() => protected override Type UnsupportedConcurrentExceptionType => null; protected override bool UsableAfterCanceledReads => false; protected override bool CansReturnFalseAfterDispose => false; - protected override bool FullyCancelableOperations => false; + protected override bool FullyCancelableOperations => !OperatingSystem.IsWindows(); [PlatformSpecific(TestPlatforms.Windows)] // WaitForPipeDrain isn't supported on Unix [Fact] From 42a32da6db87e8d5d1a205dc20f9cb745eb3a554 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Fri, 12 Mar 2021 14:33:23 +0100 Subject: [PATCH 15/16] Add back Ctor_SafeHandle_Invalid_ThrowsException test --- .../tests/FunctionalTests/CreateSocketTests.cs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs index 6445b6bf08317..c678cb3431bf0 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs @@ -245,16 +245,11 @@ public void Ctor_Netcoreapp_Success(AddressFamily addressFamily) s.Close(); } - [Theory] - [InlineData(true)] - [InlineData(false)] - [PlatformSpecific(TestPlatforms.Linux)] - public void Ctor_SafeHandle_FromPipeHandle_Ctor_Dispose_Success(bool ownsHandle) + [Fact] + public void Ctor_SafeHandle_Invalid_ThrowsException() { - (int fd1, int fd2) = pipe2(); - close(fd2); - - using var _ = new SafeSocketHandle(new IntPtr(fd1), ownsHandle); + AssertExtensions.Throws("handle", () => new Socket(null)); + AssertExtensions.Throws("handle", () => new Socket(new SafeSocketHandle((IntPtr)(-1), false))); } [Theory] From 94583f3f6da762e926d2b7c7d9bb08dae5e0aae6 Mon Sep 17 00:00:00 2001 From: Tom Deseyn Date: Mon, 15 Mar 2021 21:07:44 +0100 Subject: [PATCH 16/16] SysRead/SysWrite: Assert not used for socket-type handle --- .../src/System/Net/Sockets/SocketPal.Unix.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs index 3c1b8863ea5bf..69bcb16e9f3f5 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs @@ -91,13 +91,15 @@ public static unsafe SocketError CreateSocket(AddressFamily addressFamily, Socke return errorCode; } - private static unsafe int SysRead(SafeSocketHandle socket, Span buffer, out Interop.Error errno) + private static unsafe int SysRead(SafeSocketHandle handle, Span buffer, out Interop.Error errno) { + Debug.Assert(!handle.IsSocket); + int received = 0; fixed (byte* b = &MemoryMarshal.GetReference(buffer)) { - received = Interop.Sys.Read(socket, b, buffer.Length); + received = Interop.Sys.Read(handle, b, buffer.Length); errno = received != -1 ? Interop.Error.SUCCESS : Interop.Sys.GetLastError(); } @@ -171,13 +173,15 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, return checked((int)received); } - private static unsafe int SysWrite(SafeSocketHandle socket, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno) + private static unsafe int SysWrite(SafeSocketHandle handle, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno) { + Debug.Assert(!handle.IsSocket); + int sent; fixed (byte* b = &MemoryMarshal.GetReference(buffer)) { - sent = Interop.Sys.Write(socket, b + offset, count); + sent = Interop.Sys.Write(handle, b + offset, count); if (sent == -1) { errno = Interop.Sys.GetLastError();