Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUIC] Copy managed memory instead of pinning #72746

Merged
merged 2 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, Htt
await Task.WhenAny(task, Task.Delay(_random.Next(0, 60), cts.Token));
}

cts.Cancel();
IsCancellationRequested = true;
cts.Cancel();
return WithVersionValidation(await task);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using Microsoft.Extensions.Primitives;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Serilog;
using Microsoft.Extensions.ObjectPool;

namespace HttpStress
{
Expand All @@ -36,6 +37,7 @@ public class StressServer : IDisposable
public const string ExpectedResponseContentLength = "Expected-Response-Content-Length";

private readonly IWebHost _webHost;
private readonly LogQuicEventListener? _listener;

public string ServerUri { get; }

Expand Down Expand Up @@ -155,6 +157,10 @@ void ConfigureListenOptions(ListenOptions listenOptions)
.WriteTo.Console(Serilog.Events.LogEventLevel.Warning);
}
Log.Logger = loggerConfiguration.CreateLogger();
if (configuration.Trace)
{
_listener = new LogQuicEventListener(Log.Logger);
}

host = host
.UseSerilog()
Expand Down Expand Up @@ -333,6 +339,7 @@ private static void AppendChecksumHeader(IHeaderDictionary headers, ulong checks
public void Dispose()
{
_webHost.Dispose();
_listener?.Dispose();
}

private static (string scheme, string hostname, int port) ParseServerUri(string serverUri)
Expand Down Expand Up @@ -397,4 +404,51 @@ public static bool IsValidServerContent(string input)
return true;
}
}

public class LogQuicEventListener : EventListener
{
private DefaultObjectPool<StringBuilder> _stringBuilderPool = new DefaultObjectPool<StringBuilder>(new StringBuilderPooledObjectPolicy());
private readonly Serilog.ILogger _logger;

public LogQuicEventListener(Serilog.ILogger logger)
{
_logger = logger;
}

protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource.Name == "Private.InternalDiagnostics.System.Net.Quic")
{
EnableEvents(eventSource, EventLevel.LogAlways);
}
}

protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
StringBuilder sb = _stringBuilderPool.Get();
sb.Append($"{eventData.TimeStamp:HH:mm:ss.fffffff}[{eventData.EventName}] ");
for (int i = 0; i < eventData.Payload?.Count; i++)
{
if (i > 0)
{
sb.Append(", ");
}
sb.Append(eventData.PayloadNames?[i]).Append(": ").Append(eventData.Payload[i]);
}
if (eventData.Level > EventLevel.Error)
{
_logger.Debug(sb.ToString());
}
else
{
_logger.Error(sb.ToString());
}
_stringBuilderPool.Return(sb);
}

public override void Dispose()
{
base.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Microsoft.Quic;
Expand All @@ -15,16 +14,13 @@ namespace System.Net.Quic;
/// </summary>
internal unsafe struct MsQuicBuffers : IDisposable
{
// Handles to pinned memory blocks from the user.
private MemoryHandle[] _handles;
// Native memory block which holds the pinned memory pointers from _handles and can be passed to MsQuic as QUIC_BUFFER*.
private QUIC_BUFFER* _buffers;
// Number of QUIC_BUFFER instance currently allocated in _buffers, so that we can reuse the memory instead of reallocating.
private int _count;

public MsQuicBuffers()
{
_handles = Array.Empty<MemoryHandle>();
_buffers = null;
_count = 0;
}
Expand All @@ -37,44 +33,39 @@ private void FreeNativeMemory()
QUIC_BUFFER* buffers = _buffers;
_buffers = null;
NativeMemory.Free(buffers);
_count = 0;
}

private void Reserve(int count)
{
if (_handles.Length < count)
if (count > _count)
{
_handles = new MemoryHandle[count];
FreeNativeMemory();
_buffers = (QUIC_BUFFER*)NativeMemory.Alloc((nuint)count, (nuint)sizeof(QUIC_BUFFER));
_buffers = (QUIC_BUFFER*)NativeMemory.AllocZeroed((nuint)count, (nuint)sizeof(QUIC_BUFFER));
_count = count;
}

_count = count;
}

private void SetBuffer(int index, ReadOnlyMemory<byte> buffer)
{
MemoryHandle handle = buffer.Pin();

_handles[index] = handle;
_buffers[index].Buffer = (byte*)handle.Pointer;
_buffers[index].Buffer = (byte*)NativeMemory.Alloc((nuint)buffer.Length, (nuint)sizeof(byte));
_buffers[index].Length = (uint)buffer.Length;
buffer.Span.CopyTo(_buffers[index].Span);
}

/// <summary>
/// Initializes QUIC_BUFFER* with data from inputs, converted via toBuffer.
/// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset.
/// </summary>
/// <param name="inputs">Inputs to get their byte array, pin them and pepare them to be passed to MsQuic as QUIC_BUFFER*.</param>
/// <param name="inputs">Inputs to get their byte array, copy them to be passed to MsQuic as QUIC_BUFFER*.</param>
/// <param name="toBuffer">Method extracting byte array from the inputs, e.g. applicationProtocol.Protocol.</param>
/// <typeparam name="T">The type of the inputs.</typeparam>
public void Initialize<T>(IList<T> inputs, Func<T, ReadOnlyMemory<byte>> toBuffer)
{
Reserve(inputs.Count);

for (int i = 0; i < inputs.Count; ++i)
{
ReadOnlyMemory<byte> buffer = toBuffer(inputs[i]);
SetBuffer(i, buffer);
SetBuffer(i, toBuffer(inputs[i]));
}
}

Expand All @@ -90,19 +81,25 @@ public void Initialize(ReadOnlyMemory<byte> buffer)
}

/// <summary>
/// Unpins the managed memory and allows reuse of this struct.
/// Release the native memory of individual buffers and allows reuse of this struct.
/// </summary>
public void Reset()
{
for (int i = 0; i < _count; ++i)
{
_handles[i].Dispose();
if (_buffers[i].Buffer is null)
{
break;
}
byte* buffer = _buffers[i].Buffer;
_buffers[i].Buffer = null;
NativeMemory.Free(buffer);
_buffers[i].Length = 0;
}
}

/// <summary>
/// Apart from unpinning the managed memory, it returns the shared buffer,
/// but most importantly it releases the unmanaged memory.
/// Releases all the native memory.
/// </summary>
public void Dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public sealed partial class QuicStream
}
}
};
// [ActiveIssue("https://github.com/dotnet/roslyn-analyzers/issues/5750")] Structs can have parameterless ctor now and thus the behavior differs from just defaulting the struct to zeros.
#pragma warning disable CA1805
private ReceiveBuffers _receiveBuffers = new ReceiveBuffers();
#pragma warning restore CA1805
private int _receivedNeedsEnable;

private readonly ResettableValueTaskSource _sendTcs = new ResettableValueTaskSource()
Expand All @@ -92,10 +89,7 @@ public sealed partial class QuicStream
}
}
};
// [ActiveIssue("https://github.com/dotnet/roslyn-analyzers/issues/5750")] Structs can have parameterless ctor now and thus the behavior differs from just defaulting the struct to zeros.
#pragma warning disable CA1805
private MsQuicBuffers _sendBuffers = new MsQuicBuffers();
#pragma warning restore CA1805

private readonly long _defaultErrorCode;

Expand Down