From 636cc62615c79d33a67a301d551e093498d1f97b Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Mon, 6 Apr 2020 18:51:42 +0200 Subject: [PATCH 1/4] add possibility to change the number of handles required to allocate a new epool thread --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 9d5f48b7045e3..1188e0bb37fe2 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -105,7 +105,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) // private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue; #endif - private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)32; + private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)int.Parse(Environment.GetEnvironmentVariable("MinHandles")!); // // Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop From dcb7385c513b63e894b2935ebb326be221722131 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Fri, 17 Apr 2020 11:50:32 +0200 Subject: [PATCH 2/4] try to enqueue many work items with a single call --- .../Net/Sockets/SocketAsyncContext.Unix.cs | 18 ++++++++--- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 11 ++++++- .../src/System/Threading/ThreadPool.cs | 32 +++++++++++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index beb4cc088839d..fb1b363cc75d6 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -826,7 +826,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation } // Called on the epoll thread whenever we receive an epoll notification. - public void HandleEvent(SocketAsyncContext context) + public void HandleEvent(SocketAsyncContext context, List toEnqueue) { AsyncOperation op; using (Lock()) @@ -866,7 +866,15 @@ public void HandleEvent(SocketAsyncContext context) } // Dispatch the op so we can try to process it. - op.Dispatch(); + var e = op.Event; + if (e is null) + { + toEnqueue.Add(op); + } + else + { + e.Set(); + } } internal void ProcessAsyncOperation(TOperation op) @@ -1946,7 +1954,7 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co return SocketError.IOPending; } - public unsafe void HandleEvents(Interop.Sys.SocketEvents events) + public unsafe void HandleEvents(Interop.Sys.SocketEvents events, List toEnqueue) { if ((events & Interop.Sys.SocketEvents.Error) != 0) { @@ -1957,12 +1965,12 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) if ((events & Interop.Sys.SocketEvents.Read) != 0) { - _receiveQueue.HandleEvent(this); + _receiveQueue.HandleEvent(this, toEnqueue); } if ((events & Interop.Sys.SocketEvents.Write) != 0) { - _sendQueue.HandleEvent(this); + _sendQueue.HandleEvent(this, toEnqueue); } } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 1188e0bb37fe2..94bab0712273d 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading; @@ -308,6 +309,7 @@ private void EventLoop() try { bool shutdown = false; + List toEnqueue = new List(EventBufferCount); while (!shutdown) { int numEvents = EventBufferCount; @@ -333,11 +335,18 @@ private void EventLoop() _handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); if (context != null) { - context.HandleEvents(_buffer[i].Events); + context.HandleEvents(_buffer[i].Events, toEnqueue); context = null; } } } + + if (toEnqueue.Count > 0) + { + ThreadPool.UnsafeQueueUserWorkItem(toEnqueue, preferLocal: false); + + toEnqueue.Clear(); + } } FreeNativeResources(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index fe1021272a282..daeace02f3e2e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -489,6 +489,30 @@ public void Enqueue(object callback, bool forceGlobal) EnsureThreadRequested(); } + public void Enqueue(List callbacks, bool forceGlobal) + { + ThreadPoolWorkQueueThreadLocals? tl = null; + if (!forceGlobal) + tl = ThreadPoolWorkQueueThreadLocals.threadLocals; + + if (null != tl) + { + foreach (var callback in callbacks) + { + tl.workStealingQueue.LocalPush(callback); + } + } + else + { + foreach (var callback in callbacks) + { + workItems.Enqueue(callback); + } + } + + EnsureThreadRequested(); + } + internal bool LocalFindAndPop(object callback) { ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals; @@ -1179,6 +1203,14 @@ public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool pr return true; } + public static bool UnsafeQueueUserWorkItem(List callBacks, bool preferLocal) + { + EnsureInitialized(); + + ThreadPoolGlobals.workQueue.Enqueue(callBacks, forceGlobal: !preferLocal); + return true; + } + internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal) { Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task)); From a0b9d6612dc9fd8bcd153fb150b50fcc59798837 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Fri, 17 Apr 2020 13:23:03 +0200 Subject: [PATCH 3/4] add refs --- .../ref/System.Threading.ThreadPool.cs | 1 + .../ref/System.Threading.ThreadPool.csproj | 1 + 2 files changed, 2 insertions(+) diff --git a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs index e9188f9ee2739..8b6ad2bd55521 100644 --- a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs +++ b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.cs @@ -40,6 +40,7 @@ public static partial class ThreadPool [System.CLSCompliantAttribute(false)] public static unsafe bool UnsafeQueueNativeOverlapped(System.Threading.NativeOverlapped* overlapped) { throw null; } public static bool UnsafeQueueUserWorkItem(System.Threading.IThreadPoolWorkItem callBack, bool preferLocal) { throw null; } + public static bool UnsafeQueueUserWorkItem(System.Collections.Generic.List callBacks, bool preferLocal) { throw null; } public static bool UnsafeQueueUserWorkItem(System.Threading.WaitCallback callBack, object? state) { throw null; } public static bool UnsafeQueueUserWorkItem(System.Action callBack, TState state, bool preferLocal) { throw null; } public static System.Threading.RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(System.Threading.WaitHandle waitObject, System.Threading.WaitOrTimerCallback callBack, object? state, int millisecondsTimeOutInterval, bool executeOnlyOnce) { throw null; } diff --git a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj index a86046bb60d44..a380d18ddd033 100644 --- a/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj +++ b/src/libraries/System.Threading.ThreadPool/ref/System.Threading.ThreadPool.csproj @@ -10,5 +10,6 @@ + From ac1f6981f72bbd67d36704c52f5f5a69312930e5 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Fri, 17 Apr 2020 13:38:41 +0200 Subject: [PATCH 4/4] return true when we had to request a thread --- .../src/System/Threading/ThreadPool.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index daeace02f3e2e..9e3d984a7110e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -424,7 +424,7 @@ private ThreadPoolWorkQueueThreadLocals CreateThreadLocals() return ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this); } - internal void EnsureThreadRequested() + internal bool EnsureThreadRequested() { // // If we have not yet requested #procs threads, then request a new thread. @@ -439,10 +439,12 @@ internal void EnsureThreadRequested() if (prev == count) { ThreadPool.RequestWorkerThread(); - break; + return true; } count = prev; } + + return false; } internal void MarkThreadRequestSatisfied() @@ -489,7 +491,7 @@ public void Enqueue(object callback, bool forceGlobal) EnsureThreadRequested(); } - public void Enqueue(List callbacks, bool forceGlobal) + public bool Enqueue(List callbacks, bool forceGlobal) { ThreadPoolWorkQueueThreadLocals? tl = null; if (!forceGlobal) @@ -510,7 +512,7 @@ public void Enqueue(List callbacks, bool forceGlobal) } } - EnsureThreadRequested(); + return EnsureThreadRequested(); } internal bool LocalFindAndPop(object callback) @@ -1207,8 +1209,7 @@ public static bool UnsafeQueueUserWorkItem(List callBacks, { EnsureInitialized(); - ThreadPoolGlobals.workQueue.Enqueue(callBacks, forceGlobal: !preferLocal); - return true; + return ThreadPoolGlobals.workQueue.Enqueue(callBacks, forceGlobal: !preferLocal); } internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal)