diff --git a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs index 78c49a1aaee13..347bb985df416 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs @@ -59,9 +59,9 @@ public override void Dispose() stream.Dispose(); } -// We don't dispose the connection currently, because this causes races when the server connection is closed before -// the client has received and handled all response data. -// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832 + // We don't dispose the connection currently, because this causes races when the server connection is closed before + // the client has received and handled all response data. + // See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832 #if false // Dispose the connection // If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle. @@ -79,14 +79,14 @@ public async Task CloseAsync(long errorCode) await _connection.CloseAsync(errorCode).ConfigureAwait(false); } - public Http3LoopbackStream OpenUnidirectionalStream() + public async ValueTask OpenUnidirectionalStreamAsync() { - return new Http3LoopbackStream(_connection.OpenUnidirectionalStream()); + return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync()); } - public Http3LoopbackStream OpenBidirectionalStream() + public async ValueTask OpenBidirectionalStreamAsync() { - return new Http3LoopbackStream(_connection.OpenBidirectionalStream()); + return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync()); } public static int GetRequestId(QuicStream stream) @@ -185,10 +185,10 @@ public async Task AcceptRequestStreamAsync() public async Task EstablishControlStreamAsync() { - _outboundControlStream = OpenUnidirectionalStream(); + _outboundControlStream = await OpenUnidirectionalStreamAsync(); await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream); await _outboundControlStream.SendSettingsFrameAsync(); - } + } public override async Task ReadRequestBodyAsync() { diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 39e0d80107b36..f64fc6bc6200a 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -174,42 +174,32 @@ public async Task SendAsync(HttpRequestMessage request, lon // Allocate an active request QuicStream? quicStream = null; Http3RequestStream? requestStream = null; - ValueTask waitTask = default; try { try { - while (true) + QuicConnection? conn = _connection; + if (conn != null) { - lock (SyncObj) + if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0) { - if (_connection == null) - { - break; - } - - if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0) - { - quicStream = _connection.OpenBidirectionalStream(); - requestStream = new Http3RequestStream(request, this, quicStream); - _activeRequests.Add(quicStream, requestStream); - break; - } - - waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken); + queueStartingTimestamp = Stopwatch.GetTimestamp(); } - if (HttpTelemetry.Log.IsEnabled() && !waitTask.IsCompleted && queueStartingTimestamp == 0) + quicStream = await conn.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false); + + requestStream = new Http3RequestStream(request, this, quicStream); + lock (SyncObj) { - // We avoid logging RequestLeftQueue if a stream was available immediately (synchronously) - queueStartingTimestamp = Stopwatch.GetTimestamp(); + _activeRequests.Add(quicStream, requestStream); } - - // Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet. - await waitTask.ConfigureAwait(false); } } + // Swallow any exceptions caused by the connection being closed locally or even disposed due to a race. + // Since quicStream will stay `null`, the code below will throw appropriate exception to retry the request. + catch (ObjectDisposedException) { } + catch (QuicException e) when (!(e is QuicConnectionAbortedException)) { } finally { if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp != 0) @@ -377,7 +367,7 @@ private async Task SendSettingsAsync() { try { - _clientControl = _connection!.OpenUnidirectionalStream(); + _clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false); await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false); } catch (Exception ex) diff --git a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs index 0d4e4997cc489..e42d4f40c9dd1 100644 --- a/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs +++ b/src/libraries/System.Net.Quic/ref/System.Net.Quic.cs @@ -22,6 +22,7 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { } public bool Connected { get { throw null; } } public System.Net.IPEndPoint? LocalEndPoint { get { throw null; } } public System.Net.Security.SslApplicationProtocol NegotiatedApplicationProtocol { get { throw null; } } + public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } public System.Net.EndPoint RemoteEndPoint { get { throw null; } } public System.Threading.Tasks.ValueTask AcceptStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask CloseAsync(long errorCode, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } @@ -29,11 +30,8 @@ public QuicConnection(System.Net.Quic.QuicClientConnectionOptions options) { } public void Dispose() { } public int GetRemoteAvailableBidirectionalStreamCount() { throw null; } public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; } - public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; } - public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; } - public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } } - public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Threading.Tasks.ValueTask OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Threading.Tasks.ValueTask OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } } public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException { @@ -85,12 +83,14 @@ public sealed partial class QuicStream : System.IO.Stream internal QuicStream() { } public override bool CanRead { get { throw null; } } public override bool CanSeek { get { throw null; } } - public override bool CanWrite { get { throw null; } } public override bool CanTimeout { get { throw null; } } + public override bool CanWrite { get { throw null; } } public override long Length { get { throw null; } } public override long Position { get { throw null; } set { } } public bool ReadsCompleted { get { throw null; } } + public override int ReadTimeout { get { throw null; } set { } } public long StreamId { get { throw null; } } + public override int WriteTimeout { get { throw null; } set { } } public void AbortRead(long errorCode) { } public void AbortWrite(long errorCode) { } public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; } @@ -102,10 +102,9 @@ public override void Flush() { } public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; } public override int Read(byte[] buffer, int offset, int count) { throw null; } public override int Read(System.Span buffer) { throw null; } - public override int ReadByte() { throw null; } public override System.Threading.Tasks.Task ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; } public override System.Threading.Tasks.ValueTask ReadAsync(System.Memory buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public override int ReadTimeout { get { throw null; } set { } } + public override int ReadByte() { throw null; } public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; } public override void SetLength(long value) { } public void Shutdown() { } @@ -113,7 +112,6 @@ public void Shutdown() { } public System.Threading.Tasks.ValueTask WaitForWriteCompletionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public override void Write(byte[] buffer, int offset, int count) { } public override void Write(System.ReadOnlySpan buffer) { } - public override void WriteByte(byte value) { } public System.Threading.Tasks.ValueTask WriteAsync(System.Buffers.ReadOnlySequence buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WriteAsync(System.Buffers.ReadOnlySequence buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public override System.Threading.Tasks.Task WriteAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; } @@ -121,7 +119,7 @@ public override void WriteByte(byte value) { } public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public override int WriteTimeout { get { throw null; } set { } } + public override void WriteByte(byte value) { } } public partial class QuicStreamAbortedException : System.Net.Quic.QuicException { diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs index 5ccb653f7b993..0a0b6e1edca60 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockConnection.cs @@ -162,7 +162,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d return ValueTask.CompletedTask; } - internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default) + internal async override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken) { PeerStreamLimit? streamLimit = RemoteStreamLimit; if (streamLimit is null) @@ -170,31 +170,9 @@ internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(Cancellat throw new InvalidOperationException("Not connected"); } - return streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken); - } - - internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - PeerStreamLimit? streamLimit = RemoteStreamLimit; - if (streamLimit is null) - { - throw new InvalidOperationException("Not connected"); - } - - return streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken); - } - - internal override QuicStreamProvider OpenUnidirectionalStream() - { - PeerStreamLimit? streamLimit = RemoteStreamLimit; - if (streamLimit is null) - { - throw new InvalidOperationException("Not connected"); - } - - if (!streamLimit.Unidirectional.TryIncrement()) + while (!streamLimit.Unidirectional.TryIncrement()) { - throw new QuicException("No available unidirectional stream"); + await streamLimit.Unidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false); } long streamId; @@ -207,7 +185,7 @@ internal override QuicStreamProvider OpenUnidirectionalStream() return OpenStream(streamId, false); } - internal override QuicStreamProvider OpenBidirectionalStream() + internal async override ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken) { PeerStreamLimit? streamLimit = RemoteStreamLimit; if (streamLimit is null) @@ -215,9 +193,9 @@ internal override QuicStreamProvider OpenBidirectionalStream() throw new InvalidOperationException("Not connected"); } - if (!streamLimit.Bidirectional.TryIncrement()) + while (!streamLimit.Bidirectional.TryIncrement()) { - throw new QuicException("No available bidirectional stream"); + await streamLimit.Bidirectional.WaitForAvailableStreams(cancellationToken).ConfigureAwait(false); } long streamId; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs index 956c425828c46..8745dd5f57b9a 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Interop/MsQuicEnums.cs @@ -213,6 +213,7 @@ internal enum QUIC_STREAM_EVENT_TYPE : uint SEND_SHUTDOWN_COMPLETE = 6, SHUTDOWN_COMPLETE = 7, IDEAL_SEND_BUFFER_SIZE = 8, + PEER_ACCEPTED = 9 } #if SOCKADDR_HAS_LENGTH diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs index 99dc15398e9f0..f5d3aba6ee878 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicConnection.cs @@ -52,12 +52,6 @@ internal sealed class State // TODO: only allocate these when there is an outstanding shutdown. public readonly TaskCompletionSource ShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - // Note that there's no such thing as resetable TCS, so we cannot reuse the same instance after we've set the result. - // We also cannot use solutions like ManualResetValueTaskSourceCore, since we can have multiple waiters on the same TCS. - // As a result, we allocate a new TCS when needed, which is when someone explicitely asks for them in WaitForAvailableStreamsAsync. - public TaskCompletionSource? NewUnidirectionalStreamsAvailable; - public TaskCompletionSource? NewBidirectionalStreamsAvailable; - public bool Connected; public long AbortErrorCode = -1; public int StreamCount; @@ -320,26 +314,6 @@ private static uint HandleEventShutdownComplete(State state, ref ConnectionEvent // Stop accepting new streams. state.AcceptQueue.Writer.TryComplete(); - // Stop notifying about available streams. - TaskCompletionSource? unidirectionalTcs = null; - TaskCompletionSource? bidirectionalTcs = null; - lock (state) - { - unidirectionalTcs = state.NewUnidirectionalStreamsAvailable; - bidirectionalTcs = state.NewBidirectionalStreamsAvailable; - state.NewUnidirectionalStreamsAvailable = null; - state.NewBidirectionalStreamsAvailable = null; - } - - if (unidirectionalTcs is not null) - { - unidirectionalTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException())); - } - if (bidirectionalTcs is not null) - { - bidirectionalTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException())); - } - return MsQuicStatusCodes.Success; } @@ -358,32 +332,6 @@ private static uint HandleEventNewStream(State state, ref ConnectionEvent connec private static uint HandleEventStreamsAvailable(State state, ref ConnectionEvent connectionEvent) { - TaskCompletionSource? unidirectionalTcs = null; - TaskCompletionSource? bidirectionalTcs = null; - lock (state) - { - if (connectionEvent.Data.StreamsAvailable.UniDirectionalCount > 0) - { - unidirectionalTcs = state.NewUnidirectionalStreamsAvailable; - state.NewUnidirectionalStreamsAvailable = null; - } - - if (connectionEvent.Data.StreamsAvailable.BiDirectionalCount > 0) - { - bidirectionalTcs = state.NewBidirectionalStreamsAvailable; - state.NewBidirectionalStreamsAvailable = null; - } - } - - if (unidirectionalTcs is not null) - { - unidirectionalTcs.SetResult(); - } - if (bidirectionalTcs is not null) - { - bidirectionalTcs.SetResult(); - } - return MsQuicStatusCodes.Success; } @@ -517,72 +465,7 @@ internal override async ValueTask AcceptStreamAsync(Cancella return stream; } - internal override ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - TaskCompletionSource? tcs = _state.NewUnidirectionalStreamsAvailable; - if (tcs is null) - { - // We need to avoid calling MsQuic under lock. - // This is not atomic but it won't be anyway as counts can change between when task is completed - // and before somebody may try to allocate new stream. - int count = GetRemoteAvailableUnidirectionalStreamCount(); - lock (_state) - { - if (_state.NewUnidirectionalStreamsAvailable is null) - { - if (_state.ShutdownTcs.Task.IsCompleted) - { - throw new QuicOperationAbortedException(); - } - - if (count > 0) - { - return ValueTask.CompletedTask; - } - - _state.NewUnidirectionalStreamsAvailable = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - - tcs = _state.NewUnidirectionalStreamsAvailable; - } - } - - return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); - } - - internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) - { - TaskCompletionSource? tcs = _state.NewBidirectionalStreamsAvailable; - if (tcs is null) - { - // We need to avoid calling MsQuic under lock. - // This is not atomic but it won't be anyway as counts can change between when task is completed - // and before somebody may try to allocate new stream. - int count = GetRemoteAvailableBidirectionalStreamCount(); - lock (_state) - { - if (_state.NewBidirectionalStreamsAvailable is null) - { - if (_state.ShutdownTcs.Task.IsCompleted) - { - throw new QuicOperationAbortedException(); - } - - if (count > 0) - { - return ValueTask.CompletedTask; - } - - _state.NewBidirectionalStreamsAvailable = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - tcs = _state.NewBidirectionalStreamsAvailable; - } - } - - return new ValueTask(tcs.Task.WaitAsync(cancellationToken)); - } - - internal override QuicStreamProvider OpenUnidirectionalStream() + private async ValueTask OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken) { ThrowIfDisposed(); if (!Connected) @@ -590,20 +473,24 @@ internal override QuicStreamProvider OpenUnidirectionalStream() throw new InvalidOperationException(SR.net_quic_not_connected); } - return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL); - } + var stream = new MsQuicStream(_state, flags); - internal override QuicStreamProvider OpenBidirectionalStream() - { - ThrowIfDisposed(); - if (!Connected) + try { - throw new InvalidOperationException(SR.net_quic_not_connected); + await stream.StartAsync(cancellationToken).ConfigureAwait(false); + } + catch + { + stream.Dispose(); + throw; } - return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE); + return stream; } + internal override ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken); + internal override ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.NONE, cancellationToken); + internal override int GetRemoteAvailableUnidirectionalStreamCount() { Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index c9c0e23bef959..5aa6313789109 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -40,8 +40,6 @@ private sealed class State public MsQuicConnection.State ConnectionState = null!; // set in ctor. public string TraceId = null!; // set in ctor. - public uint StartStatus = MsQuicStatusCodes.Success; - public ReadState ReadState; // set when ReadState.Aborted: @@ -76,6 +74,9 @@ private sealed class State // Set once writes have been shutdown. public readonly TaskCompletionSource ShutdownWriteCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // Set once stream has been started and within peer's advertised stream limits + public readonly TaskCompletionSource StartCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + public ShutdownState ShutdownState; // The value makes sure that we release the handles only once. @@ -113,6 +114,9 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa // but after TryAddStream to prevent unnecessary RemoveStream in finalizer _state.ConnectionState = connectionState; + // Inbound streams are already started + _state.StartCompletionSource.SetResult(); + _state.Handle = streamHandle; _canRead = true; _canWrite = !flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL); @@ -133,6 +137,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa catch { _state.StateGCHandle.Free(); + // don't free the streamHandle, it will be freed by the caller throw; } @@ -185,10 +190,6 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F } QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer."); - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL); - QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream."); } catch { @@ -943,6 +944,9 @@ private static unsafe uint NativeCallbackHandler( // Shutdown for both sending and receiving is completed. case QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE: return HandleEventShutdownComplete(state, ref *streamEvent); + // Asynchronous open finished, the stream is now within advertised stream limits. + case QUIC_STREAM_EVENT_TYPE.PEER_ACCEPTED: + return HandleEventPeerAccepted(state); default: return MsQuicStatusCodes.Success; } @@ -1108,8 +1112,41 @@ private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt) private static uint HandleEventStartComplete(State state, ref StreamEvent evt) { - // Store the start status code and check it when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL in StreamStart. - state.StartStatus = evt.Data.StartComplete.Status; + uint status = evt.Data.StartComplete.Status; + + // The way we expose Open(Uni|Bi)directionalStreamAsync operations is that the stream + // is also accepted by the peer (i.e. it is within advertised stream limits). However, + // We may receive START_COMPLETE notification before the stream is accepted, so we defer + // completing the StartcompletionSource until we get PeerAccepted notification. + + if (status != MsQuicStatusCodes.Success) + { + // Start irrecoverably failed. The possible status codes are: + // - Aborted - connection aborted by peer + // - InvalidState - stream already started before, or connection aborted locally + // - StreamLimitReached - only if QUIC_STREAM_START_FLAG_FAIL_BLOCKED was specified (not in our case). + // + if (status == MsQuicStatusCodes.Aborted) + { + state.StartCompletionSource.TrySetException( + ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); + } + else + { + // TODO: Should we throw QuicOperationAbortedException when status is InvalidState? + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + state.StartCompletionSource.TrySetException( + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"StreamStart finished with status {MsQuicStatusCodes.GetError(status)}"))); + } + } + else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0) + { + // Start succeeded and we were within stream limits, stream already usable. + state.StartCompletionSource.TrySetResult(); + } + // if PeerAccepted == 0, we will later receive PEER_ACCEPTED event, which will + // complete the StartCompletionSource + return MsQuicStatusCodes.Success; } @@ -1183,27 +1220,27 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt if (shouldReadComplete) { - if (state.StartStatus == MsQuicStatusCodes.Success) + if (state.StartCompletionSource.Task.IsCompletedSuccessfully) { state.ReceiveResettableCompletionSource.Complete(0); } else { state.ReceiveResettableCompletionSource.CompleteException( - ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}"))); + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed"))); } } if (shouldShutdownWriteComplete) { - if (state.StartStatus == MsQuicStatusCodes.Success) + if (state.StartCompletionSource.Task.IsCompletedSuccessfully) { state.ShutdownWriteCompletionSource.SetResult(); } else { state.ShutdownWriteCompletionSource.SetException( - ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}"))); + ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException($"Stream start failed"))); } } @@ -1212,6 +1249,10 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt state.ShutdownCompletionSource.SetResult(); } + // If we are receiving stream shutdown notification, the start comletion source must have been already completed + // eihter by StreamOpen or PeerAccepted event, Connection closing, or it was cancelled by user. + Debug.Assert(state.StartCompletionSource.Task.IsCompleted); + // Dispose was called before complete event. bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, State.ShutdownDone_NotificationReceived) == State.ShutdownDone_Disposed; if (releaseHandles) @@ -1222,6 +1263,12 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt return MsQuicStatusCodes.Success; } + private static uint HandleEventPeerAccepted(State state) + { + state.StartCompletionSource.TrySetResult(); + return MsQuicStatusCodes.Success; + } + private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt) { bool shouldComplete = false; @@ -1586,8 +1633,14 @@ private static uint HandleEventConnectionClose(State state) ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); } + if (!state.StartCompletionSource.Task.IsCompleted) + { + state.StartCompletionSource.TrySetException( + ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state))); + } + // Dispose was called before complete event. - bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, 2) == 1; + bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, State.ShutdownDone_NotificationReceived) == State.ShutdownDone_Disposed; if (releaseHandles) { state.Cleanup(); @@ -1619,6 +1672,26 @@ private static bool CleanupReadStateAndCheckPending(State state, ReadState final return shouldComplete; } + internal async ValueTask StartAsync(CancellationToken cancellationToken) + { + Debug.Assert(!Monitor.IsEntered(_state)); + + using var registration = cancellationToken.UnsafeRegister((state, token) => + { + ((State)state!).StartCompletionSource.TrySetCanceled(token); + }, _state); + + uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); + + if (!MsQuicStatusHelper.SuccessfulStatusCode(status)) + { + _state.StartCompletionSource.TrySetException(QuicExceptionHelpers.CreateExceptionForHResult(status, "Could not start stream.")); + throw QuicExceptionHelpers.CreateExceptionForHResult(status, "Could not start stream."); + } + + await _state.StartCompletionSource.Task.ConfigureAwait(false); + } + // Read state transitions: // // None --(data arrives in event RECV)-> IndividualReadComplete diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs index 38c0e27a2ab41..182c68abe2b8b 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicConnectionProvider.cs @@ -16,13 +16,9 @@ internal abstract class QuicConnectionProvider : IDisposable internal abstract ValueTask ConnectAsync(CancellationToken cancellationToken = default); - internal abstract ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default); + internal abstract ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default); - internal abstract ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default); - - internal abstract QuicStreamProvider OpenUnidirectionalStream(); - - internal abstract QuicStreamProvider OpenBidirectionalStream(); + internal abstract ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default); internal abstract int GetRemoteAvailableUnidirectionalStreamCount(); @@ -32,7 +28,7 @@ internal abstract class QuicConnectionProvider : IDisposable internal abstract System.Net.Security.SslApplicationProtocol NegotiatedApplicationProtocol { get; } - internal abstract System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get ; } + internal abstract System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get; } internal abstract ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index f94ee81bfacd4..74fa392c340b2 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -70,29 +70,18 @@ internal QuicConnection(QuicConnectionProvider provider) /// public ValueTask ConnectAsync(CancellationToken cancellationToken = default) => _provider.ConnectAsync(cancellationToken); - /// - /// Waits for available unidirectional stream capacity to be announced by the peer. If any capacity is available, returns immediately. - /// - /// - public ValueTask WaitForAvailableUnidirectionalStreamsAsync(CancellationToken cancellationToken = default) => _provider.WaitForAvailableUnidirectionalStreamsAsync(cancellationToken); - - /// - /// Waits for available bidirectional stream capacity to be announced by the peer. If any capacity is available, returns immediately. - /// - /// - public ValueTask WaitForAvailableBidirectionalStreamsAsync(CancellationToken cancellationToken = default) => _provider.WaitForAvailableBidirectionalStreamsAsync(cancellationToken); - /// /// Create an outbound unidirectional stream. /// /// - public QuicStream OpenUnidirectionalStream() => new QuicStream(_provider.OpenUnidirectionalStream()); + public async ValueTask OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => new QuicStream(await _provider.OpenUnidirectionalStreamAsync(cancellationToken).ConfigureAwait(false)); /// /// Create an outbound bidirectional stream. /// /// - public QuicStream OpenBidirectionalStream() => new QuicStream(_provider.OpenBidirectionalStream()); + public async ValueTask OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => new QuicStream(await _provider.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false)); + /// /// Accept an incoming stream. diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs index 9cd0dad78c32c..0baa8dc3c9e70 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs @@ -388,58 +388,137 @@ public async Task ConnectWithClientCertificate(bool sendCerttificate) serverConnection.Dispose(); } - [Fact] - [ActiveIssue("https://github.com/dotnet/runtime/issues/67302")] - public async Task WaitForAvailableUnidirectionStreamsAsyncWorks() + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task OpenStreamAsync_BlocksUntilAvailable(bool unidirectional) { + ValueTask OpenStreamAsync(QuicConnection connection) => unidirectional + ? connection.OpenUnidirectionalStreamAsync() + : connection.OpenBidirectionalStreamAsync(); + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); listenerOptions.MaxUnidirectionalStreams = 1; + listenerOptions.MaxBidirectionalStreams = 1; (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); - // No stream opened yet, should return immediately. - Assert.True(clientConnection.WaitForAvailableUnidirectionalStreamsAsync().IsCompletedSuccessfully); - - // Open one stream, should wait till it closes. - QuicStream stream = clientConnection.OpenUnidirectionalStream(); - ValueTask waitTask = clientConnection.WaitForAvailableUnidirectionalStreamsAsync(); + // Open one stream, second call should block + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = OpenStreamAsync(clientConnection); Assert.False(waitTask.IsCompleted); - Assert.Throws(() => clientConnection.OpenUnidirectionalStream()); + // Close the streams, the waitTask should finish as a result. stream.Dispose(); QuicStream newStream = await serverConnection.AcceptStreamAsync(); newStream.Dispose(); - await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + newStream = await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + newStream.Dispose(); + clientConnection.Dispose(); serverConnection.Dispose(); } - [Fact] - [ActiveIssue("https://github.com/dotnet/runtime/issues/67302")] - public async Task WaitForAvailableBidirectionStreamsAsyncWorks() + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task OpenStreamAsync_Canceled_Throws_OperationCanceledException(bool unidirectional) { + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); + listenerOptions.MaxUnidirectionalStreams = 1; listenerOptions.MaxBidirectionalStreams = 1; (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); - // No stream opened yet, should return immediately. - Assert.True(clientConnection.WaitForAvailableBidirectionalStreamsAsync().IsCompletedSuccessfully); + CancellationTokenSource cts = new CancellationTokenSource(); - // Open one stream, should wait till it closes. - QuicStream stream = clientConnection.OpenBidirectionalStream(); - ValueTask waitTask = clientConnection.WaitForAvailableBidirectionalStreamsAsync(); + // Open one stream, second call should block + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = OpenStreamAsync(clientConnection, cts.Token); Assert.False(waitTask.IsCompleted); - Assert.Throws(() => clientConnection.OpenBidirectionalStream()); + + cts.Cancel(); + + // awaiting the task should throw + var ex = await Assert.ThrowsAnyAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + Assert.Equal(cts.Token, ex.CancellationToken); // Close the streams, the waitTask should finish as a result. stream.Dispose(); QuicStream newStream = await serverConnection.AcceptStreamAsync(); newStream.Dispose(); - await waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + + // next call should work as intended + newStream = await OpenStreamAsync(clientConnection).AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + newStream.Dispose(); + + clientConnection.Dispose(); + serverConnection.Dispose(); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task OpenStreamAsync_PreCanceled_Throws_OperationCanceledException(bool unidirectional) + { + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); + + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, CreateQuicListenerOptions()); + + CancellationTokenSource cts = new CancellationTokenSource(); + cts.Cancel(); + + var ex = await Assert.ThrowsAnyAsync(() => OpenStreamAsync(clientConnection, cts.Token).AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + Assert.Equal(cts.Token, ex.CancellationToken); + clientConnection.Dispose(); serverConnection.Dispose(); } + [Theory] + [InlineData(false, false)] + [InlineData(true, true)] // the code path for uni/bidirectional streams differs only in a flag passed to MsQuic, so there is no need to test all possible combinations. + public async Task OpenStreamAsync_ConnectionAbort_Throws(bool unidirectional, bool localAbort) + { + ValueTask OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional + ? connection.OpenUnidirectionalStreamAsync(token) + : connection.OpenBidirectionalStreamAsync(token); + + QuicListenerOptions listenerOptions = CreateQuicListenerOptions(); + listenerOptions.MaxUnidirectionalStreams = 1; + listenerOptions.MaxBidirectionalStreams = 1; + (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, listenerOptions); + + // Open one stream, second call should block + QuicStream stream = await OpenStreamAsync(clientConnection); + ValueTask waitTask = OpenStreamAsync(clientConnection); + Assert.False(waitTask.IsCompleted); + + if (localAbort) + { + await clientConnection.CloseAsync(0); + // TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads + // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) + // To be revisited once we standartize on exceptions. + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + await Assert.ThrowsAnyAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + } + else + { + await serverConnection.CloseAsync(0); + await Assert.ThrowsAsync(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3))); + } + + clientConnection.Dispose(); + serverConnection.Dispose(); + } + + [Fact] [OuterLoop("May take several seconds")] public async Task SetListenerTimeoutWorksWithSmallTimeout() @@ -462,7 +541,7 @@ public async Task WriteTests(int[][] writes, WriteType writeType) await RunClientServer( async clientConnection => { - await using QuicStream stream = clientConnection.OpenUnidirectionalStream(); + await using QuicStream stream = await clientConnection.OpenUnidirectionalStreamAsync(); foreach (int[] bufferLengths in writes) { @@ -555,7 +634,7 @@ public async Task CallDifferentWriteMethodsWorks() ReadOnlySequence ros = CreateReadOnlySequenceFromBytes(helloWorld.ToArray()); Assert.False(ros.IsSingleSegment); - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); ValueTask writeTask = clientStream.WriteAsync(ros); using QuicStream serverStream = await serverConnection.AcceptStreamAsync(); @@ -701,7 +780,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); for (int pos = 0; pos < data.Length; pos += writeSize) { @@ -729,7 +808,7 @@ async Task GetStreamIdWithoutStartWorks() { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Assert.Equal(0, clientStream.StreamId); // TODO: stream that is opened by client but left unaccepted by server may cause AccessViolationException in its Finalizer @@ -749,7 +828,7 @@ async Task GetStreamIdWithoutStartWorks() { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Assert.Equal(0, clientStream.StreamId); // Dispose all connections before the streams; @@ -771,7 +850,7 @@ await Task.Run(async () => { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - await using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + await using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await clientStream.WriteAsync(new byte[1]); await using QuicStream serverStream = await serverConnection.AcceptStreamAsync(); @@ -792,7 +871,7 @@ await Task.Run(async () => { (QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(); - await using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + await using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await clientStream.WriteAsync(new byte[1]); await using QuicStream serverStream = await serverConnection.AcceptStreamAsync(); @@ -817,7 +896,7 @@ public async Task BigWrite_SmallRead_Success(bool closeWithData) { byte[] buffer = new byte[1] { 42 }; - QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Task t = serverConnection.AcceptStreamAsync().AsTask(); await TaskTimeoutExtensions.WhenAllOrAnyFailed(clientStream.WriteAsync(buffer).AsTask(), t, PassingTestTimeoutMilliseconds); QuicStream serverStream = t.Result; @@ -881,7 +960,7 @@ await RunClientServer( }, clientFunction: async connection => { - using QuicStream stream = connection.OpenBidirectionalStream(); + using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); Assert.False(stream.ReadsCompleted); await stream.WriteAsync(s_data, endStream: true); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index d4d1a89f11fa4..644245d340d97 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -42,7 +42,7 @@ public async Task TestConnect() private static async Task OpenAndUseStreamAsync(QuicConnection c) { - QuicStream s = c.OpenBidirectionalStream(); + QuicStream s = await c.OpenBidirectionalStreamAsync(); // This will pend await s.ReadAsync(new byte[1]); @@ -74,7 +74,11 @@ await RunClientServer( // Pending ops should fail await Assert.ThrowsAsync(() => acceptTask); - await Assert.ThrowsAsync(() => connectTask); + // TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads + // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) + // To be revisited once we standartize on exceptions. + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + await Assert.ThrowsAnyAsync(() => connectTask); // Subsequent attempts should fail // TODO: Which exception is correct? @@ -115,7 +119,12 @@ await RunClientServer( // Pending ops should fail await Assert.ThrowsAsync(() => acceptTask); - await Assert.ThrowsAsync(() => connectTask); + + // TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads + // (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue) + // To be revisited once we standartize on exceptions. + // [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")] + await Assert.ThrowsAnyAsync(() => connectTask); // Subsequent attempts should fail // TODO: Should these be QuicOperationAbortedException, to match above? Or vice-versa? @@ -199,7 +208,7 @@ public async Task CloseAsync_WithOpenStream_LocalAndPeerStreamsFailWithQuicOpera await RunClientServer( async clientConnection => { - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await DoWrites(clientStream, writesBeforeClose); // Wait for peer to receive data @@ -246,7 +255,7 @@ public async Task Dispose_WithOpenLocalStream_LocalStreamFailsWithQuicOperationA await RunClientServer( async clientConnection => { - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); await DoWrites(clientStream, writesBeforeClose); // Wait for peer to receive data diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs index 5df77a4ced54a..5b58ee229496e 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs @@ -95,7 +95,7 @@ await WhenAllOrAnyFailed( listener.ListenEndPoint, GetSslClientAuthenticationOptions()); await connection2.ConnectAsync(); - stream2 = connection2.OpenBidirectionalStream(); + stream2 = await connection2.OpenBidirectionalStreamAsync(); // OpenBidirectionalStream only allocates ID. We will force stream opening // by Writing there and receiving data on the other side. await stream2.WriteAsync(buffer); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs index d20027f2d09ba..73a307f92ed51 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamTests.cs @@ -38,7 +38,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); await stream.WriteAsync(s_data, endStream: true); @@ -89,7 +89,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); for (int i = 0; i < sendCount; i++) { @@ -135,8 +135,8 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); - await using QuicStream stream2 = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); + await using QuicStream stream2 = await connection.OpenBidirectionalStreamAsync(); await stream.WriteAsync(s_data, endStream: true); await stream2.WriteAsync(s_data, endStream: true); @@ -178,7 +178,7 @@ public async Task MultipleConcurrentStreamsOnSingleConnection() static async Task MakeStreams(QuicConnection clientConnection, QuicConnection serverConnection) { byte[] buffer = new byte[64]; - QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); ValueTask writeTask = clientStream.WriteAsync(Encoding.UTF8.GetBytes("PING"), endStream: true); ValueTask acceptTask = serverConnection.AcceptStreamAsync(); await new Task[] { writeTask.AsTask(), acceptTask.AsTask() }.WhenAllOrAnyFailed(PassingTestTimeoutMilliseconds); @@ -195,7 +195,7 @@ public async Task GetStreamIdWithoutStartWorks() using (clientConnection) using (serverConnection) { - using QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + using QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Assert.Equal(0, clientStream.StreamId); // TODO: stream that is opened by client but left unaccepted by server may cause AccessViolationException in its Finalizer @@ -233,7 +233,7 @@ await RunClientServer( }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); for (int pos = 0; pos < data.Length; pos += writeSize) { @@ -275,7 +275,7 @@ public async Task TestStreams() private static async Task CreateAndTestBidirectionalStream(QuicConnection c1, QuicConnection c2) { - using QuicStream s1 = c1.OpenBidirectionalStream(); + using QuicStream s1 = await c1.OpenBidirectionalStreamAsync(); Assert.True(s1.CanRead); Assert.True(s1.CanWrite); @@ -289,7 +289,7 @@ private static async Task CreateAndTestBidirectionalStream(QuicConnection c1, Qu private static async Task CreateAndTestUnidirectionalStream(QuicConnection c1, QuicConnection c2) { - using QuicStream s1 = c1.OpenUnidirectionalStream(); + using QuicStream s1 = await c1.OpenUnidirectionalStreamAsync(); Assert.False(s1.CanRead); Assert.True(s1.CanWrite); @@ -383,7 +383,7 @@ public async Task ReadWrite_Random_Success(int readSize, int writeSize) await RunClientServer( async clientConnection => { - await using QuicStream clientStream = clientConnection.OpenUnidirectionalStream(); + await using QuicStream clientStream = await clientConnection.OpenUnidirectionalStreamAsync(); ReadOnlyMemory sendBuffer = testBuffer; while (sendBuffer.Length != 0) @@ -511,7 +511,7 @@ public async Task ReadOutstanding_ReadAborted_Throws() byte[] buffer = new byte[1] { 42 }; const int ExpectedErrorCode = 0xfffffff; - QuicStream clientStream = clientConnection.OpenBidirectionalStream(); + QuicStream clientStream = await clientConnection.OpenBidirectionalStreamAsync(); Task t = serverConnection.AcceptStreamAsync().AsTask(); await TaskTimeoutExtensions.WhenAllOrAnyFailed(clientStream.WriteAsync(buffer).AsTask(), t, PassingTestTimeoutMilliseconds); QuicStream serverStream = t.Result; @@ -563,7 +563,7 @@ public async Task WriteAbortedWithoutWriting_ReadThrows() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenUnidirectionalStream(); + await using QuicStream stream = await connection.OpenUnidirectionalStreamAsync(); stream.AbortWrite(expectedErrorCode); }, serverFunction: async connection => @@ -589,7 +589,7 @@ public async Task ReadAbortedWithoutReading_WriteThrows() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); stream.AbortRead(expectedErrorCode); }, serverFunction: async connection => @@ -613,7 +613,7 @@ public async Task WritePreCanceled_Throws() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenUnidirectionalStream(); + await using QuicStream stream = await connection.OpenUnidirectionalStreamAsync(); CancellationTokenSource cts = new CancellationTokenSource(); cts.Cancel(); @@ -655,7 +655,7 @@ public async Task WriteCanceled_NextWriteThrows() await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = connection.OpenUnidirectionalStream(); + await using QuicStream stream = await connection.OpenUnidirectionalStreamAsync(); CancellationTokenSource cts = new CancellationTokenSource(500); @@ -712,7 +712,7 @@ public async Task AbortAfterDispose_ProperlyOpenedStream_Success() await RunClientServer( clientFunction: async connection => { - QuicStream stream = connection.OpenBidirectionalStream(); + QuicStream stream = await connection.OpenBidirectionalStreamAsync(); // Force stream to open on the wire await stream.WriteAsync(buffer); await sem.WaitAsync(); @@ -739,9 +739,9 @@ await RunClientServer( public async Task AbortAfterDispose_StreamCreationFlushedByDispose_Success() { await RunClientServer( - clientFunction: connection => + clientFunction: async connection => { - QuicStream stream = connection.OpenBidirectionalStream(); + QuicStream stream = await connection.OpenBidirectionalStreamAsync(); // dispose will flush stream creation on the wire stream.Dispose(); @@ -749,8 +749,6 @@ await RunClientServer( // should not throw ODE on aborting stream.AbortRead(1234); stream.AbortWrite(5675); - - return Task.CompletedTask; }, serverFunction: async connection => { @@ -1005,7 +1003,7 @@ async ValueTask ReleaseOnWriteCompletionAsync() }, clientFunction: async connection => { - await using QuicStream stream = connection.OpenBidirectionalStream(); + await using QuicStream stream = await connection.OpenBidirectionalStreamAsync(); await stream.WriteAsync(new byte[1], endStream: true); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs index c0b8b4a9caf36..8566d0cd40f52 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicTestBase.cs @@ -187,7 +187,7 @@ internal QuicListener CreateQuicListener(IPEndPoint endpoint) internal async Task PingPong(QuicConnection client, QuicConnection server) { - using QuicStream clientStream = client.OpenBidirectionalStream(); + using QuicStream clientStream = await client.OpenBidirectionalStreamAsync(); ValueTask t = clientStream.WriteAsync(s_ping); using QuicStream serverStream = await server.AcceptStreamAsync(); @@ -259,7 +259,7 @@ internal async Task RunStreamClientServer(Func clientFunction, await RunClientServer( clientFunction: async connection => { - await using QuicStream stream = bidi ? connection.OpenBidirectionalStream() : connection.OpenUnidirectionalStream(); + await using QuicStream stream = bidi ? await connection.OpenBidirectionalStreamAsync() : await connection.OpenUnidirectionalStreamAsync(); // Open(Bi|Uni)directionalStream only allocates ID. We will force stream opening // by Writing there and receiving data on the other side. await stream.WriteAsync(buffer);