Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

[Proposal] Pool WriteContext per thread #469

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context
_connectionId = Interlocked.Increment(ref _lastConnectionId);

_rawSocketInput = new SocketInput(Memory2, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool);
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
}

public void Start()
Expand Down
7 changes: 7 additions & 0 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public void Dispose()
{
var socket = (Listener)tcs2.Task.AsyncState;
socket.ListenSocket.Dispose();

var writeReqPool = socket.WriteReqPool;
while (writeReqPool.Count > 0)
{
writeReqPool.Dequeue().Dispose();
}

tcs2.SetResult(0);
}
catch (Exception ex)
Expand Down
7 changes: 6 additions & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using Microsoft.AspNet.Http;
using System.Collections.Generic;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.AspNet.Server.Kestrel.Networking;

namespace Microsoft.AspNet.Server.Kestrel.Http
{
Expand All @@ -17,6 +18,7 @@ public ListenerContext(ServiceContext serviceContext)
: base(serviceContext)
{
Memory2 = new MemoryPool2();
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
}

public ListenerContext(ListenerContext listenerContext)
Expand All @@ -25,6 +27,7 @@ public ListenerContext(ListenerContext listenerContext)
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Memory2 = listenerContext.Memory2;
WriteReqPool = listenerContext.WriteReqPool;
Log = listenerContext.Log;
}

Expand All @@ -33,5 +36,7 @@ public ListenerContext(ListenerContext listenerContext)
public KestrelThread Thread { get; set; }

public MemoryPool2 Memory2 { get; set; }

public Queue<UvWriteReq> WriteReqPool { get; set; }
}
}
144 changes: 112 additions & 32 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class SocketOutput : ISocketOutput
{
public const int MaxPooledWriteReqs = 1024;

private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxWriteContextsPooledPerThread = 128;

private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);

[ThreadStatic]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid of adding a ThreadStatic. We don't use ThreadStatics anywhere else in Kestrel that I'm aware of.

I just checked the default maximum number of worker threads on my machine using ThreadPool.GetMaxThreads, and it was 32,767. I don't like having potentially that many static queues. Considering that each queue could pool up to 128 WriteContexts (and you just calculated the size WriteContext being 88 bytes), that's up to ~370MB of WriteContexts that will never be freed. That's not even considering the size of the queues themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't hold off merging #466 based on this (no issues in that one; and this is straight on top)

Know what you mean about ThreadStatics though been doing some investigation.

So the ThreadStatic is GC'd after thread exit; assuming it has no other outstanding strong references - which should be ok for this. Objects with a finalizer aren't so good as they'll hang around till the finalizer runs - but the WriteContext that's been reset is just a POCO managed type; so the queue will be happily GC'd when the threadpool scales back down.

If you are up to 32k of live threadpool threads; then the 32GB of thread stack space they need might be more of an issue than the 370MB of pooled WriteContexts.

However... I'm not sure this PR is a great win; it doesn't reduce any contention, as there already isn't any; but it should reduce the memory for a very large number of connections - but that scenerio will probably have all sorts of other stuff to look at.

Happy to close for now and come back to; if it is an issue?

private static Queue<WriteContext> _threadWriteContextPool;

private readonly KestrelThread _thread;
private readonly UvStreamHandle _socket;
private readonly Connection _connection;
Expand All @@ -42,12 +48,12 @@ public class SocketOutput : ISocketOutput
// The number of write operations that have been scheduled so far
// but have not completed.
private int _writesPending = 0;

private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
private readonly Queue<UvWriteReq> _writeReqPool;

public SocketOutput(
KestrelThread thread,
Expand All @@ -56,7 +62,8 @@ public SocketOutput(
Connection connection,
long connectionId,
IKestrelTrace log,
IThreadPool threadPool)
IThreadPool threadPool,
Queue<UvWriteReq> writeReqPool)
{
_thread = thread;
_socket = socket;
Expand All @@ -66,11 +73,25 @@ public SocketOutput(
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_writeReqPool = writeReqPool;

_head = memory.Lease();
_tail = _head;
}

private static Queue<WriteContext> WriteContextPool
{
get
{
if (_threadWriteContextPool == null)
{
_threadWriteContextPool = new Queue<WriteContext>(_maxWriteContextsPooledPerThread);
}

return _threadWriteContextPool;
}
}

public Task WriteAsync(
ArraySegment<byte> buffer,
bool immediate = true,
Expand All @@ -92,7 +113,15 @@ public Task WriteAsync(
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
if (WriteContextPool.Count > 0)
{
_nextWriteContext = WriteContextPool.Dequeue();
_nextWriteContext.Self = this;
}
else
{
_nextWriteContext = new WriteContext() { Self = this };
}
}

if (socketShutdownSend)
Expand Down Expand Up @@ -274,9 +303,14 @@ private void WriteAllPending()
}

// This is called on the libuv event loop
private void OnWriteCompleted(int bytesWritten, int status, Exception error)
private void OnWriteCompleted(WriteContext writeContext)
{
_log.ConnectionWriteCallback(_connectionId, status);
var bytesWritten = writeContext.ByteCount;
var status = writeContext.WriteStatus;
var error = writeContext.WriteError;

// Pool WriteContext on thread pool thread not libuv thread
ThreadPool.QueueUserWorkItem((c) => PoolWriteContext((WriteContext)c), writeContext);

if (error != null)
{
Expand Down Expand Up @@ -332,11 +366,11 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
}
}

_log.ConnectionWriteCallback(_connectionId, status);

if (scheduleWrite)
{
// ScheduleWrite();
// on right thread, fairness issues?
WriteAllPending();
ScheduleWrite();
}

_tasksCompleted.Clear();
Expand Down Expand Up @@ -367,6 +401,15 @@ private void ReturnAllBlocks()
}
}

