Skip to content

Commit

Permalink
remove waiting on start event (#55442)
Browse files Browse the repository at this point in the history
Co-authored-by: Geoffrey Kizer <geoffrek@windows.microsoft.com>
  • Loading branch information
geoffkizer and Geoffrey Kizer authored Jul 12, 2021
1 parent d766727 commit 98b7ed1
Showing 1 changed file with 21 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ internal sealed class MsQuicStream : QuicStreamProvider
// Backing for StreamId
private long _streamId = -1;

// Used to check if StartAsync has been called.
private bool _started;

private int _disposed;

private sealed class State
Expand All @@ -53,7 +50,7 @@ private sealed class State
public int SendBufferMaxCount;
public int SendBufferCount;

// Resettable completions to be used for multiple calls to send, start, and shutdown.
// Resettable completions to be used for multiple calls to send.
public readonly ResettableCompletionSource<uint> SendResettableCompletionSource = new ResettableCompletionSource<uint>();

public ShutdownWriteState ShutdownWriteState;
Expand Down Expand Up @@ -94,7 +91,6 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
_state.Handle = streamHandle;
_canRead = true;
_canWrite = !flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
_started = true;
if (!_canWrite)
{
_state.SendState = SendState.Closed;
Expand Down Expand Up @@ -217,7 +213,7 @@ internal override async ValueTask WriteAsync(ReadOnlySequence<byte> buffers, boo
{
ThrowIfDisposed();

using CancellationTokenRegistration registration = await HandleWriteStartState(cancellationToken).ConfigureAwait(false);
using CancellationTokenRegistration registration = HandleWriteStartState(cancellationToken);

await SendReadOnlySequenceAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false);

Expand All @@ -233,7 +229,7 @@ internal override async ValueTask WriteAsync(ReadOnlyMemory<ReadOnlyMemory<byte>
{
ThrowIfDisposed();

using CancellationTokenRegistration registration = await HandleWriteStartState(cancellationToken).ConfigureAwait(false);
using CancellationTokenRegistration registration = HandleWriteStartState(cancellationToken);

await SendReadOnlyMemoryListAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false);

Expand All @@ -244,14 +240,14 @@ internal override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool e
{
ThrowIfDisposed();

using CancellationTokenRegistration registration = await HandleWriteStartState(cancellationToken).ConfigureAwait(false);
using CancellationTokenRegistration registration = HandleWriteStartState(cancellationToken);

await SendReadOnlyMemoryAsync(buffer, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false);

HandleWriteCompletedState();
}

private async ValueTask<CancellationTokenRegistration> HandleWriteStartState(CancellationToken cancellationToken)
private CancellationTokenRegistration HandleWriteStartState(CancellationToken cancellationToken)
{
if (_state.SendState == SendState.Closed)
{
Expand All @@ -277,14 +273,7 @@ private async ValueTask<CancellationTokenRegistration> HandleWriteStartState(Can
}
}

throw new System.OperationCanceledException(cancellationToken);
}

// Make sure start has completed
if (!_started)
{
await _state.SendResettableCompletionSource.GetTypelessValueTask().ConfigureAwait(false);
_started = true;
throw new OperationCanceledException(cancellationToken);
}

// if token was already cancelled, this would execute synchronously
Expand Down Expand Up @@ -372,7 +361,7 @@ internal override async ValueTask<int> ReadAsync(Memory<byte> destination, Cance
}
}

throw new System.OperationCanceledException(cancellationToken);
throw new OperationCanceledException(cancellationToken);
}

if (NetEventSource.Log.IsEnabled())
Expand Down Expand Up @@ -740,12 +729,12 @@ private static uint HandleEvent(State state, ref StreamEvent evt)

try
{
switch ((QUIC_STREAM_EVENT_TYPE)evt.Type)
switch (evt.Type)
{
// Stream has started.
// Will only be done for outbound streams (inbound streams have already started)
case QUIC_STREAM_EVENT_TYPE.START_COMPLETE:
return HandleEventStartComplete(state);
return HandleEventStartComplete(state, ref evt);
// Received data on the stream
case QUIC_STREAM_EVENT_TYPE.RECEIVE:
return HandleEventRecv(state, ref evt);
Expand Down Expand Up @@ -778,12 +767,10 @@ private static uint HandleEvent(State state, ref StreamEvent evt)
{
if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Error(state, $"[Stream#{state.GetHashCode()}] Exception occurred during handling {(QUIC_STREAM_EVENT_TYPE)evt.Type} event: {ex}");
NetEventSource.Error(state, $"[Stream#{state.GetHashCode()}] Exception occurred during handling {evt.Type} event: {ex}");
}

// This is getting hit currently
// See https://github.com/dotnet/runtime/issues/55302
//Debug.Fail($"[Stream#{state.GetHashCode()}] Exception occurred during handling {(QUIC_STREAM_EVENT_TYPE)evt.Type} event: {ex}");
Debug.Fail($"[Stream#{state.GetHashCode()}] Exception occurred during handling {evt.Type} event: {ex}");

return MsQuicStatusCodes.InternalError;
}
Expand Down Expand Up @@ -840,22 +827,10 @@ private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt)
return MsQuicStatusCodes.Success;
}

private static uint HandleEventStartComplete(State state)
private static uint HandleEventStartComplete(State state, ref StreamEvent evt)
{
bool shouldComplete = false;
lock (state)
{
// Check send state before completing as send cancellation is shared between start and send.
if (state.SendState == SendState.None)
{
shouldComplete = true;
}
}

if (shouldComplete)
{
state.SendResettableCompletionSource.Complete(MsQuicStatusCodes.Success);
}
// TODO: We should probably check for a failure as indicated by the event data (or at least assert no failure if we aren't expecting it).
// However, since there is no definition for START_COMPLETE event data currently, we can't do this right now.

return MsQuicStatusCodes.Success;
}
Expand Down Expand Up @@ -1336,7 +1311,7 @@ private enum ReadState
/// <summary>
/// The stream is open, but there is no data available.
/// </summary>
None,
None = 0,

/// <summary>
/// Data is available in <see cref="State.ReceiveQuicBuffers"/>.
Expand Down Expand Up @@ -1366,15 +1341,15 @@ private enum ReadState

private enum ShutdownWriteState
{
None,
None = 0,
Canceled,
Finished,
ConnectionClosed
}

private enum ShutdownState
{
None,
None = 0,
Canceled,
Pending,
Finished,
Expand All @@ -1383,10 +1358,12 @@ private enum ShutdownState

private enum SendState
{
None,
None = 0,
Pending,
Aborted,
Finished,

// Terminal states
Aborted,
ConnectionClosed,
Closed
}
Expand Down

0 comments on commit 98b7ed1

Please sign in to comment.