Skip to content

Commit

Permalink
[browser][MT] - CancellationToken for JSWebWorker (dotnet#96696)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelsavara authored and tmds committed Jan 23, 2024
1 parent 2a93b35 commit e5a6844
Show file tree
Hide file tree
Showing 8 changed files with 687 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public static void GetManagedStackTrace(JSMarshalerArgument* arguments_buffer)
// void InstallMainSynchronizationContext()
public static void InstallMainSynchronizationContext()
{
InstallWebWorkerInterop(true);
JSSynchronizationContext.InstallWebWorkerInterop(true, CancellationToken.None);
}

#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,39 +209,6 @@ public static void LoadSatelliteAssembly(byte[] dllBytes)
}

#if FEATURE_WASM_THREADS
public static void InstallWebWorkerInterop(bool isMainThread)
{
var ctx = new JSSynchronizationContext(isMainThread);
ctx.previousSynchronizationContext = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(ctx);

var proxyContext = ctx.ProxyContext;
JSProxyContext.CurrentThreadContext = proxyContext;
JSProxyContext.ExecutionContext = proxyContext;
if (isMainThread)
{
JSProxyContext.MainThreadContext = proxyContext;
}

ctx.AwaitNewData();

Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle);
}

public static void UninstallWebWorkerInterop()
{
var ctx = JSProxyContext.CurrentThreadContext;
if (ctx == null) throw new InvalidOperationException();
var syncContext = ctx.SynchronizationContext;
if (SynchronizationContext.Current == syncContext)
{
SynchronizationContext.SetSynchronizationContext(syncContext.previousSynchronizationContext);
}
JSProxyContext.CurrentThreadContext = null;
JSProxyContext.ExecutionContext = null;
ctx.Dispose();
}

[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "external_eventloop")]
private static extern ref bool GetThreadExternalEventloop(Thread @this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle)
{
throw new InvalidOperationException("ReleasePromiseHolder expected PromiseHolder " + holderGCHandle);
}
holder.IsDisposed = true;
}
else
{
Expand All @@ -367,9 +368,9 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle)
{
throw new InvalidOperationException("ReleasePromiseHolder expected PromiseHolder" + holderGCHandle);
}
holder.IsDisposed = true;
handle.Free();
}
holder.IsDisposed = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading;
using System.Threading.Channels;
using System.Runtime.CompilerServices;
using System.Collections.Generic;
using WorkItemQueueType = System.Threading.Channels.Channel<System.Runtime.InteropServices.JavaScript.JSSynchronizationContext.WorkItem>;

namespace System.Runtime.InteropServices.JavaScript
Expand All @@ -21,13 +20,16 @@ namespace System.Runtime.InteropServices.JavaScript
internal sealed class JSSynchronizationContext : SynchronizationContext
{
internal readonly JSProxyContext ProxyContext;
private readonly Action _DataIsAvailable;// don't allocate Action on each call to UnsafeOnCompleted
private readonly Action _ScheduleJSPump;// don't allocate Action on each call to UnsafeOnCompleted
private readonly WorkItemQueueType Queue;

internal SynchronizationContext? previousSynchronizationContext;
internal bool _isDisposed;
internal bool _isCancellationRequested;
internal bool _isRunning;
private CancellationTokenRegistration _cancellationTokenRegistration;

internal readonly struct WorkItem
internal struct WorkItem
{
public readonly SendOrPostCallback Callback;
public readonly object? Data;
Expand All @@ -41,38 +43,110 @@ public WorkItem(SendOrPostCallback callback, object? data, ManualResetEventSlim?
}
}

public JSSynchronizationContext(bool isMainThread)
// this need to be called from JSWebWorker or UI thread
public static JSSynchronizationContext InstallWebWorkerInterop(bool isMainThread, CancellationToken cancellationToken)
{
ProxyContext = new JSProxyContext(isMainThread, this);
Queue = Channel.CreateUnbounded<WorkItem>(new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true });
_DataIsAvailable = DataIsAvailable;
var ctx = new JSSynchronizationContext(isMainThread, cancellationToken);
ctx.previousSynchronizationContext = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(ctx);

var proxyContext = ctx.ProxyContext;
JSProxyContext.CurrentThreadContext = proxyContext;
JSProxyContext.ExecutionContext = proxyContext;
if (isMainThread)
{
JSProxyContext.MainThreadContext = proxyContext;
}

ctx.AwaitNewData();

Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle);

return ctx;
}

internal JSSynchronizationContext(JSProxyContext proxyContext, WorkItemQueueType queue, Action dataIsAvailable)
// this need to be called from JSWebWorker thread
internal void UninstallWebWorkerInterop()
{
ProxyContext = proxyContext;
Queue = queue;
_DataIsAvailable = dataIsAvailable;
if (_isDisposed)
{
return;
}
if (!_isRunning)
{
return;
}

var jsProxyContext = JSProxyContext.AssertIsInteropThread();
if (jsProxyContext != ProxyContext)
{
Environment.FailFast($"UninstallWebWorkerInterop failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}");
}
if (SynchronizationContext.Current == this)
{
SynchronizationContext.SetSynchronizationContext(this.previousSynchronizationContext);
}

// this will runtimeKeepalivePop()
// and later maybeExit() -> __emscripten_thread_exit()
jsProxyContext.Dispose();

JSProxyContext.CurrentThreadContext = null;
JSProxyContext.ExecutionContext = null;
_isRunning = false;
Dispose();
}

