Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new optional SocketPipe transport #1480

Closed
wants to merge 13 commits into from
3 changes: 2 additions & 1 deletion src/Benchmarks.ServerJob/WebHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public enum WebHost
IISInProcess,
IISOutOfProcess,
Docker,
CCore
CCore,
SocketPipe
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using System.IO.Pipelines;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;

namespace PlatformBenchmarks
{
Expand All @@ -19,18 +22,19 @@ public static IWebHostBuilder UseBenchmarksConfiguration(this IWebHostBuilder bu
// Handle the transport type
var webHost = builder.GetSetting("KestrelTransport");

Console.WriteLine($"KestrelTransport={webHost}");

if (string.Equals(webHost, "Sockets", StringComparison.OrdinalIgnoreCase))
{
builder.UseSockets(options =>
{
if (int.TryParse(builder.GetSetting("threadCount"), out int threadCount))
{
options.IOQueueCount = threadCount;
options.IOQueueCount = threadCount;
}

#if NETCOREAPP5_0
options.WaitForDataBeforeAllocatingBuffer = false;
#endif
typeof(SocketTransportOptions).GetProperty("WaitForDataBeforeAllocatingBuffer")?.SetValue(options, false);
#endif
});
}
else if (string.Equals(webHost, "LinuxTransport", StringComparison.OrdinalIgnoreCase))
Expand All @@ -40,6 +44,13 @@ public static IWebHostBuilder UseBenchmarksConfiguration(this IWebHostBuilder bu
options.ApplicationSchedulingMode = PipeScheduler.Inline;
});
}
else if (string.Equals(webHost, "SocketPipe", StringComparison.OrdinalIgnoreCase))
{
builder.ConfigureServices(services =>
{
services.AddSingleton<IConnectionListenerFactory, SocketPipeTransportFactory>();
});
}

