Skip to content

Commit

Permalink
Use socketpair to implement Process.Start redirection (#34861)
Browse files Browse the repository at this point in the history
* Use socketpair to implement Process.Start redirection

Today on Unix, we create an anonymous pipe via pipe/pipe2 to be used for stdin/stdout/stderr on processes created by Process.Start.  We then wrap the resulting file descriptors with FileStreams to hand out via Process.StandardInput/Output/Error.  This has a few issues, however. Any async operations on the resulting stream (or wrapping stream reader) will actually be async-over-sync, and that in turn means that a) any async read will end up blocking a thread pool thread until it's satisified, and b) the operation isn't cancelable.  The implications of (b) are obvious, and the problem with (a) is that code which launches a bunch of processes and uses BeginOutput/ErrorReadLine or the like will end up blocking a bunch of thread pool threads.

This change replaces the pipe/pipe2 calls with socketpair calls, and instead of wrapping the resulting file descriptors with FileStream, wraps them in Sockets and NetworkStreams.  This gives us the full capabilities of the networking stack, which fully supports asynchronous and cancelable reads and writes.

* Try to fix macOS failures with socketpair
  • Loading branch information
stephentoub authored May 22, 2020
1 parent 1c66ad1 commit c44dc40
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 42 deletions.
50 changes: 37 additions & 13 deletions src/libraries/Native/Unix/System.Native/pal_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
#if HAVE_CRT_EXTERNS_H
#include <crt_externs.h>
#endif
#if HAVE_PIPE2
#include <fcntl.h>
#endif
#include <sys/socket.h>
#include <pthread.h>

#if HAVE_SCHED_SETAFFINITY || HAVE_SCHED_GETAFFINITY
Expand All @@ -48,7 +47,7 @@ c_static_assert(PAL_PRIO_PROCESS == (int)PRIO_PROCESS);
c_static_assert(PAL_PRIO_PGRP == (int)PRIO_PGRP);
c_static_assert(PAL_PRIO_USER == (int)PRIO_USER);

#if !HAVE_PIPE2
#ifndef SOCK_CLOEXEC
static pthread_mutex_t ProcessCreateLock = PTHREAD_MUTEX_INITIALIZER;
#endif

Expand Down Expand Up @@ -183,6 +182,31 @@ static int SetGroups(uint32_t* userGroups, int32_t userGroupsLength, uint32_t* p
return rv;
}

static int32_t SocketPair(int32_t sv[2])
{
int32_t result;

int type = SOCK_STREAM;
#ifdef SOCK_CLOEXEC
type |= SOCK_CLOEXEC;
#endif

while ((result = socketpair(AF_UNIX, type, 0, sv)) < 0 && errno == EINTR);

#ifndef SOCK_CLOEXEC
if (result == 0)
{
while ((result = fcntl(sv[READ_END_OF_PIPE], F_SETFD, FD_CLOEXEC)) < 0 && errno == EINTR);
if (result == 0)
{
while ((result = fcntl(sv[WRITE_END_OF_PIPE], F_SETFD, FD_CLOEXEC)) < 0 && errno == EINTR);
}
}
#endif

return result;
}

