Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pinning in quic #52368

Merged
merged 5 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ private sealed class State

// Buffers to hold during a call to send.
public MemoryHandle[] BufferArrays = new MemoryHandle[1];
public QuicBuffer[] SendQuicBuffers = new QuicBuffer[1];

// Handle to pinned SendQuicBuffers.
public GCHandle SendHandle;
public IntPtr SendQuicBuffers;
public int SendBufferMaxCount;
public int SendBufferCount;

// Resettable completions to be used for multiple calls to send, start, and shutdown.
public readonly ResettableCompletionSource<uint> SendResettableCompletionSource = new ResettableCompletionSource<uint>();
Expand Down Expand Up @@ -176,14 +175,12 @@ internal override async ValueTask WriteAsync(ReadOnlyMemory<ReadOnlyMemory<byte>

using CancellationTokenRegistration registration = await HandleWriteStartState(cancellationToken).ConfigureAwait(false);
await SendReadOnlyMemoryListAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false);

HandleWriteCompletedState();
}

internal override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool endStream, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();

using CancellationTokenRegistration registration = await HandleWriteStartState(cancellationToken).ConfigureAwait(false);

await SendReadOnlyMemoryAsync(buffer, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false);
Expand Down Expand Up @@ -212,7 +209,7 @@ private async ValueTask<CancellationTokenRegistration> HandleWriteStartState(Can
bool shouldComplete = false;
lock (state)
{
if (state.SendState == SendState.None)
if (state.SendState == SendState.None || state.SendState == SendState.Pending)
{
state.SendState = SendState.Aborted;
shouldComplete = true;
Expand Down Expand Up @@ -240,7 +237,7 @@ private void HandleWriteCompletedState()
{
lock (_state)
{
if (_state.SendState == SendState.Finished)
if (_state.SendState == SendState.Finished || _state.SendState == SendState.Aborted)
{
_state.SendState = SendState.None;
}
Expand Down Expand Up @@ -501,11 +498,11 @@ private void Dispose(bool disposing)
return;
}

_disposed = true;
_state.Handle.Dispose();
Marshal.FreeHGlobal(_state.SendQuicBuffers);
if (_stateHandle.IsAllocated) _stateHandle.Free();
CleanupSendState(_state);

_disposed = true;
}

private void EnableReceive()
Expand Down Expand Up @@ -602,7 +599,7 @@ private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt)
bool shouldComplete = false;
lock (state)
{
if (state.SendState == SendState.None)
if (state.SendState == SendState.None || state.SendState == SendState.Pending)
{
shouldComplete = true;
}
Expand Down Expand Up @@ -761,7 +758,7 @@ private static uint HandleEventSendComplete(State state, ref StreamEvent evt)

lock (state)
{
if (state.SendState == SendState.None)
if (state.SendState == SendState.Pending)
{
state.SendState = SendState.Finished;
complete = true;
Expand All @@ -771,7 +768,6 @@ private static uint HandleEventSendComplete(State state, ref StreamEvent evt)
if (complete)
{
CleanupSendState(state);

// TODO throw if a write was canceled.
state.SendResettableCompletionSource.Complete(MsQuicStatusCodes.Success);
}
Expand All @@ -781,15 +777,15 @@ private static uint HandleEventSendComplete(State state, ref StreamEvent evt)

private static void CleanupSendState(State state)
{
if (state.SendHandle.IsAllocated)
lock (state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the lock here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lock the state in other place when we transition. So I think we should be either consists or avoid the lock completely via some other mechanism (like preventing duplicate operations)

{
state.SendHandle.Free();
}
Debug.Assert(state.SendState != SendState.Pending);
Debug.Assert(state.SendBufferCount <= state.BufferArrays.Length);

// Callings dispose twice on a memory handle should be okay
foreach (MemoryHandle buffer in state.BufferArrays)
{
buffer.Dispose();
for (int i = 0; i < state.SendBufferCount; i++)
{
state.BufferArrays[i].Dispose();
}
}
}

Expand All @@ -798,6 +794,12 @@ private unsafe ValueTask SendReadOnlyMemoryAsync(
ReadOnlyMemory<byte> buffer,
QUIC_SEND_FLAGS flags)
{
lock (_state)
{
Debug.Assert(_state.SendState != SendState.Pending);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we want to prevent overlapping Sends. Shouldn't this then rather be a condition with throw QuicException...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not meant as guard agains overlaying Sends. (I think that should be done much earlier)
I added this to make sure our internal state logic (and msquic) always moves to some other state.

_state.SendState = buffer.IsEmpty ? SendState.Finished : SendState.Pending;
}

if (buffer.IsEmpty)
{
if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN)
Expand All @@ -809,18 +811,22 @@ private unsafe ValueTask SendReadOnlyMemoryAsync(
}

MemoryHandle handle = buffer.Pin();
_state.SendQuicBuffers[0].Length = (uint)buffer.Length;
_state.SendQuicBuffers[0].Buffer = (byte*)handle.Pointer;

_state.BufferArrays[0] = handle;
if (_state.SendQuicBuffers == IntPtr.Zero)
{
_state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QuicBuffer));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the major difference here is that we allocate SendQuicBuffers aka QuicBuffer* directly in unmanaged memory. While originally we were using pinned managed memory GCHandle.Alloc(_state.SendQuicBuffers, GCHandleType.Pinned);.

What was wrong with the original approach? Was it Marshal.UnsafeAddrOfPinnedArrayElement(_state.SendQuicBuffers, 0);? Should that be _state.SendHandle.AddrOfPinnedObject();?

This comment is just me trying to properly understand what's going on here. It has absolutely no influence on mergeability of this PR 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could keep pinning. But then we do more operations on each send and we need to maintain the handle. So I felt this would be simpler as the SendQuicBuffers are really only consumed by native code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general point here is that since we only use this array as unmanaged memory, there's not really any benefit to allocating it as managed memory, and some nontrivial cost (overhead of pin/unpin, additional GC overhead, etc).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how does this solve the problem with byte mixing and AVE? Do we know the exact root cause of that?

Copy link
Member

@jkotas jkotas May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have doubts that this change is actually fixing the root cause of the crash. It is probably just moving it around.

This type has number of subtle issues like #52048 that will lead to use-after-free and similar crashes. Somebody will need to do a focused pass over it to fix them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how does this solve the problem with byte mixing and AVE? Do we know the exact root cause of that?

Yeah, you are right. Seems like this probably doesn't fix the root cause. It might make it less likely to happen, because the memory will never move; but I don't think we actually understand the root cause yet.

(I had assumed we were previously allocating the array on the stack, per discussion above -- but we weren't, so that wasn't the root problem.)

It would probably help to clear out the BufferArray entries when we dispose them, here: https://github.com/dotnet/runtime/pull/52368/files#diff-55ed6a1c110b1a90d4900bce3075ab49f1d6212c223e69b48f711f4084d264acR787

If we have a use-after-free issue, that would help find it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems suspicious that we are calling CleanupSendState in Dispose -- how do we know that msquic isn't still holding our buffer array?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkotas

This type has number of subtle issues like #52048 that will lead to use-after-free and similar crashes. Somebody will need to do a focused pass over it to fix them.

Yeah, I agree we need a pass here. Do you have specific concerns? Dispose in particular looks questionable to me, anything else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything else?

Handling of different failure modes. For example:

https://github.com/dotnet/runtime/pull/52368/files#diff-55ed6a1c110b1a90d4900bce3075ab49f1d6212c223e69b48f711f4084d264acR943-R945

If AllocHGlobal fails, we will think that we have space for old SendBufferMaxCount, but SendQuicBuffers is actually going to be null. This will lead to NullReferenceException on retry.
If new MemoryHandle[array.Length] fails, we will think that we have space for new SendBufferMaxCount, but BufferArrays is actually going to have only have space for old SendBufferMaxCount. This will lead to IndexOutOfBoundsException on retry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to write new tests and cleanup the shutdown more. I would generally expect that if allocation fails, the stream (or whole system) will not be in usable state. I can certainly split the allocations and assignment to make sure it is consistent.

_state.SendBufferMaxCount = 1;
}

_state.SendHandle = GCHandle.Alloc(_state.SendQuicBuffers, GCHandleType.Pinned);
QuicBuffer* quicBuffers = (QuicBuffer*)_state.SendQuicBuffers;
quicBuffers->Length = (uint)buffer.Length;
quicBuffers->Buffer = (byte*)handle.Pointer;

var quicBufferPointer = (QuicBuffer*)Marshal.UnsafeAddrOfPinnedArrayElement(_state.SendQuicBuffers, 0);
_state.BufferArrays[0] = handle;
_state.SendBufferCount = 1;

uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBufferPointer,
quicBuffers,
bufferCount: 1,
flags,
IntPtr.Zero);
Expand All @@ -841,6 +847,13 @@ private unsafe ValueTask SendReadOnlySequenceAsync(
ReadOnlySequence<byte> buffers,
QUIC_SEND_FLAGS flags)
{

lock (_state)
{
Debug.Assert(_state.SendState != SendState.Pending);
_state.SendState = buffers.IsEmpty ? SendState.Finished : SendState.Pending;
}

if (buffers.IsEmpty)
{
if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN)
Expand All @@ -851,38 +864,39 @@ private unsafe ValueTask SendReadOnlySequenceAsync(
return default;
}

uint count = 0;
int count = 0;

foreach (ReadOnlyMemory<byte> buffer in buffers)
{
++count;
}

if (_state.SendQuicBuffers.Length < count)
if (_state.SendBufferMaxCount < count)
{
_state.SendQuicBuffers = new QuicBuffer[count];
Marshal.FreeHGlobal(_state.SendQuicBuffers);
jkotas marked this conversation as resolved.
Show resolved Hide resolved
_state.SendQuicBuffers = IntPtr.Zero;
_state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QuicBuffer) * count);
_state.SendBufferMaxCount = count;
_state.BufferArrays = new MemoryHandle[count];
}

_state.SendBufferCount = count;
count = 0;

QuicBuffer* quicBuffers = (QuicBuffer*)_state.SendQuicBuffers;
foreach (ReadOnlyMemory<byte> buffer in buffers)
{
MemoryHandle handle = buffer.Pin();
_state.SendQuicBuffers[count].Length = (uint)buffer.Length;
_state.SendQuicBuffers[count].Buffer = (byte*)handle.Pointer;
quicBuffers[count].Length = (uint)buffer.Length;
quicBuffers[count].Buffer = (byte*)handle.Pointer;
_state.BufferArrays[count] = handle;
++count;
}

_state.SendHandle = GCHandle.Alloc(_state.SendQuicBuffers, GCHandleType.Pinned);

var quicBufferPointer = (QuicBuffer*)Marshal.UnsafeAddrOfPinnedArrayElement(_state.SendQuicBuffers, 0);

uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBufferPointer,
count,
quicBuffers,
(uint)count,
flags,
IntPtr.Zero);

Expand All @@ -902,6 +916,12 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(
ReadOnlyMemory<ReadOnlyMemory<byte>> buffers,
QUIC_SEND_FLAGS flags)
{
lock (_state)
{
Debug.Assert(_state.SendState != SendState.Pending);
_state.SendState = buffers.IsEmpty ? SendState.Finished : SendState.Pending;
}

if (buffers.IsEmpty)
{
if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN)
Expand All @@ -916,28 +936,31 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(

uint length = (uint)array.Length;

if (_state.SendQuicBuffers.Length < length)
if (_state.SendBufferMaxCount < array.Length)
{
_state.SendQuicBuffers = new QuicBuffer[length];
_state.BufferArrays = new MemoryHandle[length];
Marshal.FreeHGlobal(_state.SendQuicBuffers);
jkotas marked this conversation as resolved.
Show resolved Hide resolved
_state.SendQuicBuffers = IntPtr.Zero;
_state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QuicBuffer) * array.Length);
_state.SendBufferMaxCount = array.Length;
_state.BufferArrays = new MemoryHandle[array.Length];
}

_state.SendBufferCount = array.Length;
QuicBuffer* quicBuffers = (QuicBuffer*)_state.SendQuicBuffers;
for (int i = 0; i < length; i++)
{
ReadOnlyMemory<byte> buffer = array[i];
MemoryHandle handle = buffer.Pin();
_state.SendQuicBuffers[i].Length = (uint)buffer.Length;
_state.SendQuicBuffers[i].Buffer = (byte*)handle.Pointer;
_state.BufferArrays[i] = handle;
}

_state.SendHandle = GCHandle.Alloc(_state.SendQuicBuffers, GCHandleType.Pinned);
quicBuffers[i].Length = (uint)buffer.Length;
quicBuffers[i].Buffer = (byte*)handle.Pointer;

var quicBufferPointer = (QuicBuffer*)Marshal.UnsafeAddrOfPinnedArrayElement(_state.SendQuicBuffers, 0);
_state.BufferArrays[i] = handle;
}

uint status = MsQuicApi.Api.StreamSendDelegate(
_state.Handle,
quicBufferPointer,
quicBuffers,
length,
flags,
IntPtr.Zero);
Expand Down Expand Up @@ -1014,6 +1037,7 @@ private enum ShutdownState
private enum SendState
{
None,
Pending,
Aborted,
Finished
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ public BufferSegment Append(ReadOnlyMemory<byte> memory)
}
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/52047")]
[Fact]
public async Task ByteMixingOrNativeAVE_MinimalFailingTest()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ public async Task GetStreamIdWithoutStartWorks()
await clientConnection.CloseAsync(0);
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/52047")]
[Fact]
public async Task LargeDataSentAndReceived()
{
Expand Down Expand Up @@ -348,7 +347,6 @@ private static async Task SendAndReceiveEOFAsync(QuicStream s1, QuicStream s2)
Assert.Equal(0, bytesRead);
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/52047")]
[Theory]
[MemberData(nameof(ReadWrite_Random_Success_Data))]
public async Task ReadWrite_Random_Success(int readSize, int writeSize)
Expand Down Expand Up @@ -434,7 +432,7 @@ await Task.Run(async () =>
byte[] buffer = new byte[100];
QuicStreamAbortedException ex = await Assert.ThrowsAsync<QuicStreamAbortedException>(() => serverStream.ReadAsync(buffer).AsTask());
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
}).WaitAsync(TimeSpan.FromSeconds(5));
}).WaitAsync(TimeSpan.FromSeconds(15));
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/32050")]
Expand Down