From 32e7d7df3b33915e4d14d61c596df50ccc8d1c4f Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Tue, 27 Oct 2015 11:59:26 +0000 Subject: [PATCH] Go faster stripes MkII Timings Lower alloc MemoryPool removal, MemoryPool2 rename Memory pool SocketOutput Output buffer packing Release all the locks Ascii headers; MemoryTextWriter removal Pool write requests --- .../Filter/FilteredStreamAdapter.cs | 2 +- .../Http/Connection.cs | 6 +- .../Http/Frame.cs | 122 ++-- .../Http/ListenerContext.cs | 13 +- .../Http/ListenerPrimary.cs | 9 +- .../Http/MemoryPool.cs | 125 ---- .../Http/MemoryPoolTextWriter.cs | 155 ----- .../Http/SocketInput.cs | 22 +- .../Http/SocketOutput.cs | 538 ++++++++++-------- .../Http/UrlPathDecoder.cs | 12 +- .../Infrastructure/KestrelThread.cs | 134 +++-- .../{MemoryPool2.cs => MemoryPool.cs} | 62 +- ...MemoryPoolBlock2.cs => MemoryPoolBlock.cs} | 26 +- ...PoolIterator2.cs => MemoryPoolIterator.cs} | 22 +- .../{MemoryPoolSlab2.cs => MemoryPoolSlab.cs} | 8 +- .../KestrelEngine.cs | 1 + .../Networking/UvHandle.cs | 2 +- .../Networking/UvShutdownReq.cs | 4 + .../Networking/UvWriteReq.cs | 116 ++-- .../ServiceContext.cs | 4 - .../ResponseTests.cs | 15 +- .../FrameTests.cs | 2 +- ...Block2Tests.cs => MemoryPoolBlockTests.cs} | 16 +- .../MemoryPoolExtensions.cs | 2 +- ...or2Tests.cs => MemoryPoolIteratorTests.cs} | 10 +- .../MultipleLoopTests.cs | 16 +- .../NetworkingTests.cs | 10 +- .../SocketOutputTests.cs | 100 ++-- .../TestInput.cs | 4 +- .../UrlPathDecoder.cs | 6 +- .../Program.cs | 2 +- 31 files changed, 730 insertions(+), 836 deletions(-) delete mode 100644 src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs delete mode 100644 src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs rename src/Microsoft.AspNet.Server.Kestrel/Infrastructure/{MemoryPool2.cs => MemoryPool.cs} (79%) rename src/Microsoft.AspNet.Server.Kestrel/Infrastructure/{MemoryPoolBlock2.cs => MemoryPoolBlock.cs} (92%) rename src/Microsoft.AspNet.Server.Kestrel/Infrastructure/{MemoryPoolIterator2.cs => MemoryPoolIterator.cs} (97%) rename src/Microsoft.AspNet.Server.Kestrel/Infrastructure/{MemoryPoolSlab2.cs => MemoryPoolSlab.cs} (95%) rename test/Microsoft.AspNet.Server.KestrelTests/{MemoryPoolBlock2Tests.cs => MemoryPoolBlockTests.cs} (92%) rename test/Microsoft.AspNet.Server.KestrelTests/{MemoryPoolIterator2Tests.cs => MemoryPoolIteratorTests.cs} (95%) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs index 2e0872f70..c5efbd016 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/FilteredStreamAdapter.cs @@ -18,7 +18,7 @@ public class FilteredStreamAdapter public FilteredStreamAdapter( Stream filteredStream, - MemoryPool2 memory, + MemoryPool memory, IKestrelTrace logger) { SocketInput = new SocketInput(memory); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index ac0540d18..3c8c05514 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -39,8 +39,8 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context _connectionId = Interlocked.Increment(ref _lastConnectionId); - _rawSocketInput = new SocketInput(Memory2); - _rawSocketOutput = new SocketOutput(Thread, _socket, _connectionId, Log); + _rawSocketInput = new SocketInput(InputMemory); + _rawSocketOutput = new SocketOutput(OutputMemory, Thread, _socket, _connectionId, Log); } public void Start() @@ -100,7 +100,7 @@ public void Start() private void ApplyConnectionFilter() { - var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log); + var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, InputMemory, Log); SocketInput = filteredStreamAdapter.SocketInput; SocketOutput = filteredStreamAdapter.SocketOutput; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index 36f1d9231..b63710be9 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -22,16 +22,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public partial class Frame : FrameContext, IFrameControl { private static readonly Encoding _ascii = Encoding.ASCII; - private static readonly ArraySegment _endChunkBytes = CreateAsciiByteArraySegment("\r\n"); + private static readonly ArraySegment _endLineBytes = CreateAsciiByteArraySegment("\r\n"); + private static readonly ArraySegment _endChunkBytes = _endLineBytes; + private static readonly ArraySegment _headerDelimiterBytes = CreateAsciiByteArraySegment(": "); + private static readonly ArraySegment _spaceBytes = CreateAsciiByteArraySegment(" "); private static readonly ArraySegment _endChunkedResponseBytes = CreateAsciiByteArraySegment("0\r\n\r\n"); private static readonly ArraySegment _continueBytes = CreateAsciiByteArraySegment("HTTP/1.1 100 Continue\r\n\r\n"); + private static readonly ArraySegment _contentLengthZeroBytes = CreateAsciiByteArraySegment("Content-Length: 0\r\n"); + private static readonly ArraySegment _transferEncodingChunkedBytes = CreateAsciiByteArraySegment("Transfer-Encoding: chunked\r\n"); + private static readonly ArraySegment _connectionCloseBytes = CreateAsciiByteArraySegment("Connection: close\r\n\r\n"); + private static readonly ArraySegment _connectionKeepAliveBytes = CreateAsciiByteArraySegment("Connection: keep-alive\r\n\r\n"); private static readonly ArraySegment _emptyData = new ArraySegment(new byte[0]); private static readonly byte[] _hex = Encoding.ASCII.GetBytes("0123456789abcdef"); - private readonly object _onStartingSync = new Object(); - private readonly object _onCompletedSync = new Object(); + private readonly object _onStartingSync = new object(); + private readonly object _onCompletedSync = new object(); private readonly FrameRequestHeaders _requestHeaders = new FrameRequestHeaders(); - private readonly byte[] _nullBuffer = new byte[4096]; + private readonly byte[] _scratchBuffer = new byte[4096]; private readonly FrameResponseHeaders _responseHeaders = new FrameResponseHeaders(); private List, object>> _onStarting; @@ -232,7 +239,7 @@ public async Task RequestProcessingAsync() await ProduceEnd(); - while (await RequestBody.ReadAsync(_nullBuffer, 0, _nullBuffer.Length) != 0) + while (await RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length) != 0) { // Finish reading the request body in case the app did not. } @@ -471,19 +478,14 @@ public async Task ProduceStartAndFireOnStarting(bool immediate = true) await ProduceStart(immediate, appCompleted: false); } - private async Task ProduceStart(bool immediate, bool appCompleted) + private Task ProduceStart(bool immediate, bool appCompleted) { - if (_responseStarted) return; + if (_responseStarted) return TaskUtilities.CompletedTask; _responseStarted = true; var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase); - var responseHeader = CreateResponseHeader(status, appCompleted); - - using (responseHeader.Item2) - { - await SocketOutput.WriteAsync(responseHeader.Item1, immediate: immediate); - } + return CreateResponseHeader(status, appCompleted, immediate); } private async Task ProduceEnd() @@ -521,16 +523,54 @@ private async Task ProduceEnd() } } - private Tuple, IDisposable> CreateResponseHeader( + ArraySegment ShortAsciiToBytes(string input) + { + + var scratch = _scratchBuffer; + var len = input.Length; + + var i = 0; + for (; i < scratch.Length; i++) + { + if (i >= len) + { + break; + } + scratch[i] = (byte)input[i]; + } + var buffer = new ArraySegment(scratch, 0, i); + return buffer; + } + bool LongAsciiToBytes(string input, int offset, out int newOffset, out ArraySegment buffer) + { + var scratch = _scratchBuffer; + var len = input.Length; + + newOffset = offset; + var i = 0; + for (; i < scratch.Length; i++) + { + if (newOffset >= len) + { + break; + } + scratch[i] = (byte)input[newOffset]; + newOffset++; + } + + buffer = new ArraySegment(scratch, 0, i); + return newOffset < len; + } + + private Task CreateResponseHeader( string status, - bool appCompleted) + bool appCompleted, + bool immediate) { - var writer = new MemoryPoolTextWriter(Memory); - writer.Write(HttpVersion); - writer.Write(' '); - writer.Write(status); - writer.Write('\r'); - writer.Write('\n'); + SocketOutput.Write(ShortAsciiToBytes(HttpVersion), immediate: false); + SocketOutput.Write(_spaceBytes, immediate: false); + SocketOutput.Write(ShortAsciiToBytes(status), immediate: false); + SocketOutput.Write(_endLineBytes, immediate: false); var hasConnection = false; var hasTransferEncoding = false; @@ -555,21 +595,33 @@ private Tuple, IDisposable> CreateResponseHeader( hasContentLength = true; } + ArraySegment buffer; + int inputOffset; foreach (var value in header.Value) { - writer.Write(header.Key); - writer.Write(':'); - writer.Write(' '); - writer.Write(value); - writer.Write('\r'); - writer.Write('\n'); + inputOffset = 0; + while (LongAsciiToBytes(header.Key, inputOffset, out inputOffset, out buffer)) + { + SocketOutput.Write(buffer, immediate: false); + } + SocketOutput.Write(buffer, immediate: false); + + SocketOutput.Write(_headerDelimiterBytes, immediate: false); + + inputOffset = 0; + while (LongAsciiToBytes(value, inputOffset, out inputOffset, out buffer)) + { + SocketOutput.Write(buffer, immediate: false); + } + SocketOutput.Write(buffer, immediate: false); + + SocketOutput.Write(_endLineBytes, immediate: false); if (isConnection && value.IndexOf("close", StringComparison.OrdinalIgnoreCase) != -1) { _keepAlive = false; } } - } if (_keepAlive && !hasTransferEncoding && !hasContentLength) @@ -582,7 +634,7 @@ private Tuple, IDisposable> CreateResponseHeader( { // Since the app has completed and we are only now generating // the headers we can safely set the Content-Length to 0. - writer.Write("Content-Length: 0\r\n"); + SocketOutput.Write(_contentLengthZeroBytes, immediate: false); } } else @@ -590,7 +642,7 @@ private Tuple, IDisposable> CreateResponseHeader( if (HttpVersion == "HTTP/1.1") { _autoChunk = true; - writer.Write("Transfer-Encoding: chunked\r\n"); + SocketOutput.Write(_transferEncodingChunkedBytes, immediate: false); } else { @@ -601,19 +653,17 @@ private Tuple, IDisposable> CreateResponseHeader( if (_keepAlive == false && hasConnection == false && HttpVersion == "HTTP/1.1") { - writer.Write("Connection: close\r\n\r\n"); + return SocketOutput.WriteAsync(_connectionCloseBytes, immediate: immediate); } else if (_keepAlive && hasConnection == false && HttpVersion == "HTTP/1.0") { - writer.Write("Connection: keep-alive\r\n\r\n"); + return SocketOutput.WriteAsync(_connectionKeepAliveBytes, immediate: immediate); } else { - writer.Write('\r'); - writer.Write('\n'); + return SocketOutput.WriteAsync(_endLineBytes, immediate: immediate); } - writer.Flush(); - return new Tuple, IDisposable>(writer.Buffer, writer); + } private bool TakeStartLine(SocketInput input) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs index 581720041..92ea33758 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs @@ -10,13 +10,15 @@ public class ListenerContext : ServiceContext { public ListenerContext() { - Memory2 = new MemoryPool2(); + InputMemory = new MemoryPool(); + OutputMemory = new MemoryPool(); } public ListenerContext(ServiceContext serviceContext) : base(serviceContext) { - Memory2 = new MemoryPool2(); + InputMemory = new MemoryPool(); + OutputMemory = new MemoryPool(); } public ListenerContext(ListenerContext listenerContext) @@ -25,7 +27,8 @@ public ListenerContext(ListenerContext listenerContext) ServerAddress = listenerContext.ServerAddress; Thread = listenerContext.Thread; Application = listenerContext.Application; - Memory2 = listenerContext.Memory2; + InputMemory = listenerContext.InputMemory; + OutputMemory = listenerContext.OutputMemory; Log = listenerContext.Log; } @@ -35,6 +38,8 @@ public ListenerContext(ListenerContext listenerContext) public RequestDelegate Application { get; set; } - public MemoryPool2 Memory2 { get; set; } + public MemoryPool InputMemory { get; set; } + + public MemoryPool OutputMemory { get; set; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs index 21c6c5518..7bf741a0a 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs @@ -22,7 +22,7 @@ abstract public class ListenerPrimary : Listener // this message is passed to write2 because it must be non-zero-length, // but it has no other functional significance - private readonly ArraySegment> _dummyMessage = new ArraySegment>(new[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }); + private readonly byte[] _dummyBuffer = new byte[] { 1, 2, 3, 4 }; protected ListenerPrimary(ServiceContext serviceContext) : base(serviceContext) { @@ -80,14 +80,17 @@ protected override void DispatchConnection(UvStreamHandle socket) } else { + var msg = MemoryPoolBlock.Create(new ArraySegment(_dummyBuffer), IntPtr.Zero, null, null); + msg.End = msg.Start + _dummyBuffer.Length; + var dispatchPipe = _dispatchPipes[index]; var write = new UvWriteReq(Log); write.Init(Thread.Loop); write.Write2( dispatchPipe, - _dummyMessage, + new ArraySegment(new[] { msg }), socket, - (write2, status, error, state) => + (write2, status, error, bytesWritten, state) => { write2.Dispose(); ((UvStreamHandle)state).Dispose(); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs deleted file mode 100644 index 0fc5c390e..000000000 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; - -namespace Microsoft.AspNet.Server.Kestrel.Http -{ - public class MemoryPool : IMemoryPool - { - private static readonly byte[] EmptyArray = new byte[0]; - - private readonly Pool _pool1 = new Pool(); - private readonly Pool _pool2 = new Pool(); - private readonly Pool _pool3 = new Pool(); - - public byte[] Empty - { - get - { - return EmptyArray; - } - } - - public byte[] AllocByte(int minimumSize) - { - if (minimumSize == 0) - { - return EmptyArray; - } - if (minimumSize <= 1024) - { - return _pool1.Alloc(1024); - } - if (minimumSize <= 2048) - { - return _pool2.Alloc(2048); - } - return new byte[minimumSize]; - } - - public void FreeByte(byte[] memory) - { - if (memory == null) - { - return; - } - switch (memory.Length) - { - case 1024: - _pool1.Free(memory, 256); - break; - case 2048: - _pool2.Free(memory, 64); - break; - } - } - - public char[] AllocChar(int minimumSize) - { - if (minimumSize == 0) - { - return new char[0]; - } - if (minimumSize <= 128) - { - return _pool3.Alloc(128); - } - return new char[minimumSize]; - } - - public void FreeChar(char[] memory) - { - if (memory == null) - { - return; - } - switch (memory.Length) - { - case 128: - _pool3.Free(memory, 256); - break; - } - } - - public ArraySegment AllocSegment(int minimumSize) - { - return new ArraySegment(AllocByte(minimumSize)); - } - - public void FreeSegment(ArraySegment segment) - { - FreeByte(segment.Array); - } - - class Pool - { - private readonly Stack _stack = new Stack(); - private readonly object _sync = new object(); - - public T[] Alloc(int size) - { - lock (_sync) - { - if (_stack.Count != 0) - { - return _stack.Pop(); - } - } - return new T[size]; - } - - public void Free(T[] value, int limit) - { - lock (_sync) - { - if (_stack.Count < limit) - { - _stack.Push(value); - } - } - } - } - } -} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs deleted file mode 100644 index 2f549dc15..000000000 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO; -using System.Text; - -namespace Microsoft.AspNet.Server.Kestrel.Http -{ - public class MemoryPoolTextWriter : TextWriter - { - private readonly IMemoryPool _memory; - - private char[] _textArray; - private int _textBegin; - private int _textEnd; - // ReSharper disable InconsistentNaming - private const int _textLength = 128; - // ReSharper restore InconsistentNaming - - private byte[] _dataArray; - private int _dataEnd; - - private readonly Encoder _encoder; - - public MemoryPoolTextWriter(IMemoryPool memory) - { - _memory = memory; - _textArray = _memory.AllocChar(_textLength); - _dataArray = _memory.Empty; - _encoder = Encoding.UTF8.GetEncoder(); - } - - public ArraySegment Buffer - { - get - { - return new ArraySegment(_dataArray, 0, _dataEnd); - } - } - - public override Encoding Encoding - { - get - { - return Encoding.UTF8; - } - } - - protected override void Dispose(bool disposing) - { - try - { - if (disposing) - { - if (_textArray != null) - { - _memory.FreeChar(_textArray); - _textArray = null; - } - if (_dataArray != null) - { - _memory.FreeByte(_dataArray); - _dataArray = null; - } - } - } - finally - { - base.Dispose(disposing); - } - } - - private void Encode(bool flush) - { - var bytesNeeded = _encoder.GetByteCount( - _textArray, - _textBegin, - _textEnd - _textBegin, - flush); - - Grow(bytesNeeded); - - var bytesUsed = _encoder.GetBytes( - _textArray, - _textBegin, - _textEnd - _textBegin, - _dataArray, - _dataEnd, - flush); - - _textBegin = _textEnd = 0; - _dataEnd += bytesUsed; - } - - private void Grow(int minimumAvailable) - { - if (_dataArray.Length - _dataEnd >= minimumAvailable) - { - return; - } - - var newLength = _dataArray.Length + Math.Max(_dataArray.Length, minimumAvailable); - var newArray = _memory.AllocByte(newLength); - Array.Copy(_dataArray, 0, newArray, 0, _dataEnd); - _memory.FreeByte(_dataArray); - _dataArray = newArray; - } - - public override void Write(char value) - { - if (_textLength == _textEnd) - { - Encode(false); - if (_textLength == _textEnd) - { - throw new InvalidOperationException("Unexplainable failure to encode text"); - } - } - - _textArray[_textEnd++] = value; - } - - public override void Write(string value) - { - var sourceIndex = 0; - var sourceLength = value.Length; - while (sourceIndex < sourceLength) - { - if (_textLength == _textEnd) - { - Encode(false); - } - - var count = sourceLength - sourceIndex; - if (count > _textLength - _textEnd) - { - count = _textLength - _textEnd; - } - - value.CopyTo(sourceIndex, _textArray, _textEnd, count); - sourceIndex += count; - _textEnd += count; - } - } - - public override void Flush() - { - while (_textBegin != _textEnd) - { - Encode(true); - } - } - } -} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs index 0eb74bb90..ba1b4e06b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs @@ -15,18 +15,18 @@ public class SocketInput : ICriticalNotifyCompletion private static readonly Action _awaitableIsCompleted = () => { }; private static readonly Action _awaitableIsNotCompleted = () => { }; - private readonly MemoryPool2 _memory; + private readonly MemoryPool _memory; private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false); private Action _awaitableState; private Exception _awaitableError; - private MemoryPoolBlock2 _head; - private MemoryPoolBlock2 _tail; - private MemoryPoolBlock2 _pinned; + private MemoryPoolBlock _head; + private MemoryPoolBlock _tail; + private MemoryPoolBlock _pinned; private readonly object _sync = new Object(); - public SocketInput(MemoryPool2 memory) + public SocketInput(MemoryPool memory) { _memory = memory; _awaitableState = _awaitableIsNotCompleted; @@ -132,20 +132,20 @@ public void IncomingComplete(int count, Exception error) } } - public MemoryPoolIterator2 ConsumingStart() + public MemoryPoolIterator ConsumingStart() { lock (_sync) { - return new MemoryPoolIterator2(_head); + return new MemoryPoolIterator(_head); } } public void ConsumingComplete( - MemoryPoolIterator2 consumed, - MemoryPoolIterator2 examined) + MemoryPoolIterator consumed, + MemoryPoolIterator examined) { - MemoryPoolBlock2 returnStart = null; - MemoryPoolBlock2 returnEnd = null; + MemoryPoolBlock returnStart = null; + MemoryPoolBlock returnEnd = null; lock (_sync) { if (!consumed.IsDefault) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index d0d218ac5..65aec4678 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -2,8 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; -using System.Diagnostics; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNet.Server.Kestrel.Infrastructure; @@ -13,104 +12,145 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { - private const int _maxPendingWrites = 3; - private const int _maxBytesPreCompleted = 65536; + // ~64k; act=64512 + internal const int MaxBytesPreCompleted = 2 * (MemoryPool.NativeBlockSize * UvWriteReq.BUFFER_COUNT) - 1; + + private static MemoryPoolBlock[] _emptyBlocks = new MemoryPoolBlock[0]; private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; private readonly long _connectionId; private readonly IKestrelTrace _log; - // This locks access to to all of the below fields - private readonly object _lockObj = new object(); + private readonly MemoryPool _memory; + + private WriteBlock _currentWriteBlock; + private ConcurrentQueue _memoryBlocks; - // The number of write operations that have been scheduled so far - // but have not completed. - private int _writesPending = 0; + private long _bytesQueued; + private long _bytesWritten; private int _numBytesPreCompleted = 0; private Exception _lastWriteError; - private WriteContext _nextWriteContext; - private readonly Queue _callbacksPending; + private int _lastStatus = 0; + private readonly ConcurrentQueue _callbacksPending; + + public int ShutdownSendStatus; - public SocketOutput(KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log) + private volatile int pendingWriteBitFlag = 0; + private bool SocketShutdownSent; + + public SocketOutput(MemoryPool memory, KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log) { _thread = thread; _socket = socket; _connectionId = connectionId; _log = log; - _callbacksPending = new Queue(); + _callbacksPending = new ConcurrentQueue(); + _memory = memory; + _memoryBlocks = new ConcurrentQueue(); } - public void Write( + internal void Write( ArraySegment buffer, - Action callback, + Action callback, object state, bool immediate = true, bool socketShutdownSend = false, bool socketDisconnect = false) { - //TODO: need buffering that works - if (buffer.Array != null) - { - var copy = new byte[buffer.Count]; - Array.Copy(buffer.Array, buffer.Offset, copy, 0, buffer.Count); - buffer = new ArraySegment(copy); - _log.ConnectionWrite(_connectionId, buffer.Count); - } + bool triggerCallbackNow; + var queuedBytes = _bytesQueued; + bool blockQueued = false; - bool triggerCallbackNow = false; + var inputLength = buffer.Array != null ? buffer.Count : 0; + MemoryPoolBlock memoryBlock; - lock (_lockObj) + if (inputLength > 0) { - if (_nextWriteContext == null) - { - _nextWriteContext = new WriteContext(this); - } + memoryBlock = Interlocked.Exchange(ref _currentWriteBlock.Block, null); - if (buffer.Array != null) - { - _nextWriteContext.Buffers.Enqueue(buffer); - } - if (socketShutdownSend) - { - _nextWriteContext.SocketShutdownSend = true; - } - if (socketDisconnect) - { - _nextWriteContext.SocketDisconnect = true; - } - // Complete the write task immediately if all previous write tasks have been completed, - // the buffers haven't grown too large, and the last write to the socket succeeded. - triggerCallbackNow = _lastWriteError == null && - _callbacksPending.Count == 0 && - _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted; - if (triggerCallbackNow) - { - _numBytesPreCompleted += buffer.Count; - } - else + _log.ConnectionWrite(_connectionId, inputLength); + + int blockRemaining = memoryBlock != null ? memoryBlock.Data.Count - (memoryBlock.End - memoryBlock.Start) : 0; + + var remaining = inputLength; + var offset = buffer.Offset; + + while (remaining > 0) { - _callbacksPending.Enqueue(new CallbackContext + if (memoryBlock == null) { - Callback = callback, - State = state, - BytesToWrite = buffer.Count - }); + memoryBlock = _memory.Lease(MemoryPool.NativeBlockSize); + blockRemaining = memoryBlock.Data.Count; + } + + var copyAmount = blockRemaining >= remaining ? remaining : blockRemaining; + Buffer.BlockCopy(buffer.Array, offset, memoryBlock.Array, memoryBlock.End, copyAmount); + + remaining -= copyAmount; + blockRemaining -= copyAmount; + memoryBlock.End += copyAmount; + offset += copyAmount; + + if (blockRemaining == 0) + { + _memoryBlocks.Enqueue(new WriteBlock() { Block = memoryBlock }); + memoryBlock = null; + blockQueued = true; + } } - if (_writesPending < _maxPendingWrites && immediate) + Interlocked.Exchange(ref _currentWriteBlock.Block, memoryBlock); + } + + CallbackContext callbackContext; + var nextPendingBytes = inputLength + _numBytesPreCompleted; + if (_lastWriteError == null && + _callbacksPending.TryPeek(out callbackContext) && + nextPendingBytes <= MaxBytesPreCompleted) + { + triggerCallbackNow = true; + } + else + { + triggerCallbackNow = queuedBytes <= _bytesWritten; + } + + if (!triggerCallbackNow) + { + callbackContext = new CallbackContext { - ScheduleWrite(); - _writesPending++; - } + Callback = callback, + State = state, + BytesWrittenThreshold = queuedBytes + }; + _callbacksPending.Enqueue(callbackContext); + } + + if (immediate) + { + _currentWriteBlock.SocketDisconnect |= socketDisconnect; + _currentWriteBlock.SocketShutdownSend |= socketShutdownSend; + } + if (immediate || blockQueued) + { + SendBufferedData(); } - // Make sure we call user code outside of the lock. if (triggerCallbackNow) { - // callback(error, state, calledInline) - callback(null, state, true); + callback(null, state, 0, true); + } + + Interlocked.Add(ref _numBytesPreCompleted, inputLength); + } + + private void SendBufferedData() + { + if (Interlocked.CompareExchange(ref pendingWriteBitFlag, 1, 0) == 0) + { + _thread.Post(so => so.DoWriteIfNeeded(), this); } } @@ -119,13 +159,13 @@ public void End(ProduceEndType endType) switch (endType) { case ProduceEndType.SocketShutdownSend: - Write(default(ArraySegment), (error, state, calledInline) => { }, null, + Write(default(ArraySegment), (error, state, status, calledInline) => { }, null, immediate: true, socketShutdownSend: true, socketDisconnect: false); break; case ProduceEndType.SocketDisconnect: - Write(default(ArraySegment), (error, state, calledInline) => { }, null, + Write(default(ArraySegment), (error, state, status, calledInline) => { }, null, immediate: true, socketShutdownSend: false, socketDisconnect: true); @@ -133,90 +173,22 @@ public void End(ProduceEndType endType) } } - private void ScheduleWrite() - { - _thread.Post(_this => _this.WriteAllPending(), this); - } - - // This is called on the libuv event loop - private void WriteAllPending() - { - WriteContext writingContext; - - lock (_lockObj) - { - if (_nextWriteContext != null) - { - writingContext = _nextWriteContext; - _nextWriteContext = null; - } - else - { - _writesPending--; - return; - } - } - - try - { - writingContext.DoWriteIfNeeded(); - } - catch - { - lock (_lockObj) - { - // Lock instead of using Interlocked.Decrement so _writesSending - // doesn't change in the middle of executing other synchronized code. - _writesPending--; - } - - throw; - } - } - // This is called on the libuv event loop - private void OnWriteCompleted(Queue> writtenBuffers, int status, Exception error) + private void OnWriteCompleted(int status, Exception error) { - _log.ConnectionWriteCallback(_connectionId, status); + _lastWriteError = error; + _lastStatus = status; - lock (_lockObj) + CallbackContext callbackContext; + while (_callbacksPending.TryPeek(out callbackContext) && callbackContext.BytesWrittenThreshold <= _bytesWritten) { - _lastWriteError = error; - - if (_nextWriteContext != null) - { - ScheduleWrite(); - } - else - { - _writesPending--; - } - - foreach (var writeBuffer in writtenBuffers) - { - // _numBytesPreCompleted can temporarily go negative in the event there are - // completed writes that we haven't triggered callbacks for yet. - _numBytesPreCompleted -= writeBuffer.Count; - } - + _log.ConnectionWriteCallback(_connectionId, status); - // bytesLeftToBuffer can be greater than _maxBytesPreCompleted - // This allows large writes to complete once they've actually finished. - var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; - while (_callbacksPending.Count > 0 && - _callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer) + if (_callbacksPending.TryDequeue(out callbackContext)) { - var callbackContext = _callbacksPending.Dequeue(); - - _numBytesPreCompleted += callbackContext.BytesToWrite; - - // callback(error, state, calledInline) - callbackContext.Callback(_lastWriteError, callbackContext.State, false); + //Callback(error, state, calledInline) + callbackContext.Callback(error, callbackContext.State, status, false); } - - // Now that the while loop has completed the following invariants should hold true: - Debug.Assert(_numBytesPreCompleted >= 0); - Debug.Assert(_numBytesPreCompleted <= _maxBytesPreCompleted); } } @@ -228,9 +200,10 @@ void ISocketOutput.Write(ArraySegment buffer, bool immediate) // to be a subsequent immediate==true call which will go down the following code-path Write( buffer, - (error, state, calledInline) => { }, + (error, state, status, calledInline) => { }, null, immediate: false); + return; } @@ -239,15 +212,16 @@ void ISocketOutput.Write(ArraySegment buffer, bool immediate) Write( buffer, - (error, state, calledInline) => + (error, state, status, calledInline) => { + var cs = (TaskCompletionSource)state; if (error != null) { - tcs.SetException(error); + cs.SetException(error); } else { - tcs.SetResult(0); + cs.SetResult(0); } }, tcs, @@ -267,9 +241,10 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell // to be a subsequent immediate==true call which will go down the following code-path Write( buffer, - (error, state, calledInline) => { }, + (error, state, status, calledInline) => { }, null, immediate: false); + return TaskUtilities.CompletedTask; } @@ -278,33 +253,50 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell Write( buffer, - (error, state, calledInline) => + (error, state, status, calledInline) => { - if (!calledInline) + if (status < 0) { - ThreadPool.QueueUserWorkItem(state2 => + if (!calledInline) + { + ThreadPool.QueueUserWorkItem((state2) => + { + var tcs2 = (TaskCompletionSource)state2; + if (error != null) + { + tcs2.SetException(error); + } + else + { + tcs2.SetResult(0); + } + }, tcs); + } + else { - var tcs2 = (TaskCompletionSource)state2; if (error != null) { - tcs2.SetException(error); + tcs.SetException(error); } else { - tcs2.SetResult(0); + tcs.SetResult(0); } - }, state); + } } else { - var tcs2 = (TaskCompletionSource)state; - if (error != null) + if (!calledInline) { - tcs2.SetException(error); + ThreadPool.QueueUserWorkItem((state2) => + { + var tcs2 = (TaskCompletionSource)state2; + tcs.SetResult(0); + }, tcs); } else { - tcs2.SetResult(0); + tcs.SetResult(0); } } }, @@ -314,118 +306,182 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell return tcs.Task; } - private class CallbackContext - { - // callback(error, state, calledInline) - public Action Callback; - public object State; - public int BytesToWrite; - } - - private class WriteContext + private WriteContext GetContext() { - public SocketOutput Self; - - public Queue> Buffers; - public bool SocketShutdownSend; - public bool SocketDisconnect; - - public int WriteStatus; - public Exception WriteError; + MemoryPoolBlock[] data = null; - public int ShutdownSendStatus; + var count = 0; + var dataLength = 0; - public WriteContext(SocketOutput self) - { - Self = self; - Buffers = new Queue>(); - } - - /// - /// Perform any actions needed by this work item. The individual tasks are non-blocking and - /// will continue through to each other in order. - /// - public void Execute() - { - DoWriteIfNeeded(); - } + bool socketDisconnect = false; + bool socketShutdownSend = false; - /// - /// First step: initiate async write if needed, otherwise go to next step - /// - public void DoWriteIfNeeded() + WriteBlock writeBlock; + while (_memoryBlocks.TryDequeue(out writeBlock)) { - if (Buffers.Count == 0 || Self._socket.IsClosed) + var block = writeBlock.Block; + if (block != null) { - DoShutdownIfNeeded(); - return; + if (data == null) + { + data = new MemoryPoolBlock[UvWriteReq.BUFFER_COUNT]; + } + var length = block.End - block.Start; + data[count] = block; + dataLength += length; + count++; } - var buffers = new ArraySegment[Buffers.Count]; + socketDisconnect |= writeBlock.SocketDisconnect; + socketShutdownSend |= writeBlock.SocketShutdownSend; - var i = 0; - foreach (var buffer in Buffers) + if (count == UvWriteReq.BUFFER_COUNT) { - buffers[i++] = buffer; + break; } - - var writeReq = new UvWriteReq(Self._log); - writeReq.Init(Self._thread.Loop); - writeReq.Write(Self._socket, new ArraySegment>(buffers), (_writeReq, status, error, state) => - { - _writeReq.Dispose(); - var _this = (WriteContext)state; - _this.WriteStatus = status; - _this.WriteError = error; - DoShutdownIfNeeded(); - }, this); } - /// - /// Second step: initiate async shutdown if needed, otherwise go to next step - /// - public void DoShutdownIfNeeded() + if (count < UvWriteReq.BUFFER_COUNT) { - if (SocketShutdownSend == false || Self._socket.IsClosed) + var block = Interlocked.Exchange(ref _currentWriteBlock.Block, null); ; + + if (block != null) { - DoDisconnectIfNeeded(); - return; + if (data == null) + { + data = new MemoryPoolBlock[UvWriteReq.BUFFER_COUNT]; + } + var length = block.End - block.Start; + data[count] = block; + dataLength += length; + count++; } + socketDisconnect |= _currentWriteBlock.SocketDisconnect; + socketShutdownSend |= _currentWriteBlock.SocketShutdownSend; + } - var shutdownReq = new UvShutdownReq(Self._log); - shutdownReq.Init(Self._thread.Loop); - shutdownReq.Shutdown(Self._socket, (_shutdownReq, status, state) => - { - _shutdownReq.Dispose(); - var _this = (WriteContext)state; - _this.ShutdownSendStatus = status; + Interlocked.Add(ref _bytesQueued, dataLength); - Self._log.ConnectionWroteFin(Self._connectionId, status); + return new WriteContext() + { + Data = new ArraySegment(data ?? _emptyBlocks, 0, count), + SocketDisconnect = socketDisconnect, + SocketShutdownSend = socketShutdownSend + }; + } - DoDisconnectIfNeeded(); - }, this); - } + /// + /// First step: initiate async write if needed, otherwise go to next step + /// + public void DoWriteIfNeeded() + { + pendingWriteBitFlag = 0; + Interlocked.MemoryBarrier(); - /// - /// Third step: disconnect socket if needed, otherwise this work item is complete - /// - public void DoDisconnectIfNeeded() + WriteContext context; + while (true) { - if (SocketDisconnect == false || Self._socket.IsClosed) + context = GetContext(); + var data = context.Data; + + if (data.Count == 0 || _socket.IsClosed) { - Complete(); + DoShutdownIfNeeded(context.SocketDisconnect, context.SocketShutdownSend, 0, null); return; } + + var writeReq = _thread.LeaseWriteRequest(); + + writeReq.SocketDisconnect = context.SocketDisconnect; + writeReq.SocketShutdownSend = context.SocketShutdownSend; + writeReq.Write(_socket, + data, + (_writeReq, status, error, bytesWritten, state) => WriteCallback(_writeReq, status, error, bytesWritten, state), + this); + } + } + + private static void WriteCallback(UvWriteReq writeReq, int status, Exception error, int bytesWritten, object state) + { + var socketOutput = (SocketOutput)state; + socketOutput._thread.ReturnWriteRequest(writeReq); + + Interlocked.Add(ref socketOutput._bytesWritten, bytesWritten); + Interlocked.Add(ref socketOutput._numBytesPreCompleted, -bytesWritten); + + socketOutput.DoShutdownIfNeeded(writeReq.SocketDisconnect, writeReq.SocketShutdownSend, status, error); + } - Self._socket.Dispose(); - Self._log.ConnectionStop(Self._connectionId); - Complete(); + /// + /// Second step: initiate async shutdown if needed, otherwise go to next step + /// + private void DoShutdownIfNeeded(bool socketDisconnect, bool socketShutdownSend, int status, Exception error) + { + if (socketShutdownSend == false || SocketShutdownSent == true || _socket.IsClosed) + { + DoDisconnectIfNeeded(socketDisconnect, status, error); + return; } + SocketShutdownSent = true; + + var shutdownReq = new UvShutdownReq(_log); + shutdownReq.Init(_thread.Loop); + + shutdownReq.SocketDisconnect = socketDisconnect; + shutdownReq.SocketStatus = status; + shutdownReq.SocketException = error; + + shutdownReq.Shutdown(_socket, + (shutdownReq2, status2, state) => ShutdownCallback(shutdownReq2, status2, state), this); + } + + private static void ShutdownCallback(UvShutdownReq shutdownReq, int status, object state) + { + shutdownReq.Dispose(); + var socketOutput = (SocketOutput)state; + socketOutput.ShutdownSendStatus = status; + + socketOutput._log.ConnectionWroteFin(socketOutput._connectionId, status); - public void Complete() + socketOutput.DoDisconnectIfNeeded(shutdownReq.SocketDisconnect, shutdownReq.SocketStatus, shutdownReq.SocketException); + } + + /// + /// Third step: disconnect socket if needed, otherwise this work item is complete + /// + private void DoDisconnectIfNeeded(bool socketDisconnect, int status, Exception error) + { + if (socketDisconnect == false || _socket.IsClosed) { - Self.OnWriteCompleted(Buffers, WriteStatus, WriteError); + OnWriteCompleted(status, error); + return; } + + _socket.Dispose(); + _log.ConnectionStop(_connectionId); + OnWriteCompleted(status, error); + } + + private struct CallbackContext + { + //Callback(error, state, calledInline) + public Action Callback; + public object State; + public long BytesWrittenThreshold; + } + + private struct WriteContext + { + public ArraySegment Data; + public bool SocketShutdownSend; + public bool SocketDisconnect; + } + + private struct WriteBlock + { + public MemoryPoolBlock Block; + public bool SocketShutdownSend; + public bool SocketDisconnect; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/UrlPathDecoder.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/UrlPathDecoder.cs index 66608a4a2..34d2008eb 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/UrlPathDecoder.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/UrlPathDecoder.cs @@ -13,7 +13,7 @@ public class UrlPathDecoder /// The iterator points to the beginning of the sequence. /// The iterator points to the byte behind the end of the sequence. /// The iterator points to the byte behind the end of the processed sequence. - public static MemoryPoolIterator2 Unescape(MemoryPoolIterator2 start, MemoryPoolIterator2 end) + public static MemoryPoolIterator Unescape(MemoryPoolIterator start, MemoryPoolIterator end) { // the slot to read the input var reader = start; @@ -59,7 +59,7 @@ public static MemoryPoolIterator2 Unescape(MemoryPoolIterator2 start, MemoryPool /// The iterator point to the first % char /// The place to write to /// The end of the sequence - private static bool DecodeCore(ref MemoryPoolIterator2 reader, ref MemoryPoolIterator2 writer, MemoryPoolIterator2 end) + private static bool DecodeCore(ref MemoryPoolIterator reader, ref MemoryPoolIterator writer, MemoryPoolIterator end) { // preserves the original head. if the percent-encodings cannot be interpreted as sequence of UTF-8 octets, // bytes from this till the last scanned one will be copied to the memory pointed by writer. @@ -189,7 +189,7 @@ private static bool DecodeCore(ref MemoryPoolIterator2 reader, ref MemoryPoolIte return true; } - private static void Copy(MemoryPoolIterator2 head, MemoryPoolIterator2 tail, ref MemoryPoolIterator2 writer) + private static void Copy(MemoryPoolIterator head, MemoryPoolIterator tail, ref MemoryPoolIterator writer) { while (!CompareIterators(ref head, ref tail)) { @@ -216,7 +216,7 @@ private static void Copy(MemoryPoolIterator2 head, MemoryPoolIterator2 tail, ref /// The value to read /// The end of the sequence /// The unescaped byte if success. Otherwise return -1. - private static int UnescapePercentEncoding(ref MemoryPoolIterator2 scan, MemoryPoolIterator2 end) + private static int UnescapePercentEncoding(ref MemoryPoolIterator scan, MemoryPoolIterator end) { if (scan.Take() != '%') { @@ -255,7 +255,7 @@ private static int UnescapePercentEncoding(ref MemoryPoolIterator2 scan, MemoryP /// The value to read /// The end of the sequence /// The hexadecimal value if successes, otherwise -1. - private static int ReadHex(ref MemoryPoolIterator2 scan, MemoryPoolIterator2 end) + private static int ReadHex(ref MemoryPoolIterator scan, MemoryPoolIterator end) { if (CompareIterators(ref scan, ref end)) { @@ -297,7 +297,7 @@ private static bool SkipUnescape(int value1, int value2) return false; } - private static bool CompareIterators(ref MemoryPoolIterator2 lhs, ref MemoryPoolIterator2 rhs) + private static bool CompareIterators(ref MemoryPoolIterator lhs, ref MemoryPoolIterator rhs) { // uses ref parameter to save cost of copying return (lhs.Block == rhs.Block) && (lhs.Index == rhs.Index); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 85a4b5062..92b37acf5 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -2,7 +2,6 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -10,6 +9,7 @@ using Microsoft.AspNet.Server.Kestrel.Infrastructure; using Microsoft.AspNet.Server.Kestrel.Networking; using Microsoft.Extensions.Logging; +using System.Collections.Concurrent; namespace Microsoft.AspNet.Server.Kestrel { @@ -18,21 +18,21 @@ namespace Microsoft.AspNet.Server.Kestrel /// public class KestrelThread { + private const int _maxPooledWriteRequests = 64; + private static Action _objectCallbackAdapter = (callback, state) => ((Action)callback).Invoke(state); private KestrelEngine _engine; private readonly IApplicationLifetime _appLifetime; private Thread _thread; private UvLoopHandle _loop; private UvAsyncHandle _post; - private Queue _workAdding = new Queue(); - private Queue _workRunning = new Queue(); - private Queue _closeHandleAdding = new Queue(); - private Queue _closeHandleRunning = new Queue(); - private object _workSync = new Object(); + private ConcurrentQueue _workAdding = new ConcurrentQueue(); + private ConcurrentQueue _closeHandleAdding = new ConcurrentQueue(); private bool _stopImmediate = false; private bool _initCompleted = false; private ExceptionDispatchInfo _closeError; private IKestrelTrace _log; + private ConcurrentQueue _writeRequests = new ConcurrentQueue(); public KestrelThread(KestrelEngine engine) { @@ -46,6 +46,7 @@ public KestrelThread(KestrelEngine engine) } public UvLoopHandle Loop { get { return _loop; } } + public ExceptionDispatchInfo FatalError { get { return _closeError; } } public Action, IntPtr> QueueCloseHandle { get; internal set; } @@ -87,6 +88,17 @@ public void Stop(TimeSpan timeout) private void OnStop(object obj) { + if (_writeRequests != null) + { + var writeRequests = _writeRequests; + _writeRequests = null; + + UvWriteReq writeReq; + while (writeRequests.TryDequeue(out writeReq)) + { + writeReq.Dispose(); + } + } _post.Unreference(); } @@ -113,40 +125,36 @@ private void OnStopImmediate(object obj) public void Post(Action callback, object state) { - lock (_workSync) - { - _workAdding.Enqueue(new Work { CallbackAdapter = _objectCallbackAdapter, Callback = callback, State = state }); - } + _workAdding.Enqueue(new Work { CallbackAdapter = _objectCallbackAdapter, Callback = callback, State = state }); + _post.Send(); } public void Post(Action callback, T state) { - lock (_workSync) + + _workAdding.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = (callback2, state2) => ((Action)callback2).Invoke((T)state2), - Callback = callback, - State = state - }); - } + CallbackAdapter = (callback2, state2) => ((Action)callback2).Invoke((T)state2), + Callback = callback, + State = state + }); + _post.Send(); } public Task PostAsync(Action callback, object state) { var tcs = new TaskCompletionSource(); - lock (_workSync) + + _workAdding.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = _objectCallbackAdapter, - Callback = callback, - State = state, - Completion = tcs - }); - } + CallbackAdapter = _objectCallbackAdapter, + Callback = callback, + State = state, + Completion = tcs + }); + _post.Send(); return tcs.Task; } @@ -154,16 +162,15 @@ public Task PostAsync(Action callback, object state) public Task PostAsync(Action callback, T state) { var tcs = new TaskCompletionSource(); - lock (_workSync) + + _workAdding.Enqueue(new Work { - _workAdding.Enqueue(new Work - { - CallbackAdapter = (state1, state2) => ((Action)state1).Invoke((T)state2), - Callback = callback, - State = state, - Completion = tcs - }); - } + CallbackAdapter = (state1, state2) => ((Action)state1).Invoke((T)state2), + Callback = callback, + State = state, + Completion = tcs + }); + _post.Send(); return tcs.Task; } @@ -182,10 +189,8 @@ public void Send(Action callback, object state) private void PostCloseHandle(Action callback, IntPtr handle) { - lock (_workSync) - { - _closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); - } + _closeHandleAdding.Enqueue(new CloseHandle { Callback = callback, Handle = handle }); + _post.Send(); } @@ -253,16 +258,9 @@ private void OnPost() private void DoPostWork() { - Queue queue; - lock (_workSync) - { - queue = _workAdding; - _workAdding = _workRunning; - _workRunning = queue; - } - while (queue.Count != 0) + Work work; + while (_workAdding.TryDequeue(out work)) { - var work = queue.Dequeue(); try { work.CallbackAdapter(work.Callback, work.State); @@ -292,16 +290,9 @@ private void DoPostWork() } private void DoPostCloseHandle() { - Queue queue; - lock (_workSync) - { - queue = _closeHandleAdding; - _closeHandleAdding = _closeHandleRunning; - _closeHandleRunning = queue; - } - while (queue.Count != 0) + CloseHandle closeHandle; + while (_closeHandleAdding.TryDequeue(out closeHandle)) { - var closeHandle = queue.Dequeue(); try { closeHandle.Callback(closeHandle.Handle); @@ -314,6 +305,33 @@ private void DoPostCloseHandle() } } + public UvWriteReq LeaseWriteRequest() + { + UvWriteReq writeReq; + + var writeRequests = _writeRequests; + if (writeRequests == null || !writeRequests.TryDequeue(out writeReq)) + { + writeReq = new UvWriteReq(_log); + writeReq.Init(_loop); + } + + return writeReq; + } + + public void ReturnWriteRequest(UvWriteReq writeReq) + { + if ((_writeRequests?.Count ?? _maxPooledWriteRequests) < _maxPooledWriteRequests) + { + writeReq.Reset(); + _writeRequests.Enqueue(writeReq); + } + else + { + writeReq.Dispose(); + } + } + private struct Work { public Action CallbackAdapter; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool.cs similarity index 79% rename from src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs rename to src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool.cs index 2ee01c0d1..854f835bb 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool.cs @@ -6,7 +6,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure /// /// Used to allocate and distribute re-usable blocks of memory. /// - public class MemoryPool2 : IDisposable + public class MemoryPool : IDisposable { /// /// The gap between blocks' starting address. 4096 is chosen because most operating systems are 4k pages in size and alignment. @@ -31,6 +31,8 @@ public class MemoryPool2 : IDisposable /// private const int _blockLength = _blockStride - _blockUnused; + public const int NativeBlockSize = _blockLength; + /// /// 4096 * 32 gives you a slabLength of 128k contiguous bytes allocated per slab /// @@ -40,13 +42,13 @@ public class MemoryPool2 : IDisposable /// Thread-safe collection of blocks which are currently in the pool. A slab will pre-allocate all of the block tracking objects /// and add them to this collection. When memory is requested it is taken from here first, and when it is returned it is re-added. /// - private readonly ConcurrentStack _blocks = new ConcurrentStack(); + private readonly ConcurrentQueue _blocks = new ConcurrentQueue(); /// /// Thread-safe collection of slabs which have been allocated by this pool. As long as a slab is in this collection and slab.IsActive, /// the blocks will be added to _blocks when returned. /// - private readonly ConcurrentStack _slabs = new ConcurrentStack(); + private readonly ConcurrentQueue _slabs = new ConcurrentQueue(); /// /// This is part of implementing the IDisposable pattern. @@ -59,7 +61,7 @@ public class MemoryPool2 : IDisposable /// The block returned must be at least this size. It may be larger than this minimum size, and if so, /// the caller may write to the block's entire size rather than being limited to the minumumSize requested. /// The block that is reserved for the called. It must be passed to Return when it is no longer being used. - public MemoryPoolBlock2 Lease(int minimumSize) + public MemoryPoolBlock Lease(int minimumSize) { if (minimumSize > _blockLength) { @@ -67,49 +69,59 @@ public MemoryPoolBlock2 Lease(int minimumSize) // Because this is the degenerate case, a one-time-use byte[] array and tracking object are allocated. // When this block tracking object is returned it is not added to the pool - instead it will be // allowed to be garbage collected normally. - return MemoryPoolBlock2.Create( + return MemoryPoolBlock.Create( new ArraySegment(new byte[minimumSize]), dataPtr: IntPtr.Zero, pool: null, slab: null); } - while (true) + MemoryPoolBlock block; + if (_blocks.TryDequeue(out block)) { - MemoryPoolBlock2 block; - if (_blocks.TryPop(out block)) - { - // block successfully taken from the stack - return it - return block; - } - // no blocks available - grow the pool and try again - AllocateSlab(); + // block successfully taken from the stack - return it + return block; } + // no blocks available - grow the pool + return AllocateSlab(); + } /// /// Internal method called when a block is requested and the pool is empty. It allocates one additional slab, creates all of the /// block tracking objects, and adds them all to the pool. /// - private void AllocateSlab() + private MemoryPoolBlock AllocateSlab() { - var slab = MemoryPoolSlab2.Create(_slabLength); - _slabs.Push(slab); + var slab = MemoryPoolSlab.Create(_slabLength); + _slabs.Enqueue(slab); var basePtr = slab.ArrayPtr; var firstOffset = (int)((_blockStride - 1) - ((ulong)(basePtr + _blockStride - 1) % _blockStride)); - for (var offset = firstOffset; - offset + _blockLength <= _slabLength; + var poolAllocationLength = _slabLength - (_blockLength + _blockStride); + + var offset = firstOffset; + for (; + offset < poolAllocationLength; offset += _blockStride) { - var block = MemoryPoolBlock2.Create( + var block = MemoryPoolBlock.Create( new ArraySegment(slab.Array, offset, _blockLength), basePtr, this, slab); Return(block); } + + // return last block rather than adding to pool + var newBlock = MemoryPoolBlock.Create( + new ArraySegment(slab.Array, offset, _blockLength), + basePtr, + this, + slab); + + return newBlock; } /// @@ -120,10 +132,10 @@ private void AllocateSlab() /// leaving "dead zones" in the slab due to lost block tracking objects. /// /// The block to return. It must have been acquired by calling Lease on the same memory pool instance. - public void Return(MemoryPoolBlock2 block) + public void Return(MemoryPoolBlock block) { block.Reset(); - _blocks.Push(block); + _blocks.Enqueue(block); } protected virtual void Dispose(bool disposing) @@ -132,8 +144,8 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - MemoryPoolSlab2 slab; - while (_slabs.TryPop(out slab)) + MemoryPoolSlab slab; + while (_slabs.TryDequeue(out slab)) { // dispose managed state (managed objects). slab.Dispose(); @@ -149,7 +161,7 @@ protected virtual void Dispose(bool disposing) } // N/A: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources. - // ~MemoryPool2() { + // ~MemoryPool() { // // Do not change this code. Put cleanup code in Dispose(bool disposing) above. // Dispose(false); // } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock.cs similarity index 92% rename from src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs rename to src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock.cs index a93e03186..6b8b415f2 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolBlock.cs @@ -9,7 +9,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure /// Block tracking object used by the byte buffer memory pool. A slab is a large allocation which is divided into smaller blocks. The /// individual blocks are then treated as independant array segments. /// - public class MemoryPoolBlock2 + public class MemoryPoolBlock { /// /// If this block represents a one-time-use memory object, this GCHandle will hold that memory object at a fixed address @@ -33,19 +33,19 @@ public class MemoryPoolBlock2 /// /// This object cannot be instantiated outside of the static Create method /// - protected MemoryPoolBlock2() + protected MemoryPoolBlock() { } /// /// Back-reference to the memory pool which this block was allocated from. It may only be returned to this pool. /// - public MemoryPool2 Pool { get; private set; } + public MemoryPool Pool { get; private set; } /// /// Back-reference to the slab from which this block was taken, or null if it is one-time-use memory. /// - public MemoryPoolSlab2 Slab { get; private set; } + public MemoryPoolSlab Slab { get; private set; } /// /// Convenience accessor @@ -72,9 +72,9 @@ protected MemoryPoolBlock2() /// working memory. The "active" memory is grown when bytes are copied in, End is increased, and Next is assigned. The "active" /// memory is shrunk when bytes are consumed, Start is increased, and blocks are returned to the pool. /// - public MemoryPoolBlock2 Next { get; set; } + public MemoryPoolBlock Next { get; set; } - ~MemoryPoolBlock2() + ~MemoryPoolBlock() { Debug.Assert(!_pinHandle.IsAllocated, "Ad-hoc memory block wasn't unpinned"); // Debug.Assert(Slab == null || !Slab.IsActive, "Block being garbage collected instead of returned to pool"); @@ -87,7 +87,7 @@ protected MemoryPoolBlock2() if (Slab != null && Slab.IsActive) { - Pool.Return(new MemoryPoolBlock2 + Pool.Return(new MemoryPoolBlock { _dataArrayPtr = _dataArrayPtr, Data = Data, @@ -129,13 +129,13 @@ public void Unpin() } } - public static MemoryPoolBlock2 Create( + public static MemoryPoolBlock Create( ArraySegment data, IntPtr dataPtr, - MemoryPool2 pool, - MemoryPoolSlab2 slab) + MemoryPool pool, + MemoryPoolSlab slab) { - return new MemoryPoolBlock2 + return new MemoryPoolBlock { Data = data, _dataArrayPtr = dataPtr, @@ -169,9 +169,9 @@ public override string ToString() /// acquires a cursor pointing into this block at the Start of "active" byte information /// /// - public MemoryPoolIterator2 GetIterator() + public MemoryPoolIterator GetIterator() { - return new MemoryPoolIterator2(this); + return new MemoryPoolIterator(this); } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator.cs similarity index 97% rename from src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs rename to src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator.cs index 7de5c7fb1..8f47c4086 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator.cs @@ -8,7 +8,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure { - public struct MemoryPoolIterator2 + public struct MemoryPoolIterator { /// /// Array of "minus one" bytes of the length of SIMD operations on the current hardware. Used as an argument in the @@ -24,15 +24,15 @@ public struct MemoryPoolIterator2 private static Encoding _utf8 = Encoding.UTF8; - private MemoryPoolBlock2 _block; + private MemoryPoolBlock _block; private int _index; - public MemoryPoolIterator2(MemoryPoolBlock2 block) + public MemoryPoolIterator(MemoryPoolBlock block) { _block = block; _index = _block?.Start ?? 0; } - public MemoryPoolIterator2(MemoryPoolBlock2 block, int index) + public MemoryPoolIterator(MemoryPoolBlock block, int index) { _block = block; _index = index; @@ -68,7 +68,7 @@ public bool IsEnd } } - public MemoryPoolBlock2 Block => _block; + public MemoryPoolBlock Block => _block; public int Index => _index; @@ -459,7 +459,7 @@ public bool Put(byte data) } } - public int GetLength(MemoryPoolIterator2 end) + public int GetLength(MemoryPoolIterator end) { if (IsDefault || end.IsDefault) { @@ -488,7 +488,7 @@ public int GetLength(MemoryPoolIterator2 end) } } - public string GetString(MemoryPoolIterator2 end) + public string GetString(MemoryPoolIterator end) { if (IsDefault || end.IsDefault) { @@ -566,7 +566,7 @@ public string GetString(MemoryPoolIterator2 end) } } - public ArraySegment GetArraySegment(MemoryPoolIterator2 end) + public ArraySegment GetArraySegment(MemoryPoolIterator end) { if (IsDefault || end.IsDefault) { @@ -583,7 +583,7 @@ public ArraySegment GetArraySegment(MemoryPoolIterator2 end) return new ArraySegment(array, 0, length); } - public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int actual) + public MemoryPoolIterator CopyTo(byte[] array, int offset, int count, out int actual) { if (IsDefault) { @@ -601,13 +601,13 @@ public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int a { actual = count; Buffer.BlockCopy(block.Array, index, array, offset, remaining); - return new MemoryPoolIterator2(block, index + remaining); + return new MemoryPoolIterator(block, index + remaining); } else if (block.Next == null) { actual = count - remaining + following; Buffer.BlockCopy(block.Array, index, array, offset, following); - return new MemoryPoolIterator2(block, index + following); + return new MemoryPoolIterator(block, index + following); } else { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolSlab2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolSlab.cs similarity index 95% rename from src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolSlab2.cs rename to src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolSlab.cs index 134bc147d..34c105a98 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolSlab2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolSlab.cs @@ -7,7 +7,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Infrastructure /// Slab tracking object used by the byte buffer memory pool. A slab is a large allocation which is divided into smaller blocks. The /// individual blocks are then treated as independant array segments. /// - public class MemoryPoolSlab2 : IDisposable + public class MemoryPoolSlab : IDisposable { /// /// This handle pins the managed array in memory until the slab is disposed. This prevents it from being @@ -41,14 +41,14 @@ public class MemoryPoolSlab2 : IDisposable /// private bool _disposedValue = false; // To detect redundant calls - public static MemoryPoolSlab2 Create(int length) + public static MemoryPoolSlab Create(int length) { // allocate and pin requested memory length var array = new byte[length]; var gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned); // allocate and return slab tracking object - return new MemoryPoolSlab2 + return new MemoryPoolSlab { Array = array, _gcHandle = gcHandle, @@ -78,7 +78,7 @@ protected virtual void Dispose(bool disposing) } // override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources. - ~MemoryPoolSlab2() + ~MemoryPoolSlab() { // Do not change this code. Put cleanup code in Dispose(bool disposing) above. Dispose(false); diff --git a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs index b0150cad1..0089100ce 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs @@ -6,6 +6,7 @@ using Microsoft.AspNet.Http; using Microsoft.AspNet.Server.Kestrel.Http; using Microsoft.AspNet.Server.Kestrel.Networking; +using Microsoft.Extensions.PlatformAbstractions; namespace Microsoft.AspNet.Server.Kestrel { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs index 4b90ecfd5..0f85bc4ce 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvHandle.cs @@ -42,7 +42,7 @@ protected override bool ReleaseHandle() // This can be called from the finalizer. // Ensure the closure doesn't reference "this". var uv = _uv; - _queueCloseHandle(memory2 => uv.close(memory2, _destroyMemory), memory); + _queueCloseHandle(mem => uv.close(mem, _destroyMemory), memory); } } return true; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs index d6aa2c65a..f8b2a4d5a 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs @@ -16,6 +16,10 @@ public class UvShutdownReq : UvRequest private Action _callback; private object _state; + public bool SocketDisconnect; + public int SocketStatus; + public Exception SocketException; + public UvShutdownReq(IKestrelTrace logger) : base (logger) { } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 103ddfc6e..eb5b63742 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -14,15 +14,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvWriteReq : UvRequest { - private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + private readonly static Libuv.uv_write_cb _uv_write_cb = (ptr, status) => UvWriteCb(ptr, status); - private IntPtr _bufs; + private IntPtr _nativePointers; + private ArraySegment _segments; - private Action _callback; + private Action _callback; private object _state; - private const int BUFFER_COUNT = 4; - - private List _pins = new List(); + public const int BUFFER_COUNT = 8; + + public bool SocketShutdownSend; + public bool SocketDisconnect; public UvWriteReq(IKestrelTrace logger) : base(logger) { @@ -32,45 +34,37 @@ public void Init(UvLoopHandle loop) { var requestSize = loop.Libuv.req_size(Libuv.RequestType.WRITE); var bufferSize = Marshal.SizeOf() * BUFFER_COUNT; + CreateMemory( loop.Libuv, loop.ThreadId, requestSize + bufferSize); - _bufs = handle + requestSize; + _nativePointers = handle + requestSize; } public unsafe void Write( UvStreamHandle handle, - ArraySegment> bufs, - Action callback, + ArraySegment segments, + Action callback, object state) { try { + _segments = segments; // add GCHandle to keeps this SafeHandle alive while request processing - _pins.Add(GCHandle.Alloc(this, GCHandleType.Normal)); + Pin(); - var pBuffers = (Libuv.uv_buf_t*)_bufs; - var nBuffers = bufs.Count; - if (nBuffers > BUFFER_COUNT) - { - // create and pin buffer array when it's larger than the pre-allocated one - var bufArray = new Libuv.uv_buf_t[nBuffers]; - var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned); - _pins.Add(gcHandle); - pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject(); - } + var pBuffers = (Libuv.uv_buf_t*)_nativePointers; + var nBuffers = segments.Count; for (var index = 0; index < nBuffers; index++) { + var buf = segments.Array[segments.Offset + index]; + var len = buf.End - buf.Start; // create and pin each segment being written - var buf = bufs.Array[bufs.Offset + index]; - - var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned); - _pins.Add(gcHandle); pBuffers[index] = Libuv.buf_init( - gcHandle.AddrOfPinnedObject() + buf.Offset, - buf.Count); + buf.Pin() - len, + buf.End - buf.Start); } _callback = callback; @@ -81,44 +75,37 @@ public unsafe void Write( { _callback = null; _state = null; - Unpin(this); + Unpin(); + ProcessBlocks(); throw; } } public unsafe void Write2( UvStreamHandle handle, - ArraySegment> bufs, + ArraySegment segments, UvStreamHandle sendHandle, - Action callback, + Action callback, object state) { try { + _segments = segments; // add GCHandle to keeps this SafeHandle alive while request processing - _pins.Add(GCHandle.Alloc(this, GCHandleType.Normal)); + Pin(); - var pBuffers = (Libuv.uv_buf_t*)_bufs; - var nBuffers = bufs.Count; - if (nBuffers > BUFFER_COUNT) - { - // create and pin buffer array when it's larger than the pre-allocated one - var bufArray = new Libuv.uv_buf_t[nBuffers]; - var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned); - _pins.Add(gcHandle); - pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject(); - } + var pBuffers = (Libuv.uv_buf_t*)_nativePointers; + var nBuffers = segments.Count; for (var index = 0; index < nBuffers; index++) { - // create and pin each segment being written - var buf = bufs.Array[bufs.Offset + index]; + var buf = segments.Array[segments.Offset + index]; + var len = buf.End - buf.Start; - var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned); - _pins.Add(gcHandle); + // create and pin each segment being written pBuffers[index] = Libuv.buf_init( - gcHandle.AddrOfPinnedObject() + buf.Offset, - buf.Count); + buf.Pin() - len, + buf.End - buf.Start); } _callback = callback; @@ -129,24 +116,36 @@ public unsafe void Write2( { _callback = null; _state = null; - Unpin(this); + Unpin(); + ProcessBlocks(); throw; } } - private static void Unpin(UvWriteReq req) + private int ProcessBlocks() { - foreach (var pin in req._pins) + var bytesWritten = 0; + var end = _segments.Offset + _segments.Count; + for (var i = _segments.Offset; i < end; i++) { - pin.Free(); + var block = _segments.Array[i]; + bytesWritten += block.End - block.Start; + + block.Unpin(); + + if (block.Pool != null) + { + block.Pool.Return(block); + } } - req._pins.Clear(); + + return bytesWritten; } private static void UvWriteCb(IntPtr ptr, int status) { var req = FromIntPtr(ptr); - Unpin(req); + var bytesWritten = req.ProcessBlocks(); var callback = req._callback; req._callback = null; @@ -162,13 +161,24 @@ private static void UvWriteCb(IntPtr ptr, int status) try { - callback(req, status, error, state); + callback(req, status, error, bytesWritten, state); } catch (Exception ex) { req._log.LogError("UvWriteCb", ex); throw; } + finally + { + req.Unpin(); + } + } + + public void Reset() + { + _callback = null; + _state = null; + _segments = default(ArraySegment); } } } \ No newline at end of file diff --git a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs index d3d1539a5..095f37f85 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs @@ -13,13 +13,11 @@ public class ServiceContext { public ServiceContext() { - Memory = new MemoryPool(); } public ServiceContext(ServiceContext context) { AppLifetime = context.AppLifetime; - Memory = context.Memory; Log = context.Log; HttpContextFactory = context.HttpContextFactory; DateHeaderValueManager = context.DateHeaderValueManager; @@ -29,8 +27,6 @@ public ServiceContext(ServiceContext context) public IApplicationLifetime AppLifetime { get; set; } - public IMemoryPool Memory { get; set; } - public IKestrelTrace Log { get; set; } public IHttpContextFactory HttpContextFactory { get; set; } diff --git a/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs b/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs index 245729d94..ea7f35f41 100644 --- a/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs +++ b/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs @@ -13,6 +13,10 @@ namespace Microsoft.AspNet.Server.Kestrel.FunctionalTests { public class ResponseTests { + const int BufferSize = 1024; + const int SendCount = 1024; + const int TotalSize = BufferSize * SendCount; + [Fact] public async Task LargeDownload() { @@ -29,20 +33,20 @@ public async Task LargeDownload() { app.Run(async context => { - var bytes = new byte[1024]; + var bytes = new byte[BufferSize]; for (int i = 0; i < bytes.Length; i++) { bytes[i] = (byte)i; } - context.Response.ContentLength = bytes.Length * 1024; + context.Response.ContentLength = bytes.Length * SendCount; - for (int i = 0; i < 1024; i++) + for (int i = 0; i < SendCount; i++) { await context.Response.Body.WriteAsync(bytes, 0, bytes.Length); } }); - }); + }); using (var app = hostBuilder.Build().Start()) { @@ -54,7 +58,7 @@ public async Task LargeDownload() // Read the full response body var total = 0; - var bytes = new byte[1024]; + var bytes = new byte[BufferSize]; var count = await responseBody.ReadAsync(bytes, 0, bytes.Length); while (count > 0) { @@ -65,6 +69,7 @@ public async Task LargeDownload() } count = await responseBody.ReadAsync(bytes, 0, bytes.Length); } + Assert.Equal(total, TotalSize); } } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs index 8266a1c2e..25fcf46a6 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs @@ -48,7 +48,7 @@ public void ChunkedPrefixMustBeHexCrLfWithoutLeadingZeros(int dataCount, string [InlineData("Connection:\r\n \r\nCookie \r\n", 1)] public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders) { - var socketInput = new SocketInput(new MemoryPool2()); + var socketInput = new SocketInput(new MemoryPool()); var headerCollection = new FrameRequestHeaders(); var headerArray = Encoding.ASCII.GetBytes(rawHeaders); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlockTests.cs similarity index 92% rename from test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs rename to test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlockTests.cs index db3b03802..944cce973 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlock2Tests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolBlockTests.cs @@ -4,12 +4,12 @@ namespace Microsoft.AspNet.Server.KestrelTests { - public class MemoryPoolBlock2Tests + public class MemoryPoolBlockTests { [Fact] public void SeekWorks() { - using (var pool = new MemoryPool2()) + using (var pool = new MemoryPool()) { var block = pool.Lease(256); foreach (var ch in Enumerable.Range(0, 256).Select(x => (byte)x)) @@ -37,7 +37,7 @@ public void SeekWorks() [Fact] public void GetLengthBetweenIteratorsWorks() { - using (var pool = new MemoryPool2()) + using (var pool = new MemoryPool()) { var block = pool.Lease(256); block.End += 256; @@ -64,7 +64,7 @@ public void GetLengthBetweenIteratorsWorks() } } - private void TestAllLengths(MemoryPoolBlock2 block, int lengths) + private void TestAllLengths(MemoryPoolBlock block, int lengths) { for (var firstIndex = 0; firstIndex <= lengths; ++firstIndex) { @@ -82,7 +82,7 @@ private void TestAllLengths(MemoryPoolBlock2 block, int lengths) [Fact] public void AddDoesNotAdvanceAtEndOfCurrentBlock() { - using (var pool = new MemoryPool2()) + using (var pool = new MemoryPool()) { var block1 = pool.Lease(256); var block2 = block1.Next = pool.Lease(256); @@ -120,7 +120,7 @@ public void AddDoesNotAdvanceAtEndOfCurrentBlock() [Fact] public void CopyToCorrectlyTraversesBlocks() { - using (var pool = new MemoryPool2()) + using (var pool = new MemoryPool()) { var block1 = pool.Lease(128); var block2 = block1.Next = pool.Lease(128); @@ -155,7 +155,7 @@ public void CopyToCorrectlyTraversesBlocks() [Fact] public void IsEndCorrectlyTraversesBlocks() { - using (var pool = new MemoryPool2()) + using (var pool = new MemoryPool()) { var block1 = pool.Lease(128); var block2 = block1.Next = pool.Lease(128); @@ -176,7 +176,7 @@ public void IsEndCorrectlyTraversesBlocks() } } - private void AssertIterator(MemoryPoolIterator2 iter, MemoryPoolBlock2 block, int index) + private void AssertIterator(MemoryPoolIterator iter, MemoryPoolBlock block, int index) { Assert.Same(block, iter.Block); Assert.Equal(index, iter.Index); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolExtensions.cs b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolExtensions.cs index 1eb683088..19b15c3f7 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolExtensions.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolExtensions.cs @@ -4,7 +4,7 @@ namespace Microsoft.AspNet.Server.KestrelTests { public static class MemoryPoolExtensions { - public static MemoryPoolIterator2 Add(this MemoryPoolIterator2 iterator, int count) + public static MemoryPoolIterator Add(this MemoryPoolIterator iterator, int count) { int actual; return iterator.CopyTo(new byte[count], 0, count, out actual); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolIterator2Tests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolIteratorTests.cs similarity index 95% rename from test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolIterator2Tests.cs rename to test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolIteratorTests.cs index b7d101c28..5c2828753 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolIterator2Tests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MemoryPoolIteratorTests.cs @@ -5,13 +5,13 @@ namespace Microsoft.AspNet.Server.KestrelTests { - public class MemoryPoolIterator2Tests : IDisposable + public class MemoryPoolIteratorTests : IDisposable { - private readonly MemoryPool2 _pool; + private readonly MemoryPool _pool; - public MemoryPoolIterator2Tests() + public MemoryPoolIteratorTests() { - _pool = new MemoryPool2(); + _pool = new MemoryPool(); } public void Dispose() @@ -74,7 +74,7 @@ public void MemorySeek(string raw, string search, char expectResult, int expectI [Fact] public void Put() { - var blocks = new MemoryPoolBlock2[4]; + var blocks = new MemoryPoolBlock[4]; for (var i = 0; i < 4; ++i) { blocks[i] = _pool.Lease(16); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs index 8293ad09b..05f1c4be8 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs @@ -62,12 +62,16 @@ public void ServerPipeListenForConnections() return; } + var buffer = new byte[] { 1, 2, 3, 4 }; + var msg = MemoryPoolBlock.Create(new ArraySegment(buffer), IntPtr.Zero, null, null); + msg.End = msg.Start + buffer.Length; + var writeRequest = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); writeRequest.Init(loop); writeRequest.Write( serverConnectionPipe, - new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), - (_3, status2, error2, _4) => + new ArraySegment(new[] { msg }), + (socket2, status2, error2, bytesWritten, state) => { writeRequest.Dispose(); serverConnectionPipe.Dispose(); @@ -156,13 +160,17 @@ public void ServerPipeDispatchConnections() serverConnectionPipeAcceptedEvent.WaitOne(); + var buffer = new byte[] { 1, 2, 3, 4 }; + var msg = MemoryPoolBlock.Create(new ArraySegment(buffer), IntPtr.Zero, null, null); + msg.End = msg.Start + buffer.Length; + var writeRequest = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); writeRequest.Init(loop); writeRequest.Write2( serverConnectionPipe, - new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), + new ArraySegment(new[] { msg }), serverConnectionTcp, - (_3, status2, error2, _4) => + (socket2, status2, error2, bytesWritten, state) => { writeRequest.Dispose(); serverConnectionTcp.Dispose(); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs index fa8a3ada3..e817d33f2 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs @@ -199,14 +199,16 @@ public async Task SocketCanReadAndWrite() { for (var x = 0; x < 2; x++) { + var buffer = new byte[] { 65, 66, 67, 68, 69 }; + var msg = MemoryPoolBlock.Create(new ArraySegment(buffer), IntPtr.Zero, null, null); + msg.End = msg.Start + buffer.Length; + var req = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); req.Init(loop); req.Write( tcp2, - new ArraySegment>( - new[] { new ArraySegment(new byte[] { 65, 66, 67, 68, 69 }) } - ), - (_1, _2, _3, _4) => { }, + new ArraySegment(new[] { msg }), + (_1, _2, _3, _4, _5) => { }, null); } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index 8e0425ce5..a4e115388 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -16,7 +16,7 @@ namespace Microsoft.AspNet.Server.KestrelTests public class SocketOutputTests { [Fact] - public void CanWrite1MB() + public void CanWriteOver1MB() { // This test was added because when initially implementing write-behind buffering in // SocketOutput, the write callback would never be invoked for writes larger than @@ -39,24 +39,27 @@ public void CanWrite1MB() var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, 0, trace); - - // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. - var bufferSize = 1048576; - var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); - var completedWh = new ManualResetEventSlim(); - Action onCompleted = (ex, state, calledInline) => + using (var memory = new MemoryPool()) { - Assert.Null(ex); - Assert.Null(state); - completedWh.Set(); - }; - - // Act - socketOutput.Write(buffer, onCompleted, null); - - // Assert - Assert.True(completedWh.Wait(1000)); + var socketOutput = new SocketOutput(memory, kestrelThread, socket, 0, trace); + + // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. + var bufferSize = 1048576 + SocketOutput.MaxBytesPreCompleted; + var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); + var completedWh = new ManualResetEventSlim(); + Action onCompleted = (ex, state, status, calledInline) => + { + Assert.Null(ex); + Assert.Null(state); + completedWh.Set(); + }; + + // Act + socketOutput.Write(buffer, onCompleted, null); + + // Assert + Assert.True(completedWh.Wait(1000)); + } } } @@ -64,7 +67,7 @@ public void CanWrite1MB() public void WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyPreCompleted() { // This should match _maxBytesPreCompleted in SocketOutput - var maxBytesPreCompleted = 65536; + var maxBytesPreCompleted = SocketOutput.MaxBytesPreCompleted; var completeQueue = new Queue>(); // Arrange @@ -84,35 +87,38 @@ public void WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyPreCompleted( var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, 0, trace); - - var bufferSize = maxBytesPreCompleted; - var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); - var completedWh = new ManualResetEventSlim(); - Action onCompleted = (ex, state, calledInline) => + using (var memory = new MemoryPool()) { - Assert.Null(ex); - Assert.Null(state); - completedWh.Set(); - }; - - // Act - socketOutput.Write(buffer, onCompleted, null); - // Assert - // The first write should pre-complete since it is <= _maxBytesPreCompleted. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); - // Act - socketOutput.Write(buffer, onCompleted, null); - // Assert - // Too many bytes are already pre-completed for the second write to pre-complete. - Assert.False(completedWh.Wait(1000)); - // Act - completeQueue.Dequeue()(0); - // Assert - // Finishing the first write should allow the second write to pre-complete. - Assert.True(completedWh.Wait(1000)); + var socketOutput = new SocketOutput(memory, kestrelThread, socket, 0, trace); + + var bufferSize = maxBytesPreCompleted; + var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); + var completedWh = new ManualResetEventSlim(); + Action onCompleted = (ex, state, status, calledInline) => + { + Assert.Null(ex); + Assert.Null(state); + completedWh.Set(); + }; + + // Act + socketOutput.Write(buffer, onCompleted, null); + // Assert + // The first write should pre-complete since it is <= _maxBytesPreCompleted. + Assert.True(completedWh.Wait(200)); + // Arrange + completedWh.Reset(); + // Act + socketOutput.Write(buffer, onCompleted, null); + // Assert + // Too many bytes are already pre-completed for the second write to pre-complete. + Assert.False(completedWh.Wait(200)); + // Act + completeQueue.Dequeue()(0); + // Assert + // Finishing the first write should allow the second write to pre-complete. + Assert.True(completedWh.Wait(200)); + } } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs index 3db112560..575091968 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs @@ -14,11 +14,9 @@ class TestInput : IConnectionControl, IFrameControl public TestInput() { var memory = new MemoryPool(); - var memory2 = new MemoryPool2(); FrameContext = new FrameContext { - SocketInput = new SocketInput(memory2), - Memory = memory, + SocketInput = new SocketInput(memory), ConnectionControl = this, FrameControl = this }; diff --git a/test/Microsoft.AspNet.Server.KestrelTests/UrlPathDecoder.cs b/test/Microsoft.AspNet.Server.KestrelTests/UrlPathDecoder.cs index 928fc84a0..a8dbd3455 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/UrlPathDecoder.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/UrlPathDecoder.cs @@ -121,16 +121,16 @@ public void DecodeWithBoundary(string raw, int rawLength, string expect, int exp Assert.Equal(expect, result); } - private MemoryPoolIterator2 BuildSample(string data) + private MemoryPoolIterator BuildSample(string data) { var store = data.Select(c => (byte)c).ToArray(); - var mem = MemoryPoolBlock2.Create(new ArraySegment(store), IntPtr.Zero, null, null); + var mem = MemoryPoolBlock.Create(new ArraySegment(store), IntPtr.Zero, null, null); mem.End = store.Length; return mem.GetIterator(); } - private MemoryPoolIterator2 GetIterator(MemoryPoolIterator2 begin, int displacement) + private MemoryPoolIterator GetIterator(MemoryPoolIterator begin, int displacement) { var result = begin; for (int i = 0; i < displacement; ++i) diff --git a/tools/Microsoft.AspNet.Server.Kestrel.LibuvCopier/Program.cs b/tools/Microsoft.AspNet.Server.Kestrel.LibuvCopier/Program.cs index 9ee4d03ad..b76dd16fe 100644 --- a/tools/Microsoft.AspNet.Server.Kestrel.LibuvCopier/Program.cs +++ b/tools/Microsoft.AspNet.Server.Kestrel.LibuvCopier/Program.cs @@ -66,7 +66,7 @@ private string GetHome() #if DNX451 return Environment.GetFolderPath(Environment.SpecialFolder.UserProfile); #else - var runtimeEnv = PlatformServices.Default.Runtime; + var runtimeEnv = Extensions.PlatformAbstractions.PlatformServices.Default.Runtime; if (runtimeEnv.OperatingSystem == "Windows") { return Environment.GetEnvironmentVariable("USERPROFILE") ??