Skip to content

Commit

Permalink
Use RVTS
Browse files Browse the repository at this point in the history
  • Loading branch information
ManickaP committed Jan 26, 2024
1 parent c04ec4b commit 995c5e8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ private enum State : byte
private State _state;
private ManualResetValueTaskSourceCore<bool> _valueTaskSource;
private CancellationTokenRegistration _cancellationRegistration;
private Exception? _exception;
private GCHandle _keepAlive;

public ValueTaskSource()
{
_state = State.None;
_valueTaskSource = new ManualResetValueTaskSourceCore<bool>() { RunContinuationsAsynchronously = true };
_cancellationRegistration = default;
_exception = default;
_keepAlive = default;
}

Expand Down Expand Up @@ -77,7 +75,7 @@ public bool TryInitialize(out ValueTask valueTask, object? keepAlive = null, Can
State state = _state;

// If we're the first here, we will return true.
if (state == State.None && _valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending)
if (state == State.None)
{
// Keep alive the caller object until the result is read from the task.
// Used for keeping caller alive during async interop calls.
Expand Down Expand Up @@ -106,42 +104,30 @@ private bool TryComplete(Exception? exception)
{
State state = _state;

// Completed: nothing to do.
if (state == State.Completed)
if (state != State.Completed)
{
return false;
}

// With cancellation, keep the state as-is so it can be restored after the OCE is consumed.
_state = exception is OperationCanceledException ? state : State.Completed;
_state = State.Completed;

// Swap the cancellation registration so the one that's been registered gets eventually Disposed.
// Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock.
cancellationRegistration = _cancellationRegistration;
_cancellationRegistration = default;
// Swap the cancellation registration so the one that's been registered gets eventually Disposed.
// Ideally, we would dispose it here, but if the callbacks kicks in, it tries to take the lock held by this thread leading to deadlock.
cancellationRegistration = _cancellationRegistration;
_cancellationRegistration = default;

if (exception is not null)
{
// Set up the exception stack trace for the caller.
exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception;
if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending)
if (exception is not null)
{
// Set up the exception stack trace for the caller.
exception = exception.StackTrace is null ? ExceptionDispatchInfo.SetCurrentStackTrace(exception) : exception;
_valueTaskSource.SetException(exception);
}
else if (exception is not OperationCanceledException)
{
_exception = exception;
}
}
else
{
if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Pending)
else
{
_valueTaskSource.SetResult(true);
}

return true;
}

return true;
return false;
}
finally
{
Expand Down Expand Up @@ -187,34 +173,5 @@ void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, s
=> _valueTaskSource.OnCompleted(continuation, state, token, flags);

void IValueTaskSource.GetResult(short token)
{
try
{
_valueTaskSource.GetResult(token);
}
finally
{
lock (this)
{
State state = _state;

// In case of a cancellation, reset the task and set the stored results if necessary.
if (_valueTaskSource.GetStatus(_valueTaskSource.Version) == ValueTaskSourceStatus.Canceled)
{
_valueTaskSource.Reset();
if (state == State.Completed)
{
if (_exception is not null)
{
_valueTaskSource.SetException(_exception);
}
else
{
_valueTaskSource.SetResult(true);
}
}
}
}
}
}
=> _valueTaskSource.GetResult(token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,26 @@ static async ValueTask<QuicConnection> StartConnectAsync(QuicClientConnectionOpt
private int _disposed;

private readonly ValueTaskSource _connectedTcs = new ValueTaskSource();
private readonly ValueTaskSource _shutdownTcs = new ValueTaskSource();
private readonly ResettableValueTaskSource _shutdownTcs = new ResettableValueTaskSource()
{
CancellationAction = target =>
{
try
{
if (target is QuicConnection connection)
{
// The OCE will be propagated through stored CancellationToken in ResettableValueTaskSource.
connection._shutdownTcs.TrySetResult();
}
}
catch (ObjectDisposedException)
{
// We collided with a Dispose in another thread. This can happen
// when using CancellationTokenSource.CancelAfter.
// Ignore the exception
}
}
};

private readonly CancellationTokenSource _shutdownTokenSource = new CancellationTokenSource();

Expand Down Expand Up @@ -467,7 +486,7 @@ public ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken
{
ObjectDisposedException.ThrowIf(_disposed == 1, this);

if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this, cancellationToken))
if (_shutdownTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken))
{
unsafe
{
Expand Down Expand Up @@ -626,7 +645,7 @@ public async ValueTask DisposeAsync()
}

// Check if the connection has been shut down and if not, shut it down.
if (_shutdownTcs.TryInitialize(out ValueTask valueTask, this))
if (_shutdownTcs.TryGetValueTask(out ValueTask valueTask, this))
{
unsafe
{
Expand All @@ -648,7 +667,7 @@ public async ValueTask DisposeAsync()
}

// Wait for SHUTDOWN_COMPLETE, the last event, so that all resources can be safely released.
await valueTask.ConfigureAwait(false);
await _shutdownTcs.GetFinalTask(this).ConfigureAwait(false);
Debug.Assert(_connectedTcs.IsCompleted);
_handle.Dispose();
_shutdownTokenSource.Dispose();
Expand Down

0 comments on commit 995c5e8

Please sign in to comment.