Skip to content

Commit

Permalink
Split the HttpConnectionPool into multiple files (#100614)
Browse files Browse the repository at this point in the history
* Split HttpConnectionPool into multiple files

* Un-nest the connection waiter and request queue

* More shuffles

* Revert minor H3 change
  • Loading branch information
MihaZupan authored Apr 8, 2024
1 parent bc2bd2b commit 70e3218
Show file tree
Hide file tree
Showing 9 changed files with 2,868 additions and 2,799 deletions.
7 changes: 6 additions & 1 deletion src/libraries/System.Net.Http/src/System.Net.Http.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@
<Compile Include="System\Net\Http\Headers\AltSvcHeaderParser.cs" />
<Compile Include="System\Net\Http\Headers\AltSvcHeaderValue.cs" />
<Compile Include="System\Net\Http\Headers\KnownHeader.Http2And3.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\ConnectionPool\HttpConnectionPool.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\ConnectionPool\HttpConnectionPool.Http1.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\ConnectionPool\HttpConnectionPool.Http2.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\ConnectionPool\HttpConnectionPool.Http3.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\ConnectionPool\HttpConnectionWaiter.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\ConnectionPool\RequestQueue.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\AuthenticationHelper.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\AuthenticationHelper.Digest.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\AuthenticationHelper.NtAuth.cs" />
Expand All @@ -199,7 +205,6 @@
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionBase.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionHandler.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionKind.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionPool.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionPoolManager.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionResponseContent.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpConnectionSettings.cs" />
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net.Http
{
internal sealed class HttpConnectionWaiter<T> : TaskCompletionSourceWithCancellation<T>
where T : HttpConnectionBase?
{
// When a connection attempt is pending, reference the connection's CTS, so we can tear it down if the initiating request is cancelled
// or completes on a different connection.
public CancellationTokenSource? ConnectionCancellationTokenSource;

// Distinguish connection cancellation that happens because the initiating request is cancelled or completed on a different connection.
public bool CancelledByOriginatingRequestCompletion { get; set; }

public ValueTask<T> WaitForConnectionAsync(HttpRequestMessage request, HttpConnectionPool pool, bool async, CancellationToken requestCancellationToken)
{
return HttpTelemetry.Log.IsEnabled() || pool.Settings._metrics!.RequestsQueueDuration.Enabled
? WaitForConnectionWithTelemetryAsync(request, pool, async, requestCancellationToken)
: WaitWithCancellationAsync(async, requestCancellationToken);
}

private async ValueTask<T> WaitForConnectionWithTelemetryAsync(HttpRequestMessage request, HttpConnectionPool pool, bool async, CancellationToken requestCancellationToken)
{
Debug.Assert(typeof(T) == typeof(HttpConnection) || typeof(T) == typeof(Http2Connection));

long startingTimestamp = Stopwatch.GetTimestamp();
try
{
return await WaitWithCancellationAsync(async, requestCancellationToken).ConfigureAwait(false);
}
finally
{
TimeSpan duration = Stopwatch.GetElapsedTime(startingTimestamp);
int versionMajor = typeof(T) == typeof(HttpConnection) ? 1 : 2;

pool.Settings._metrics!.RequestLeftQueue(request, pool, duration, versionMajor);

if (HttpTelemetry.Log.IsEnabled())
{
HttpTelemetry.Log.RequestLeftQueue(versionMajor, duration);
}
}
}

public bool TrySignal(T connection)
{
Debug.Assert(connection is not null);

if (TrySetResult(connection))
{
if (NetEventSource.Log.IsEnabled()) connection.Trace("Dequeued waiting request.");
return true;
}
else
{
if (NetEventSource.Log.IsEnabled())
{
connection.Trace(Task.IsCanceled
? "Discarding canceled request from queue."
: "Discarding signaled request waiter from queue.");
}
return false;
}
}

public void CancelIfNecessary(HttpConnectionPool pool, bool requestCancelled)
{
int timeout = GlobalHttpSettings.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion;
if (ConnectionCancellationTokenSource is null ||
timeout == Timeout.Infinite ||
pool.Settings._connectTimeout != Timeout.InfiniteTimeSpan && timeout > (int)pool.Settings._connectTimeout.TotalMilliseconds) // Do not override shorter ConnectTimeout
{
return;
}

lock (this)
{
if (ConnectionCancellationTokenSource is null)
{
return;
}

if (NetEventSource.Log.IsEnabled())
{
pool.Trace($"Initiating cancellation of a pending connection attempt with delay of {timeout} ms, " +
$"Reason: {(requestCancelled ? "Request cancelled" : "Request served by another connection")}.");
}

CancelledByOriginatingRequestCompletion = true;
if (timeout > 0)
{
// Cancel after the specified timeout. This cancellation will not fire if the connection
// succeeds within the delay and the CTS becomes disposed.
ConnectionCancellationTokenSource.CancelAfter(timeout);
}
else
{
// Cancel immediately if no timeout specified.
ConnectionCancellationTokenSource.Cancel();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace System.Net.Http
{
internal struct RequestQueue<T>
where T : HttpConnectionBase?
{
public struct QueueItem
{
public HttpRequestMessage Request;
public HttpConnectionWaiter<T> Waiter;
}

// This implementation mimics that of Queue<T>, but without version checks and with an extra head pointer
// https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Collections/Generic/Queue.cs
private QueueItem[] _array;
private int _head; // The index from which to dequeue if the queue isn't empty.
private int _tail; // The index at which to enqueue if the queue isn't full.
private int _size; // Number of elements.
private int _attemptedConnectionsOffset; // The offset from head where we should next peek for a request without a connection attempt

public RequestQueue()
{
_array = Array.Empty<QueueItem>();
_head = 0;
_tail = 0;
_size = 0;
_attemptedConnectionsOffset = 0;
}

private void Enqueue(QueueItem queueItem)
{
if (_size == _array.Length)
{
Grow();
}

_array[_tail] = queueItem;
MoveNext(ref _tail);

_size++;
}

private QueueItem Dequeue()
{
Debug.Assert(_size > 0);

int head = _head;
QueueItem[] array = _array;

QueueItem queueItem = array[head];
array[head] = default;

MoveNext(ref _head);

if (_attemptedConnectionsOffset > 0)
{
_attemptedConnectionsOffset--;
}

_size--;
return queueItem;
}

private bool TryPeek(out QueueItem queueItem)
{
if (_size == 0)
{
queueItem = default!;
return false;
}

queueItem = _array[_head];
return true;
}

private void MoveNext(ref int index)
{
int tmp = index + 1;
if (tmp == _array.Length)
{
tmp = 0;
}
index = tmp;
}

private void Grow()
{
var newArray = new QueueItem[Math.Max(4, _array.Length * 2)];

if (_size != 0)
{
if (_head < _tail)
{
Array.Copy(_array, _head, newArray, 0, _size);
}
else
{
Array.Copy(_array, _head, newArray, 0, _array.Length - _head);
Array.Copy(_array, 0, newArray, _array.Length - _head, _tail);
}
}

_array = newArray;
_head = 0;
_tail = _size;
}


public HttpConnectionWaiter<T> EnqueueRequest(HttpRequestMessage request)
{
var waiter = new HttpConnectionWaiter<T>();
EnqueueRequest(request, waiter);
return waiter;
}


public void EnqueueRequest(HttpRequestMessage request, HttpConnectionWaiter<T> waiter)
{
Enqueue(new QueueItem { Request = request, Waiter = waiter });
}

public void PruneCompletedRequestsFromHeadOfQueue(HttpConnectionPool pool)
{
while (TryPeek(out QueueItem queueItem) && queueItem.Waiter.Task.IsCompleted)
{
if (NetEventSource.Log.IsEnabled())
{
pool.Trace(queueItem.Waiter.Task.IsCanceled
? "Discarding canceled request from queue."
: "Discarding signaled request waiter from queue.");
}

Dequeue();
}
}

public bool TryDequeueWaiter(HttpConnectionPool pool, [MaybeNullWhen(false)] out HttpConnectionWaiter<T> waiter)
{
PruneCompletedRequestsFromHeadOfQueue(pool);

if (Count != 0)
{
waiter = Dequeue().Waiter;
return true;
}

waiter = null;
return false;
}

public void TryDequeueSpecificWaiter(HttpConnectionWaiter<T> waiter)
{
if (TryPeek(out QueueItem queueItem) && queueItem.Waiter == waiter)
{
Dequeue();
}
}

public QueueItem PeekNextRequestForConnectionAttempt()
{
Debug.Assert(_attemptedConnectionsOffset >= 0);
Debug.Assert(_attemptedConnectionsOffset < _size, $"{_attemptedConnectionsOffset} < {_size}");

int index = _head + _attemptedConnectionsOffset;
_attemptedConnectionsOffset++;

if (index >= _array.Length)
{
index -= _array.Length;
}

return _array[index];
}

public int Count => _size;

public int RequestsWithoutAConnectionAttempt => _size - _attemptedConnectionsOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,41 @@ static void LogFaulted(HttpConnectionBase connection, Task task)
}

public abstract void Dispose();

/// <summary>
/// Called by <see cref="HttpConnectionPool.CleanCacheAndDisposeIfUnused"/> while holding the lock.
/// </summary>
public bool IsUsable(long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout)
{
// Validate that the connection hasn't been idle in the pool for longer than is allowed.
if (pooledConnectionIdleTimeout != Timeout.InfiniteTimeSpan)
{
long idleTicks = GetIdleTicks(nowTicks);
if (idleTicks > pooledConnectionIdleTimeout.TotalMilliseconds)
{
if (NetEventSource.Log.IsEnabled()) Trace($"Scavenging connection. Idle {TimeSpan.FromMilliseconds(idleTicks)} > {pooledConnectionIdleTimeout}.");
return false;
}
}

// Validate that the connection lifetime has not been exceeded.
if (pooledConnectionLifetime != Timeout.InfiniteTimeSpan)
{
long lifetimeTicks = GetLifetimeTicks(nowTicks);
if (lifetimeTicks > pooledConnectionLifetime.TotalMilliseconds)
{
if (NetEventSource.Log.IsEnabled()) Trace($"Scavenging connection. Lifetime {TimeSpan.FromMilliseconds(lifetimeTicks)} > {pooledConnectionLifetime}.");
return false;
}
}

if (!CheckUsabilityOnScavenge())
{
if (NetEventSource.Log.IsEnabled()) Trace($"Scavenging connection. Keep-Alive timeout exceeded, unexpected data or EOF received.");
return false;
}

return true;
}
}
}
Loading

0 comments on commit 70e3218

Please sign in to comment.