diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 0b51d5e07a6ec8..c96ad22b47b531 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -6,6 +6,8 @@ using System.Diagnostics.Tracing; using System.IO; using System.Linq; +using System.Net.Sockets; +using System.Net; using System.Reflection; using System.Threading.Tasks; using System.Threading.Tests; @@ -1160,6 +1162,95 @@ public void ThreadPoolMinMaxThreadsEventTest() }).Dispose(); } + private sealed class RuntimeEventListener : EventListener + { + private const string ClrProviderName = "Microsoft-Windows-DotNETRuntime"; + private const EventKeywords ThreadingKeyword = (EventKeywords)0x10000; + + public volatile int tpIOEnqueue = 0; + public volatile int tpIODequeue = 0; + public ManualResetEvent tpWaitIOEnqueueEvent = new ManualResetEvent(false); + public ManualResetEvent tpWaitIODequeueEvent = new ManualResetEvent(false); + + protected override void OnEventSourceCreated(EventSource eventSource) + { + if (eventSource.Name.Equals(ClrProviderName)) + { + EnableEvents(eventSource, EventLevel.Verbose, ThreadingKeyword); + } + + base.OnEventSourceCreated(eventSource); + } + + protected override void OnEventWritten(EventWrittenEventArgs eventData) + { + if (eventData.EventName.Equals("ThreadPoolIOEnqueue")) + { + Interlocked.Increment(ref tpIOEnqueue); + tpWaitIOEnqueueEvent.Set(); + } + else if (eventData.EventName.Equals("ThreadPoolIODequeue")) + { + Interlocked.Increment(ref tpIODequeue); + tpWaitIODequeueEvent.Set(); + } + } + } + + [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported), nameof(UseWindowsThreadPool))] + public void ReadWriteAsyncTest() + { + RemoteExecutor.Invoke(async () => + { + using (RuntimeEventListener eventListener = new RuntimeEventListener()) + { + TaskCompletionSource<int> portTcs = new TaskCompletionSource<int>(); + TaskCompletionSource<bool> readAsyncReadyTcs = new TaskCompletionSource<bool>(); + + async Task StartListenerAsync() + { + using TcpListener listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + int port = ((IPEndPoint)listener.LocalEndpoint).Port; + portTcs.SetResult(port); + using TcpClient client = await listener.AcceptTcpClientAsync(); + using (NetworkStream stream = client.GetStream()) + { + byte[] buffer = new byte[1]; + Task readAsyncTask = stream.ReadAsync(buffer, 0, buffer.Length); + readAsyncReadyTcs.SetResult(true); + await readAsyncTask; + } + listener.Stop(); + } + + async Task StartClientAsync() + { + int port = await portTcs.Task; + using (TcpClient client = new TcpClient(new IPEndPoint(IPAddress.Loopback, 0))) + { + await client.ConnectAsync(IPAddress.Loopback, port); + using (NetworkStream stream = client.GetStream()) + { + bool readAsyncReady = await readAsyncReadyTcs.Task; + byte[] data = new byte[1]; + await stream.WriteAsync(data, 0, data.Length); + } + } + } + + Task listenerTask = StartListenerAsync(); + Task clientTask = StartClientAsync(); + await Task.WhenAll(listenerTask, clientTask); + ManualResetEvent[] waitEvents = [eventListener.tpWaitIOEnqueueEvent, eventListener.tpWaitIODequeueEvent]; + + Assert.True(WaitHandle.WaitAll(waitEvents, TimeSpan.FromSeconds(15))); // Assert that there wasn't a timeout + Assert.True(eventListener.tpIOEnqueue > 0); + Assert.True(eventListener.tpIODequeue > 0); + } + }).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported; @@ -1169,6 +1260,7 @@ private static bool GetUseWindowsThreadPool() return useWindowsThreadPool; } - private static bool UsePortableThreadPool { get; } = !GetUseWindowsThreadPool(); + private static bool UseWindowsThreadPool { get; } = GetUseWindowsThreadPool(); + private static bool UsePortableThreadPool { get; } = !UseWindowsThreadPool; } }