diff --git a/src/Benchmarks.ServerJob/WebHost.cs b/src/Benchmarks.ServerJob/WebHost.cs index 6b6b2234a..af459de1e 100644 --- a/src/Benchmarks.ServerJob/WebHost.cs +++ b/src/Benchmarks.ServerJob/WebHost.cs @@ -11,6 +11,7 @@ public enum WebHost IISInProcess, IISOutOfProcess, Docker, - CCore + CCore, + SocketPipe } } diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/BenchmarkConfigurationHelpers.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/BenchmarkConfigurationHelpers.cs index 5be93e840..b4e213344 100644 --- a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/BenchmarkConfigurationHelpers.cs +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/BenchmarkConfigurationHelpers.cs @@ -7,6 +7,9 @@ using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Configuration; using System.IO.Pipelines; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; namespace PlatformBenchmarks { @@ -19,18 +22,19 @@ public static IWebHostBuilder UseBenchmarksConfiguration(this IWebHostBuilder bu // Handle the transport type var webHost = builder.GetSetting("KestrelTransport"); + Console.WriteLine($"KestrelTransport={webHost}"); + if (string.Equals(webHost, "Sockets", StringComparison.OrdinalIgnoreCase)) { builder.UseSockets(options => { if (int.TryParse(builder.GetSetting("threadCount"), out int threadCount)) { - options.IOQueueCount = threadCount; + options.IOQueueCount = threadCount; } - #if NETCOREAPP5_0 - options.WaitForDataBeforeAllocatingBuffer = false; -#endif + typeof(SocketTransportOptions).GetProperty("WaitForDataBeforeAllocatingBuffer")?.SetValue(options, false); +#endif }); } else if (string.Equals(webHost, "LinuxTransport", StringComparison.OrdinalIgnoreCase)) @@ -40,6 +44,13 @@ public static IWebHostBuilder UseBenchmarksConfiguration(this IWebHostBuilder bu options.ApplicationSchedulingMode = PipeScheduler.Inline; }); } + else if (string.Equals(webHost, "SocketPipe", StringComparison.OrdinalIgnoreCase)) + { + builder.ConfigureServices(services => + { + services.AddSingleton(); + }); + } return builder; } diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/Program.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/Program.cs index d2faf2d43..069048e63 100644 --- a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/Program.cs +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/Program.cs @@ -2,7 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Net; +using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; #if DATABASE diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketAwaitableEventArgs.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketAwaitableEventArgs.cs new file mode 100644 index 000000000..f4a6ce273 --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketAwaitableEventArgs.cs @@ -0,0 +1,59 @@ +using System; +using System.Net.Sockets; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace PlatformBenchmarks +{ + // more or less a copy of https://github.com/dotnet/aspnetcore/blob/master/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs + // but without PipeScheduler (always inlining) + internal sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion + { + private static readonly Action _callbackCompleted = () => { }; + + private Action _callback; + + public SocketAwaitableEventArgs() +#if NETCOREAPP5_0 + : base(unsafeSuppressExecutionContextFlow: true) +#endif + { + } + + public SocketAwaitableEventArgs GetAwaiter() => this; + public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); + + public int GetResult() + { + _callback = null; + + if (SocketError != SocketError.Success) + { + ThrowSocketException(SocketError); + } + + return BytesTransferred; + + static void ThrowSocketException(SocketError e) + { + throw new SocketException((int)e); + } + } + + public void OnCompleted(Action continuation) + { + if (ReferenceEquals(_callback, _callbackCompleted) || + ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) + { + Task.Run(continuation); + } + } + + public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation); + + public void Complete() => OnCompleted(this); + + protected override void OnCompleted(SocketAsyncEventArgs _) => Interlocked.Exchange(ref _callback, _callbackCompleted)?.Invoke(); + } +} \ No newline at end of file diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipe.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipe.cs new file mode 100644 index 000000000..8cf8ccae1 --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipe.cs @@ -0,0 +1,18 @@ +using System.IO.Pipelines; +using System.Net.Sockets; + +namespace PlatformBenchmarks +{ + internal sealed class SocketPipe : IDuplexPipe + { + internal SocketPipe(Socket socket, SocketAwaitableEventArgs awaitableEventArgs) + { + Input = new SocketPipeReader(socket, awaitableEventArgs); + Output = new SocketPipeWriter(socket); + } + + public PipeReader Input { get; } + + public PipeWriter Output { get; } + } +} diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeConnection.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeConnection.cs new file mode 100644 index 000000000..5c2d11d42 --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeConnection.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Http.Features; + +namespace PlatformBenchmarks +{ + public class SocketPipeConnection : ConnectionContext + { + public override string ConnectionId { get; set; } + public override IFeatureCollection Features { get; } + public override IDictionary Items { get; set; } + public override IDuplexPipe Transport { get; set; } + private Socket Socket { get; } + private SocketAwaitableEventArgs AwaitableEventArgs { get; } + + public SocketPipeConnection(Socket socket) + { + Socket = socket; + AwaitableEventArgs = new SocketAwaitableEventArgs(); + Features = new FeatureCollection(); + Transport = new SocketPipe(socket, AwaitableEventArgs); + LocalEndPoint = socket.LocalEndPoint; + RemoteEndPoint = socket.RemoteEndPoint; + ConnectionId = Guid.NewGuid().ToString(); + } + + public override ValueTask DisposeAsync() + { + AwaitableEventArgs.Dispose(); + Socket.Dispose(); + + return default; + } + + } +} \ No newline at end of file diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeConnectionListener.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeConnectionListener.cs new file mode 100644 index 000000000..db36ecf95 --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeConnectionListener.cs @@ -0,0 +1,117 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; + +namespace PlatformBenchmarks +{ + // copy of https://github.com/dotnet/aspnetcore/blob/master/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs + // difference: creates SocketPipeConnection instead of SocketConnection + internal sealed class SocketPipeConnectionListener : IConnectionListener + { + private readonly SocketTransportOptions _options; + + private Socket _listenSocket; + + public SocketPipeConnectionListener(EndPoint endpoint, SocketTransportOptions options) + { + _options = options; + EndPoint = endpoint; + } + + public EndPoint EndPoint { get; private set; } + + public ValueTask UnbindAsync(CancellationToken cancellationToken = new CancellationToken()) => DisposeAsync(); + + public ValueTask DisposeAsync() + { + _listenSocket?.Dispose(); + return default; + } + + public async ValueTask AcceptAsync(CancellationToken cancellationToken = new CancellationToken()) + { + while (true) + { + try + { + var acceptSocket = await _listenSocket.AcceptAsync(); + + // Only apply no delay to Tcp based endpoints + if (acceptSocket.LocalEndPoint is IPEndPoint) + { + acceptSocket.NoDelay = _options.NoDelay; + } + + return new SocketPipeConnection(acceptSocket); + } + catch (ObjectDisposedException) + { + // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done + return null; + } + catch (SocketException e) when (e.SocketErrorCode == SocketError.OperationAborted) + { + // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done + return null; + } + catch (SocketException se) + { + Console.WriteLine($"Socket exception! {se}"); + // The connection got reset while it was in the backlog, so we try again. + // _trace.ConnectionReset(connectionId: "(null)"); + } + } + } + + internal void Bind() + { + if (_listenSocket != null) + { + throw new InvalidOperationException("TransportAlreadyBound"); + } + + // Check if EndPoint is a FileHandleEndpoint before attempting to access EndPoint.AddressFamily + // since that will throw an NotImplementedException. + if (EndPoint is FileHandleEndPoint) + { + throw new NotSupportedException("FileHandleEndPointNotSupported"); + } + + Socket listenSocket; + + // Unix domain sockets are unspecified + var protocolType = EndPoint is UnixDomainSocketEndPoint ? ProtocolType.Unspecified : ProtocolType.Tcp; + + listenSocket = new Socket(EndPoint.AddressFamily, SocketType.Stream, protocolType); + + // Kestrel expects IPv6Any to bind to both IPv6 and IPv4 + if (EndPoint is IPEndPoint ip && ip.Address == IPAddress.IPv6Any) + { + listenSocket.DualMode = true; + } + + try + { + listenSocket.Bind(EndPoint); + } + catch (SocketException e) when (e.SocketErrorCode == SocketError.AddressAlreadyInUse) + { + throw new AddressInUseException(e.Message, e); + } + + EndPoint = listenSocket.LocalEndPoint; + +#if NETCOREAPP5_0 + listenSocket.Listen(_options.Backlog); +#else + listenSocket.Listen(512); // default backlog value +#endif + + _listenSocket = listenSocket; + } + } +} \ No newline at end of file diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeReader.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeReader.cs new file mode 100644 index 000000000..4d60d99cb --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeReader.cs @@ -0,0 +1,78 @@ +using System; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace PlatformBenchmarks +{ + internal sealed class SocketPipeReader : PipeReader + { + // the biggest request that we can get for TechEmpower Plaintext is around 3k + // it's simply cheaper to allocate small array and reuse it compared to pooling + private const int BufferSize = 1024 * 4; + + private Socket _socket; + private byte[] _array; + private int _offset; + private int _length; + private SocketAwaitableEventArgs _awaitableEventArgs; + + public SocketPipeReader(Socket socket, SocketAwaitableEventArgs awaitableEventArgs) + { + _socket = socket; + _array = new byte[BufferSize]; + _offset = 0; + _length = 0; + _awaitableEventArgs = awaitableEventArgs; + } + + public override void AdvanceTo(SequencePosition consumed) => _offset += consumed.GetInteger(); + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _offset += consumed.GetInteger(); + + public override void CancelPendingRead() { } // nop + + public override bool TryRead(out ReadResult result) + { + result = default; + + return false; + } + + public override void Complete(Exception exception = null) { } // nop + + public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + if (_offset == _length) + { + // previously entire array was parsed (100% of cases for TechEmpower) + _offset = 0; + _length = 0; + } + else + { + // in theory it's possible + Array.Resize(ref _array, _array.Length * 2); + } + + var array = _array; + var args = _awaitableEventArgs; + args.SetBuffer(new Memory(array, _length, array.Length - _length)); + + if (_socket.ReceiveAsync(args)) + { + // ReceiveAsync "returns true if the I/O operation is pending" + // so we await only in that case (it's ugly but gives nice perf boost in JSON benchmark + await args; + } + + _length += args.GetResult(); + + return new ReadResult( + new System.Buffers.ReadOnlySequence(array, _offset, _length - _offset), + isCanceled: false, + isCompleted: _length == 0); // FIN + } + } +} diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeTransportFactory.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeTransportFactory.cs new file mode 100644 index 000000000..f5370ec4c --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeTransportFactory.cs @@ -0,0 +1,34 @@ +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.Options; + +namespace PlatformBenchmarks +{ + // a copy of https://github.com/dotnet/aspnetcore/blob/master/src/Servers/Kestrel/Transport.Sockets/src/SocketTransportFactory.cs + // the difference: using different connection listener + internal sealed class SocketPipeTransportFactory : IConnectionListenerFactory + { + private readonly SocketTransportOptions _options; + + public SocketPipeTransportFactory(IOptions options) + { + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + + _options = options.Value; + } + + public ValueTask BindAsync(EndPoint endpoint, CancellationToken cancellationToken = default) + { + var transport = new SocketPipeConnectionListener(endpoint, _options); + transport.Bind(); + return new ValueTask(transport); + } + } +} \ No newline at end of file diff --git a/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeWriter.cs b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeWriter.cs new file mode 100644 index 000000000..cefff1fc9 --- /dev/null +++ b/src/BenchmarksApps/Kestrel/PlatformBenchmarks/SocketPipes/SocketPipeWriter.cs @@ -0,0 +1,91 @@ +using System; +using System.IO.Pipelines; +using System.Net.Sockets; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace PlatformBenchmarks +{ + internal sealed class SocketPipeWriter : PipeWriter + { + // the biggest response that we can create for TechEmpower Plaintext is around 1200 bytes + // it's simply cheaper to allocate small array and reuse it compared to pooling + private const int BufferSize = 2 * 1024; + + private Socket _socket; + private byte[] _array; + private int _offset; + + public SocketPipeWriter(Socket socket) + { + _socket = socket; + _array = new byte[BufferSize]; + _offset = 0; + } + + public override void Advance(int bytes) => _offset += bytes; + + public override void CancelPendingFlush() { } // nop + + public override void Complete(Exception exception = null) => _offset = 0; + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + => _offset <= BufferSize ? SendSync() : SendAsync(cancellationToken); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask SendSync() + { + // we take advantage of the fact that all writes in TE are always small and non-blocking + // so we perform a SYNC send on purpose + + int start = 0; + int toSent = _offset; + + do + { + int bytesSent = _socket.Send(new ReadOnlySpan(_array, start, toSent), SocketFlags.None); + + start += bytesSent; + toSent -= bytesSent; + } + while (toSent > 0); + + _offset = 0; + + return new ValueTask(new FlushResult(isCanceled: false, isCompleted: true)); + } + + private async ValueTask SendAsync(CancellationToken cancellationToken) + { + var isCompleted = await _socket.SendAsync(new ReadOnlyMemory(_array, 0, _offset), SocketFlags.None) == _offset; + + _offset = 0; + + return new FlushResult(isCanceled: cancellationToken.IsCancellationRequested, isCompleted: isCompleted); + } + + public override Memory GetMemory(int sizeHint = 0) + { + ResizeIfNeeded(sizeHint); + + return new Memory(_array, _offset, _array.Length - _offset); + } + + public override Span GetSpan(int sizeHint = 0) + { + ResizeIfNeeded(sizeHint); + + return new Span(_array, _offset, _array.Length - _offset); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ResizeIfNeeded(int sizeHint) + { + if (sizeHint >= _array.Length) + { + Array.Resize(ref _array, Math.Max(sizeHint, _array.Length * 2)); + } + } + } +} diff --git a/src/BenchmarksDriver/Program.cs b/src/BenchmarksDriver/Program.cs index cd5f05e56..c5435c355 100644 --- a/src/BenchmarksDriver/Program.cs +++ b/src/BenchmarksDriver/Program.cs @@ -205,7 +205,7 @@ public static int Main(string[] args) "Scheme (http, https, h2, h2c). Default is http.", CommandOptionType.SingleValue); var webHostOption = app.Option( "-w|--webHost", - "WebHost (e.g., KestrelLibuv, KestrelSockets, HttpSys). Default is KestrelSockets.", + "WebHost (e.g., KestrelLibuv, KestrelSockets, HttpSys, SocketPipe). Default is KestrelSockets.", CommandOptionType.SingleValue); var monoOption = app.Option("--mono-runtime", "Use the mono runtime.", CommandOptionType.NoValue);