public JSSynchronizationContext(bool isMainThread, CancellationToken cancellationToken)
{
ProxyContext = new JSProxyContext(isMainThread, this);
Queue = Channel.CreateUnbounded<WorkItem>(new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true });
_ScheduleJSPump = ScheduleJSPump;

// receive callback (on any thread) that cancelation is requested
_cancellationTokenRegistration = cancellationToken.Register(() =>
{

_isCancellationRequested = true;
Queue.Writer.TryComplete();

while (Queue.Reader.TryRead(out var item))
{
// the Post is checking _isCancellationRequested after .Wait()
item.Signal?.Set();
}
});
}

public override SynchronizationContext CreateCopy()
{
return new JSSynchronizationContext(ProxyContext, Queue, _DataIsAvailable);
return this;
}

// this must be called from the worker thread
internal void AwaitNewData()
{
if (_isDisposed)
{
// FIXME: there could be abandoned work, but here we have no way how to propagate the failure
// ObjectDisposedException.ThrowIf(_isDisposed, this);
return;
}
if (_isCancellationRequested)
{
UninstallWebWorkerInterop();
return;
}
_isRunning = true;

var vt = Queue.Reader.WaitToReadAsync();
if (_isCancellationRequested)
{
UninstallWebWorkerInterop();
return;
}

if (vt.IsCompleted)
{
DataIsAvailable();
ScheduleJSPump();
return;
}

Expand All @@ -81,10 +155,10 @@ internal void AwaitNewData()
// fire a callback that will schedule a background job to pump the queue on the main thread.
var awaiter = vt.AsTask().ConfigureAwait(false).GetAwaiter();
// UnsafeOnCompleted avoids spending time flowing the execution context (we don't need it.)
awaiter.UnsafeOnCompleted(_DataIsAvailable);
awaiter.UnsafeOnCompleted(_ScheduleJSPump);
}

private unsafe void DataIsAvailable()
private unsafe void ScheduleJSPump()
{
// While we COULD pump here, we don't want to. We want the pump to happen on the next event loop turn.
// Otherwise we could get a chain where a pump generates a new work item and that makes us pump again, forever.
Expand All @@ -94,10 +168,23 @@ private unsafe void DataIsAvailable()
public override void Post(SendOrPostCallback d, object? state)
{
ObjectDisposedException.ThrowIf(_isDisposed, this);
if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}

var workItem = new WorkItem(d, state, null);
if (!Queue.Writer.TryWrite(workItem))
{
if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}
ObjectDisposedException.ThrowIf(_isDisposed, this);
Environment.FailFast($"JSSynchronizationContext.Post failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}");
}
}

// This path can only run when threading is enabled
Expand All @@ -117,9 +204,24 @@ public override void Send(SendOrPostCallback d, object? state)
{
var workItem = new WorkItem(d, state, signal);
if (!Queue.Writer.TryWrite(workItem))
{
if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}
ObjectDisposedException.ThrowIf(_isDisposed, this);

Environment.FailFast($"JSSynchronizationContext.Send failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}");
}

signal.Wait();

if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}
}
}

Expand All @@ -138,10 +240,9 @@ private static void BackgroundJobHandler()

private void Pump()
{
if (_isDisposed)
if (_isDisposed || _isCancellationRequested)
{
// FIXME: there could be abandoned work, but here we have no way how to propagate the failure
// ObjectDisposedException.ThrowIf(_isDisposed, this);
UninstallWebWorkerInterop();
return;
}
try
Expand All @@ -159,17 +260,20 @@ private void Pump()
{
item.Signal?.Set();
}
if (_isDisposed || _isCancellationRequested)
{
UninstallWebWorkerInterop();
return;
}
}
// if anything throws unhandled exception, we will abort the program
// otherwise, we could schedule another round
AwaitNewData();
}
catch (Exception e)
{
Environment.FailFast($"JSSynchronizationContext.BackgroundJobHandler failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {e.StackTrace}");
}
finally
{
// If an item throws, we want to ensure that the next pump gets scheduled appropriately regardless.
if (!_isDisposed) AwaitNewData();
}
}

private void Dispose(bool disposing)
Expand All @@ -178,10 +282,11 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Queue.Writer.Complete();
Queue.Writer.TryComplete();
}
previousSynchronizationContext = null;
_isDisposed = true;
_cancellationTokenRegistration.Dispose();
previousSynchronizationContext = null;
}
}

Expand Down
Loading

0 comments on commit e5a6844

Please sign in to comment.