Skip to content

Commit

Permalink
Fix a race condition in the thread pool (dotnet#68171)
Browse files Browse the repository at this point in the history
* Fix a race condition in the thread pool

There is a case where on a work-stealing queue, both `LocalPop()` and `TrySteal()` may fail when running concurrently, and lead to a case where there is a work item but no threads are released to process it. Fixed to always ensure that there's a thread request when there was a missed steal. Also when `LocalPop()` fails, the thread does not attempt to pop anymore and that can be an issue if that thread is the last thread to look for work items. Fixed to always check the local queue.

Fixes dotnet#67545
  • Loading branch information
kouvel authored and directhex committed Apr 21, 2022
1 parent 4576be0 commit 8d4f53d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ public void Enqueue(object callback, bool forceGlobal)
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
{
tl.workStealingQueue.LocalPush(callback);
tl.workState |= ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems;
}
else
{
Expand Down Expand Up @@ -510,30 +509,21 @@ internal static bool LocalFindAndPop(object callback)
public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
{
// Check for local work items
object? workItem;
ThreadPoolWorkQueueThreadLocals.WorkState tlWorkState = tl.workState;
if ((tlWorkState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) != 0)
object? workItem = tl.workStealingQueue.LocalPop();
if (workItem != null)
{
workItem = tl.workStealingQueue.LocalPop();
if (workItem != null)
{
return workItem;
}

Debug.Assert(tlWorkState == tl.workState);
tl.workState = tlWorkState &= ~ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems;
return workItem;
}

// Check for high-priority work items
if ((tlWorkState & ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems) != 0)
if (tl.isProcessingHighPriorityWorkItems)
{
if (highPriorityWorkItems.TryDequeue(out workItem))
{
return workItem;
}

Debug.Assert(tlWorkState == tl.workState);
tl.workState = tlWorkState &= ~ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems;
tl.isProcessingHighPriorityWorkItems = false;
}
else if (
_mayHaveHighPriorityWorkItems != 0 &&
Expand Down Expand Up @@ -579,14 +569,14 @@ private bool TryStartProcessingHighPriorityWorkItemsAndDequeue(
ThreadPoolWorkQueueThreadLocals tl,
[MaybeNullWhen(false)] out object workItem)
{
Debug.Assert((tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems) == 0);
Debug.Assert(!tl.isProcessingHighPriorityWorkItems);

if (!highPriorityWorkItems.TryDequeue(out workItem))
{
return false;
}

tl.workState |= ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems;
tl.isProcessingHighPriorityWorkItems = true;
_mayHaveHighPriorityWorkItems = 1;
return true;
}
Expand Down Expand Up @@ -632,8 +622,7 @@ internal static bool Dispatch()
// take over the thread, sustaining starvation. For example, when worker threads are continually starved,
// high-priority work items may always be queued and normal-priority work items may not get a chance to run.
bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst;
if (dispatchNormalPriorityWorkFirst &&
(tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) == 0)
if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal)
{
workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst;
workQueue.workItems.TryDequeue(out workItem);
Expand Down Expand Up @@ -670,7 +659,7 @@ internal static bool Dispatch()
// reason that may have a dependency on other queued work items.
workQueue.EnsureThreadRequested();

// After this point, this method is no longer responsible for ensuring thread requests
// After this point, this method is no longer responsible for ensuring thread requests except for missed steals
}

// Has the desire for logging changed since the last time we entered?
Expand Down Expand Up @@ -700,8 +689,18 @@ internal static bool Dispatch()

if (workItem == null)
{
// May have missed a steal, but this method is not responsible for ensuring thread requests anymore. See
// the dequeue before the loop.
//
// No work.
// If we missed a steal, though, there may be more work in the queue.
// Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
// that owns the contended work-stealing queue will pick up its own workitems in the meantime,
// which will be more efficient than this thread doing it anyway.
//
if (missedSteal)
{
workQueue.EnsureThreadRequested();
}

return true;
}
}
Expand Down Expand Up @@ -753,7 +752,7 @@ internal static bool Dispatch()
// to ensure that they would not be heavily delayed. Tell the caller that this thread was requested to stop
// processing work items.
tl.TransferLocalWork();
tl.ResetWorkItemProcessingState();
tl.isProcessingHighPriorityWorkItems = false;
return false;
}

Expand All @@ -769,7 +768,7 @@ internal static bool Dispatch()
{
// The runtime-specific thread pool implementation requires the Dispatch loop to return to the VM
// periodically to let it perform its own work
tl.ResetWorkItemProcessingState();
tl.isProcessingHighPriorityWorkItems = false;
return true;
}

Expand Down Expand Up @@ -823,7 +822,7 @@ internal sealed class ThreadPoolWorkQueueThreadLocals
[ThreadStatic]
public static ThreadPoolWorkQueueThreadLocals? threadLocals;

public WorkState workState;
public bool isProcessingHighPriorityWorkItems;
public readonly ThreadPoolWorkQueue workQueue;
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
public readonly Thread currentThread;
Expand All @@ -839,16 +838,12 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
threadLocalCompletionCountObject = ThreadPool.GetOrCreateThreadLocalCompletionCountObject();
}