private static void PoolWriteContext(WriteContext writeContext)
{
if (WriteContextPool.Count < _maxWriteContextsPooledPerThread)
{
writeContext.Reset();
WriteContextPool.Enqueue(writeContext);
}
}

void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
var task = WriteAsync(buffer, immediate);
Expand Down Expand Up @@ -412,54 +455,62 @@ private class WriteContext
{
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);

public SocketOutput Self;
private UvWriteReq _writeReq;
private MemoryPoolIterator2 _lockedStart;
private MemoryPoolIterator2 _lockedEnd;
private int _bufferCount;
private int _byteCount;

public SocketOutput Self;

public int ByteCount;
public bool SocketShutdownSend;
public bool SocketDisconnect;

public int WriteStatus;
public Exception WriteError;

public int ShutdownSendStatus;

public WriteContext(SocketOutput self)
{
Self = self;
}

/// <summary>
/// First step: initiate async write if needed, otherwise go to next step
/// </summary>
public void DoWriteIfNeeded()
{
LockWrite();

if (_byteCount == 0 || Self._socket.IsClosed)
if (ByteCount == 0 || Self._socket.IsClosed)
{
DoShutdownIfNeeded();
return;
}

var writeReq = new UvWriteReq(Self._log);
writeReq.Init(Self._thread.Loop);
// Sample values locally in case write completes inline
// to allow block to be Reset and still complete this function
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;
var socketOutput = Self;

writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
if (socketOutput._writeReqPool.Count > 0)
{
_writeReq.Dispose();
var _this = (WriteContext)state;
_this.ScheduleReturnFullyWrittenBlocks();
_this.WriteStatus = status;
_this.WriteError = error;
_this.DoShutdownIfNeeded();
_writeReq = socketOutput._writeReqPool.Dequeue();
}
else
{
_writeReq = new UvWriteReq(socketOutput._log);
_writeReq.Init(socketOutput._thread.Loop);
}

_writeReq.Write(socketOutput._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
var writeContext = (WriteContext)state;
writeContext.PoolWriteReq(writeContext._writeReq);
writeContext._writeReq = null;
writeContext.ScheduleReturnFullyWrittenBlocks();
writeContext.WriteStatus = status;
writeContext.WriteError = error;
writeContext.DoShutdownIfNeeded();
}, this);

Self._head = _lockedEnd.Block;
Self._head.Start = _lockedEnd.Index;
socketOutput._head = lockedEndBlock;
socketOutput._head.Start = lockedEndIndex;
}

/// <summary>
Expand Down Expand Up @@ -506,9 +557,21 @@ public void DoDisconnectIfNeeded()

public void Complete()
{
Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError);
Self.OnWriteCompleted(this);
}


private void PoolWriteReq(UvWriteReq writeReq)
{
if (Self._writeReqPool.Count < MaxPooledWriteReqs)
{
Self._writeReqPool.Enqueue(writeReq);
}
else
{
writeReq.Dispose();
}
}

private void ScheduleReturnFullyWrittenBlocks()
{
var block = _lockedStart.Block;
Expand Down Expand Up @@ -556,7 +619,24 @@ private void LockWrite()
_lockedStart = new MemoryPoolIterator2(head, head.Start);
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);

BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount);
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
}

public void Reset()
{
Self = null;
_lockedStart = default(MemoryPoolIterator2);
_lockedEnd = default(MemoryPoolIterator2);
_bufferCount = 0;
ByteCount = 0;

SocketShutdownSend = false;
SocketDisconnect = false;

WriteStatus = 0;
WriteError = null;

ShutdownSendStatus = 0;
}
}
}
Expand Down
Loading