Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experiment] Enqueue many work items with single call, make MinHandles configurable via env var #2

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
}

// Called on the epoll thread whenever we receive an epoll notification.
public void HandleEvent(SocketAsyncContext context)
public void HandleEvent(SocketAsyncContext context, List<IThreadPoolWorkItem> toEnqueue)
{
AsyncOperation op;
using (Lock())
Expand Down Expand Up @@ -866,7 +866,15 @@ public void HandleEvent(SocketAsyncContext context)
}

// Dispatch the op so we can try to process it.
op.Dispatch();
var e = op.Event;
if (e is null)
{
toEnqueue.Add(op);
}
else
{
e.Set();
}
}

internal void ProcessAsyncOperation(TOperation op)
Expand Down Expand Up @@ -1946,7 +1954,7 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co
return SocketError.IOPending;
}

public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
public unsafe void HandleEvents(Interop.Sys.SocketEvents events, List<IThreadPoolWorkItem> toEnqueue)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
{
Expand All @@ -1957,12 +1965,12 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events)

if ((events & Interop.Sys.SocketEvents.Read) != 0)
{
_receiveQueue.HandleEvent(this);
_receiveQueue.HandleEvent(this, toEnqueue);
}

if ((events & Interop.Sys.SocketEvents.Write) != 0)
{
_sendQueue.HandleEvent(this);
_sendQueue.HandleEvent(this, toEnqueue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -105,7 +106,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error)
//
private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue;
#endif
private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)32;
private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)int.Parse(Environment.GetEnvironmentVariable("MinHandles")!);

//
// Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop
Expand Down Expand Up @@ -308,6 +309,7 @@ private void EventLoop()
try
{
bool shutdown = false;
List<IThreadPoolWorkItem> toEnqueue = new List<IThreadPoolWorkItem>(EventBufferCount);
while (!shutdown)
{
int numEvents = EventBufferCount;
Expand All @@ -333,11 +335,18 @@ private void EventLoop()
_handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context);
if (context != null)
{
context.HandleEvents(_buffer[i].Events);
context.HandleEvents(_buffer[i].Events, toEnqueue);
context = null;
}
}
}

if (toEnqueue.Count > 0)
{
ThreadPool.UnsafeQueueUserWorkItem(toEnqueue, preferLocal: false);

toEnqueue.Clear();
}
}

FreeNativeResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private ThreadPoolWorkQueueThreadLocals CreateThreadLocals()
return ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this);
}

internal void EnsureThreadRequested()
internal bool EnsureThreadRequested()
{
//
// If we have not yet requested #procs threads, then request a new thread.
Expand All @@ -439,10 +439,12 @@ internal void EnsureThreadRequested()
if (prev == count)
{
ThreadPool.RequestWorkerThread();
break;
return true;
}
count = prev;
}

return false;
}

internal void MarkThreadRequestSatisfied()
Expand Down Expand Up @@ -489,6 +491,30 @@ public void Enqueue(object callback, bool forceGlobal)
EnsureThreadRequested();
}

public bool Enqueue(List<IThreadPoolWorkItem> callbacks, bool forceGlobal)
{
ThreadPoolWorkQueueThreadLocals? tl = null;
if (!forceGlobal)
tl = ThreadPoolWorkQueueThreadLocals.threadLocals;

if (null != tl)
{
foreach (var callback in callbacks)
{
tl.workStealingQueue.LocalPush(callback);
}
}
else
{
foreach (var callback in callbacks)
{
workItems.Enqueue(callback);
}
}

return EnsureThreadRequested();
}

internal bool LocalFindAndPop(object callback)
{
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
Expand Down Expand Up @@ -1179,6 +1205,13 @@ public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool pr
return true;
}

public static bool UnsafeQueueUserWorkItem(List<IThreadPoolWorkItem> callBacks, bool preferLocal)
{
EnsureInitialized();

return ThreadPoolGlobals.workQueue.Enqueue(callBacks, forceGlobal: !preferLocal);
}

internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal)
{
Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static partial class ThreadPool
[System.CLSCompliantAttribute(false)]
public static unsafe bool UnsafeQueueNativeOverlapped(System.Threading.NativeOverlapped* overlapped) { throw null; }
public static bool UnsafeQueueUserWorkItem(System.Threading.IThreadPoolWorkItem callBack, bool preferLocal) { throw null; }
public static bool UnsafeQueueUserWorkItem(System.Collections.Generic.List<System.Threading.IThreadPoolWorkItem> callBacks, bool preferLocal) { throw null; }
public static bool UnsafeQueueUserWorkItem(System.Threading.WaitCallback callBack, object? state) { throw null; }
public static bool UnsafeQueueUserWorkItem<TState>(System.Action<TState> callBack, TState state, bool preferLocal) { throw null; }
public static System.Threading.RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(System.Threading.WaitHandle waitObject, System.Threading.WaitOrTimerCallback callBack, object? state, int millisecondsTimeOutInterval, bool executeOnlyOnce) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
<ItemGroup>
<ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
<ProjectReference Include="..\..\System.Threading.Overlapped\ref\System.Threading.Overlapped.csproj" />
<ProjectReference Include="..\..\System.Collections\ref\System.Collections.csproj" />
</ItemGroup>
</Project>