int32_t SystemNative_ForkAndExecProcess(const char* filename,
char* const argv[],
char* const envp[],
Expand All @@ -201,7 +225,7 @@ int32_t SystemNative_ForkAndExecProcess(const char* filename,
int32_t* stderrFd)
{
#if HAVE_FORK
#if !HAVE_PIPE2
#ifndef SOCK_CLOEXEC
bool haveProcessCreateLock = false;
#endif
bool success = true;
Expand Down Expand Up @@ -257,11 +281,11 @@ int32_t SystemNative_ForkAndExecProcess(const char* filename,
goto done;
}

#if !HAVE_PIPE2
// We do not have pipe2(); take the lock to emulate it race free.
// If another process were to be launched between the pipe creation and the fcntl call to set CLOEXEC on it, that
// file descriptor will be inherited into the other child process, eventually causing a deadlock either in the loop
// below that waits for that pipe to be closed or in StreamReader.ReadToEnd() in the calling code.
#ifndef SOCK_CLOEXEC
// We do not have SOCK_CLOEXEC; take the lock to emulate it race free.
// If another process were to be launched between the socket creation and the fcntl call to set CLOEXEC on it, that
// file descriptor would be inherited into the other child process, eventually causing a deadlock either in the loop
// below that waits for that socket to be closed or in StreamReader.ReadToEnd() in the calling code.
if (pthread_mutex_lock(&ProcessCreateLock) != 0)
{
// This check is pretty much just checking for trashed memory.
Expand All @@ -273,9 +297,9 @@ int32_t SystemNative_ForkAndExecProcess(const char* filename,

// Open pipes for any requests to redirect stdin/stdout/stderr and set the
// close-on-exec flag to the pipe file descriptors.
if ((redirectStdin && SystemNative_Pipe(stdinFds, PAL_O_CLOEXEC) != 0) ||
(redirectStdout && SystemNative_Pipe(stdoutFds, PAL_O_CLOEXEC) != 0) ||
(redirectStderr && SystemNative_Pipe(stderrFds, PAL_O_CLOEXEC) != 0))
if ((redirectStdin && SocketPair(stdinFds) != 0) ||
(redirectStdout && SocketPair(stdoutFds) != 0) ||
(redirectStderr && SocketPair(stderrFds) != 0))
{
success = false;
goto done;
Expand Down Expand Up @@ -426,7 +450,7 @@ int32_t SystemNative_ForkAndExecProcess(const char* filename,
*stderrFd = stderrFds[READ_END_OF_PIPE];

done:;
#if !HAVE_PIPE2
#ifndef SOCK_CLOEXEC
if (haveProcessCreateLock)
{
pthread_mutex_unlock(&ProcessCreateLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@
<Reference Include="System.IO.FileSystem.DriveInfo" />
<Reference Include="System.Linq" />
<Reference Include="System.Memory" />
<Reference Include="System.Net.Sockets" />
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Net.Sockets;
using System.Security;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -761,16 +762,15 @@ internal static TimeSpan TicksToTimeSpan(double ticks)
return TimeSpan.FromSeconds(ticks / (double)ticksPerSecond);
}

/// <summary>Opens a stream around the specified file descriptor and with the specified access.</summary>
/// <param name="fd">The file descriptor.</param>
/// <summary>Opens a stream around the specified socket file descriptor and with the specified access.</summary>
/// <param name="fd">The socket file descriptor.</param>
/// <param name="access">The access mode.</param>
/// <returns>The opened stream.</returns>
private static FileStream OpenStream(int fd, FileAccess access)
private static Stream OpenStream(int fd, FileAccess access)
{
Debug.Assert(fd >= 0);
return new FileStream(
new SafeFileHandle((IntPtr)fd, ownsHandle: true),
access, StreamBufferSize, isAsync: false);
var socket = new Socket(new SafeSocketHandle((IntPtr)fd, ownsHandle: true));
return new NetworkStream(socket, access, ownsSocket: true);
}

/// <summary>Parses a command-line argument string into a list of arguments.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,42 @@ async private Task<bool> WaitPipeSignal(PipeStream pipe, int millisecond)
}
}

[PlatformSpecific(~TestPlatforms.Windows)] // currently on Windows these operations async-over-sync on Windows
[Fact]
public async Task ReadAsync_OutputStreams_Cancel_RespondsQuickly()
{
Process p = CreateProcessLong();
try
{
p.StartInfo.RedirectStandardOutput = true;
p.StartInfo.RedirectStandardError = true;
Assert.True(p.Start());

using (var cts = new CancellationTokenSource())
{
ValueTask<int> vt = p.StandardOutput.ReadAsync(new char[1].AsMemory(), cts.Token);
await Task.Delay(1);
Assert.False(vt.IsCompleted);
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
}

using (var cts = new CancellationTokenSource())
{
ValueTask<int> vt = p.StandardError.ReadAsync(new char[1].AsMemory(), cts.Token);
await Task.Delay(1);
Assert.False(vt.IsCompleted);
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
}
}
finally
{
p.Kill();
p.Dispose();
}
}

[Fact]
public void TestSyncStreams()
{
Expand Down
55 changes: 32 additions & 23 deletions src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,36 +188,45 @@ private unsafe Socket(SafeSocketHandle handle, bool loadPropertiesFromHandle)
{
try
{
// Local and Remote EP may be different sizes for something like UDS.
// Local and remote end points may be different sizes for protocols like Unix Domain Sockets.
bufferLength = buffer.Length;
if (SocketPal.GetPeerName(handle, buffer, ref bufferLength) != SocketError.Success)
switch (SocketPal.GetPeerName(handle, buffer, ref bufferLength))
{
return;
}
case SocketError.Success:
switch (_addressFamily)
{
case AddressFamily.InterNetwork:
_remoteEndPoint = new IPEndPoint(
new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF),
SocketAddressPal.GetPort(buffer));
break;

case AddressFamily.InterNetworkV6:
Span<byte> address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes];
SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope);
_remoteEndPoint = new IPEndPoint(
new IPAddress(address, scope),
SocketAddressPal.GetPort(buffer));
break;

case AddressFamily.Unix:
socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength));
_remoteEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress));
break;
}

switch (_addressFamily)
{
case AddressFamily.InterNetwork:
_remoteEndPoint = new IPEndPoint(
new IPAddress((long)SocketAddressPal.GetIPv4Address(buffer.Slice(0, bufferLength)) & 0x0FFFFFFFF),
SocketAddressPal.GetPort(buffer));
break;

case AddressFamily.InterNetworkV6:
Span<byte> address = stackalloc byte[IPAddressParserStatics.IPv6AddressBytes];
SocketAddressPal.GetIPv6Address(buffer.Slice(0, bufferLength), address, out uint scope);
_remoteEndPoint = new IPEndPoint(
new IPAddress(address, scope),
SocketAddressPal.GetPort(buffer));
_isConnected = true;
break;

case AddressFamily.Unix:
socketAddress = new Internals.SocketAddress(_addressFamily, buffer.Slice(0, bufferLength));
_remoteEndPoint = new UnixDomainSocketEndPoint(IPEndPointExtensions.GetNetSocketAddress(socketAddress));
case SocketError.InvalidArgument:
// On some OSes (e.g. macOS), EINVAL means the socket has been shut down.
// This can happen if, for example, socketpair was used and the parent
// process closed its copy of the child's socket. Since we don't know
// whether we're actually connected or not, err on the side of saying
// we're connected.
_isConnected = true;
break;
}

_isConnected = true;
}
catch { }
}
Expand Down

0 comments on commit c44dc40

Please sign in to comment.