From 07c5ac8fd8cc553d7e37c0940c73e746b8890d04 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Tue, 9 Jan 2024 14:19:11 +0100 Subject: [PATCH 1/8] Cancelletion is temporary --- .../Internal/ResettableValueTaskSource.cs | 13 +--- .../Net/Quic/Internal/ValueTaskSource.cs | 73 +++++++++++++++---- .../src/System/Net/Quic/QuicConnection.cs | 10 +++ .../src/System/Net/Quic/QuicListener.cs | 10 +-- .../src/System/Net/Quic/QuicStream.cs | 1 + .../FunctionalTests/QuicConnectionTests.cs | 21 ++++++ 6 files changed, 99 insertions(+), 29 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs index c3135042b032b..5548fee130bc8 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ResettableValueTaskSource.cs @@ -105,9 +105,7 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C _state = State.Awaiting; } // None, Ready, Completed: return the current task. - if (state == State.None || - state == State.Ready || - state == State.Completed) + if (state is State.None or State.Ready or State.Completed) { // Remember that the value task with the current version is being given out. _hasWaiter = true; @@ -167,8 +165,7 @@ private bool TryComplete(Exception? exception, bool final) // If the _valueTaskSource has already been set, we don't want to lose the result by overwriting it. // So keep it as is and store the result in _finalTaskSource. - if (state == State.None || - state == State.Awaiting) + if (state is State.None or State.Awaiting) { _state = final ? State.Completed : State.Ready; } @@ -178,16 +175,14 @@ private bool TryComplete(Exception? exception, bool final) { // Set up the exception stack trace for the caller. exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; - if (state == State.None || - state == State.Awaiting) + if (state is State.None or State.Awaiting) { _valueTaskSource.SetException(exception); } } else { - if (state == State.None || - state == State.Awaiting) + if (state is State.None or State.Awaiting) { _valueTaskSource.SetResult(final); } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs index 2acd2138a1237..0005629c7ff5f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs @@ -25,6 +25,7 @@ private enum State : byte private State _state; private ManualResetValueTaskSourceCore _valueTaskSource; private CancellationTokenRegistration _cancellationRegistration; + private Exception? _exception; private GCHandle _keepAlive; public ValueTaskSource() @@ -32,6 +33,7 @@ public ValueTaskSource() _state = State.None; _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; + _exception = default; _keepAlive = default; } @@ -75,7 +77,7 @@ public bool TryInitialize(out ValueTask valueTask, object? keepAlive = null, Can State state = _state; // If we're the first here, we will return true. - if (state == State.None) + if (state == State.None && _valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) { // Keep alive the caller object until the result is read from the task. // Used for keeping caller alive during async interop calls. @@ -104,30 +106,42 @@ private bool TryComplete(Exception? exception) { State state = _state; - if (state != State.Completed) + // Completed: nothing to do. + if (state == State.Completed) { - _state = State.Completed; + return false; + } + + // With cancellation, keep the state as-is so it can be restored after the OCE is consumed. + _state = exception is OperationCanceledException ? state : State.Completed; - // Swap the cancellation registration so the one that's been registered gets eventually Disposed. - // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. - cancellationRegistration = _cancellationRegistration; - _cancellationRegistration = default; + // Swap the cancellation registration so the one that's been registered gets eventually Disposed. + // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. + cancellationRegistration = _cancellationRegistration; + _cancellationRegistration = default; - if (exception is not null) + if (exception is not null) + { + // Set up the exception stack trace for the caller. + exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; + if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) { - // Set up the exception stack trace for the caller. - exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; _valueTaskSource.SetException(exception); } - else + else if (exception is not OperationCanceledException) + { + _exception = exception; + } + } + else + { + if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) { _valueTaskSource.SetResult(true); } - - return true; } - return false; + return true; } finally { @@ -173,5 +187,34 @@ void IValueTaskSource.OnCompleted(Action continuation, object? state, s => _valueTaskSource.OnCompleted(continuation, state, token, flags); void IValueTaskSource.GetResult(short token) - => _valueTaskSource.GetResult(token); + { + try + { + _valueTaskSource.GetResult(token); + } + finally + { + lock (this) + { + State state = _state; + + // In case of a cancellation, reset the task and set the stored results if necessary. + if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Canceled) + { + _valueTaskSource.Reset(); + if (state == State.Completed) + { + if (_exception is not null) + { + _valueTaskSource.SetException(_exception); + } + else + { + _valueTaskSource.SetResult(true); + } + } + } + } + } + } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 049ab54063800..8b8ea7617e5f7 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -636,6 +636,16 @@ public async ValueTask DisposeAsync() (ulong)_defaultCloseErrorCode); } } + else if (!valueTask.IsCompletedSuccessfully) + { + unsafe + { + MsQuicApi.Api.ConnectionShutdown( + _handle, + QUIC_CONNECTION_SHUTDOWN_FLAGS.SILENT, + (ulong)_defaultCloseErrorCode); + } + } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. await valueTask.ConfigureAwait(false); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs index e39d718718d11..6f0a0d8bb5b75 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicListener.cs @@ -220,7 +220,7 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient { using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, connection.ConnectionShutdownToken); cancellationToken = linkedCts.Token; - // initial timeout for retrieving connection options + // Initial timeout for retrieving connection options. linkedCts.CancelAfter(handshakeTimeout); wrapException = true; @@ -229,7 +229,7 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient options.Validate(nameof(options)); - // update handshake timetout based on the returned value + // Update handshake timeout based on the returned value. handshakeTimeout = options.HandshakeTimeout; linkedCts.CancelAfter(handshakeTimeout); @@ -248,12 +248,12 @@ private async void StartConnectionHandshake(QuicConnection connection, SslClient NetEventSource.Info(connection, $"{connection} Connection closed by remote peer"); } - // retrieve the exception which failed the handshake, the parameters are not going to be - // validated because the inner _connectedTcs is already transitioned to faulted state + // Retrieve the exception which failed the handshake, the parameters are not going to be + // validated because the inner _connectedTcs is already transitioned to faulted state. ValueTask task = connection.FinishHandshakeAsync(null!, null!, default); Debug.Assert(task.IsFaulted); - // unwrap AggregateException and propagate it to the accept queue + // Unwrap AggregateException and propagate it to the accept queue. Exception ex = task.AsTask().Exception!.InnerException!; await connection.DisposeAsync().ConfigureAwait(false); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 6dcd92395b50c..2e8f6a50e7e64 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -613,6 +613,7 @@ private unsafe int HandleEventShutdownComplete(ref SHUTDOWN_COMPLETE_DATA data) _receiveTcs.TrySetException(exception, final: true); _sendTcs.TrySetException(exception, final: true); } + _startedTcs.TrySetResult(); _shutdownTcs.TrySetResult(); return QUIC_STATUS_SUCCESS; } diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index 9e58cab98c095..b423b096407cb 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -124,6 +124,27 @@ await RunClientServer( }); } + [Fact] + public async Task DisposeAfterCloseCanceled() + { + using var sync = new SemaphoreSlim(0); + + await RunClientServer( + async clientConnection => + { + var cts = new CancellationTokenSource(); + cts.Cancel(); + await Assert.ThrowsAsync(async () => await clientConnection.CloseAsync(ExpectedErrorCode, cts.Token)); + await clientConnection.DisposeAsync(); + sync.Release(); + }, + async serverConnection => + { + await sync.WaitAsync(); + await serverConnection.DisposeAsync(); + }); + } + [Fact] public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException() { From a9bf49670864389d5bffacbce2335807f27486da Mon Sep 17 00:00:00 2001 From: ManickaP Date: Thu, 18 Jan 2024 11:06:20 +0100 Subject: [PATCH 2/8] Use ordinary TCS and revert changes in ValueTaskSource --- .../Net/Quic/Internal/ValueTaskSource.cs | 73 ++++--------------- .../src/System/Net/Quic/QuicConnection.cs | 13 ++-- .../FunctionalTests/QuicConnectionTests.cs | 22 +++++- 3 files changed, 43 insertions(+), 65 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs index 0005629c7ff5f..2acd2138a1237 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs @@ -25,7 +25,6 @@ private enum State : byte private State _state; private ManualResetValueTaskSourceCore _valueTaskSource; private CancellationTokenRegistration _cancellationRegistration; - private Exception? _exception; private GCHandle _keepAlive; public ValueTaskSource() @@ -33,7 +32,6 @@ public ValueTaskSource() _state = State.None; _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; - _exception = default; _keepAlive = default; } @@ -77,7 +75,7 @@ public bool TryInitialize(out ValueTask valueTask, object? keepAlive = null, Can State state = _state; // If we're the first here, we will return true. - if (state == State.None && _valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) + if (state == State.None) { // Keep alive the caller object until the result is read from the task. // Used for keeping caller alive during async interop calls. @@ -106,42 +104,30 @@ private bool TryComplete(Exception? exception) { State state = _state; - // Completed: nothing to do. - if (state == State.Completed) + if (state != State.Completed) { - return false; - } - - // With cancellation, keep the state as-is so it can be restored after the OCE is consumed. - _state = exception is OperationCanceledException ? state : State.Completed; + _state = State.Completed; - // Swap the cancellation registration so the one that's been registered gets eventually Disposed. - // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. - cancellationRegistration = _cancellationRegistration; - _cancellationRegistration = default; + // Swap the cancellation registration so the one that's been registered gets eventually Disposed. + // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. + cancellationRegistration = _cancellationRegistration; + _cancellationRegistration = default; - if (exception is not null) - { - // Set up the exception stack trace for the caller. - exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; - if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) + if (exception is not null) { + // Set up the exception stack trace for the caller. + exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; _valueTaskSource.SetException(exception); } - else if (exception is not OperationCanceledException) - { - _exception = exception; - } - } - else - { - if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) + else { _valueTaskSource.SetResult(true); } + + return true; } - return true; + return false; } finally { @@ -187,34 +173,5 @@ void IValueTaskSource.OnCompleted(Action continuation, object? state, s => _valueTaskSource.OnCompleted(continuation, state, token, flags); void IValueTaskSource.GetResult(short token) - { - try - { - _valueTaskSource.GetResult(token); - } - finally - { - lock (this) - { - State state = _state; - - // In case of a cancellation, reset the task and set the stored results if necessary. - if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Canceled) - { - _valueTaskSource.Reset(); - if (state == State.Completed) - { - if (_exception is not null) - { - _valueTaskSource.SetException(_exception); - } - else - { - _valueTaskSource.SetResult(true); - } - } - } - } - } - } + => _valueTaskSource.GetResult(token); } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 8b8ea7617e5f7..dced079c2d1db 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -109,7 +109,7 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt private int _disposed; private readonly ValueTaskSource _connectedTcs = new ValueTaskSource(); - private readonly ValueTaskSource _shutdownTcs = new ValueTaskSource(); + private TaskCompletionSource? _shutdownTcs; private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource(); @@ -467,7 +467,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken { ObjectDisposedException.ThrowIf(_disposed == 1, this); - if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken)) + if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) { unsafe { @@ -478,7 +478,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken } } - return valueTask; + return new ValueTask(_shutdownTcs.Task.WaitAsync(cancellationToken)); } private unsafe int HandleEventConnected(ref CONNECTED_DATA data) @@ -520,6 +520,7 @@ private unsafe int HandleEventShutdownComplete() _acceptQueue.Writer.TryComplete(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); + Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); _shutdownTcs.TrySetResult(); return QUIC_STATUS_SUCCESS; } @@ -626,7 +627,7 @@ public async ValueTask DisposeAsync() } // Check if the connection has been shut down and if not, shut it down. - if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this)) + if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) { unsafe { @@ -636,7 +637,7 @@ public async ValueTask DisposeAsync() (ulong)_defaultCloseErrorCode); } } - else if (!valueTask.IsCompletedSuccessfully) + else if (!_shutdownTcs.Task.IsCompletedSuccessfully) { unsafe { @@ -648,7 +649,7 @@ public async ValueTask DisposeAsync() } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. - await valueTask.ConfigureAwait(false); + await _shutdownTcs.Task.ConfigureAwait(false); Debug.Assert(_connectedTcs.IsCompleted); _handle.Dispose(); _shutdownTokenSource.Dispose(); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index b423b096407cb..ed14a6a993ccb 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -134,7 +134,27 @@ await RunClientServer( { var cts = new CancellationTokenSource(); cts.Cancel(); - await Assert.ThrowsAsync(async () => await clientConnection.CloseAsync(ExpectedErrorCode, cts.Token)); + await Assert.ThrowsAnyAsync(async () => await clientConnection.CloseAsync(ExpectedErrorCode, cts.Token)); + await clientConnection.DisposeAsync(); + sync.Release(); + }, + async serverConnection => + { + await sync.WaitAsync(); + await serverConnection.DisposeAsync(); + }); + } + + [Fact] + public async Task DisposeAfterCloseTaskStored() + { + using var sync = new SemaphoreSlim(0); + + await RunClientServer( + async clientConnection => + { + var cts = new CancellationTokenSource(); + var task = clientConnection.CloseAsync(0).AsTask(); await clientConnection.DisposeAsync(); sync.Release(); }, From 23e7b94ab4c4401aff6acaf2d1f483e0d4ea84e3 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Thu, 18 Jan 2024 15:32:11 +0100 Subject: [PATCH 3/8] Lower probability of creating extra TCS instance. --- .../src/System/Net/Quic/QuicConnection.cs | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index dced079c2d1db..0f5758d9f203e 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -109,8 +109,22 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt private int _disposed; private readonly ValueTaskSource _connectedTcs = new ValueTaskSource(); + private TaskCompletionSource? _shutdownTcs; + private bool InitializeShutdownTcs(out TaskCompletionSource shutdownTcs) + { + if (_shutdownTcs is not null) + { + shutdownTcs = _shutdownTcs; + return false; + } + + TaskCompletionSource? original = Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); + shutdownTcs = _shutdownTcs; + return original is null; + } + private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource(); // Token that fires when the connection is closed. @@ -467,7 +481,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken { ObjectDisposedException.ThrowIf(_disposed == 1, this); - if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) + if (InitializeShutdownTcs(out TaskCompletionSource shutdownTcs)) { unsafe { @@ -478,7 +492,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken } } - return new ValueTask(_shutdownTcs.Task.WaitAsync(cancellationToken)); + return new ValueTask(shutdownTcs.Task.WaitAsync(cancellationToken)); } private unsafe int HandleEventConnected(ref CONNECTED_DATA data) @@ -520,8 +534,8 @@ private unsafe int HandleEventShutdownComplete() _acceptQueue.Writer.TryComplete(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); - Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); - _shutdownTcs.TrySetResult(); + InitializeShutdownTcs(out TaskCompletionSource shutdownTcs); + shutdownTcs.TrySetResult(); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventLocalAddressChanged(ref LOCAL_ADDRESS_CHANGED_DATA data) @@ -627,7 +641,7 @@ public async ValueTask DisposeAsync() } // Check if the connection has been shut down and if not, shut it down. - if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) + if (InitializeShutdownTcs(out TaskCompletionSource shutdownTcs)) { unsafe { @@ -637,7 +651,7 @@ public async ValueTask DisposeAsync() (ulong)_defaultCloseErrorCode); } } - else if (!_shutdownTcs.Task.IsCompletedSuccessfully) + else if (!shutdownTcs.Task.IsCompletedSuccessfully) { unsafe { @@ -649,7 +663,7 @@ public async ValueTask DisposeAsync() } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. - await _shutdownTcs.Task.ConfigureAwait(false); + await shutdownTcs.Task.ConfigureAwait(false); Debug.Assert(_connectedTcs.IsCompleted); _handle.Dispose(); _shutdownTokenSource.Dispose(); From 3a4e39336592329eca3b79d6da7ee0700ed93ebf Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 19 Jan 2024 12:17:39 +0100 Subject: [PATCH 4/8] Revert "Lower probability of creating extra TCS instance." This reverts commit 636037cf9e85bc1f275d6cb7346866d4b86c5938. --- .../src/System/Net/Quic/QuicConnection.cs | 28 +++++-------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 0f5758d9f203e..dced079c2d1db 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -109,22 +109,8 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt private int _disposed; private readonly ValueTaskSource _connectedTcs = new ValueTaskSource(); - private TaskCompletionSource? _shutdownTcs; - private bool InitializeShutdownTcs(out TaskCompletionSource shutdownTcs) - { - if (_shutdownTcs is not null) - { - shutdownTcs = _shutdownTcs; - return false; - } - - TaskCompletionSource? original = Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); - shutdownTcs = _shutdownTcs; - return original is null; - } - private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource(); // Token that fires when the connection is closed. @@ -481,7 +467,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken { ObjectDisposedException.ThrowIf(_disposed == 1, this); - if (InitializeShutdownTcs(out TaskCompletionSource shutdownTcs)) + if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) { unsafe { @@ -492,7 +478,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken } } - return new ValueTask(shutdownTcs.Task.WaitAsync(cancellationToken)); + return new ValueTask(_shutdownTcs.Task.WaitAsync(cancellationToken)); } private unsafe int HandleEventConnected(ref CONNECTED_DATA data) @@ -534,8 +520,8 @@ private unsafe int HandleEventShutdownComplete() _acceptQueue.Writer.TryComplete(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); - InitializeShutdownTcs(out TaskCompletionSource shutdownTcs); - shutdownTcs.TrySetResult(); + Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); + _shutdownTcs.TrySetResult(); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventLocalAddressChanged(ref LOCAL_ADDRESS_CHANGED_DATA data) @@ -641,7 +627,7 @@ public async ValueTask DisposeAsync() } // Check if the connection has been shut down and if not, shut it down. - if (InitializeShutdownTcs(out TaskCompletionSource shutdownTcs)) + if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) { unsafe { @@ -651,7 +637,7 @@ public async ValueTask DisposeAsync() (ulong)_defaultCloseErrorCode); } } - else if (!shutdownTcs.Task.IsCompletedSuccessfully) + else if (!_shutdownTcs.Task.IsCompletedSuccessfully) { unsafe { @@ -663,7 +649,7 @@ public async ValueTask DisposeAsync() } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. - await shutdownTcs.Task.ConfigureAwait(false); + await _shutdownTcs.Task.ConfigureAwait(false); Debug.Assert(_connectedTcs.IsCompleted); _handle.Dispose(); _shutdownTokenSource.Dispose(); From 630681659bff1995f4447d3b1c6b25dc5c7afc20 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 19 Jan 2024 12:18:04 +0100 Subject: [PATCH 5/8] Revert "Use ordinary TCS and revert changes in ValueTaskSource" This reverts commit f393313f886bf2073ddaea9b02f9bb30b80f047b. --- .../Net/Quic/Internal/ValueTaskSource.cs | 73 +++++++++++++++---- .../src/System/Net/Quic/QuicConnection.cs | 13 ++-- .../FunctionalTests/QuicConnectionTests.cs | 22 +----- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs index 2acd2138a1237..0005629c7ff5f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs @@ -25,6 +25,7 @@ private enum State : byte private State _state; private ManualResetValueTaskSourceCore _valueTaskSource; private CancellationTokenRegistration _cancellationRegistration; + private Exception? _exception; private GCHandle _keepAlive; public ValueTaskSource() @@ -32,6 +33,7 @@ public ValueTaskSource() _state = State.None; _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; + _exception = default; _keepAlive = default; } @@ -75,7 +77,7 @@ public bool TryInitialize(out ValueTask valueTask, object? keepAlive = null, Can State state = _state; // If we're the first here, we will return true. - if (state == State.None) + if (state == State.None && _valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) { // Keep alive the caller object until the result is read from the task. // Used for keeping caller alive during async interop calls. @@ -104,30 +106,42 @@ private bool TryComplete(Exception? exception) { State state = _state; - if (state != State.Completed) + // Completed: nothing to do. + if (state == State.Completed) { - _state = State.Completed; + return false; + } + + // With cancellation, keep the state as-is so it can be restored after the OCE is consumed. + _state = exception is OperationCanceledException ? state : State.Completed; - // Swap the cancellation registration so the one that's been registered gets eventually Disposed. - // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. - cancellationRegistration = _cancellationRegistration; - _cancellationRegistration = default; + // Swap the cancellation registration so the one that's been registered gets eventually Disposed. + // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. + cancellationRegistration = _cancellationRegistration; + _cancellationRegistration = default; - if (exception is not null) + if (exception is not null) + { + // Set up the exception stack trace for the caller. + exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; + if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) { - // Set up the exception stack trace for the caller. - exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; _valueTaskSource.SetException(exception); } - else + else if (exception is not OperationCanceledException) + { + _exception = exception; + } + } + else + { + if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) { _valueTaskSource.SetResult(true); } - - return true; } - return false; + return true; } finally { @@ -173,5 +187,34 @@ void IValueTaskSource.OnCompleted(Action continuation, object? state, s => _valueTaskSource.OnCompleted(continuation, state, token, flags); void IValueTaskSource.GetResult(short token) - => _valueTaskSource.GetResult(token); + { + try + { + _valueTaskSource.GetResult(token); + } + finally + { + lock (this) + { + State state = _state; + + // In case of a cancellation, reset the task and set the stored results if necessary. + if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Canceled) + { + _valueTaskSource.Reset(); + if (state == State.Completed) + { + if (_exception is not null) + { + _valueTaskSource.SetException(_exception); + } + else + { + _valueTaskSource.SetResult(true); + } + } + } + } + } + } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index dced079c2d1db..8b8ea7617e5f7 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -109,7 +109,7 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt private int _disposed; private readonly ValueTaskSource _connectedTcs = new ValueTaskSource(); - private TaskCompletionSource? _shutdownTcs; + private readonly ValueTaskSource _shutdownTcs = new ValueTaskSource(); private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource(); @@ -467,7 +467,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken { ObjectDisposedException.ThrowIf(_disposed == 1, this); - if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) + if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken)) { unsafe { @@ -478,7 +478,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken } } - return new ValueTask(_shutdownTcs.Task.WaitAsync(cancellationToken)); + return valueTask; } private unsafe int HandleEventConnected(ref CONNECTED_DATA data) @@ -520,7 +520,6 @@ private unsafe int HandleEventShutdownComplete() _acceptQueue.Writer.TryComplete(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); - Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); _shutdownTcs.TrySetResult(); return QUIC_STATUS_SUCCESS; } @@ -627,7 +626,7 @@ public async ValueTask DisposeAsync() } // Check if the connection has been shut down and if not, shut it down. - if (Interlocked.CompareExchange(ref _shutdownTcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null) == null) + if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this)) { unsafe { @@ -637,7 +636,7 @@ public async ValueTask DisposeAsync() (ulong)_defaultCloseErrorCode); } } - else if (!_shutdownTcs.Task.IsCompletedSuccessfully) + else if (!valueTask.IsCompletedSuccessfully) { unsafe { @@ -649,7 +648,7 @@ public async ValueTask DisposeAsync() } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. - await _shutdownTcs.Task.ConfigureAwait(false); + await valueTask.ConfigureAwait(false); Debug.Assert(_connectedTcs.IsCompleted); _handle.Dispose(); _shutdownTokenSource.Dispose(); diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index ed14a6a993ccb..b423b096407cb 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -134,27 +134,7 @@ await RunClientServer( { var cts = new CancellationTokenSource(); cts.Cancel(); - await Assert.ThrowsAnyAsync(async () => await clientConnection.CloseAsync(ExpectedErrorCode, cts.Token)); - await clientConnection.DisposeAsync(); - sync.Release(); - }, - async serverConnection => - { - await sync.WaitAsync(); - await serverConnection.DisposeAsync(); - }); - } - - [Fact] - public async Task DisposeAfterCloseTaskStored() - { - using var sync = new SemaphoreSlim(0); - - await RunClientServer( - async clientConnection => - { - var cts = new CancellationTokenSource(); - var task = clientConnection.CloseAsync(0).AsTask(); + await Assert.ThrowsAsync(async () => await clientConnection.CloseAsync(ExpectedErrorCode, cts.Token)); await clientConnection.DisposeAsync(); sync.Release(); }, From c04ec4b1eb42c390f34067b812690a768ff2e92a Mon Sep 17 00:00:00 2001 From: ManickaP Date: Fri, 19 Jan 2024 12:27:36 +0100 Subject: [PATCH 6/8] Added Radek's test --- .../FunctionalTests/QuicConnectionTests.cs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs index b423b096407cb..98d72124f0048 100644 --- a/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs +++ b/src/libraries/System.Net.Quic/tests/FunctionalTests/QuicConnectionTests.cs @@ -145,6 +145,26 @@ await RunClientServer( }); } + [Fact] + public async Task DisposeAfterCloseTaskStored() + { + using var sync = new SemaphoreSlim(0); + + await RunClientServer( + async clientConnection => + { + var cts = new CancellationTokenSource(); + var task = clientConnection.CloseAsync(0).AsTask(); + await clientConnection.DisposeAsync(); + sync.Release(); + }, + async serverConnection => + { + await sync.WaitAsync(); + await serverConnection.DisposeAsync(); + }); + } + [Fact] public async Task ConnectionClosedByPeer_WithPendingAcceptAndConnect_PendingAndSubsequentThrowConnectionAbortedException() { From 995c5e80ce1bdc1c41948debd05bc54bb7595c67 Mon Sep 17 00:00:00 2001 From: ManickaP Date: Wed, 24 Jan 2024 11:34:50 +0100 Subject: [PATCH 7/8] Use RVTS --- .../Net/Quic/Internal/ValueTaskSource.cs | 73 ++++--------------- .../src/System/Net/Quic/QuicConnection.cs | 27 ++++++- 2 files changed, 38 insertions(+), 62 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs index 0005629c7ff5f..2acd2138a1237 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ValueTaskSource.cs @@ -25,7 +25,6 @@ private enum State : byte private State _state; private ManualResetValueTaskSourceCore _valueTaskSource; private CancellationTokenRegistration _cancellationRegistration; - private Exception? _exception; private GCHandle _keepAlive; public ValueTaskSource() @@ -33,7 +32,6 @@ public ValueTaskSource() _state = State.None; _valueTaskSource = new ManualResetValueTaskSourceCore() { RunContinuationsAsynchronously = true }; _cancellationRegistration = default; - _exception = default; _keepAlive = default; } @@ -77,7 +75,7 @@ public bool TryInitialize(out ValueTask valueTask, object? keepAlive = null, Can State state = _state; // If we're the first here, we will return true. - if (state == State.None && _valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) + if (state == State.None) { // Keep alive the caller object until the result is read from the task. // Used for keeping caller alive during async interop calls. @@ -106,42 +104,30 @@ private bool TryComplete(Exception? exception) { State state = _state; - // Completed: nothing to do. - if (state == State.Completed) + if (state != State.Completed) { - return false; - } - - // With cancellation, keep the state as-is so it can be restored after the OCE is consumed. - _state = exception is OperationCanceledException ? state : State.Completed; + _state = State.Completed; - // Swap the cancellation registration so the one that's been registered gets eventually Disposed. - // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. - cancellationRegistration = _cancellationRegistration; - _cancellationRegistration = default; + // Swap the cancellation registration so the one that's been registered gets eventually Disposed. + // Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock. + cancellationRegistration = _cancellationRegistration; + _cancellationRegistration = default; - if (exception is not null) - { - // Set up the exception stack trace for the caller. - exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; - if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) + if (exception is not null) { + // Set up the exception stack trace for the caller. + exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception; _valueTaskSource.SetException(exception); } - else if (exception is not OperationCanceledException) - { - _exception = exception; - } - } - else - { - if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending) + else { _valueTaskSource.SetResult(true); } + + return true; } - return true; + return false; } finally { @@ -187,34 +173,5 @@ void IValueTaskSource.OnCompleted(Action continuation, object? state, s => _valueTaskSource.OnCompleted(continuation, state, token, flags); void IValueTaskSource.GetResult(short token) - { - try - { - _valueTaskSource.GetResult(token); - } - finally - { - lock (this) - { - State state = _state; - - // In case of a cancellation, reset the task and set the stored results if necessary. - if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Canceled) - { - _valueTaskSource.Reset(); - if (state == State.Completed) - { - if (_exception is not null) - { - _valueTaskSource.SetException(_exception); - } - else - { - _valueTaskSource.SetResult(true); - } - } - } - } - } - } + => _valueTaskSource.GetResult(token); } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 8b8ea7617e5f7..7ec4e3f01c821 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -109,7 +109,26 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt private int _disposed; private readonly ValueTaskSource _connectedTcs = new ValueTaskSource(); - private readonly ValueTaskSource _shutdownTcs = new ValueTaskSource(); + private readonly ResettableValueTaskSource _shutdownTcs = new ResettableValueTaskSource() + { + CancellationAction = target => + { + try + { + if (target is QuicConnection connection) + { + // The OCE will be propagated through stored CancellationToken in ResettableValueTaskSource. + connection._shutdownTcs.TrySetResult(); + } + } + catch (ObjectDisposedException) + { + // We collided with a Dispose in another thread. This can happen + // when using CancellationTokenSource.CancelAfter. + // Ignore the exception + } + } + }; private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource(); @@ -467,7 +486,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken { ObjectDisposedException.ThrowIf(_disposed == 1, this); - if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken)) + if (_shutdownTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken)) { unsafe { @@ -626,7 +645,7 @@ public async ValueTask DisposeAsync() } // Check if the connection has been shut down and if not, shut it down. - if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this)) + if (_shutdownTcs.TryGetValueTask(out ValueTask valueTask, this)) { unsafe { @@ -648,7 +667,7 @@ public async ValueTask DisposeAsync() } // Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released. - await valueTask.ConfigureAwait(false); + await _shutdownTcs.GetFinalTask(this).ConfigureAwait(false); Debug.Assert(_connectedTcs.IsCompleted); _handle.Dispose(); _shutdownTokenSource.Dispose(); From 85cd4eb8980c4875cfa2dbf4b460ee6a48d59cde Mon Sep 17 00:00:00 2001 From: ManickaP Date: Wed, 24 Jan 2024 15:43:46 +0100 Subject: [PATCH 8/8] Fix the fix :D --- .../System.Net.Quic/src/System/Net/Quic/QuicConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 7ec4e3f01c821..db3adf776d542 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -539,7 +539,7 @@ private unsafe int HandleEventShutdownComplete() _acceptQueue.Writer.TryComplete(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); - _shutdownTcs.TrySetResult(); + _shutdownTcs.TrySetResult(final: true); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventLocalAddressChanged(ref LOCAL_ADDRESS_CHANGED_DATA data)