return builder;
}
Expand Down
2 changes: 1 addition & 1 deletion src/BenchmarksApps/Kestrel/PlatformBenchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Net;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
#if DATABASE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace PlatformBenchmarks
{
// more or less a copy of https://github.com/dotnet/aspnetcore/blob/master/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs
// but without PipeScheduler (always inlining)
internal sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this at all? You can use the SocketTask APIs that do this internally. Kestrel only does this because it's trying to schedule directly to the IOQueue from continuations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of awaiting socket.ReadAsync is quite big, by using this custom class I was able to get +15k RPS. I am going to create an issue in runtime repo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@stephentoub stephentoub Apr 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of awaiting socket.ReadAsync is quite big, by using this custom class I was able to get +15k RPS.

"quite big" is ~15K out of ~1065k in a benchmark that's basically seeing how fast it can call this method? To me that's not worth all the extra code and complexity and diverging from how we want developers to use Socket.

I am going to create an issue in runtime repo

Recommending what? If you have concrete ideas for improving it, great, I'd love to see it get faster, but it has to account for a bunch of things you're not here, like the possibility it might be used concurrently, being used via an interface via ValueTask, that there might be a custom SynchronizationContext/TaskScheduler to be used, possible flowing of ExecutionContext, etc.

{
private static readonly Action _callbackCompleted = () => { };

private Action _callback;

public SocketAwaitableEventArgs()
#if NETCOREAPP5_0
: base(unsafeSuppressExecutionContextFlow: true)
#endif
{
}

public SocketAwaitableEventArgs GetAwaiter() => this;
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);

public int GetResult()
{
_callback = null;

if (SocketError != SocketError.Success)
{
ThrowSocketException(SocketError);
}

return BytesTransferred;

static void ThrowSocketException(SocketError e)
{
throw new SocketException((int)e);
}
}

public void OnCompleted(Action continuation)
{
if (ReferenceEquals(_callback, _callbackCompleted) ||
ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
{
Task.Run(continuation);
}
}

public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);

public void Complete() => OnCompleted(this);

protected override void OnCompleted(SocketAsyncEventArgs _) => Interlocked.Exchange(ref _callback, _callbackCompleted)?.Invoke();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.IO.Pipelines;
using System.Net.Sockets;

namespace PlatformBenchmarks
{
internal sealed class SocketPipe : IDuplexPipe
{
internal SocketPipe(Socket socket, SocketAwaitableEventArgs awaitableEventArgs)
{
Input = new SocketPipeReader(socket, awaitableEventArgs);
Output = new SocketPipeWriter(socket);
}

public PipeReader Input { get; }

public PipeWriter Output { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;

namespace PlatformBenchmarks
{
public class SocketPipeConnection : ConnectionContext
{
public override string ConnectionId { get; set; }
public override IFeatureCollection Features { get; }
public override IDictionary<object, object> Items { get; set; }
public override IDuplexPipe Transport { get; set; }
private Socket Socket { get; }
private SocketAwaitableEventArgs AwaitableEventArgs { get; }

public SocketPipeConnection(Socket socket)
{
Socket = socket;
AwaitableEventArgs = new SocketAwaitableEventArgs();
Features = new FeatureCollection();
Transport = new SocketPipe(socket, AwaitableEventArgs);
LocalEndPoint = socket.LocalEndPoint;
RemoteEndPoint = socket.RemoteEndPoint;
ConnectionId = Guid.NewGuid().ToString();
}

public override ValueTask DisposeAsync()
{
AwaitableEventArgs.Dispose();
Socket.Dispose();

return default;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;

namespace PlatformBenchmarks
{
// copy of https://github.com/dotnet/aspnetcore/blob/master/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs
// difference: creates SocketPipeConnection instead of SocketConnection
internal sealed class SocketPipeConnectionListener : IConnectionListener
{
private readonly SocketTransportOptions _options;

private Socket _listenSocket;

public SocketPipeConnectionListener(EndPoint endpoint, SocketTransportOptions options)
{
_options = options;
EndPoint = endpoint;
}

public EndPoint EndPoint { get; private set; }

public ValueTask UnbindAsync(CancellationToken cancellationToken = new CancellationToken()) => DisposeAsync();

public ValueTask DisposeAsync()
{
_listenSocket?.Dispose();
return default;
}

public async ValueTask<ConnectionContext> AcceptAsync(CancellationToken cancellationToken = new CancellationToken())
{
while (true)
{
try
{
var acceptSocket = await _listenSocket.AcceptAsync();

// Only apply no delay to Tcp based endpoints
if (acceptSocket.LocalEndPoint is IPEndPoint)
{
acceptSocket.NoDelay = _options.NoDelay;
}

return new SocketPipeConnection(acceptSocket);
}
catch (ObjectDisposedException)
{
// A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
return null;
}
catch (SocketException e) when (e.SocketErrorCode == SocketError.OperationAborted)
{
// A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
return null;
}
catch (SocketException se)
{
Console.WriteLine($"Socket exception! {se}");
// The connection got reset while it was in the backlog, so we try again.
// _trace.ConnectionReset(connectionId: "(null)");
}
}
}

internal void Bind()
{
if (_listenSocket != null)
{
throw new InvalidOperationException("TransportAlreadyBound");
}

// Check if EndPoint is a FileHandleEndpoint before attempting to access EndPoint.AddressFamily
// since that will throw an NotImplementedException.
if (EndPoint is FileHandleEndPoint)
{
throw new NotSupportedException("FileHandleEndPointNotSupported");
}

Socket listenSocket;

// Unix domain sockets are unspecified
var protocolType = EndPoint is UnixDomainSocketEndPoint ? ProtocolType.Unspecified : ProtocolType.Tcp;

listenSocket = new Socket(EndPoint.AddressFamily, SocketType.Stream, protocolType);

// Kestrel expects IPv6Any to bind to both IPv6 and IPv4
if (EndPoint is IPEndPoint ip && ip.Address == IPAddress.IPv6Any)
{
listenSocket.DualMode = true;
}

try
{
listenSocket.Bind(EndPoint);
}
catch (SocketException e) when (e.SocketErrorCode == SocketError.AddressAlreadyInUse)
{
throw new AddressInUseException(e.Message, e);
}

EndPoint = listenSocket.LocalEndPoint;

#if NETCOREAPP5_0
listenSocket.Listen(_options.Backlog);
#else
listenSocket.Listen(512); // default backlog value
#endif

_listenSocket = listenSocket;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace PlatformBenchmarks
{
internal sealed class SocketPipeReader : PipeReader
{
// the biggest request that we can get for TechEmpower Plaintext is around 3k
// it's simply cheaper to allocate small array and reuse it compared to pooling
private const int BufferSize = 1024 * 4;

private Socket _socket;
private byte[] _array;
private int _offset;
private int _length;
private SocketAwaitableEventArgs _awaitableEventArgs;

public SocketPipeReader(Socket socket, SocketAwaitableEventArgs awaitableEventArgs)
{
_socket = socket;
_array = new byte[BufferSize];
_offset = 0;
_length = 0;
_awaitableEventArgs = awaitableEventArgs;
}

public override void AdvanceTo(SequencePosition consumed) => _offset += consumed.GetInteger();

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _offset += consumed.GetInteger();

public override void CancelPendingRead() { } // nop

public override bool TryRead(out ReadResult result)
{
result = default;

return false;
}

public override void Complete(Exception exception = null) { } // nop

public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
if (_offset == _length)
{
// previously entire array was parsed (100% of cases for TechEmpower)
_offset = 0;
_length = 0;
}
else
{
// in theory it's possible
Array.Resize(ref _array, _array.Length * 2);
}

var array = _array;
var args = _awaitableEventArgs;
args.SetBuffer(new Memory<byte>(array, _length, array.Length - _length));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to call _socket.ReadAsync here directly instead of using SocketAsyncEventArgs (see https://github.com/dotnet/runtime/blob/e22cf553e11d500c1523034f2c7ff745014e0629/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs#L18).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, but it's slow. I am going to create an issue in runtime repo


if (_socket.ReceiveAsync(args))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't wait to receive more data if everything hasn't been examined yet. There may be no more data coming.

{
// ReceiveAsync "returns true if the I/O operation is pending"
// so we await only in that case (it's ugly but gives nice perf boost in JSON benchmark
await args;
}

_length += args.GetResult();

return new ReadResult(
new System.Buffers.ReadOnlySequence<byte>(array, _offset, _length - _offset),
isCanceled: false,
isCompleted: _length == 0); // FIN
}
}
}
Loading