public void ResetWorkItemProcessingState() => workState &= ~WorkState.IsProcessingHighPriorityWorkItems;

public void TransferLocalWork()
{
while (workStealingQueue.LocalPop() is object cb)
{
workQueue.Enqueue(cb, forceGlobal: true);
}

workState &= ~WorkState.MayHaveLocalWorkItems;
}

~ThreadPoolWorkQueueThreadLocals()
Expand All @@ -860,13 +855,6 @@ public void TransferLocalWork()
ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
}
}

[Flags]
public enum WorkState
{
MayHaveLocalWorkItems = 1 << 0,
IsProcessingHighPriorityWorkItems = 1 << 1
}
}

// A strongly typed callback for ThreadPoolTypedWorkItemQueue<T, TCallback>.
Expand Down Expand Up @@ -948,7 +936,7 @@ void IThreadPoolWorkItem.Execute()
// yield to the thread pool after some time. The threshold used is half of the thread pool's dispatch quantum,
// which the thread pool uses for doing periodic work.
if (++completedCount == uint.MaxValue ||
(tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) != 0 ||
tl.workStealingQueue.CanSteal ||
(uint)(Environment.TickCount - startTimeMs) >= ThreadPoolWorkQueue.DispatchQuantumMs / 2 ||
!_workItems.TryDequeue(out workItem))
{
Expand Down
60 changes: 60 additions & 0 deletions src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
Expand Down Expand Up @@ -1020,6 +1021,65 @@ public static void CooperativeBlockingWithProcessingThreadsAndGoalThreadsAndAddW
}).Dispose();
}

[ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
public void FileStreamFlushAsyncThreadPoolDeadlockTest()
{
// This test was occasionally causing the deadlock described in https://github.com/dotnet/runtime/pull/68171. Run it
// in a remote process to test it with a dedicated thread pool.
RemoteExecutor.Invoke(async () =>
{
const int OneKibibyte = 1 << 10;
const int FourKibibytes = OneKibibyte << 2;
const int FileSize = 1024;
string destinationFilePath = null;
try
{
destinationFilePath = CreateFileWithRandomContent(FileSize);
static string CreateFileWithRandomContent(int fileSize)
{
string filePath = Path.GetTempFileName();
File.WriteAllBytes(filePath, CreateArray(fileSize));
return filePath;
}
static byte[] CreateArray(int count)
{
var result = new byte[count];
const int Seed = 12345;
var random = new Random(Seed);
random.NextBytes(result);
return result;
}
for (int j = 0; j < 100; j++)
{
using var fileStream =
new FileStream(
destinationFilePath,
FileMode.Create,
FileAccess.Write,
FileShare.Read,
FourKibibytes,
FileOptions.None);
for (int i = 0; i < FileSize; i++)
{
fileStream.WriteByte(default);
await fileStream.FlushAsync();
}
}
}
finally
{
if (!string.IsNullOrEmpty(destinationFilePath) && File.Exists(destinationFilePath))
{
File.Delete(destinationFilePath);
}
}
}).Dispose();
}

public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;
}
Expand Down

0 comments on commit 8d4f53d

Please sign in to comment.