diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 65d3cd0b4d6b45..df6f484ea58cdb 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -175,9 +175,11 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt /// private IPEndPoint _localEndPoint = null!; /// + /// Represents how many bidirectional streams can be accepted by the peer. Is only manipulated from MsQuic thread. /// private int _availableBidirectionalStreamsCount; /// + /// Represents how many unidirectional streams can be accepted by the peer. Is only manipulated from MsQuic thread. /// private int _availableUnidirectionalStreamsCount; /// @@ -434,6 +436,25 @@ internal ValueTask FinishHandshakeAsync(QuicServerConnectionOptions options, str return valueTask; } + /// + /// In order to provide meaningful increments in , available streams count can be only manipulated from MsQuic thread. + /// For that purpose we pass this function to so that it can call it from START_COMPLETE event handler. + /// + /// Note that MsQuic itself manipulates stream counts right before indicating START_COMPLETE event. + /// + /// Type of the stream to decrement appropriate field. + private void DecrementAvailableStreamCount(QuicStreamType streamType) + { + if (streamType == QuicStreamType.Unidirectional) + { + --_availableUnidirectionalStreamsCount; + } + else + { + --_availableBidirectionalStreamsCount; + } + } + /// /// Create an outbound uni/bidirectional . /// In case the connection doesn't have any available stream capacity, i.e.: the peer limits the concurrent stream count, @@ -456,15 +477,7 @@ public async ValueTask OpenOutboundStreamAsync(QuicStreamType type, NetEventSource.Info(this, $"{this} New outbound {type} stream {stream}."); } - await stream.StartAsync(cancellationToken).ConfigureAwait(false); - if (type == QuicStreamType.Unidirectional) - { - Interlocked.Decrement(ref _availableUnidirectionalStreamsCount); - } - else - { - Interlocked.Decrement(ref _availableBidirectionalStreamsCount); - } + await stream.StartAsync(DecrementAvailableStreamCount, cancellationToken).ConfigureAwait(false); } catch { @@ -637,8 +650,8 @@ private unsafe int HandleEventStreamsAvailable(ref STREAMS_AVAILABLE_DATA data) { int bidirectionalStreamsCountIncrement = data.BidirectionalCount - _availableBidirectionalStreamsCount; int unidirectionalStreamsCountIncrement = data.UnidirectionalCount - _availableUnidirectionalStreamsCount; - Volatile.Write(ref _availableBidirectionalStreamsCount, data.BidirectionalCount); - Volatile.Write(ref _availableUnidirectionalStreamsCount, data.UnidirectionalCount); + _availableBidirectionalStreamsCount = data.BidirectionalCount; + _availableUnidirectionalStreamsCount = data.UnidirectionalCount; OnStreamsAvailable(bidirectionalStreamsCountIncrement, unidirectionalStreamsCountIncrement); return QUIC_STATUS_SUCCESS; } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 6af0f5c5c099f3..f6311f5e5c65ff 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -120,6 +120,12 @@ public sealed partial class QuicStream private long _id = -1; private readonly QuicStreamType _type; + /// + /// Provided via from so that can decrement its available stream count field. + /// When START_COMPLETE arrives it gets invoked and unset back to null to not to hold any unintended reference to . + /// + private Action? _decrementAvailableStreamCount; + /// /// Stream id, see . /// @@ -234,9 +240,10 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE /// If no more concurrent streams can be opened at the moment, the operation will wait until it can, /// either by closing some existing streams or receiving more available stream ids from the peer. /// + /// /// A cancellation token that can be used to cancel the asynchronous operation. /// An asynchronous task that completes with the opened . - internal ValueTask StartAsync(CancellationToken cancellationToken = default) + internal ValueTask StartAsync(Action decrementAvailableStreamCount, CancellationToken cancellationToken = default) { _startedTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken); { @@ -252,6 +259,7 @@ internal ValueTask StartAsync(CancellationToken cancellationToken = default) } } + _decrementAvailableStreamCount = decrementAvailableStreamCount; return valueTask; } @@ -518,9 +526,13 @@ public void CompleteWrites() private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data) { + Debug.Assert(_decrementAvailableStreamCount is not null); + _id = unchecked((long)data.ID); if (StatusSucceeded(data.Status)) { + _decrementAvailableStreamCount(Type); + if (data.PeerAccepted != 0) { _startedTcs.TrySetResult(); @@ -535,6 +547,7 @@ private unsafe int HandleEventStartComplete(ref START_COMPLETE_DATA data) } } + _decrementAvailableStreamCount = null; return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventReceive(ref RECEIVE_DATA data)