From bbae02d9c436a35c7d9faedb7fbabdc8ab896b43 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Tue, 9 Jan 2024 18:37:10 +0100 Subject: [PATCH 1/8] - `CancellationToken` for `JSWebWorker` & `JSSynchronizationContext` - fix `UninstallWebWorkerInterop` for `JSWebWorker` when exception - `ReleasePromiseHolder` operations order - tests: `Executor_Cancellation`, `Executor_Propagates` --- .../JavaScript/Interop/JavaScriptExports.cs | 2 +- .../JavaScript/JSHostImplementation.cs | 6 +- .../JavaScript/JSProxyContext.cs | 3 +- .../JavaScript/JSSynchronizationContext.cs | 21 +++- .../InteropServices/JavaScript/JSWebWorker.cs | 45 +++++-- ...me.InteropServices.JavaScript.Tests.csproj | 5 + .../JavaScript/WebWorkerTest.cs | 113 +++++++++++++----- .../JavaScript/WebWorkerTestHelper.cs | 87 ++++++++------ 8 files changed, 203 insertions(+), 79 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs index 87676e9699cd9..3e371b5c99f12 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs @@ -279,7 +279,7 @@ public static void GetManagedStackTrace(JSMarshalerArgument* arguments_buffer) // void InstallMainSynchronizationContext() public static void InstallMainSynchronizationContext() { - InstallWebWorkerInterop(true); + InstallWebWorkerInterop(true, CancellationToken.None); } #endif diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs index 4bf4aeb36353e..51f4409e5a0f1 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs @@ -209,9 +209,9 @@ public static void LoadSatelliteAssembly(byte[] dllBytes) } #if FEATURE_WASM_THREADS - public static void InstallWebWorkerInterop(bool isMainThread) + public static JSSynchronizationContext InstallWebWorkerInterop(bool isMainThread, CancellationToken cancellationToken) { - var ctx = new JSSynchronizationContext(isMainThread); + var ctx = new JSSynchronizationContext(isMainThread, cancellationToken); ctx.previousSynchronizationContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(ctx); @@ -226,6 +226,8 @@ public static void InstallWebWorkerInterop(bool isMainThread) ctx.AwaitNewData(); Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle); + + return ctx; } public static void UninstallWebWorkerInterop() diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSProxyContext.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSProxyContext.cs index d81f1cc1bcd56..dcb99ed1d3d4e 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSProxyContext.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSProxyContext.cs @@ -354,6 +354,7 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle) { throw new InvalidOperationException("ReleasePromiseHolder expected PromiseHolder " + holderGCHandle); } + holder.IsDisposed = true; } else { @@ -367,9 +368,9 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle) { throw new InvalidOperationException("ReleasePromiseHolder expected PromiseHolder" + holderGCHandle); } + holder.IsDisposed = true; handle.Free(); } - holder.IsDisposed = true; } } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs index ae1c8d4f1b98a..e76d54280abf8 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs @@ -6,7 +6,6 @@ using System.Threading; using System.Threading.Channels; using System.Runtime.CompilerServices; -using System.Collections.Generic; using WorkItemQueueType = System.Threading.Channels.Channel; namespace System.Runtime.InteropServices.JavaScript @@ -26,6 +25,8 @@ internal sealed class JSSynchronizationContext : SynchronizationContext internal SynchronizationContext? previousSynchronizationContext; internal bool _isDisposed; + internal bool _isCancellationRequested; + private CancellationTokenRegistration _cancellationTokenRegistration; internal readonly struct WorkItem { @@ -41,11 +42,16 @@ public WorkItem(SendOrPostCallback callback, object? data, ManualResetEventSlim? } } - public JSSynchronizationContext(bool isMainThread) + public JSSynchronizationContext(bool isMainThread, CancellationToken cancellationToken) { ProxyContext = new JSProxyContext(isMainThread, this); Queue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true }); _DataIsAvailable = DataIsAvailable; + _cancellationTokenRegistration = cancellationToken.Register(() => + { + _isCancellationRequested=true; + Queue.Writer.TryComplete(); + }); } internal JSSynchronizationContext(JSProxyContext proxyContext, WorkItemQueueType queue, Action dataIsAvailable) @@ -62,7 +68,7 @@ public override SynchronizationContext CreateCopy() internal void AwaitNewData() { - if (_isDisposed) + if (_isDisposed || _isCancellationRequested) { // FIXME: there could be abandoned work, but here we have no way how to propagate the failure // ObjectDisposedException.ThrowIf(_isDisposed, this); @@ -70,6 +76,11 @@ internal void AwaitNewData() } var vt = Queue.Reader.WaitToReadAsync(); + if (_isCancellationRequested) + { + return; + } + if (vt.IsCompleted) { DataIsAvailable(); @@ -138,7 +149,7 @@ private static void BackgroundJobHandler() private void Pump() { - if (_isDisposed) + if (_isDisposed || _isCancellationRequested) { // FIXME: there could be abandoned work, but here we have no way how to propagate the failure // ObjectDisposedException.ThrowIf(_isDisposed, this); @@ -168,7 +179,7 @@ private void Pump() finally { // If an item throws, we want to ensure that the next pump gets scheduled appropriately regardless. - if (!_isDisposed) AwaitNewData(); + if (!_isDisposed && !_isCancellationRequested) AwaitNewData(); } } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs index 4e1108b4afcf9..04fd400ff80a1 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs @@ -54,8 +54,9 @@ private static Task RunAsyncImpl(Func> body, CancellationToken can // continuation should not be running synchronously in the JSWebWorker thread because we are about to kill it after we resolve/reject the Task. var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var capturedContext = SynchronizationContext.Current; - var t = new Thread(() => + var thread = new Thread(() => { + CancellationTokenRegistration? reg = null; try { if (cancellationToken.IsCancellationRequested) @@ -64,7 +65,17 @@ private static Task RunAsyncImpl(Func> body, CancellationToken can return; } - JSHostImplementation.InstallWebWorkerInterop(false); + var synchronizationContext = JSHostImplementation.InstallWebWorkerInterop(false, cancellationToken); + + reg = cancellationToken.Register(() => + { + synchronizationContext.Send(static o => + { + JSHostImplementation.UninstallWebWorkerInterop(); + }, synchronizationContext); + SendWhenException(parentContext, tcs, new OperationCanceledException(cancellationToken)); + }); + var childScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task res = body(); // This code is exiting thread main() before all promises are resolved. @@ -73,16 +84,18 @@ private static Task RunAsyncImpl(Func> body, CancellationToken can { SendWhenDone(parentContext, tcs, res); JSHostImplementation.UninstallWebWorkerInterop(); + reg?.Dispose(); }, childScheduler); } catch (Exception ex) { SendWhenException(parentContext, tcs, ex); + JSHostImplementation.UninstallWebWorkerInterop(); + reg?.Dispose(); } - }); - JSHostImplementation.SetHasExternalEventLoop(t); - t.Start(); + JSHostImplementation.SetHasExternalEventLoop(thread); + thread.Start(); return tcs.Task; } @@ -92,8 +105,9 @@ private static Task RunAsyncImpl(Func body, CancellationToken cancellation // continuation should not be running synchronously in the JSWebWorker thread because we are about to kill it after we resolve/reject the Task. var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var capturedContext = SynchronizationContext.Current; - var t = new Thread(() => + var thread = new Thread(() => { + CancellationTokenRegistration? reg = null; try { if (cancellationToken.IsCancellationRequested) @@ -102,7 +116,17 @@ private static Task RunAsyncImpl(Func body, CancellationToken cancellation return; } - JSHostImplementation.InstallWebWorkerInterop(false); + var synchronizationContext = JSHostImplementation.InstallWebWorkerInterop(false, cancellationToken); + + reg = cancellationToken.Register(() => + { + synchronizationContext.Send(static o => + { + JSHostImplementation.UninstallWebWorkerInterop(); + }, synchronizationContext); + SendWhenException(parentContext, tcs, new OperationCanceledException(cancellationToken)); + }); + var childScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task res = body(); // This code is exiting thread main() before all promises are resolved. @@ -111,16 +135,19 @@ private static Task RunAsyncImpl(Func body, CancellationToken cancellation { SendWhenDone(parentContext, tcs, res); JSHostImplementation.UninstallWebWorkerInterop(); + reg?.Dispose(); }, childScheduler); } catch (Exception ex) { SendWhenException(parentContext, tcs, ex); + reg?.Dispose(); + JSHostImplementation.UninstallWebWorkerInterop(); } }); - JSHostImplementation.SetHasExternalEventLoop(t); - t.Start(); + JSHostImplementation.SetHasExternalEventLoop(thread); + thread.Start(); return tcs.Task; } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj index 3785901bd97f7..004b8b11e1e13 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj @@ -14,6 +14,11 @@ true + + + false + $(WasmXHarnessMonoArgs) --setenv=XHARNESS_LOG_TEST_START=true + diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs index 00e3ee669b662..ed98b2104f6a6 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs @@ -15,28 +15,26 @@ namespace System.Runtime.InteropServices.JavaScript.Tests { // TODO test: - // JSWebWorker.RunAsync with CancellationToken // JSExport 2x // JSExport async - // timer // GC + finalizer + dispose // lock // thread allocation, many threads - // TLS // ProxyContext flow, child thread, child task // use JSObject after JSWebWorker finished, especially HTTP // JSWebWorker JS setTimeout till after close // WS on JSWebWorker // Yield will hit event loop 3x // HTTP continue on TP - // WS continue on TP // event pipe // FS // unit test for problem **7)** public class WebWorkerTest { - #region executor threads + const int TimeoutMilliseconds = 300; + + #region Executors public static IEnumerable GetTargetThreads() { @@ -56,36 +54,83 @@ public static IEnumerable GetTargetThreads2x() type2 => new object[] { new Executor(type1), new Executor(type2) })); } + [Theory, MemberData(nameof(GetTargetThreads))] + public async Task Executor_Cancellation(Executor executor) + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + await Task.Delay(10); + + TaskCompletionSource ready = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var canceledTask = executor.Execute(() => + { + TaskCompletionSource never = new TaskCompletionSource(); + ready.SetResult(); + return never.Task; + }, cts.Token); + + cts.Cancel(); + + switch (executor.Type) + { + case ExecutorType.Main: + case ExecutorType.NewThread: + await Assert.ThrowsAsync(() => canceledTask); + break; + case ExecutorType.ThreadPool: + case ExecutorType.JSWebWorker: + await Assert.ThrowsAsync(() => canceledTask); + break; + + } + } + + [Theory, MemberData(nameof(GetTargetThreads))] + public async Task Executor_Propagates(Executor executor) + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + + var canceledTask = executor.Execute(() => + { + throw new InvalidOperationException("Test"); + }, cts.Token); + + var ex = await Assert.ThrowsAsync(() => canceledTask); + Assert.Equal("Test", ex.Message); + } + #endregion - #region Console, Yield, Delay + #region Console, Yield, Delay, Timer [Theory, MemberData(nameof(GetTargetThreads))] public async Task ManagedConsole(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(() => { Console.WriteLine("C# Hello from ManagedThreadId: " + Environment.CurrentManagedThreadId); return Task.CompletedTask; - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] public async Task JSConsole(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(() => { WebWorkerTestHelper.Log("JS Hello from ManagedThreadId: " + Environment.CurrentManagedThreadId + " NativeThreadId: " + WebWorkerTestHelper.NativeThreadId); return Task.CompletedTask; - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] public async Task NativeThreadId(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { - await executor.StickyAwait(WebWorkerTestHelper.InitializeAsync()); + await executor.StickyAwait(WebWorkerTestHelper.InitializeAsync(), cts.Token); var jsTid = WebWorkerTestHelper.GetTid(); var csTid = WebWorkerTestHelper.NativeThreadId; @@ -99,12 +144,13 @@ await executor.Execute(async () => } await WebWorkerTestHelper.DisposeAsync(); - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] public async Task ThreadingTimer(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { TaskCompletionSource tcs = new TaskCompletionSource(); @@ -118,41 +164,43 @@ await executor.Execute(async () => }, null, 100, Timeout.Infinite); await tcs.Task; - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] public async Task JSDelay_ContinueWith(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { - await executor.StickyAwait(WebWorkerTestHelper.CreateDelay()); + await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); await WebWorkerTestHelper.Delay(1).ContinueWith(_ => { // continue on the context of the target JS interop executor.AssertInteropThread(); }, TaskContinuationOptions.ExecuteSynchronously); - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] public async Task JSDelay_ConfigureAwait_True(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { - await executor.StickyAwait(WebWorkerTestHelper.CreateDelay()); + await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); await WebWorkerTestHelper.Delay(1).ConfigureAwait(true); executor.AssertAwaitCapturedContext(); - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] - [ActiveIssue("https://github.com/dotnet/runtime/issues/96493")] public async Task ManagedDelay_ContinueWith(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { executor.AssertTargetThread(); @@ -161,14 +209,14 @@ await Task.Delay(10).ContinueWith(_ => // continue on the context of the Timer's thread pool thread Assert.True(Thread.CurrentThread.IsThreadPoolThread); }, TaskContinuationOptions.ExecuteSynchronously); - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] - [ActiveIssue("https://github.com/dotnet/runtime/issues/96628")] public async Task ManagedDelay_ConfigureAwait_True(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { executor.AssertTargetThread(); @@ -176,12 +224,13 @@ await executor.Execute(async () => await Task.Delay(1).ConfigureAwait(true); executor.AssertAwaitCapturedContext(); - }); + }, cts.Token); } [Theory, MemberData(nameof(GetTargetThreads))] public async Task ManagedYield(Executor executor) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { executor.AssertTargetThread(); @@ -189,14 +238,14 @@ await executor.Execute(async () => await Task.Yield(); executor.AssertAwaitCapturedContext(); - }); + }, cts.Token); } #endregion - #region affinity + #region Thread Affinity - private async Task ActionsInDifferentThreads(Executor executor1, Executor executor2, Func, Task> e1Job, Func e2Job) + private async Task ActionsInDifferentThreads(Executor executor1, Executor executor2, Func, Task> e1Job, Func e2Job, CancellationTokenSource cts) { TaskCompletionSource readyTCS = new TaskCompletionSource(); TaskCompletionSource doneTCS = new TaskCompletionSource(); @@ -209,7 +258,7 @@ private async Task ActionsInDifferentThreads(Executor executor1, Executor exe readyTCS.SetResult(default); } await doneTCS.Task; - }); + }, cts.Token); var r1 = await readyTCS.Task.ConfigureAwait(true); @@ -221,15 +270,25 @@ private async Task ActionsInDifferentThreads(Executor executor1, Executor exe await e2Job(r1); doneTCS.SetResult(); - }); + }, cts.Token); - await e2; - await e1; + try + { + await e2; + await e1; + } + catch (Exception) + { + cts.Cancel(); + throw; + } } [Theory, MemberData(nameof(GetTargetThreads2x))] public async Task JSObject_CapturesAffinity(Executor executor1, Executor executor2) { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + var e1Job = async (Task e2done, TaskCompletionSource e1State) => { await WebWorkerTestHelper.InitializeAsync(); @@ -253,7 +312,7 @@ public async Task JSObject_CapturesAffinity(Executor executor1, Executor executo Assert.True(valid); }; - await ActionsInDifferentThreads(executor1, executor2, e1Job, e2Job); + await ActionsInDifferentThreads(executor1, executor2, e1Job, e2Job, cts); } #endregion diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs index 63cf9fff69e7f..91ff833a7c5ef 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs @@ -52,30 +52,6 @@ public static Task ImportModuleFromString(string jsModule) return JSHost.ImportAsync("InlineTestHelper", es6DataUrl); } - #region Execute - - public static Task RunOnNewThread(Func job) - { - TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var t = new Thread(() => - { - try - { - var task = job(); - task.Wait(); - tcs.SetResult(); - } - catch (Exception ex) - { - tcs.SetException(ex); - } - }); - t.Start(); - return tcs.Task; - } - - #endregion - #region Setup [ThreadStatic] @@ -175,7 +151,7 @@ public Executor(ExecutorType type) Type = type; } - public Task Execute(Func job) + public Task Execute(Func job, CancellationToken cancellationToken) { Task wrapExecute() { @@ -187,13 +163,13 @@ Task wrapExecute() switch (Type) { case ExecutorType.Main: - return RunOnTargetAsync(MainSynchronizationContext, wrapExecute); + return RunOnTargetAsync(MainSynchronizationContext, wrapExecute, cancellationToken); case ExecutorType.ThreadPool: - return Task.Run(wrapExecute); + return Task.Run(wrapExecute, cancellationToken); case ExecutorType.NewThread: - return WebWorkerTestHelper.RunOnNewThread(wrapExecute); + return RunOnNewThread(wrapExecute, cancellationToken); case ExecutorType.JSWebWorker: - return JSWebWorker.RunAsync(wrapExecute); + return JSWebWorker.RunAsync(wrapExecute, cancellationToken); default: throw new InvalidOperationException(); } @@ -279,11 +255,11 @@ public void AssertInteropThread() public override string ToString() => Type.ToString(); // make sure we stay on the executor - public async Task StickyAwait(Task task) + public async Task StickyAwait(Task task, CancellationToken cancellationToken) { if (Type == ExecutorType.NewThread) { - task.Wait(); + task.Wait(cancellationToken); } else { @@ -292,19 +268,62 @@ public async Task StickyAwait(Task task) AssertTargetThread(); } - public static Task RunOnTargetAsync(SynchronizationContext ctx, Func job) + public static Task RunOnNewThread(Func job, CancellationToken cancellationToken) + { + TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var thread = new Thread(() => + { + CancellationTokenRegistration? reg = null; + try + { + if (cancellationToken.IsCancellationRequested) + { + tcs.TrySetException(new OperationCanceledException(cancellationToken)); + return; + } + reg = cancellationToken.Register(() => + { + tcs.TrySetException(new OperationCanceledException(cancellationToken)); + }); + var task = job(); + task.Wait(cancellationToken); + tcs.TrySetResult(); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + finally + { + reg?.Dispose(); + } + }); + thread.Start(); + return tcs.Task; + } + + public static Task RunOnTargetAsync(SynchronizationContext ctx, Func job, CancellationToken cancellationToken) { TaskCompletionSource tcs = new TaskCompletionSource(); ctx.Post(async _ => { + CancellationTokenRegistration? reg = null; try { + reg = cancellationToken.Register(() => + { + tcs.TrySetException(new OperationCanceledException(cancellationToken)); + }); await job().ConfigureAwait(true); - tcs.SetResult(); + tcs.TrySetResult(); } catch (Exception ex) { - tcs.SetException(ex); + tcs.TrySetException(ex); + } + finally + { + reg?.Dispose(); } }, null); return tcs.Task; From 0985d7b894e2953e67ff4160a6348db2c379d64f Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Tue, 9 Jan 2024 19:18:53 +0100 Subject: [PATCH 2/8] more --- .../JavaScript/WebWorkerTest.cs | 33 ++++++++----------- .../JavaScript/WebWorkerTestHelper.cs | 5 +-- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs index ed98b2104f6a6..602875d8c3321 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs @@ -150,21 +150,24 @@ await executor.Execute(async () => [Theory, MemberData(nameof(GetTargetThreads))] public async Task ThreadingTimer(Executor executor) { + var hit = false; var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { TaskCompletionSource tcs = new TaskCompletionSource(); - executor.AssertTargetThread(); - using var timer = new Threading.Timer(_ => + using var timer = new Timer(_ => { Assert.NotEqual(1, Environment.CurrentManagedThreadId); Assert.True(Thread.CurrentThread.IsThreadPoolThread); tcs.SetResult(); + hit = true; }, null, 100, Timeout.Infinite); await tcs.Task; }, cts.Token); + + Assert.True(hit); } [Theory, MemberData(nameof(GetTargetThreads))] @@ -175,7 +178,7 @@ await executor.Execute(async () => { await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); - await WebWorkerTestHelper.Delay(1).ContinueWith(_ => + await WebWorkerTestHelper.JSDelay(10).ContinueWith(_ => { // continue on the context of the target JS interop executor.AssertInteropThread(); @@ -191,7 +194,7 @@ await executor.Execute(async () => { await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); - await WebWorkerTestHelper.Delay(1).ConfigureAwait(true); + await WebWorkerTestHelper.JSDelay(1).ConfigureAwait(true); executor.AssertAwaitCapturedContext(); }, cts.Token); @@ -200,28 +203,25 @@ await executor.Execute(async () => [Theory, MemberData(nameof(GetTargetThreads))] public async Task ManagedDelay_ContinueWith(Executor executor) { + var hit = false; var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { - executor.AssertTargetThread(); - await Task.Delay(10).ContinueWith(_ => + await Task.Delay(10, cts.Token).ContinueWith(_ => { - // continue on the context of the Timer's thread pool thread - Assert.True(Thread.CurrentThread.IsThreadPoolThread); + hit = true; }, TaskContinuationOptions.ExecuteSynchronously); }, cts.Token); + Assert.True(hit); } - [Theory, MemberData(nameof(GetTargetThreads))] public async Task ManagedDelay_ConfigureAwait_True(Executor executor) { var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { - executor.AssertTargetThread(); - - await Task.Delay(1).ConfigureAwait(true); + await Task.Delay(10, cts.Token).ConfigureAwait(true); executor.AssertAwaitCapturedContext(); }, cts.Token); @@ -233,8 +233,6 @@ public async Task ManagedYield(Executor executor) var cts = new CancellationTokenSource(TimeoutMilliseconds); await executor.Execute(async () => { - executor.AssertTargetThread(); - await Task.Yield(); executor.AssertAwaitCapturedContext(); @@ -247,8 +245,8 @@ await executor.Execute(async () => private async Task ActionsInDifferentThreads(Executor executor1, Executor executor2, Func, Task> e1Job, Func e2Job, CancellationTokenSource cts) { - TaskCompletionSource readyTCS = new TaskCompletionSource(); - TaskCompletionSource doneTCS = new TaskCompletionSource(); + TaskCompletionSource readyTCS = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource doneTCS = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var e1 = executor1.Execute(async () => { @@ -264,9 +262,6 @@ private async Task ActionsInDifferentThreads(Executor executor1, Executor exe var e2 = executor2.Execute(async () => { - - executor2.AssertTargetThread(); - await e2Job(r1); doneTCS.SetResult(); diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs index 91ff833a7c5ef..3ffb2d4198b4a 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs @@ -21,7 +21,7 @@ public partial class WebWorkerTestHelper public static partial void Log(string message); [JSImport("delay", "InlineTestHelper")] - public static partial Task Delay(int ms); + public static partial Task JSDelay(int ms); [JSImport("getTid", "WebWorkerTestHelper")] public static partial int GetTid(); @@ -77,7 +77,7 @@ export function delay(ms) { } else { - await Delay(1).ConfigureAwait(false); + await JSDelay(1).ConfigureAwait(false); } } @@ -157,6 +157,7 @@ Task wrapExecute() { ExecutorTID = Environment.CurrentManagedThreadId; ExecutorSynchronizationContext = SynchronizationContext.Current ?? MainSynchronizationContext; + AssertTargetThread(); return job(); } From 6ce0ad51dd28af72d485d523cb1cc12de1890e83 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Tue, 9 Jan 2024 19:31:38 +0100 Subject: [PATCH 3/8] more --- .../JavaScript/WebWorkerTestHelper.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs index 3ffb2d4198b4a..39e1c5649dd98 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs @@ -211,10 +211,16 @@ public void AssertAwaitCapturedContext() Assert.False(Thread.CurrentThread.IsThreadPoolThread); break; case ExecutorType.NewThread: - // the actual new thread is now blocked in .Wait() and so this is running on TP Assert.NotEqual(1, Environment.CurrentManagedThreadId); - Assert.NotEqual(ExecutorTID, Environment.CurrentManagedThreadId); - Assert.True(Thread.CurrentThread.IsThreadPoolThread); + // sometimes this is TP and some times newThread, why ? + if (Thread.CurrentThread.IsThreadPoolThread) + { + Assert.NotEqual(ExecutorTID, Environment.CurrentManagedThreadId); + } + else + { + Assert.Equal(ExecutorTID, Environment.CurrentManagedThreadId); + } break; case ExecutorType.ThreadPool: // it could migrate to any TP thread From 0309f70f70d8d0edc8bf7e0c96b5a90fdc2c27f0 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 10 Jan 2024 20:26:05 +0100 Subject: [PATCH 4/8] more --- .../JavaScript/Interop/JavaScriptExports.cs | 2 +- .../JavaScript/JSHostImplementation.cs | 35 -- .../JavaScript/JSSynchronizationContext.cs | 142 ++++++-- .../InteropServices/JavaScript/JSWebWorker.cs | 329 ++++++++---------- ...me.InteropServices.JavaScript.Tests.csproj | 2 +- .../JavaScript/WebWorkerTest.cs | 174 ++++++++- .../JavaScript/WebWorkerTestHelper.cs | 34 +- 7 files changed, 458 insertions(+), 260 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs index 3e371b5c99f12..47e999d704786 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/Interop/JavaScriptExports.cs @@ -279,7 +279,7 @@ public static void GetManagedStackTrace(JSMarshalerArgument* arguments_buffer) // void InstallMainSynchronizationContext() public static void InstallMainSynchronizationContext() { - InstallWebWorkerInterop(true, CancellationToken.None); + JSSynchronizationContext.InstallWebWorkerInterop(true, CancellationToken.None); } #endif diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs index 51f4409e5a0f1..361997414e6e7 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHostImplementation.cs @@ -209,41 +209,6 @@ public static void LoadSatelliteAssembly(byte[] dllBytes) } #if FEATURE_WASM_THREADS - public static JSSynchronizationContext InstallWebWorkerInterop(bool isMainThread, CancellationToken cancellationToken) - { - var ctx = new JSSynchronizationContext(isMainThread, cancellationToken); - ctx.previousSynchronizationContext = SynchronizationContext.Current; - SynchronizationContext.SetSynchronizationContext(ctx); - - var proxyContext = ctx.ProxyContext; - JSProxyContext.CurrentThreadContext = proxyContext; - JSProxyContext.ExecutionContext = proxyContext; - if (isMainThread) - { - JSProxyContext.MainThreadContext = proxyContext; - } - - ctx.AwaitNewData(); - - Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle); - - return ctx; - } - - public static void UninstallWebWorkerInterop() - { - var ctx = JSProxyContext.CurrentThreadContext; - if (ctx == null) throw new InvalidOperationException(); - var syncContext = ctx.SynchronizationContext; - if (SynchronizationContext.Current == syncContext) - { - SynchronizationContext.SetSynchronizationContext(syncContext.previousSynchronizationContext); - } - JSProxyContext.CurrentThreadContext = null; - JSProxyContext.ExecutionContext = null; - ctx.Dispose(); - } - [UnsafeAccessor(UnsafeAccessorKind.Field, Name = "external_eventloop")] private static extern ref bool GetThreadExternalEventloop(Thread @this); diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs index e76d54280abf8..e78923d2839f4 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs @@ -20,15 +20,16 @@ namespace System.Runtime.InteropServices.JavaScript internal sealed class JSSynchronizationContext : SynchronizationContext { internal readonly JSProxyContext ProxyContext; - private readonly Action _DataIsAvailable;// don't allocate Action on each call to UnsafeOnCompleted + private readonly Action _ScheduleJSPump;// don't allocate Action on each call to UnsafeOnCompleted private readonly WorkItemQueueType Queue; internal SynchronizationContext? previousSynchronizationContext; internal bool _isDisposed; internal bool _isCancellationRequested; + internal bool _isRunning; private CancellationTokenRegistration _cancellationTokenRegistration; - internal readonly struct WorkItem + internal struct WorkItem { public readonly SendOrPostCallback Callback; public readonly object? Data; @@ -42,15 +43,77 @@ public WorkItem(SendOrPostCallback callback, object? data, ManualResetEventSlim? } } + // this need to be called from JSWebWorker or UI thread + public static JSSynchronizationContext InstallWebWorkerInterop(bool isMainThread, CancellationToken cancellationToken) + { + var ctx = new JSSynchronizationContext(isMainThread, cancellationToken); + ctx.previousSynchronizationContext = SynchronizationContext.Current; + SynchronizationContext.SetSynchronizationContext(ctx); + + var proxyContext = ctx.ProxyContext; + JSProxyContext.CurrentThreadContext = proxyContext; + JSProxyContext.ExecutionContext = proxyContext; + if (isMainThread) + { + JSProxyContext.MainThreadContext = proxyContext; + } + + ctx.AwaitNewData(); + + Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle); + + return ctx; + } + + // this need to be called from JSWebWorker thread + internal void UninstallWebWorkerInterop() + { + if (_isDisposed) + { + return; + } + if (!_isRunning) + { + return; + } + + var jsProxyContext = JSProxyContext.AssertIsInteropThread(); + if (jsProxyContext != ProxyContext) + { + Environment.FailFast($"UninstallWebWorkerInterop failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}"); + } + if (SynchronizationContext.Current == this) + { + SynchronizationContext.SetSynchronizationContext(this.previousSynchronizationContext); + } + + // this will runtimeKeepalivePop() + // and later maybeExit() -> __emscripten_thread_exit() + jsProxyContext.Dispose(); + + JSProxyContext.CurrentThreadContext = null; + JSProxyContext.ExecutionContext = null; + _isRunning = false; + Dispose(); + } + public JSSynchronizationContext(bool isMainThread, CancellationToken cancellationToken) { ProxyContext = new JSProxyContext(isMainThread, this); Queue = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true }); - _DataIsAvailable = DataIsAvailable; + _ScheduleJSPump = ScheduleJSPump; + + // receive callback (on any thread) that cancelation is requested _cancellationTokenRegistration = cancellationToken.Register(() => { - _isCancellationRequested=true; + + _isCancellationRequested = true; Queue.Writer.TryComplete(); + + while (Queue.Reader.TryRead(out var item)) + { + item.Signal?.Set(); + } }); } @@ -58,32 +121,38 @@ internal JSSynchronizationContext(JSProxyContext proxyContext, WorkItemQueueType { ProxyContext = proxyContext; Queue = queue; - _DataIsAvailable = dataIsAvailable; + _ScheduleJSPump = dataIsAvailable; } public override SynchronizationContext CreateCopy() { - return new JSSynchronizationContext(ProxyContext, Queue, _DataIsAvailable); + return new JSSynchronizationContext(ProxyContext, Queue, _ScheduleJSPump); } + // this must be called from the worker thread internal void AwaitNewData() { - if (_isDisposed || _isCancellationRequested) + if (_isDisposed) + { + return; + } + if (_isCancellationRequested) { - // FIXME: there could be abandoned work, but here we have no way how to propagate the failure - // ObjectDisposedException.ThrowIf(_isDisposed, this); + UninstallWebWorkerInterop(); return; } + _isRunning = true; var vt = Queue.Reader.WaitToReadAsync(); if (_isCancellationRequested) { + UninstallWebWorkerInterop(); return; } if (vt.IsCompleted) { - DataIsAvailable(); + ScheduleJSPump(); return; } @@ -92,10 +161,10 @@ internal void AwaitNewData() // fire a callback that will schedule a background job to pump the queue on the main thread. var awaiter = vt.AsTask().ConfigureAwait(false).GetAwaiter(); // UnsafeOnCompleted avoids spending time flowing the execution context (we don't need it.) - awaiter.UnsafeOnCompleted(_DataIsAvailable); + awaiter.UnsafeOnCompleted(_ScheduleJSPump); } - private unsafe void DataIsAvailable() + private unsafe void ScheduleJSPump() { // While we COULD pump here, we don't want to. We want the pump to happen on the next event loop turn. // Otherwise we could get a chain where a pump generates a new work item and that makes us pump again, forever. @@ -105,10 +174,23 @@ private unsafe void DataIsAvailable() public override void Post(SendOrPostCallback d, object? state) { ObjectDisposedException.ThrowIf(_isDisposed, this); + if (_isCancellationRequested) + { + // propagate the cancellation to the caller + throw new OperationCanceledException(_cancellationTokenRegistration.Token); + } var workItem = new WorkItem(d, state, null); if (!Queue.Writer.TryWrite(workItem)) + { + if (_isCancellationRequested) + { + // propagate the cancellation to the caller + throw new OperationCanceledException(_cancellationTokenRegistration.Token); + } + ObjectDisposedException.ThrowIf(_isDisposed, this); Environment.FailFast($"JSSynchronizationContext.Post failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}"); + } } // This path can only run when threading is enabled @@ -128,9 +210,24 @@ public override void Send(SendOrPostCallback d, object? state) { var workItem = new WorkItem(d, state, signal); if (!Queue.Writer.TryWrite(workItem)) + { + if (_isCancellationRequested) + { + // propagate the cancellation to the caller + throw new OperationCanceledException(_cancellationTokenRegistration.Token); + } + ObjectDisposedException.ThrowIf(_isDisposed, this); + Environment.FailFast($"JSSynchronizationContext.Send failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}"); + } signal.Wait(); + + if (_isCancellationRequested) + { + // propagate the cancellation to the caller + throw new OperationCanceledException(_cancellationTokenRegistration.Token); + } } } @@ -151,8 +248,7 @@ private void Pump() { if (_isDisposed || _isCancellationRequested) { - // FIXME: there could be abandoned work, but here we have no way how to propagate the failure - // ObjectDisposedException.ThrowIf(_isDisposed, this); + UninstallWebWorkerInterop(); return; } try @@ -170,17 +266,20 @@ private void Pump() { item.Signal?.Set(); } + if (_isDisposed || _isCancellationRequested) + { + UninstallWebWorkerInterop(); + return; + } } + // if anything throws unhandled exception, we will abort the program + // otherwise, we could schedule another round + AwaitNewData(); } catch (Exception e) { Environment.FailFast($"JSSynchronizationContext.BackgroundJobHandler failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {e.StackTrace}"); } - finally - { - // If an item throws, we want to ensure that the next pump gets scheduled appropriately regardless. - if (!_isDisposed && !_isCancellationRequested) AwaitNewData(); - } } private void Dispose(bool disposing) @@ -189,10 +288,11 @@ private void Dispose(bool disposing) { if (disposing) { - Queue.Writer.Complete(); + Queue.Writer.TryComplete(); } - previousSynchronizationContext = null; _isDisposed = true; + _cancellationTokenRegistration.Dispose(); + previousSynchronizationContext = null; } } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs index 04fd400ff80a1..114b7992dfa46 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs @@ -30,247 +30,204 @@ public static Task RunAsync(Func body) return RunAsync(body, CancellationToken.None); } - public static async Task RunAsync(Func> body, CancellationToken cancellationToken) + public static Task RunAsync(Func> body, CancellationToken cancellationToken) { - if (JSProxyContext.MainThreadContext.IsCurrentThread()) - { - await JavaScriptImports.ThreadAvailable().ConfigureAwait(false); - } - return await RunAsyncImpl(body, cancellationToken).ConfigureAwait(false); + var instance = new JSWebWorkerInstance(body, null, cancellationToken); + return instance.Start(); } - public static async Task RunAsync(Func body, CancellationToken cancellationToken) + public static Task RunAsync(Func body, CancellationToken cancellationToken) { - if (JSProxyContext.MainThreadContext.IsCurrentThread()) - { - await JavaScriptImports.ThreadAvailable().ConfigureAwait(false); - } - await RunAsyncImpl(body, cancellationToken).ConfigureAwait(false); + var instance = new JSWebWorkerInstance(null, body, cancellationToken); + return instance.Start(); } - private static Task RunAsyncImpl(Func> body, CancellationToken cancellationToken) + internal sealed class JSWebWorkerInstance : IDisposable { - var parentContext = SynchronizationContext.Current ?? new SynchronizationContext(); - // continuation should not be running synchronously in the JSWebWorker thread because we are about to kill it after we resolve/reject the Task. - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var capturedContext = SynchronizationContext.Current; - var thread = new Thread(() => - { - CancellationTokenRegistration? reg = null; - try + private JSSynchronizationContext? _jsSynchronizationContext; + private TaskCompletionSource _taskCompletionSource; + private Thread _thread; + private CancellationToken _cancellationToken; + private CancellationTokenRegistration? _cancellationRegistration; + private Func>? _bodyRes; + private Func? _bodyVoid; + private Task? _result; + private bool _isDisposed; + + public JSWebWorkerInstance(Func>? bodyRes, Func? bodyVoid, CancellationToken cancellationToken) + { + _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _thread = new Thread(ThreadMain); + _result = null; + _cancellationToken = cancellationToken; + _cancellationRegistration = null; + _bodyRes = bodyRes; + _bodyVoid = bodyVoid; + JSHostImplementation.SetHasExternalEventLoop(_thread); + } + + public Task Start() + { + if (JSProxyContext.MainThreadContext.IsCurrentThread()) { - if (cancellationToken.IsCancellationRequested) - { - PostWhenCancellation(parentContext, tcs); - return; - } - - var synchronizationContext = JSHostImplementation.InstallWebWorkerInterop(false, cancellationToken); - - reg = cancellationToken.Register(() => - { - synchronizationContext.Send(static o => - { - JSHostImplementation.UninstallWebWorkerInterop(); - }, synchronizationContext); - SendWhenException(parentContext, tcs, new OperationCanceledException(cancellationToken)); - }); - - var childScheduler = TaskScheduler.FromCurrentSynchronizationContext(); - Task res = body(); - // This code is exiting thread main() before all promises are resolved. - // the continuation is executed by setTimeout() callback of the thread. - res.ContinueWith(t => + // give browser chance to load more threads + JavaScriptImports.ThreadAvailable().ContinueWith(_ => { - SendWhenDone(parentContext, tcs, res); - JSHostImplementation.UninstallWebWorkerInterop(); - reg?.Dispose(); - }, childScheduler); + _thread.Start(); + }, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Current); } - catch (Exception ex) + else { - SendWhenException(parentContext, tcs, ex); - JSHostImplementation.UninstallWebWorkerInterop(); - reg?.Dispose(); + _thread.Start(); } - }); - JSHostImplementation.SetHasExternalEventLoop(thread); - thread.Start(); - return tcs.Task; - } + return _taskCompletionSource.Task; + } - private static Task RunAsyncImpl(Func body, CancellationToken cancellationToken) - { - var parentContext = SynchronizationContext.Current ?? new SynchronizationContext(); - // continuation should not be running synchronously in the JSWebWorker thread because we are about to kill it after we resolve/reject the Task. - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var capturedContext = SynchronizationContext.Current; - var thread = new Thread(() => + private void ThreadMain() { - CancellationTokenRegistration? reg = null; try { - if (cancellationToken.IsCancellationRequested) + if (_cancellationToken.IsCancellationRequested) { - PostWhenCancellation(parentContext, tcs); + PropagateCompletionAndDispose(Task.FromException(new OperationCanceledException(_cancellationToken))); return; } - var synchronizationContext = JSHostImplementation.InstallWebWorkerInterop(false, cancellationToken); - - reg = cancellationToken.Register(() => + // receive callback when the cancellation is requested + _cancellationRegistration = _cancellationToken.Register(() => { - synchronizationContext.Send(static o => - { - JSHostImplementation.UninstallWebWorkerInterop(); - }, synchronizationContext); - SendWhenException(parentContext, tcs, new OperationCanceledException(cancellationToken)); + // this could be executing on any thread + PropagateCompletionAndDispose(Task.FromException(new OperationCanceledException(_cancellationToken))); }); + // JSSynchronizationContext also registers to _cancellationToken + _jsSynchronizationContext = JSSynchronizationContext.InstallWebWorkerInterop(false, _cancellationToken); + var childScheduler = TaskScheduler.FromCurrentSynchronizationContext(); - Task res = body(); - // This code is exiting thread main() before all promises are resolved. - // the continuation is executed by setTimeout() callback of the thread. - res.ContinueWith(t => + if (_bodyRes != null) + { + _result = _bodyRes(); + } + else { - SendWhenDone(parentContext, tcs, res); - JSHostImplementation.UninstallWebWorkerInterop(); - reg?.Dispose(); - }, childScheduler); + _result = _bodyVoid!(); + } + // This code is exiting thread ThreadMain() before all promises are resolved. + // the continuation is executed by setTimeout() callback of the WebWorker thread. + _result.ContinueWith(PropagateCompletionAndDispose, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, childScheduler); } catch (Exception ex) { - SendWhenException(parentContext, tcs, ex); - reg?.Dispose(); - JSHostImplementation.UninstallWebWorkerInterop(); + PropagateCompletionAndDispose(Task.FromException(ex)); } - - }); - JSHostImplementation.SetHasExternalEventLoop(thread); - thread.Start(); - return tcs.Task; - } - - #region posting result to the original thread when handling exception - - private static void PostWhenCancellation(SynchronizationContext ctx, TaskCompletionSource tcs) - { - try - { - ctx.Send((_) => tcs.SetCanceled(), null); } - catch (Exception e) - { - Environment.FailFast("JSWebWorker.RunAsync failed", e); - } - } - private static void PostWhenCancellation(SynchronizationContext ctx, TaskCompletionSource tcs) - { - try - { - ctx.Send((_) => tcs.SetCanceled(), null); - } - catch (Exception e) + // run actions on correct thread + private void PropagateCompletionAndDispose(Task result) { - Environment.FailFast("JSWebWorker.RunAsync failed", e); - } - } + _result = result; - private static void SendWhenDone(SynchronizationContext ctx, TaskCompletionSource tcs, Task done) - { - try - { - ctx.Send((_) => + _cancellationRegistration?.Dispose(); + _cancellationRegistration = null; + + if (Thread.CurrentThread == _thread) { - PropagateCompletion(tcs, done); - }, null); - } - catch (Exception e) - { - Environment.FailFast("JSWebWorker.RunAsync failed", e); - } - } + // if cancelation was requested, the JSSynchronizationContext will stop the thread + if (_jsSynchronizationContext != null && !_cancellationToken.IsCancellationRequested) + { + // this will lead to __emscripten_thread_exit() later + _jsSynchronizationContext.UninstallWebWorkerInterop(); + } + _jsSynchronizationContext = null; - private static void SendWhenException(SynchronizationContext ctx, TaskCompletionSource tcs, Exception ex) - { - try - { - ctx.Send((_) => tcs.SetException(ex), null); - } - catch (Exception e) - { - Environment.FailFast("JSWebWorker.RunAsync failed", e); - } - } + // propagate the result on thread pool, rather than the JSWebWorker + Task.Run(PropagateCompletion); + } + else + { + // if cancelation was requested, the JSSynchronizationContext will stop the thread + if (_jsSynchronizationContext != null && !_cancellationToken.IsCancellationRequested) + { + // we can only uninstall JS interop on it's own thread + _jsSynchronizationContext.Post(static o => + { + var jsSynchronizationContext = (JSSynchronizationContext)o!; + // this will lead to __emscripten_thread_exit() later + jsSynchronizationContext?.UninstallWebWorkerInterop(); + }, _jsSynchronizationContext); + } + _jsSynchronizationContext = null; - private static void SendWhenException(SynchronizationContext ctx, TaskCompletionSource tcs, Exception ex) - { - try - { - ctx.Send((_) => tcs.SetException(ex), null); - } - catch (Exception e) - { - Environment.FailFast("JSWebWorker.RunAsync failed", e); - } - } + PropagateCompletion(); + } - private static void SendWhenDone(SynchronizationContext ctx, TaskCompletionSource tcs, Task done) - { - try - { - ctx.Send((_) => - { - PropagateCompletion(tcs, done); - }, null); - } - catch (Exception e) - { - Environment.FailFast("JSWebWorker.RunAsync failed", e); + Dispose(); } - } - internal static void PropagateCompletion(TaskCompletionSource tcs, Task done) - { - if (done.IsFaulted) + private void PropagateCompletion() { - if (done.Exception is AggregateException ag && ag.InnerException != null) + if (_result!.IsFaulted) { - tcs.SetException(ag.InnerException); + if (_result.Exception is AggregateException ag && ag.InnerException != null) + { + _taskCompletionSource.TrySetException(ag.InnerException); + } + else + { + _taskCompletionSource.TrySetException(_result.Exception); + } + } + else if (_result.IsCanceled) + { + _taskCompletionSource.TrySetCanceled(); } else { - tcs.SetException(done.Exception); + if (_bodyRes != null) + { + _taskCompletionSource.TrySetResult(((Task)_result).Result); + } + else + { + _taskCompletionSource.TrySetResult(default!); + } } } - else if (done.IsCanceled) - tcs.SetCanceled(); - else - tcs.SetResult(done.Result); - } - internal static void PropagateCompletion(TaskCompletionSource tcs, Task done) - { - if (done.IsFaulted) + private void Dispose(bool disposing) { - if (done.Exception is AggregateException ag && ag.InnerException != null) + lock (this) { - tcs.SetException(ag.InnerException); + if (_isDisposed) + { + return; + } + _isDisposed = true; } - else + + if (disposing) + { + _cancellationRegistration?.Dispose(); + _cancellationRegistration = null; + } + + if (_jsSynchronizationContext != null) { - tcs.SetException(done.Exception); + Environment.FailFast("expected the thread to be disposed"); } } - else if (done.IsCanceled) - tcs.SetCanceled(); - else - tcs.SetResult(); - } - #endregion + ~JSWebWorkerInstance() + { + Dispose(disposing: false); + } + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } } } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj index 004b8b11e1e13..3e6c61c71a141 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System.Runtime.InteropServices.JavaScript.Tests.csproj @@ -16,7 +16,7 @@ - false + false $(WasmXHarnessMonoArgs) --setenv=XHARNESS_LOG_TEST_START=true diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs index 602875d8c3321..2bbb1a0b77b69 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs @@ -58,7 +58,6 @@ public static IEnumerable GetTargetThreads2x() public async Task Executor_Cancellation(Executor executor) { var cts = new CancellationTokenSource(TimeoutMilliseconds); - await Task.Delay(10); TaskCompletionSource ready = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var canceledTask = executor.Execute(() => @@ -68,33 +67,178 @@ public async Task Executor_Cancellation(Executor executor) return never.Task; }, cts.Token); + await ready.Task; + cts.Cancel(); - switch (executor.Type) + await Assert.ThrowsAsync(() => canceledTask); + } + + [Theory, MemberData(nameof(GetTargetThreads))] + public async Task JSDelay_Cancellation(Executor executor) + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + TaskCompletionSource ready = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var canceledTask = executor.Execute(async () => { - case ExecutorType.Main: - case ExecutorType.NewThread: - await Assert.ThrowsAsync(() => canceledTask); - break; - case ExecutorType.ThreadPool: - case ExecutorType.JSWebWorker: - await Assert.ThrowsAsync(() => canceledTask); - break; + await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); - } + var never = WebWorkerTestHelper.JSDelay(TimeoutMilliseconds * 1000); + ready.SetResult(); + await never; + }, cts.Token); + + await ready.Task; + + cts.Cancel(); + + await Assert.ThrowsAsync(() => canceledTask); } + [Fact] + public async Task JSSynchronizationContext_Send_Post_Items_Cancellation() + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + + ManualResetEventSlim blocker=new ManualResetEventSlim(false); + TaskCompletionSource never = new TaskCompletionSource(); + SynchronizationContext capturedSynchronizationContext = null; + TaskCompletionSource jswReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource sendReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource postReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var canceledTask = JSWebWorker.RunAsync(() => + { + capturedSynchronizationContext = SynchronizationContext.Current; + jswReady.SetResult(); + + // blocking the worker, so that JSSynchronizationContext could enqueue next tasks + blocker.Wait(); + + return never.Task; + }, cts.Token); + + await jswReady.Task; + Assert.Equal("System.Runtime.InteropServices.JavaScript.JSSynchronizationContext", capturedSynchronizationContext!.GetType().FullName); + + var shouldNotHitSend = false; + var shouldNotHitPost = false; + var hitAfterPost = false; + + var canceledSend = Task.Run(() => + { + // this will be blocked until blocker.Set() + sendReady.SetResult(); + capturedSynchronizationContext.Send(_ => + { + // then it should get canceled and not executed + shouldNotHitSend = true; + }, null); + return Task.CompletedTask; + }); + + var canceledPost = Task.Run(() => + { + postReady.SetResult(); + capturedSynchronizationContext.Post(_ => + { + // then it should get canceled and not executed + shouldNotHitPost = true; + }, null); + hitAfterPost = true; + return Task.CompletedTask; + }); + + // make sure that jobs got the chance to enqueue + await sendReady.Task; + await postReady.Task; + await Task.Delay(100); + + // this could should be delivered immediately + cts.Cancel(); + + // this will unblock the current pending work item + blocker.Set(); + + await Assert.ThrowsAsync(() => canceledSend); + await canceledPost; // this shouldn't throw + + Assert.False(shouldNotHitSend); + Assert.False(shouldNotHitPost); + Assert.True(hitAfterPost); + } + + [Fact] + public async Task JSSynchronizationContext_Send_Post_To_Canceled() + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + + TaskCompletionSource never = new TaskCompletionSource(); + SynchronizationContext capturedSynchronizationContext = null; + TaskCompletionSource jswReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + JSObject capturedGlobalThis=null; + + var canceledTask = JSWebWorker.RunAsync(() => + { + capturedSynchronizationContext = SynchronizationContext.Current; + capturedGlobalThis = JSHost.GlobalThis; + jswReady.SetResult(); + return never.Task; + }, cts.Token); + + await jswReady.Task; + Assert.Equal("System.Runtime.InteropServices.JavaScript.JSSynchronizationContext", capturedSynchronizationContext!.GetType().FullName); + + cts.Cancel(); + + // give it chance to dispose the thread + await Task.Delay(100); + + Assert.True(capturedGlobalThis.IsDisposed); + + var shouldNotHitSend = false; + var shouldNotHitPost = false; + + Assert.Throws(() => + { + capturedGlobalThis.HasProperty("document"); + }); + + Assert.Throws(() => + { + capturedSynchronizationContext.Send(_ => + { + // then it should get canceled and not executed + shouldNotHitSend = true; + }, null); + }); + + Assert.Throws(() => + { + capturedSynchronizationContext.Post(_ => + { + // then it should get canceled and not executed + shouldNotHitPost = true; + }, null); + }); + + Assert.False(shouldNotHitSend); + Assert.False(shouldNotHitPost); + } + + [Theory, MemberData(nameof(GetTargetThreads))] public async Task Executor_Propagates(Executor executor) { var cts = new CancellationTokenSource(TimeoutMilliseconds); - - var canceledTask = executor.Execute(() => + bool hit = false; + var failedTask = executor.Execute(() => { + hit = true; throw new InvalidOperationException("Test"); }, cts.Token); - var ex = await Assert.ThrowsAsync(() => canceledTask); + var ex = await Assert.ThrowsAsync(() => failedTask); + Assert.True(hit); Assert.Equal("Test", ex.Message); } @@ -264,12 +408,12 @@ private async Task ActionsInDifferentThreads(Executor executor1, Executor exe { await e2Job(r1); - doneTCS.SetResult(); }, cts.Token); try { await e2; + doneTCS.SetResult(); await e1; } catch (Exception) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs index 39e1c5649dd98..91fa40695e1be 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTestHelper.cs @@ -166,7 +166,7 @@ Task wrapExecute() case ExecutorType.Main: return RunOnTargetAsync(MainSynchronizationContext, wrapExecute, cancellationToken); case ExecutorType.ThreadPool: - return Task.Run(wrapExecute, cancellationToken); + return RunOnThreadPool(wrapExecute, cancellationToken); case ExecutorType.NewThread: return RunOnNewThread(wrapExecute, cancellationToken); case ExecutorType.JSWebWorker: @@ -275,6 +275,38 @@ public async Task StickyAwait(Task task, CancellationToken cancellationToken) AssertTargetThread(); } + public static Task RunOnThreadPool(Func job, CancellationToken cancellationToken) + { + TaskCompletionSource done = new TaskCompletionSource(); + var reg = cancellationToken.Register(() => + { + done.TrySetException(new OperationCanceledException(cancellationToken)); + }); + Task.Run(job, cancellationToken).ContinueWith(result => + { + if (result.IsFaulted) + { + if (result.Exception is AggregateException ag && ag.InnerException != null) + { + done.TrySetException(ag.InnerException); + } + else + { + done.TrySetException(result.Exception); + } + } + else if (result.IsCanceled) + { + done.TrySetCanceled(cancellationToken); + } + else + { + done.TrySetResult(); + } + }, TaskContinuationOptions.ExecuteSynchronously); + return done.Task; + } + public static Task RunOnNewThread(Func job, CancellationToken cancellationToken) { TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); From 226ecb8603387a6c094988c6bc7afaa66092d559 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 10 Jan 2024 21:06:52 +0100 Subject: [PATCH 5/8] more --- .../InteropServices/JavaScript/JSWebWorker.cs | 3 +- .../JavaScript/WebWorkerTest.cs | 60 +++++++++++++++++-- 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs index 114b7992dfa46..57feeb95a47ee 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs @@ -213,7 +213,8 @@ private void Dispose(bool disposing) if (_jsSynchronizationContext != null) { - Environment.FailFast("expected the thread to be disposed"); + // this should not happen + Environment.FailFast($"JSWebWorker was disposed while running, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}"); } } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs index 2bbb1a0b77b69..31352864753af 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/tests/System.Runtime.InteropServices.JavaScript.UnitTests/System/Runtime/InteropServices/JavaScript/WebWorkerTest.cs @@ -17,18 +17,16 @@ namespace System.Runtime.InteropServices.JavaScript.Tests // TODO test: // JSExport 2x // JSExport async - // GC + finalizer + dispose // lock // thread allocation, many threads // ProxyContext flow, child thread, child task // use JSObject after JSWebWorker finished, especially HTTP - // JSWebWorker JS setTimeout till after close // WS on JSWebWorker - // Yield will hit event loop 3x // HTTP continue on TP // event pipe // FS - // unit test for problem **7)** + // JS setTimeout till after JSWebWorker close + // synchronous .Wait for JS setTimeout on the same thread -> deadlock problem **7)** public class WebWorkerTest { @@ -83,7 +81,7 @@ public async Task JSDelay_Cancellation(Executor executor) { await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); - var never = WebWorkerTestHelper.JSDelay(TimeoutMilliseconds * 1000); + var never = WebWorkerTestHelper.JSDelay(int.MaxValue); ready.SetResult(); await never; }, cts.Token); @@ -225,6 +223,56 @@ public async Task JSSynchronizationContext_Send_Post_To_Canceled() Assert.False(shouldNotHitPost); } + [Fact] + public async Task JSWebWorker_Abandon_Running() + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + + TaskCompletionSource never = new TaskCompletionSource(); + TaskCompletionSource ready = new TaskCompletionSource(); + +#pragma warning disable CS4014 + // intentionally not awaiting it + JSWebWorker.RunAsync(() => + { + ready.SetResult(); + return never.Task; + }, cts.Token); +#pragma warning restore CS4014 + + await ready.Task; + + // it should not get collected + GC.Collect(); + + // it should not prevent mono and the test suite from exiting + } + + [Fact] + public async Task JSWebWorker_Abandon_Running_JS() + { + var cts = new CancellationTokenSource(TimeoutMilliseconds); + + TaskCompletionSource ready = new TaskCompletionSource(); + +#pragma warning disable CS4014 + // intentionally not awaiting it + JSWebWorker.RunAsync(async () => + { + await WebWorkerTestHelper.CreateDelay(); + var never = WebWorkerTestHelper.JSDelay(int.MaxValue); + ready.SetResult(); + await never; + }, cts.Token); +#pragma warning restore CS4014 + + await ready.Task; + + // it should not get collected + GC.Collect(); + + // it should not prevent mono and the test suite from exiting + } [Theory, MemberData(nameof(GetTargetThreads))] public async Task Executor_Propagates(Executor executor) @@ -338,7 +386,7 @@ await executor.Execute(async () => { await executor.StickyAwait(WebWorkerTestHelper.CreateDelay(), cts.Token); - await WebWorkerTestHelper.JSDelay(1).ConfigureAwait(true); + await WebWorkerTestHelper.JSDelay(10).ConfigureAwait(true); executor.AssertAwaitCapturedContext(); }, cts.Token); From 15a32954b874e0e760ccec915c18c125663a347f Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 10 Jan 2024 21:21:07 +0100 Subject: [PATCH 6/8] feedback --- .../InteropServices/JavaScript/JSSynchronizationContext.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs index e78923d2839f4..193899dc6792d 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs @@ -112,6 +112,7 @@ public JSSynchronizationContext(bool isMainThread, CancellationToken cancellatio while (Queue.Reader.TryRead(out var item)) { + // the Post is checking _isCancellationRequested after .Wait() item.Signal?.Set(); } }); @@ -126,7 +127,7 @@ internal JSSynchronizationContext(JSProxyContext proxyContext, WorkItemQueueType public override SynchronizationContext CreateCopy() { - return new JSSynchronizationContext(ProxyContext, Queue, _ScheduleJSPump); + return this; } // this must be called from the worker thread From 7fed5b3a8af68785053e7adb2ac3433c97c3fc33 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Wed, 10 Jan 2024 21:22:52 +0100 Subject: [PATCH 7/8] deadcode --- .../InteropServices/JavaScript/JSSynchronizationContext.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs index 193899dc6792d..f7fbb7fb5a79c 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs @@ -118,13 +118,6 @@ public JSSynchronizationContext(bool isMainThread, CancellationToken cancellatio }); } - internal JSSynchronizationContext(JSProxyContext proxyContext, WorkItemQueueType queue, Action dataIsAvailable) - { - ProxyContext = proxyContext; - Queue = queue; - _ScheduleJSPump = dataIsAvailable; - } - public override SynchronizationContext CreateCopy() { return this; From df5794670e9fb5235660f54987842f48012bb649 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Thu, 11 Jan 2024 12:21:42 +0100 Subject: [PATCH 8/8] feedback --- .../InteropServices/JavaScript/JSWebWorker.cs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs index 57feeb95a47ee..cb9016471923e 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSWebWorker.cs @@ -51,14 +51,14 @@ internal sealed class JSWebWorkerInstance : IDisposable private CancellationTokenRegistration? _cancellationRegistration; private Func>? _bodyRes; private Func? _bodyVoid; - private Task? _result; + private Task? _resultTask; private bool _isDisposed; public JSWebWorkerInstance(Func>? bodyRes, Func? bodyVoid, CancellationToken cancellationToken) { _taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _thread = new Thread(ThreadMain); - _result = null; + _resultTask = null; _cancellationToken = cancellationToken; _cancellationRegistration = null; _bodyRes = bodyRes; @@ -71,10 +71,16 @@ public Task Start() if (JSProxyContext.MainThreadContext.IsCurrentThread()) { // give browser chance to load more threads - JavaScriptImports.ThreadAvailable().ContinueWith(_ => + // until there at least one thread loaded, it doesn't make sense to `Start` + // because that would also hang, but in a way blocking the UI thread, much worse. + JavaScriptImports.ThreadAvailable().ContinueWith(t => { - _thread.Start(); - }, CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Current); + if (t.IsCompletedSuccessfully) + { + _thread.Start(); + } + return t; + }, _cancellationToken, TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Current); } else { @@ -106,15 +112,15 @@ private void ThreadMain() var childScheduler = TaskScheduler.FromCurrentSynchronizationContext(); if (_bodyRes != null) { - _result = _bodyRes(); + _resultTask = _bodyRes(); } else { - _result = _bodyVoid!(); + _resultTask = _bodyVoid!(); } // This code is exiting thread ThreadMain() before all promises are resolved. // the continuation is executed by setTimeout() callback of the WebWorker thread. - _result.ContinueWith(PropagateCompletionAndDispose, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, childScheduler); + _resultTask.ContinueWith(PropagateCompletionAndDispose, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, childScheduler); } catch (Exception ex) { @@ -125,7 +131,7 @@ private void ThreadMain() // run actions on correct thread private void PropagateCompletionAndDispose(Task result) { - _result = result; + _resultTask = result; _cancellationRegistration?.Dispose(); _cancellationRegistration = null; @@ -166,18 +172,18 @@ private void PropagateCompletionAndDispose(Task result) private void PropagateCompletion() { - if (_result!.IsFaulted) + if (_resultTask!.IsFaulted) { - if (_result.Exception is AggregateException ag && ag.InnerException != null) + if (_resultTask.Exception is AggregateException ag && ag.InnerException != null) { _taskCompletionSource.TrySetException(ag.InnerException); } else { - _taskCompletionSource.TrySetException(_result.Exception); + _taskCompletionSource.TrySetException(_resultTask.Exception); } } - else if (_result.IsCanceled) + else if (_resultTask.IsCanceled) { _taskCompletionSource.TrySetCanceled(); } @@ -185,7 +191,7 @@ private void PropagateCompletion() { if (_bodyRes != null) { - _taskCompletionSource.TrySetResult(((Task)_result).Result); + _taskCompletionSource.TrySetResult(((Task)_resultTask).Result); } else {