Skip to content

Commit

Permalink
Use generic Interlocked.{Compare}Exchange in more places
Browse files Browse the repository at this point in the history
  • Loading branch information
stephentoub committed Jul 8, 2024
1 parent b0f578c commit 0ad780b
Show file tree
Hide file tree
Showing 48 changed files with 415 additions and 422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private static unsafe void ClrReportEvent(string eventSource, short type, ushort
Interop.Advapi32.DeregisterEventSource(handle);
}

private static byte s_once;
private static bool s_once;

public static bool ShouldLogInEventLog
{
Expand All @@ -180,7 +180,7 @@ public static bool ShouldLogInEventLog
if (Interop.Kernel32.IsDebuggerPresent())
return false;

if (s_once == 1 || Interlocked.Exchange(ref s_once, 1) == 1)
if (s_once || Interlocked.Exchange(ref s_once, true))
return false;

return true;
Expand Down
12 changes: 6 additions & 6 deletions src/libraries/Common/src/System/Net/StreamBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,15 @@ private sealed class ResettableValueTaskSource : IValueTaskSource

private ManualResetValueTaskSourceCore<bool> _waitSource; // mutable struct, do not make this readonly
private CancellationTokenRegistration _waitSourceCancellation;
private int _hasWaiter;
private bool _hasWaiter;

ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _waitSource.GetStatus(token);

void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _waitSource.OnCompleted(continuation, state, token, flags);

void IValueTaskSource.GetResult(short token)
{
Debug.Assert(_hasWaiter == 0);
Debug.Assert(!_hasWaiter);

// Clean up the registration. This will wait for any in-flight cancellation to complete.
_waitSourceCancellation.Dispose();
Expand All @@ -312,7 +312,7 @@ void IValueTaskSource.GetResult(short token)

public void SignalWaiter()
{
if (Interlocked.Exchange(ref _hasWaiter, 0) == 1)
if (Interlocked.Exchange(ref _hasWaiter, false))
{
_waitSource.SetResult(true);
}
Expand All @@ -322,21 +322,21 @@ private void CancelWaiter(CancellationToken cancellationToken)
{
Debug.Assert(cancellationToken.IsCancellationRequested);

if (Interlocked.Exchange(ref _hasWaiter, 0) == 1)
if (Interlocked.Exchange(ref _hasWaiter, false))
{
_waitSource.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException(cancellationToken)));
}
}

public void Reset()
{
if (_hasWaiter != 0)
if (_hasWaiter)
{
throw new InvalidOperationException("Concurrent use is not supported");
}

_waitSource.Reset();
Volatile.Write(ref _hasWaiter, 1);
Volatile.Write(ref _hasWaiter, true);
}

public void Wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class WebSocketStream : Stream
// Used by the class to indicate that the stream is writable.
private bool _writeable;

// Whether Dispose has been called. 0 == false, 1 == true
private int _disposed;
// Whether Dispose has been called.
private bool _disposed;

public WebSocketStream(WebSocket socket)
: this(socket, FileAccess.ReadWrite, ownsSocket: false)
Expand Down Expand Up @@ -140,7 +140,7 @@ public void Close(int timeout)

protected override void Dispose(bool disposing)
{
if (Interlocked.Exchange(ref _disposed, 1) != 0)
if (Interlocked.Exchange(ref _disposed, true))
{
return;
}
Expand Down Expand Up @@ -269,7 +269,7 @@ public override void SetLength(long value)

private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed != 0, this);
ObjectDisposedException.ThrowIf(_disposed, this);
}

