diff --git a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj
index f37055179859c..f0c7d8eb136bf 100644
--- a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj
+++ b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj
@@ -337,6 +337,7 @@
+
diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs
index 55c5a615a605f..704fab865eb0f 100644
--- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs
+++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Unix.cs
@@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
+using System.IO.Pipes;
using System.Security;
using System.Text;
using System.Threading;
@@ -439,20 +440,20 @@ private bool StartCore(ProcessStartInfo startInfo)
if (startInfo.RedirectStandardInput)
{
Debug.Assert(stdinFd >= 0);
- _standardInput = new StreamWriter(OpenStream(stdinFd, FileAccess.Write),
+ _standardInput = new StreamWriter(OpenStream(stdinFd, PipeDirection.Out),
startInfo.StandardInputEncoding ?? Encoding.Default, StreamBufferSize)
{ AutoFlush = true };
}
if (startInfo.RedirectStandardOutput)
{
Debug.Assert(stdoutFd >= 0);
- _standardOutput = new StreamReader(OpenStream(stdoutFd, FileAccess.Read),
+ _standardOutput = new StreamReader(OpenStream(stdoutFd, PipeDirection.In),
startInfo.StandardOutputEncoding ?? Encoding.Default, true, StreamBufferSize);
}
if (startInfo.RedirectStandardError)
{
Debug.Assert(stderrFd >= 0);
- _standardError = new StreamReader(OpenStream(stderrFd, FileAccess.Read),
+ _standardError = new StreamReader(OpenStream(stderrFd, PipeDirection.In),
startInfo.StandardErrorEncoding ?? Encoding.Default, true, StreamBufferSize);
}
@@ -747,14 +748,12 @@ internal static TimeSpan TicksToTimeSpan(double ticks)
/// Opens a stream around the specified file descriptor and with the specified access.
/// The file descriptor.
- /// The access mode.
+ /// The pipe direction.
/// The opened stream.
- private static FileStream OpenStream(int fd, FileAccess access)
+ private static Stream OpenStream(int fd, PipeDirection direction)
{
Debug.Assert(fd >= 0);
- return new FileStream(
- new SafeFileHandle((IntPtr)fd, ownsHandle: true),
- access, StreamBufferSize, isAsync: false);
+ return new AnonymousPipeClientStream(direction, new SafePipeHandle((IntPtr)fd, ownsHandle: true));
}
/// Parses a command-line argument string into a list of arguments.
diff --git a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs
index 577a9f1a06137..39e068d409dd6 100644
--- a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs
+++ b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamReadTests.cs
@@ -344,7 +344,6 @@ async private Task WaitPipeSignal(PipeStream pipe, int millisecond)
}
}
- [ActiveIssue("https://github.com/dotnet/runtime/issues/44329")]
[PlatformSpecific(~TestPlatforms.Windows)] // currently on Windows these operations async-over-sync on Windows
[ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
public async Task ReadAsync_OutputStreams_Cancel_RespondsQuickly()
diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs
index 98e9e10d816a8..e52957f40aa69 100644
--- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs
+++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs
@@ -7,6 +7,7 @@
using System.Reflection;
using System.Runtime.InteropServices;
using System.Security;
+using System.Threading;
namespace Microsoft.Win32.SafeHandles
{
@@ -14,38 +15,35 @@ public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private const int DefaultInvalidHandle = -1;
- // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe, and the
- // _named* fields remain null. For named pipes, SafePipeHandle.handle is a copy of the file descriptor
- // extracted from the Socket's SafeHandle, and the _named* fields are the socket and its safe handle.
+ // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe.
+ // For named pipes, SafePipeHandle.handle is a copy of the file descriptor
+ // extracted from the Socket's SafeHandle.
// This allows operations related to file descriptors to be performed directly on the SafePipeHandle,
- // and operations that should go through the Socket to be done via _namedPipeSocket. We keep the
+ // and operations that should go through the Socket to be done via PipeSocket. We keep the
// Socket's SafeHandle alive as long as this SafeHandle is alive.
- private Socket? _namedPipeSocket;
- private SafeHandle? _namedPipeSocketHandle;
+ private Socket? _pipeSocket;
+ private SafeHandle? _pipeSocketHandle;
+ private volatile int _disposed;
internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true)
{
- Debug.Assert(namedPipeSocket != null);
- _namedPipeSocket = namedPipeSocket;
-
- _namedPipeSocketHandle = namedPipeSocket.SafeHandle;
-
- bool ignored = false;
- _namedPipeSocketHandle.DangerousAddRef(ref ignored);
- SetHandle(_namedPipeSocketHandle.DangerousGetHandle());
+ SetPipeSocketInterlocked(namedPipeSocket, ownsHandle: true);
+ base.SetHandle(_pipeSocketHandle!.DangerousGetHandle());
}
- internal Socket? NamedPipeSocket => _namedPipeSocket;
- internal SafeHandle? NamedPipeSocketHandle => _namedPipeSocketHandle;
+ internal Socket PipeSocket => _pipeSocket ?? CreatePipeSocket();
+
+ internal SafeHandle? PipeSocketHandle => _pipeSocketHandle;
protected override void Dispose(bool disposing)
{
base.Dispose(disposing); // must be called before trying to Dispose the socket
- if (disposing && _namedPipeSocket != null)
+ _disposed = 1;
+ if (disposing && Volatile.Read(ref _pipeSocket) is Socket socket)
{
- _namedPipeSocket.Dispose();
- _namedPipeSocket = null;
+ socket.Dispose();
+ _pipeSocket = null;
}
}
@@ -53,24 +51,92 @@ protected override bool ReleaseHandle()
{
Debug.Assert(!IsInvalid);
- // Clean up resources for named handles
- if (_namedPipeSocketHandle != null)
+ if (_pipeSocketHandle != null)
{
- SetHandle(DefaultInvalidHandle);
- _namedPipeSocketHandle.DangerousRelease();
- _namedPipeSocketHandle = null;
+ base.SetHandle((IntPtr)DefaultInvalidHandle);
+ _pipeSocketHandle.DangerousRelease();
+ _pipeSocketHandle = null;
return true;
}
-
- // Clean up resources for anonymous handles
- return (long)handle >= 0 ?
- Interop.Sys.Close(handle) == 0 :
- true;
+ else
+ {
+ return (long)handle >= 0 ?
+ Interop.Sys.Close(handle) == 0 :
+ true;
+ }
}
public override bool IsInvalid
{
- get { return (long)handle < 0 && _namedPipeSocket == null; }
+ get { return (long)handle < 0 && _pipeSocket == null; }
+ }
+
+ private Socket CreatePipeSocket(bool ownsHandle = true)
+ {
+ Socket? socket = null;
+ if (_disposed == 0)
+ {
+ bool refAdded = false;
+ try
+ {
+ DangerousAddRef(ref refAdded);
+
+ socket = SetPipeSocketInterlocked(new Socket(new SafeSocketHandle(handle, ownsHandle)), ownsHandle);
+
+ // Double check if we haven't Disposed in the meanwhile, and ensure
+ // the Socket is disposed, in case Dispose() missed the _pipeSocket assignment.
+ if (_disposed == 1)
+ {
+ Volatile.Write(ref _pipeSocket, null);
+ socket.Dispose();
+ socket = null;
+ }
+ }
+ finally
+ {
+ if (refAdded)
+ {
+ DangerousRelease();
+ }
+ }
+ }
+ return socket ?? throw new ObjectDisposedException(GetType().ToString());;
+ }
+
+ private Socket SetPipeSocketInterlocked(Socket socket, bool ownsHandle)
+ {
+ Debug.Assert(socket != null);
+
+ // Multiple threads may try to create the PipeSocket.
+ Socket? current = Interlocked.CompareExchange(ref _pipeSocket, socket, null);
+ if (current != null)
+ {
+ socket.Dispose();
+ return current;
+ }
+
+ // If we own the handle, defer ownership to the SocketHandle.
+ SafeSocketHandle socketHandle = _pipeSocket.SafeHandle;
+ if (ownsHandle)
+ {
+ _pipeSocketHandle = socketHandle;
+
+ bool ignored = false;
+ socketHandle.DangerousAddRef(ref ignored);
+ }
+
+ return socket;
+ }
+
+ internal void SetHandle(IntPtr descriptor, bool ownsHandle = true)
+ {
+ base.SetHandle(descriptor);
+
+ // Avoid throwing when we own the handle by defering pipe creation.
+ if (!ownsHandle)
+ {
+ _pipeSocket = CreatePipeSocket(ownsHandle);
+ }
}
}
}
diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs
index 946f305c9c7f9..5a1d27149f944 100644
--- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs
+++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Windows.cs
@@ -15,5 +15,10 @@ protected override bool ReleaseHandle()
{
return Interop.Kernel32.CloseHandle(handle);
}
+
+ internal void SetHandle(IntPtr descriptor, bool ownsHandle = true)
+ {
+ base.SetHandle(descriptor);
+ }
}
}
diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs
index f19fa957d7910..13d9d4ae5acef 100644
--- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs
+++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.cs
@@ -17,12 +17,7 @@ public SafePipeHandle()
public SafePipeHandle(IntPtr preexistingHandle, bool ownsHandle)
: base(ownsHandle)
{
- SetHandle(preexistingHandle);
- }
-
- internal void SetHandle(int descriptor)
- {
- base.SetHandle((IntPtr)descriptor);
+ SetHandle(preexistingHandle, ownsHandle);
}
}
}
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs
index 42fe518e0012e..3e07b15738cb6 100644
--- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs
+++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeClientStream.Unix.cs
@@ -83,7 +83,7 @@ public override int InBufferSize
{
CheckPipePropertyOperations();
if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream);
- return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? 0;
+ return InternalHandle?.PipeSocket.ReceiveBufferSize ?? 0;
}
}
@@ -93,7 +93,7 @@ public override int OutBufferSize
{
CheckPipePropertyOperations();
if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream);
- return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? 0;
+ return InternalHandle?.PipeSocket.SendBufferSize ?? 0;
}
}
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs
index 7f0d62611bc44..4628fe4fce0b2 100644
--- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs
+++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Unix.cs
@@ -134,7 +134,7 @@ public string GetImpersonationUserName()
{
CheckWriteOperations();
- SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle;
+ SafeHandle? handle = InternalHandle?.PipeSocketHandle;
if (handle == null)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet);
@@ -155,7 +155,7 @@ public override int InBufferSize
{
CheckPipePropertyOperations();
if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream);
- return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? _inBufferSize;
+ return InternalHandle?.PipeSocket.ReceiveBufferSize ?? _inBufferSize;
}
}
@@ -165,7 +165,7 @@ public override int OutBufferSize
{
CheckPipePropertyOperations();
if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream);
- return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? _outBufferSize;
+ return InternalHandle?.PipeSocket.SendBufferSize ?? _outBufferSize;
}
}
@@ -173,7 +173,7 @@ public override int OutBufferSize
public void RunAsClient(PipeStreamImpersonationWorker impersonationWorker)
{
CheckWriteOperations();
- SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle;
+ SafeHandle? handle = InternalHandle?.PipeSocketHandle;
if (handle == null)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet);
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs
index 6e6fa8a5c1385..2fa979b222a81 100644
--- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs
+++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Unix.cs
@@ -32,6 +32,156 @@ public abstract partial class PipeStream : Stream
/// Prefix to prepend to all pipe names.
private static readonly string s_pipePrefix = Path.Combine(Path.GetTempPath(), "CoreFxPipe_");
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+ CheckReadOperations();
+
+ return ReadCore(new Span(buffer, offset, count));
+ }
+
+ public override int Read(Span buffer)
+ {
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+ CheckReadOperations();
+
+ return ReadCore(buffer);
+ }
+
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ CheckReadOperations();
+
+ if (count == 0)
+ {
+ UpdateMessageCompletion(false);
+ return Task.FromResult(0);
+ }
+
+ return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask();
+ }
+
+ public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ CheckReadOperations();
+
+ if (buffer.Length == 0)
+ {
+ UpdateMessageCompletion(false);
+ return new ValueTask(0);
+ }
+
+ return ReadAsyncCore(buffer, cancellationToken);
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ => TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state);
+
+ public override int EndRead(IAsyncResult asyncResult)
+ => TaskToApm.End(asyncResult);
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+ CheckWriteOperations();
+
+ WriteCore(new ReadOnlySpan(buffer, offset, count));
+ }
+
+ public override void Write(ReadOnlySpan buffer)
+ {
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+ CheckWriteOperations();
+
+ WriteCore(buffer);
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ CheckWriteOperations();
+
+ if (count == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken);
+ }
+
+ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ CheckWriteOperations();
+
+ if (buffer.Length == 0)
+ {
+ return default;
+ }
+
+ return new ValueTask(WriteAsyncCore(buffer, cancellationToken));
+ }
+
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ => TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state);
+
+ public override void EndWrite(IAsyncResult asyncResult)
+ => TaskToApm.End(asyncResult);
+
internal static string GetPipePath(string serverName, string pipeName)
{
if (serverName != "." && serverName != Interop.Sys.GetHostName())
@@ -80,17 +230,14 @@ internal static string GetPipePath(string serverName, string pipeName)
/// The handle to validate.
internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle)
{
- if (safePipeHandle.NamedPipeSocket == null)
+ Interop.Sys.FileStatus status;
+ int result = CheckPipeCall(Interop.Sys.FStat(safePipeHandle, out status));
+ if (result == 0)
{
- Interop.Sys.FileStatus status;
- int result = CheckPipeCall(Interop.Sys.FStat(safePipeHandle, out status));
- if (result == 0)
+ if ((status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFIFO &&
+ (status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFSOCK)
{
- if ((status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFIFO &&
- (status.Mode & Interop.Sys.FileTypes.S_IFMT) != Interop.Sys.FileTypes.S_IFSOCK)
- {
- throw new IOException(SR.IO_InvalidPipeHandle);
- }
+ throw new IOException(SR.IO_InvalidPipeHandle);
}
}
}
@@ -98,9 +245,7 @@ internal void ValidateHandleIsPipe(SafePipeHandle safePipeHandle)
/// Initializes the handle to be used asynchronously.
/// The handle.
private void InitializeAsyncHandle(SafePipeHandle handle)
- {
- // nop
- }
+ { }
internal virtual void DisposeCore(bool disposing)
{
@@ -112,30 +257,22 @@ private unsafe int ReadCore(Span buffer)
Debug.Assert(_handle != null);
DebugAssertHandleValid(_handle);
- // For named pipes, receive on the socket.
- Socket? socket = _handle.NamedPipeSocket;
- if (socket != null)
+ if (buffer.Length == 0)
{
- // For a blocking socket, we could simply use the same Read syscall as is done
- // for reading an anonymous pipe. However, for a non-blocking socket, Read could
- // end up returning EWOULDBLOCK rather than blocking waiting for data. Such a case
- // is already handled by Socket.Receive, so we use it here.
- try
- {
- return socket.Receive(buffer, SocketFlags.None);
- }
- catch (SocketException e)
- {
- throw GetIOExceptionForSocketException(e);
- }
+ return 0;
}
- // For anonymous pipes, read from the file descriptor.
- fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
+ // For a blocking socket, we could simply use the same Read syscall as is done
+ // for reading an anonymous pipe. However, for a non-blocking socket, Read could
+ // end up returning EWOULDBLOCK rather than blocking waiting for data. Such a case
+ // is already handled by Socket.Receive, so we use it here.
+ try
{
- int result = CheckPipeCall(Interop.Sys.Read(_handle, bufPtr, buffer.Length));
- Debug.Assert(result <= buffer.Length);
- return result;
+ return _handle!.PipeSocket.Receive(buffer, SocketFlags.None);
+ }
+ catch (SocketException e)
+ {
+ throw GetIOExceptionForSocketException(e);
}
}
@@ -144,46 +281,29 @@ private unsafe void WriteCore(ReadOnlySpan buffer)
Debug.Assert(_handle != null);
DebugAssertHandleValid(_handle);
- // For named pipes, send to the socket.
- Socket? socket = _handle.NamedPipeSocket;
- if (socket != null)
- {
- // For a blocking socket, we could simply use the same Write syscall as is done
- // for writing to anonymous pipe. However, for a non-blocking socket, Write could
- // end up returning EWOULDBLOCK rather than blocking waiting for space available.
- // Such a case is already handled by Socket.Send, so we use it here.
- try
- {
- while (buffer.Length > 0)
- {
- int bytesWritten = socket.Send(buffer, SocketFlags.None);
- buffer = buffer.Slice(bytesWritten);
- }
- }
- catch (SocketException e)
- {
- throw GetIOExceptionForSocketException(e);
- }
- }
-
- // For anonymous pipes, write the file descriptor.
- fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
+ // For a blocking socket, we could simply use the same Write syscall as is done
+ // for writing to anonymous pipe. However, for a non-blocking socket, Write could
+ // end up returning EWOULDBLOCK rather than blocking waiting for space available.
+ // Such a case is already handled by Socket.Send, so we use it here.
+ try
{
while (buffer.Length > 0)
{
- int bytesWritten = CheckPipeCall(Interop.Sys.Write(_handle, bufPtr, buffer.Length));
+ int bytesWritten = _handle!.PipeSocket.Send(buffer, SocketFlags.None);
buffer = buffer.Slice(bytesWritten);
}
}
+ catch (SocketException e)
+ {
+ throw GetIOExceptionForSocketException(e);
+ }
}
private async ValueTask ReadAsyncCore(Memory destination, CancellationToken cancellationToken)
{
- Debug.Assert(this is NamedPipeClientStream || this is NamedPipeServerStream, $"Expected a named pipe, got a {GetType()}");
-
try
{
- return await InternalHandle!.NamedPipeSocket!.ReceiveAsync(destination, SocketFlags.None, cancellationToken).ConfigureAwait(false);
+ return await InternalHandle!.PipeSocket.ReceiveAsync(destination, SocketFlags.None, cancellationToken).ConfigureAwait(false);
}
catch (SocketException e)
{
@@ -193,13 +313,11 @@ private async ValueTask ReadAsyncCore(Memory destination, Cancellatio
private async Task WriteAsyncCore(ReadOnlyMemory source, CancellationToken cancellationToken)
{
- Debug.Assert(this is NamedPipeClientStream || this is NamedPipeServerStream, $"Expected a named pipe, got a {GetType()}");
-
try
{
while (source.Length > 0)
{
- int bytesWritten = await _handle!.NamedPipeSocket!.SendAsync(source, SocketFlags.None, cancellationToken).ConfigureAwait(false);
+ int bytesWritten = await _handle!.PipeSocket.SendAsync(source, SocketFlags.None, cancellationToken).ConfigureAwait(false);
Debug.Assert(bytesWritten > 0 && bytesWritten <= source.Length);
source = source.Slice(bytesWritten);
}
@@ -350,8 +468,8 @@ internal static unsafe void CreateAnonymousPipe(out SafePipeHandle reader, out S
Interop.CheckIo(Interop.Sys.Pipe(fds, Interop.Sys.PipeFlags.O_CLOEXEC));
// Store the file descriptors into our safe handles
- reader.SetHandle(fds[Interop.Sys.ReadEndOfPipe]);
- writer.SetHandle(fds[Interop.Sys.WriteEndOfPipe]);
+ reader.SetHandle(new IntPtr(fds[Interop.Sys.ReadEndOfPipe]));
+ writer.SetHandle(new IntPtr(fds[Interop.Sys.WriteEndOfPipe]));
}
internal int CheckPipeCall(int result)
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs
index 252796cd1f199..928cc5d1a081f 100644
--- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs
+++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.Windows.cs
@@ -16,6 +16,218 @@ public abstract partial class PipeStream : Stream
internal const bool CheckOperationsRequiresSetHandle = true;
internal ThreadPoolBoundHandle? _threadPoolBinding;
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (_isAsync)
+ {
+ return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
+ }
+
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+ CheckReadOperations();
+
+ return ReadCore(new Span(buffer, offset, count));
+ }
+
+ public override int Read(Span buffer)
+ {
+ if (_isAsync)
+ {
+ return base.Read(buffer);
+ }
+
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+ CheckReadOperations();
+
+ return ReadCore(buffer);
+ }
+
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ CheckReadOperations();
+
+ if (!_isAsync)
+ {
+ return base.ReadAsync(buffer, offset, count, cancellationToken);
+ }
+
+ if (count == 0)
+ {
+ UpdateMessageCompletion(false);
+ return Task.FromResult(0);
+ }
+
+ return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask();
+ }
+
+ public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (!_isAsync)
+ {
+ return base.ReadAsync(buffer, cancellationToken);
+ }
+
+ if (!CanRead)
+ {
+ throw Error.GetReadNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ CheckReadOperations();
+
+ if (buffer.Length == 0)
+ {
+ UpdateMessageCompletion(false);
+ return new ValueTask(0);
+ }
+
+ return ReadAsyncCore(buffer, cancellationToken);
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ if (_isAsync)
+ return TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state);
+ else
+ return base.BeginRead(buffer, offset, count, callback, state);
+ }
+
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ if (_isAsync)
+ return TaskToApm.End(asyncResult);
+ else
+ return base.EndRead(asyncResult);
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ if (_isAsync)
+ {
+ WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
+ return;
+ }
+
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+ CheckWriteOperations();
+
+ WriteCore(new ReadOnlySpan(buffer, offset, count));
+ }
+
+ public override void Write(ReadOnlySpan buffer)
+ {
+ if (_isAsync)
+ {
+ base.Write(buffer);
+ return;
+ }
+
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+ CheckWriteOperations();
+
+ WriteCore(buffer);
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ ValidateBufferArguments(buffer, offset, count);
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ CheckWriteOperations();
+
+ if (!_isAsync)
+ {
+ return base.WriteAsync(buffer, offset, count, cancellationToken);
+ }
+
+ if (count == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken);
+ }
+
+ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ if (!_isAsync)
+ {
+ return base.WriteAsync(buffer, cancellationToken);
+ }
+
+ if (!CanWrite)
+ {
+ throw Error.GetWriteNotSupported();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ CheckWriteOperations();
+
+ if (buffer.Length == 0)
+ {
+ return default;
+ }
+
+ return new ValueTask(WriteAsyncCore(buffer, cancellationToken));
+ }
+
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ if (_isAsync)
+ return TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state);
+ else
+ return base.BeginWrite(buffer, offset, count, callback, state);
+ }
+
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ if (_isAsync)
+ TaskToApm.End(asyncResult);
+ else
+ base.EndWrite(asyncResult);
+ }
+
internal static string GetPipePath(string serverName, string pipeName)
{
string normalizedPipePath = Path.GetFullPath(@"\\" + serverName + @"\pipe\" + pipeName);
diff --git a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs
index 4dad917bd453f..6df150682be21 100644
--- a/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs
+++ b/src/libraries/System.IO.Pipes/src/System/IO/Pipes/PipeStream.cs
@@ -107,218 +107,6 @@ protected void InitializeHandle(SafePipeHandle? handle, bool isExposed, bool isA
_isFromExistingHandle = isExposed;
}
- public override int Read(byte[] buffer, int offset, int count)
- {
- if (_isAsync)
- {
- return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
- }
-
- ValidateBufferArguments(buffer, offset, count);
- if (!CanRead)
- {
- throw Error.GetReadNotSupported();
- }
- CheckReadOperations();
-
- return ReadCore(new Span(buffer, offset, count));
- }
-
- public override int Read(Span buffer)
- {
- if (_isAsync)
- {
- return base.Read(buffer);
- }
-
- if (!CanRead)
- {
- throw Error.GetReadNotSupported();
- }
- CheckReadOperations();
-
- return ReadCore(buffer);
- }
-
- public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- ValidateBufferArguments(buffer, offset, count);
- if (!CanRead)
- {
- throw Error.GetReadNotSupported();
- }
-
- if (cancellationToken.IsCancellationRequested)
- {
- return Task.FromCanceled(cancellationToken);
- }
-
- CheckReadOperations();
-
- if (!_isAsync)
- {
- return base.ReadAsync(buffer, offset, count, cancellationToken);
- }
-
- if (count == 0)
- {
- UpdateMessageCompletion(false);
- return Task.FromResult(0);
- }
-
- return ReadAsyncCore(new Memory(buffer, offset, count), cancellationToken).AsTask();
- }
-
- public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default(CancellationToken))
- {
- if (!_isAsync)
- {
- return base.ReadAsync(buffer, cancellationToken);
- }
-
- if (!CanRead)
- {
- throw Error.GetReadNotSupported();
- }
-
- if (cancellationToken.IsCancellationRequested)
- {
- return ValueTask.FromCanceled(cancellationToken);
- }
-
- CheckReadOperations();
-
- if (buffer.Length == 0)
- {
- UpdateMessageCompletion(false);
- return new ValueTask(0);
- }
-
- return ReadAsyncCore(buffer, cancellationToken);
- }
-
- public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
- {
- if (_isAsync)
- return TaskToApm.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state);
- else
- return base.BeginRead(buffer, offset, count, callback, state);
- }
-
- public override int EndRead(IAsyncResult asyncResult)
- {
- if (_isAsync)
- return TaskToApm.End(asyncResult);
- else
- return base.EndRead(asyncResult);
- }
-
- public override void Write(byte[] buffer, int offset, int count)
- {
- if (_isAsync)
- {
- WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
- return;
- }
-
- ValidateBufferArguments(buffer, offset, count);
- if (!CanWrite)
- {
- throw Error.GetWriteNotSupported();
- }
- CheckWriteOperations();
-
- WriteCore(new ReadOnlySpan(buffer, offset, count));
- }
-
- public override void Write(ReadOnlySpan buffer)
- {
- if (_isAsync)
- {
- base.Write(buffer);
- return;
- }
-
- if (!CanWrite)
- {
- throw Error.GetWriteNotSupported();
- }
- CheckWriteOperations();
-
- WriteCore(buffer);
- }
-
- public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- ValidateBufferArguments(buffer, offset, count);
- if (!CanWrite)
- {
- throw Error.GetWriteNotSupported();
- }
-
- if (cancellationToken.IsCancellationRequested)
- {
- return Task.FromCanceled(cancellationToken);
- }
-
- CheckWriteOperations();
-
- if (!_isAsync)
- {
- return base.WriteAsync(buffer, offset, count, cancellationToken);
- }
-
- if (count == 0)
- {
- return Task.CompletedTask;
- }
-
- return WriteAsyncCore(new ReadOnlyMemory(buffer, offset, count), cancellationToken);
- }
-
- public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default(CancellationToken))
- {
- if (!_isAsync)
- {
- return base.WriteAsync(buffer, cancellationToken);
- }
-
- if (!CanWrite)
- {
- throw Error.GetWriteNotSupported();
- }
-
- if (cancellationToken.IsCancellationRequested)
- {
- return ValueTask.FromCanceled(cancellationToken);
- }
-
- CheckWriteOperations();
-
- if (buffer.Length == 0)
- {
- return default;
- }
-
- return new ValueTask(WriteAsyncCore(buffer, cancellationToken));
- }
-
- public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
- {
- if (_isAsync)
- return TaskToApm.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state);
- else
- return base.BeginWrite(buffer, offset, count, callback, state);
- }
-
- public override void EndWrite(IAsyncResult asyncResult)
- {
- if (_isAsync)
- TaskToApm.End(asyncResult);
- else
- base.EndWrite(asyncResult);
- }
-
[Conditional("DEBUG")]
private static void DebugAssertHandleValid(SafePipeHandle handle)
{
diff --git a/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs b/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs
index 444ea87431c06..b4962b60d11be 100644
--- a/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs
+++ b/src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs
@@ -20,7 +20,7 @@ public static string GetUniquePipeName() =>
protected override Type UnsupportedConcurrentExceptionType => null;
protected override bool UsableAfterCanceledReads => false;
protected override bool CansReturnFalseAfterDispose => false;
- protected override bool FullyCancelableOperations => false;
+ protected override bool FullyCancelableOperations => !OperatingSystem.IsWindows();
[PlatformSpecific(TestPlatforms.Windows)] // WaitForPipeDrain isn't supported on Unix
[Fact]
diff --git a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj
index c6421ec4e1986..bbddbb391d54b 100644
--- a/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj
+++ b/src/libraries/System.Net.Sockets/src/System.Net.Sockets.csproj
@@ -263,6 +263,8 @@
Link="Common\Interop\Unix\Interop.Poll.Structs.cs" />
+
ExposedHandleOrUntrackedConfiguration = true;
@@ -238,6 +240,11 @@ private unsafe SocketError DoCloseHandle(bool abortive)
{
Interop.Error errorCode = Interop.Error.SUCCESS;
+ if (!IsSocket)
+ {
+ return SocketPal.GetSocketErrorForErrorCode(CloseHandle(handle));
+ }
+
// If abortive is not set, we're not running on the finalizer thread, so it's safe to block here.
// We can honor the linger options set on the socket. It also means closesocket() might return
// EWOULDBLOCK, in which case we need to do some recovery.
@@ -281,6 +288,7 @@ private unsafe SocketError DoCloseHandle(bool abortive)
case Interop.Error.SUCCESS:
case Interop.Error.EINVAL:
case Interop.Error.ENOPROTOOPT:
+ case Interop.Error.ENOTSOCK:
errorCode = CloseHandle(handle);
break;
diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs
index a9e08bd076c96..79ba1e95c1d3f 100644
--- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs
+++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Unix.cs
@@ -75,21 +75,32 @@ partial void ValidateForMultiConnect(bool isMultiEndpoint)
}
private static unsafe void LoadSocketTypeFromHandle(
- SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening)
+ SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening, out bool isSocket)
{
- // Validate that the supplied handle is indeed a socket.
- if (Interop.Sys.FStat(handle, out Interop.Sys.FileStatus stat) == -1 ||
- (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) != Interop.Sys.FileTypes.S_IFSOCK)
+ if (Interop.Sys.FStat(handle, out Interop.Sys.FileStatus stat) == -1)
{
throw new SocketException((int)SocketError.NotSocket);
}
+ isSocket = (stat.Mode & Interop.Sys.FileTypes.S_IFSOCK) == Interop.Sys.FileTypes.S_IFSOCK;
- // On Linux, GetSocketType will be able to query SO_DOMAIN, SO_TYPE, and SO_PROTOCOL to get the
- // address family, socket type, and protocol type, respectively. On macOS, this will only succeed
- // in getting the socket type, and the others will be unknown. Subsequently the Socket ctor
- // can use getsockname to retrieve the address family as part of trying to get the local end point.
- Interop.Error e = Interop.Sys.GetSocketType(handle, out addressFamily, out socketType, out protocolType, out isListening);
- Debug.Assert(e == Interop.Error.SUCCESS, e.ToString());
+ handle.IsSocket = isSocket;
+
+ if (isSocket)
+ {
+ // On Linux, GetSocketType will be able to query SO_DOMAIN, SO_TYPE, and SO_PROTOCOL to get the
+ // address family, socket type, and protocol type, respectively. On macOS, this will only succeed
+ // in getting the socket type, and the others will be unknown. Subsequently the Socket ctor
+ // can use getsockname to retrieve the address family as part of trying to get the local end point.
+ Interop.Error e = Interop.Sys.GetSocketType(handle, out addressFamily, out socketType, out protocolType, out isListening);
+ Debug.Assert(e == Interop.Error.SUCCESS, e.ToString());
+ }
+ else
+ {
+ addressFamily = AddressFamily.Unknown;
+ socketType = SocketType.Unknown;
+ protocolType = ProtocolType.Unknown;
+ isListening = false;
+ }
// Get whether the socket is in non-blocking mode. On Unix, we automatically put the underlying
// Socket into non-blocking mode whenever an async method is first invoked on the instance, but we
@@ -101,7 +112,7 @@ private static unsafe void LoadSocketTypeFromHandle(
bool nonBlocking;
int rv = Interop.Sys.Fcntl.GetIsNonBlocking(handle, out nonBlocking);
blocking = !nonBlocking;
- Debug.Assert(rv == 0 || blocking, e.ToString()); // ignore failures
+ Debug.Assert(rv == 0 || blocking); // ignore failures
}
internal void ReplaceHandleIfNecessaryAfterFailedConnect()
diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs
index 267bfb0ab0aa5..11325495e0a31 100644
--- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs
+++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Windows.cs
@@ -94,7 +94,7 @@ public Socket(SocketInformation socketInformation)
}
private unsafe void LoadSocketTypeFromHandle(
- SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening)
+ SafeSocketHandle handle, out AddressFamily addressFamily, out SocketType socketType, out ProtocolType protocolType, out bool blocking, out bool isListening, out bool isSocket)
{
// This can be called without winsock initialized. The handle is not going to be a valid socket handle in that case and the code will throw exception anyway.
// Initializing winsock will ensure the error SocketError.NotSocket as opposed to SocketError.NotInitialized.
@@ -121,6 +121,7 @@ private unsafe void LoadSocketTypeFromHandle(
// This affects the result of querying Socket.Blocking, which will mostly only affect user code that happens to query
// that property, though there are a few places we check it internally, e.g. as part of NetworkStream argument validation.
blocking = true;
+ isSocket = true;
}
[SupportedOSPlatform("windows")]
diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs
index eb5277e8e01a2..e44a88a0d3dd4 100644
--- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs
+++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs
@@ -136,95 +136,98 @@ private unsafe Socket(SafeSocketHandle handle, bool loadPropertiesFromHandle)
try
{
// Get properties like address family and blocking mode from the OS.
- LoadSocketTypeFromHandle(handle, out _addressFamily, out _socketType, out _protocolType, out _willBlockInternal, out _isListening);
+ LoadSocketTypeFromHandle(handle, out _addressFamily, out _socketType, out _protocolType, out _willBlockInternal, out _isListening, out bool isSocket);
- // We should change stackalloc if this ever grows too big.
- Debug.Assert(SocketPal.MaximumAddressSize <= 512);
- // Try to get the address of the socket.
- Span buffer = stackalloc byte[SocketPal.MaximumAddressSize];
- int bufferLength = buffer.Length;
- fixed (byte* bufferPtr = buffer)
+ if (isSocket)
{
- if (SocketPal.GetSockName(handle, bufferPtr, &bufferLength) != SocketError.Success)
+ // We should change stackalloc if this ever grows too big.
+ Debug.Assert(SocketPal.MaximumAddressSize <= 512);
+ // Try to get the address of the socket.
+ Span buffer = stackalloc byte[SocketPal.MaximumAddressSize];
+ int bufferLength = buffer.Length;
+ fixed (byte* bufferPtr = buffer)
{
- return;
+ if (SocketPal.GetSockName(handle, bufferPtr, &bufferLength) != SocketError.Success)
+ {
+ return;
+ }
}
- }
-
- Debug.Assert(bufferLength <= buffer.Length);
-
- // Try to get the local end point. That will in turn enable the remote
- // end point to be retrieved on-demand when the property is accessed.
- Internals.SocketAddress? socketAddress = null;
- switch (_addressFamily)
- {
- case AddressFamily.InterNetwork:
- _rightEndPoint = new IPEndPoint(
- new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF),
- SocketAddressPal.GetPort(buffer));
- break;
- case AddressFamily.InterNetworkV6:
- Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes];
- SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope);
- _rightEndPoint = new IPEndPoint(
- new IPAddress(address, scope),
- SocketAddressPal.GetPort(buffer));
- break;
+ Debug.Assert(bufferLength <= buffer.Length);
- case AddressFamily.Unix:
- socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength));
- _rightEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress));
- break;
- }
+ // Try to get the local end point. That will in turn enable the remote
+ // end point to be retrieved on-demand when the property is accessed.
+ Internals.SocketAddress? socketAddress = null;
+ switch (_addressFamily)
+ {
+ case AddressFamily.InterNetwork:
+ _rightEndPoint = new IPEndPoint(
+ new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF),
+ SocketAddressPal.GetPort(buffer));
+ break;
+
+ case AddressFamily.InterNetworkV6:
+ Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes];
+ SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope);
+ _rightEndPoint = new IPEndPoint(
+ new IPAddress(address, scope),
+ SocketAddressPal.GetPort(buffer));
+ break;
+
+ case AddressFamily.Unix:
+ socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength));
+ _rightEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress));
+ break;
+ }
- // Try to determine if we're connected, based on querying for a peer, just as we would in RemoteEndPoint,
- // but ignoring any failures; this is best-effort (RemoteEndPoint also does a catch-all around the Create call).
- if (_rightEndPoint != null)
- {
- try
+ // Try to determine if we're connected, based on querying for a peer, just as we would in RemoteEndPoint,
+ // but ignoring any failures; this is best-effort (RemoteEndPoint also does a catch-all around the Create call).
+ if (_rightEndPoint != null)
{
- // Local and remote end points may be different sizes for protocols like Unix Domain Sockets.
- bufferLength = buffer.Length;
- switch (SocketPal.GetPeerName(handle, buffer, ref bufferLength))
+ try
{
- case SocketError.Success:
- switch (_addressFamily)
- {
- case AddressFamily.InterNetwork:
- _remoteEndPoint = new IPEndPoint(
- new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF),
- SocketAddressPal.GetPort(buffer));
- break;
-
- case AddressFamily.InterNetworkV6:
- Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes];
- SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope);
- _remoteEndPoint = new IPEndPoint(
- new IPAddress(address, scope),
- SocketAddressPal.GetPort(buffer));
- break;
-
- case AddressFamily.Unix:
- socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength));
- _remoteEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress));
- break;
- }
+ // Local and remote end points may be different sizes for protocols like Unix Domain Sockets.
+ bufferLength = buffer.Length;
+ switch (SocketPal.GetPeerName(handle, buffer, ref bufferLength))
+ {
+ case SocketError.Success:
+ switch (_addressFamily)
+ {
+ case AddressFamily.InterNetwork:
+ _remoteEndPoint = new IPEndPoint(
+ new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF),
+ SocketAddressPal.GetPort(buffer));
+ break;
+
+ case AddressFamily.InterNetworkV6:
+ Span address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes];
+ SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope);
+ _remoteEndPoint = new IPEndPoint(
+ new IPAddress(address, scope),
+ SocketAddressPal.GetPort(buffer));
+ break;
+
+ case AddressFamily.Unix:
+ socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength));
+ _remoteEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress));
+ break;
+ }
- _isConnected = true;
- break;
-
- case SocketError.InvalidArgument:
- // On some OSes (e.g. macOS), EINVAL means the socket has been shut down.
- // This can happen if, for example, socketpair was used and the parent
- // process closed its copy of the child's socket. Since we don't know
- // whether we're actually connected or not, err on the side of saying
- // we're connected.
- _isConnected = true;
- break;
+ _isConnected = true;
+ break;
+
+ case SocketError.InvalidArgument:
+ // On some OSes (e.g. macOS), EINVAL means the socket has been shut down.
+ // This can happen if, for example, socketpair was used and the parent
+ // process closed its copy of the child's socket. Since we don't know
+ // whether we're actually connected or not, err on the side of saying
+ // we're connected.
+ _isConnected = true;
+ break;
+ }
}
+ catch { }
}
- catch { }
}
}
catch
diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs
index 4d15435d3567e..bf57c9311602c 100644
--- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs
+++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs
@@ -787,9 +787,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
{
Trace(context, $"Enter");
- if (!context.IsRegistered)
+ if (!context.IsRegistered && !context.TryRegister(out Interop.Error error))
{
- context.Register();
+ HandleFailedRegistration(context, operation, error);
+
+ Trace(context, "Leave, not registered");
+ return false;
}
while (true)
@@ -867,6 +870,31 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
return false;
}
}
+
+ static void HandleFailedRegistration(SocketAsyncContext context, TOperation operation, Interop.Error error)
+ {
+ Debug.Assert(error != Interop.Error.SUCCESS);
+
+ // macOS: kevent returns EPIPE when adding pipe fd for which the other end is closed.
+ if (error == Interop.Error.EPIPE)
+ {
+ // Because the other end close, we expect the operation to complete when we retry it.
+ // If it doesn't, we fall through and throw an Exception.
+ if (operation.TryComplete(context))
+ {
+ return;
+ }
+ }
+
+ if (error == Interop.Error.ENOMEM || error == Interop.Error.ENOSPC)
+ {
+ throw new OutOfMemoryException();
+ }
+ else
+ {
+ throw new InternalException(error);
+ }
+ }
}
public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context, bool skipAsyncEvents = false)
@@ -1224,7 +1252,7 @@ public bool PreferInlineCompletions
get => _socket.PreferInlineCompletions;
}
- private void Register()
+ private bool TryRegister(out Interop.Error error)
{
Debug.Assert(_nonBlockingSet);
lock (_registerLock)
@@ -1236,9 +1264,18 @@ private void Register()
{
_socket.DangerousAddRef(ref addedRef);
IntPtr handle = _socket.DangerousGetHandle();
- Volatile.Write(ref _asyncEngine, SocketAsyncEngine.RegisterSocket(handle, this));
+ if (SocketAsyncEngine.TryRegisterSocket(handle, this, out SocketAsyncEngine? engine, out error))
+ {
+ Volatile.Write(ref _asyncEngine, engine);
- Trace("Registered");
+ Trace("Registered");
+ return true;
+ }
+ else
+ {
+ Trace("Registration failed");
+ return false;
+ }
}
finally
{
@@ -1248,6 +1285,8 @@ private void Register()
}
}
}
+ error = Interop.Error.SUCCESS;
+ return true;
}
}
diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs
index 28e7c3a092ef3..983ca51ed27ee 100644
--- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs
+++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs
@@ -97,15 +97,16 @@ private static SocketAsyncEngine[] CreateEngines()
//
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
//
- public static SocketAsyncEngine RegisterSocket(IntPtr socketHandle, SocketAsyncContext context)
+ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext context, out SocketAsyncEngine? engine, out Interop.Error error)
{
int engineIndex = Math.Abs(Interlocked.Increment(ref s_allocateFromEngine) % s_engines.Length);
- SocketAsyncEngine engine = s_engines[engineIndex];
- engine.RegisterCore(socketHandle, context);
- return engine;
+ SocketAsyncEngine nextEngine = s_engines[engineIndex];
+ bool registered = nextEngine.TryRegisterCore(socketHandle, context, out error);
+ engine = registered ? nextEngine : null;
+ return registered;
}
- private void RegisterCore(IntPtr socketHandle, SocketAsyncContext context)
+ private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error)
{
bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context));
if (!added)
@@ -115,22 +116,15 @@ private void RegisterCore(IntPtr socketHandle, SocketAsyncContext context)
throw new InvalidOperationException(SR.net_sockets_handle_already_used);
}
- Interop.Error error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
+ error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle);
if (error == Interop.Error.SUCCESS)
{
- return;
+ return true;
}
_handleToContextMap.TryRemove(socketHandle, out _);
- if (error == Interop.Error.ENOMEM || error == Interop.Error.ENOSPC)
- {
- throw new OutOfMemoryException();
- }
- else
- {
- throw new InternalException(error);
- }
+ return false;
}
public void UnregisterSocket(IntPtr socketHandle)
diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs
index 6f800064abad7..69bcb16e9f3f5 100644
--- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs
+++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketPal.Unix.cs
@@ -91,8 +91,25 @@ public static unsafe SocketError CreateSocket(AddressFamily addressFamily, Socke
return errorCode;
}
+ private static unsafe int SysRead(SafeSocketHandle handle, Span buffer, out Interop.Error errno)
+ {
+ Debug.Assert(!handle.IsSocket);
+
+ int received = 0;
+
+ fixed (byte* b = &MemoryMarshal.GetReference(buffer))
+ {
+ received = Interop.Sys.Read(handle, b, buffer.Length);
+ errno = received != -1 ? Interop.Error.SUCCESS : Interop.Sys.GetLastError();
+ }
+
+ return received;
+ }
+
private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Span buffer, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
+
int received = 0;
fixed (byte* b = &MemoryMarshal.GetReference(buffer))
@@ -115,6 +132,8 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags,
private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, Span buffer, byte[]? socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
+
Debug.Assert(socketAddress != null || socketAddressLen == 0, $"Unexpected values: socketAddress={socketAddress}, socketAddressLen={socketAddressLen}");
long received = 0;
@@ -154,14 +173,40 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags,
return checked((int)received);
}
+ private static unsafe int SysWrite(SafeSocketHandle handle, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno)
+ {
+ Debug.Assert(!handle.IsSocket);
+
+ int sent;
+
+ fixed (byte* b = &MemoryMarshal.GetReference(buffer))
+ {
+ sent = Interop.Sys.Write(handle, b + offset, count);
+ if (sent == -1)
+ {
+ errno = Interop.Sys.GetLastError();
+ }
+ else
+ {
+ errno = Interop.Error.SUCCESS;
+ offset += sent;
+ count -= sent;
+ }
+ }
+
+ return sent;
+ }
+
private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, ReadOnlySpan buffer, ref int offset, ref int count, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
+
int sent;
fixed (byte* b = &MemoryMarshal.GetReference(buffer))
{
errno = Interop.Sys.Send(
socket,
- &b[offset],
+ b + offset,
count,
flags,
&sent);
@@ -179,13 +224,15 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re
private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, ReadOnlySpan buffer, ref int offset, ref int count, byte[] socketAddress, int socketAddressLen, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
+
int sent;
fixed (byte* sockAddr = socketAddress)
fixed (byte* b = &MemoryMarshal.GetReference(buffer))
{
var iov = new Interop.Sys.IOVector
{
- Base = &b[offset],
+ Base = b + offset,
Count = (UIntPtr)count
};
@@ -219,6 +266,8 @@ private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, Re
private static unsafe int SysSend(SafeSocketHandle socket, SocketFlags flags, IList> buffers, ref int bufferIndex, ref int offset, byte[]? socketAddress, int socketAddressLen, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
+
// Pin buffers and set up iovecs.
int startIndex = bufferIndex, startOffset = offset;
@@ -313,6 +362,8 @@ private static unsafe long SendFile(SafeSocketHandle socket, SafeFileHandle file
private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags, IList> buffers, byte[]? socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
+
int maxBuffers = buffers.Count;
bool allocOnStack = maxBuffers <= IovStackThreshold;
@@ -412,6 +463,7 @@ private static unsafe int SysReceive(SafeSocketHandle socket, SocketFlags flags,
private static unsafe int SysReceiveMessageFrom(SafeSocketHandle socket, SocketFlags flags, Span buffer, byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
Debug.Assert(socketAddress != null, "Expected non-null socketAddress");
int cmsgBufferLen = Interop.Sys.GetControlMessageBufferSize(Convert.ToInt32(isIPv4), Convert.ToInt32(isIPv6));
@@ -465,6 +517,7 @@ private static unsafe int SysReceiveMessageFrom(
byte[] socketAddress, ref int socketAddressLen, bool isIPv4, bool isIPv6,
out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, out Interop.Error errno)
{
+ Debug.Assert(socket.IsSocket);
Debug.Assert(socketAddress != null, "Expected non-null socketAddress");
int buffersCount = buffers.Count;
@@ -678,7 +731,12 @@ public static unsafe bool TryCompleteReceive(SafeSocketHandle socket, Span
Interop.Error errno;
int received;
- if (buffer.Length == 0)
+ if (!socket.IsSocket)
+ {
+ Debug.Assert(flags == SocketFlags.None);
+ received = SysRead(socket, buffer, out errno);
+ }
+ else if (buffer.Length == 0)
{
// Special case a receive of 0 bytes into a single buffer. A common pattern is to ReceiveAsync 0 bytes in order
// to be asynchronously notified when data is available, without needing to dedicate a buffer. Some platforms (e.g. macOS),
@@ -732,7 +790,16 @@ public static unsafe bool TryCompleteReceiveFrom(SafeSocketHandle socket, Span
Interop.Error errno;
try
{
- sent = buffers != null ?
- SysSend(socket, flags, buffers, ref bufferIndex, ref offset, socketAddress, socketAddressLen, out errno) :
- socketAddress == null ? SysSend(socket, flags, buffer, ref offset, ref count, out errno) :
- SysSend(socket, flags, buffer, ref offset, ref count, socketAddress, socketAddressLen, out errno);
+ if (!socket.IsSocket)
+ {
+ Debug.Assert(flags == SocketFlags.None);
+ Debug.Assert(buffers == null);
+ sent = SysWrite(socket, buffer, ref offset, ref count, out errno);
+ }
+ else
+ {
+ sent = buffers != null ?
+ SysSend(socket, flags, buffers, ref bufferIndex, ref offset, socketAddress, socketAddressLen, out errno) :
+ socketAddress == null ? SysSend(socket, flags, buffer, ref offset, ref count, out errno) :
+ SysSend(socket, flags, buffer, ref offset, ref count, socketAddress, socketAddressLen, out errno);
+ }
}
catch (ObjectDisposedException)
{
diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs
index c291583fd54d7..c678cb3431bf0 100644
--- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs
+++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/CreateSocketTests.cs
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
@@ -249,12 +250,18 @@ public void Ctor_SafeHandle_Invalid_ThrowsException()
{
AssertExtensions.Throws("handle", () => new Socket(null));
AssertExtensions.Throws("handle", () => new Socket(new SafeSocketHandle((IntPtr)(-1), false)));
+ }
- using (var pipe = new AnonymousPipeServerStream())
- {
- SocketException se = Assert.Throws(() => new Socket(new SafeSocketHandle(pipe.ClientSafePipeHandle.DangerousGetHandle(), false)));
- Assert.Equal(SocketError.NotSocket, se.SocketErrorCode);
- }
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ [PlatformSpecific(TestPlatforms.Linux)]
+ public void Ctor_Socket_FromPipeHandle_Ctor_Dispose_Success(bool ownsHandle)
+ {
+ (int fd1, int fd2) = pipe2();
+ close(fd2);
+
+ using var _ = new Socket(new SafeSocketHandle(new IntPtr(fd1), ownsHandle));
}
[Theory]
@@ -624,6 +631,25 @@ public unsafe void Ctor_SafeHandle_UnknownSocket_Success()
[DllImport("libc")]
private static extern int close(int fd);
+ [DllImport("libc", SetLastError = true)]
+ private static unsafe extern int pipe2(int* pipefd, int flags);
+
+ private static unsafe (int, int) pipe2(int flags = 0)
+ {
+ Span pipefd = stackalloc int[2];
+ fixed (int* ptr = pipefd)
+ {
+ if (pipe2(ptr, flags) == 0)
+ {
+ return (pipefd[0], pipefd[1]);
+ }
+ else
+ {
+ throw new Win32Exception();
+ }
+ }
+ }
+
[Fact]
[PlatformSpecific(TestPlatforms.AnyUnix)]
public unsafe void Ctor_SafeHandle_SocketPair_Success()