private static IOException WrapException(string resourceFormatString, Exception innerException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,11 +628,11 @@ private void FreezeBag(ref bool lockTaken)
// Finally, wait for all unsynchronized operations on each queue to be done.
for (WorkStealingQueue? queue = head; queue != null; queue = queue._nextQueue)
{
if (queue._currentOp != (int)Operation.None)
if (queue._currentOp != Operation.None)
{
SpinWait spinner = default;
do { spinner.SpinOnce(); }
while (queue._currentOp != (int)Operation.None);
while (queue._currentOp != Operation.None);
}
}
}
Expand Down Expand Up @@ -684,7 +684,7 @@ private sealed class WorkStealingQueue
/// <summary>Number of steals; needs to be combined with <see cref="_addTakeCount"/> to get an actual Count.</summary>
private int _stealCount;
/// <summary>The current queue operation. Used to quiesce before performing operations from one thread onto another.</summary>
internal volatile int _currentOp;
internal volatile Operation _currentOp;
/// <summary>true if this queue's lock is held as part of a global freeze.</summary>
internal bool _frozen;
/// <summary>Next queue in the <see cref="ConcurrentBag{T}"/>'s set of thread-local queues.</summary>
Expand Down Expand Up @@ -726,14 +726,14 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
try
{
// Full fence to ensure subsequent reads don't get reordered before this
Interlocked.Exchange(ref _currentOp, (int)Operation.Add);
Interlocked.Exchange(ref _currentOp, Operation.Add);
int tail = _tailIndex;

// Rare corner case (at most once every 2 billion pushes on this thread):
// We're going to increment the tail; if we'll overflow, then we need to reset our counts
if (tail == int.MaxValue)
{
_currentOp = (int)Operation.None; // set back to None temporarily to avoid a deadlock
_currentOp = Operation.None; // set back to None temporarily to avoid a deadlock
lock (this)
{
Debug.Assert(_tailIndex == tail, "No other thread should be changing _tailIndex");
Expand All @@ -749,7 +749,7 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
_tailIndex = tail &= _mask;
Debug.Assert(_headIndex - _tailIndex <= 0);

Interlocked.Exchange(ref _currentOp, (int)Operation.Add); // ensure subsequent reads aren't reordered before this
Interlocked.Exchange(ref _currentOp, Operation.Add); // ensure subsequent reads aren't reordered before this
}
}

Expand Down Expand Up @@ -778,7 +778,7 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
else
{
// We need to contend with foreign operations (e.g. steals, enumeration, etc.), so we lock.
_currentOp = (int)Operation.None; // set back to None to avoid a deadlock
_currentOp = Operation.None; // set back to None to avoid a deadlock
Monitor.Enter(this, ref lockTaken);

head = _headIndex;
Expand Down Expand Up @@ -830,7 +830,7 @@ internal void LocalPush(T item, ref long emptyToNonEmptyListTransitionCount)
}
finally
{
_currentOp = (int)Operation.None;
_currentOp = Operation.None;
if (lockTaken)
{
Monitor.Exit(this);
Expand Down Expand Up @@ -874,7 +874,7 @@ internal bool TryLocalPop([MaybeNullWhen(false)] out T result)
// If the read of _headIndex moved before this write to _tailIndex, we could erroneously end up
// popping an element that's concurrently being stolen, leading to the same element being
// dequeued from the bag twice.
_currentOp = (int)Operation.Take;
_currentOp = Operation.Take;
Interlocked.Exchange(ref _tailIndex, --tail);

// If there is no interaction with a steal, we can head down the fast path.
Expand All @@ -895,7 +895,7 @@ internal bool TryLocalPop([MaybeNullWhen(false)] out T result)
else
{
// Interaction with steals: 0 or 1 elements left.
_currentOp = (int)Operation.None; // set back to None to avoid a deadlock
_currentOp = Operation.None; // set back to None to avoid a deadlock
Monitor.Enter(this, ref lockTaken);
if (_headIndex - tail <= 0)
{
Expand All @@ -920,7 +920,7 @@ internal bool TryLocalPop([MaybeNullWhen(false)] out T result)
}
finally
{
_currentOp = (int)Operation.None;
_currentOp = Operation.None;
if (lockTaken)
{
Monitor.Exit(this);
Expand Down Expand Up @@ -975,14 +975,14 @@ internal bool TrySteal([MaybeNullWhen(false)] out T result, bool take)
// is in progress, as add operations need to accurately count transitions
// from empty to non-empty, and they can only do that if there are no concurrent
// steal operations happening at the time.
if ((head - (_tailIndex - 2) >= 0) && _currentOp == (int)Operation.Add)
if ((head - (_tailIndex - 2) >= 0) && _currentOp == Operation.Add)
{
SpinWait spinner = default;
do
{
spinner.SpinOnce();
}
while (_currentOp == (int)Operation.Add);
while (_currentOp == Operation.Add);
}

// Increment head to tentatively take an element: a full fence is used to ensure the read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,11 +1276,11 @@ class TwiceDerivedCultureInfo : DerivedCultureInfo
{
}

private long _concurrentError = 0;
private volatile bool _concurrentError;
private bool ConcurrentError
{
get => Interlocked.Read(ref _concurrentError) == 1;
set => Interlocked.Exchange(ref _concurrentError, value ? 1 : 0);
get => _concurrentError;
set => Interlocked.Exchange(ref _concurrentError, value);
}

private void ConcurrentTest(TypeWithProperty instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void ReleaseStateForDispose()
if (buffer != null)
{
_buffer = null!;
if (!AsyncOperationIsActive)
if (!_activeAsyncOperation)
{
ArrayPool<byte>.Shared.Return(buffer);
}
Expand Down Expand Up @@ -170,27 +170,28 @@ public override long Position
/// <param name="value">The length of the stream.</param>
public override void SetLength(long value) => throw new NotSupportedException();

private int _activeAsyncOperation; // 1 == true, 0 == false
private bool AsyncOperationIsActive => _activeAsyncOperation != 0;
private volatile bool _activeAsyncOperation;

private void EnsureNoActiveAsyncOperation()
{
if (AsyncOperationIsActive)
if (_activeAsyncOperation)
{
ThrowInvalidBeginCall();
}
}

private void AsyncOperationStarting()
{
if (Interlocked.Exchange(ref _activeAsyncOperation, 1) != 0)
if (Interlocked.Exchange(ref _activeAsyncOperation, true))
{
ThrowInvalidBeginCall();
}
}

private void AsyncOperationCompleting()
{
Debug.Assert(_activeAsyncOperation == 1);
Volatile.Write(ref _activeAsyncOperation, 0);
Debug.Assert(_activeAsyncOperation);
_activeAsyncOperation = false;
}

private static void ThrowInvalidBeginCall() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public partial class DeflateStream : Stream
private Inflater? _inflater;
private Deflater? _deflater;
private byte[]? _buffer;
private int _activeAsyncOperation; // 1 == true, 0 == false
private volatile bool _activeAsyncOperation;
private bool _wroteBytes;

internal DeflateStream(Stream stream, CompressionMode mode, long uncompressedSize) : this(stream, mode, leaveOpen: false, ZLibNative.Deflate_DefaultWindowBits, uncompressedSize)
Expand Down Expand Up @@ -698,7 +698,7 @@ protected override void Dispose(bool disposing)
if (buffer != null)
{
_buffer = null;
if (!AsyncOperationIsActive)
if (!_activeAsyncOperation)
{
ArrayPool<byte>.Shared.Return(buffer);
}
Expand Down Expand Up @@ -751,7 +751,7 @@ async ValueTask Core()
if (buffer != null)
{
_buffer = null;
if (!AsyncOperationIsActive)
if (!_activeAsyncOperation)
{
ArrayPool<byte>.Shared.Return(buffer);
}
Expand Down Expand Up @@ -1059,24 +1059,27 @@ public override void Flush() { }
public override void SetLength(long value) { throw new NotSupportedException(); }
}

private bool AsyncOperationIsActive => _activeAsyncOperation != 0;

private void EnsureNoActiveAsyncOperation()
{
if (AsyncOperationIsActive)
if (_activeAsyncOperation)
{
ThrowInvalidBeginCall();
}
}

private void AsyncOperationStarting()
{
if (Interlocked.Exchange(ref _activeAsyncOperation, 1) != 0)
if (Interlocked.Exchange(ref _activeAsyncOperation, true))
{
ThrowInvalidBeginCall();
}
}

private void AsyncOperationCompleting() =>
Volatile.Write(ref _activeAsyncOperation, 0);
private void AsyncOperationCompleting()
{
Debug.Assert(_activeAsyncOperation);
_activeAsyncOperation = false;
}

private static void ThrowInvalidBeginCall() =>
throw new InvalidOperationException(SR.InvalidBeginCall);
Expand Down
Loading

0 comments on commit 0ad780b

Please sign in to comment.