Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser][MT] - CancellationToken for JSWebWorker #96696

Merged
merged 9 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public static void GetManagedStackTrace(JSMarshalerArgument* arguments_buffer)
// void InstallMainSynchronizationContext()
public static void InstallMainSynchronizationContext()
{
InstallWebWorkerInterop(true);
JSSynchronizationContext.InstallWebWorkerInterop(true, CancellationToken.None);
}

#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,39 +209,6 @@ public static void LoadSatelliteAssembly(byte[] dllBytes)
}

#if FEATURE_WASM_THREADS
public static void InstallWebWorkerInterop(bool isMainThread)
{
var ctx = new JSSynchronizationContext(isMainThread);
ctx.previousSynchronizationContext = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(ctx);

var proxyContext = ctx.ProxyContext;
JSProxyContext.CurrentThreadContext = proxyContext;
JSProxyContext.ExecutionContext = proxyContext;
if (isMainThread)
{
JSProxyContext.MainThreadContext = proxyContext;
}

ctx.AwaitNewData();

Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle);
}

public static void UninstallWebWorkerInterop()
{
var ctx = JSProxyContext.CurrentThreadContext;
if (ctx == null) throw new InvalidOperationException();
var syncContext = ctx.SynchronizationContext;
if (SynchronizationContext.Current == syncContext)
{
SynchronizationContext.SetSynchronizationContext(syncContext.previousSynchronizationContext);
}
JSProxyContext.CurrentThreadContext = null;
JSProxyContext.ExecutionContext = null;
ctx.Dispose();
}

[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "external_eventloop")]
private static extern ref bool GetThreadExternalEventloop(Thread @this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle)
{
throw new InvalidOperationException("ReleasePromiseHolder expected PromiseHolder " + holderGCHandle);
}
holder.IsDisposed = true;
}
else
{
Expand All @@ -367,9 +368,9 @@ public unsafe void ReleasePromiseHolder(nint holderGCHandle)
{
throw new InvalidOperationException("ReleasePromiseHolder expected PromiseHolder" + holderGCHandle);
}
holder.IsDisposed = true;
handle.Free();
}
holder.IsDisposed = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading;
using System.Threading.Channels;
using System.Runtime.CompilerServices;
using System.Collections.Generic;
using WorkItemQueueType = System.Threading.Channels.Channel<System.Runtime.InteropServices.JavaScript.JSSynchronizationContext.WorkItem>;

namespace System.Runtime.InteropServices.JavaScript
Expand All @@ -21,13 +20,16 @@ namespace System.Runtime.InteropServices.JavaScript
internal sealed class JSSynchronizationContext : SynchronizationContext
{
internal readonly JSProxyContext ProxyContext;
private readonly Action _DataIsAvailable;// don't allocate Action on each call to UnsafeOnCompleted
private readonly Action _ScheduleJSPump;// don't allocate Action on each call to UnsafeOnCompleted
private readonly WorkItemQueueType Queue;

internal SynchronizationContext? previousSynchronizationContext;
internal bool _isDisposed;
internal bool _isCancellationRequested;
internal bool _isRunning;
private CancellationTokenRegistration _cancellationTokenRegistration;

internal readonly struct WorkItem
internal struct WorkItem
{
public readonly SendOrPostCallback Callback;
public readonly object? Data;
Expand All @@ -41,38 +43,110 @@ public WorkItem(SendOrPostCallback callback, object? data, ManualResetEventSlim?
}
}

public JSSynchronizationContext(bool isMainThread)
// this need to be called from JSWebWorker or UI thread
public static JSSynchronizationContext InstallWebWorkerInterop(bool isMainThread, CancellationToken cancellationToken)
{
ProxyContext = new JSProxyContext(isMainThread, this);
Queue = Channel.CreateUnbounded<WorkItem>(new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true });
_DataIsAvailable = DataIsAvailable;
var ctx = new JSSynchronizationContext(isMainThread, cancellationToken);
ctx.previousSynchronizationContext = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(ctx);

var proxyContext = ctx.ProxyContext;
JSProxyContext.CurrentThreadContext = proxyContext;
JSProxyContext.ExecutionContext = proxyContext;
if (isMainThread)
{
JSProxyContext.MainThreadContext = proxyContext;
}

ctx.AwaitNewData();

Interop.Runtime.InstallWebWorkerInterop(proxyContext.ContextHandle);

return ctx;
}

internal JSSynchronizationContext(JSProxyContext proxyContext, WorkItemQueueType queue, Action dataIsAvailable)
// this need to be called from JSWebWorker thread
internal void UninstallWebWorkerInterop()
{
ProxyContext = proxyContext;
Queue = queue;
_DataIsAvailable = dataIsAvailable;
if (_isDisposed)
{
return;
}
if (!_isRunning)
{
return;
}

var jsProxyContext = JSProxyContext.AssertIsInteropThread();
if (jsProxyContext != ProxyContext)
{
Environment.FailFast($"UninstallWebWorkerInterop failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}");
}
if (SynchronizationContext.Current == this)
{
SynchronizationContext.SetSynchronizationContext(this.previousSynchronizationContext);
}

// this will runtimeKeepalivePop()
// and later maybeExit() -> __emscripten_thread_exit()
jsProxyContext.Dispose();

JSProxyContext.CurrentThreadContext = null;
JSProxyContext.ExecutionContext = null;
_isRunning = false;
Dispose();
}

public JSSynchronizationContext(bool isMainThread, CancellationToken cancellationToken)
{
ProxyContext = new JSProxyContext(isMainThread, this);
Queue = Channel.CreateUnbounded<WorkItem>(new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true });
_ScheduleJSPump = ScheduleJSPump;

// receive callback (on any thread) that cancelation is requested
_cancellationTokenRegistration = cancellationToken.Register(() =>
{

_isCancellationRequested = true;
Queue.Writer.TryComplete();

while (Queue.Reader.TryRead(out var item))
{
// the Post is checking _isCancellationRequested after .Wait()
item.Signal?.Set();
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

public override SynchronizationContext CreateCopy()
{
return new JSSynchronizationContext(ProxyContext, Queue, _DataIsAvailable);
return this;
}

// this must be called from the worker thread
internal void AwaitNewData()
{
if (_isDisposed)
{
// FIXME: there could be abandoned work, but here we have no way how to propagate the failure
// ObjectDisposedException.ThrowIf(_isDisposed, this);
return;
}
if (_isCancellationRequested)
{
UninstallWebWorkerInterop();
return;
}
_isRunning = true;

var vt = Queue.Reader.WaitToReadAsync();
if (_isCancellationRequested)
{
UninstallWebWorkerInterop();
return;
}

if (vt.IsCompleted)
{
DataIsAvailable();
ScheduleJSPump();
return;
}

Expand All @@ -81,10 +155,10 @@ internal void AwaitNewData()
// fire a callback that will schedule a background job to pump the queue on the main thread.
var awaiter = vt.AsTask().ConfigureAwait(false).GetAwaiter();
// UnsafeOnCompleted avoids spending time flowing the execution context (we don't need it.)
awaiter.UnsafeOnCompleted(_DataIsAvailable);
awaiter.UnsafeOnCompleted(_ScheduleJSPump);
}

private unsafe void DataIsAvailable()
private unsafe void ScheduleJSPump()
{
// While we COULD pump here, we don't want to. We want the pump to happen on the next event loop turn.
// Otherwise we could get a chain where a pump generates a new work item and that makes us pump again, forever.
Expand All @@ -94,10 +168,23 @@ private unsafe void DataIsAvailable()
public override void Post(SendOrPostCallback d, object? state)
{
ObjectDisposedException.ThrowIf(_isDisposed, this);
if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}

var workItem = new WorkItem(d, state, null);
if (!Queue.Writer.TryWrite(workItem))
{
if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}
ObjectDisposedException.ThrowIf(_isDisposed, this);
Environment.FailFast($"JSSynchronizationContext.Post failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}");
}
}

// This path can only run when threading is enabled
Expand All @@ -117,9 +204,24 @@ public override void Send(SendOrPostCallback d, object? state)
{
var workItem = new WorkItem(d, state, signal);
if (!Queue.Writer.TryWrite(workItem))
{
if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}
ObjectDisposedException.ThrowIf(_isDisposed, this);

Environment.FailFast($"JSSynchronizationContext.Send failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {Environment.StackTrace}");
}

signal.Wait();

if (_isCancellationRequested)
{
// propagate the cancellation to the caller
throw new OperationCanceledException(_cancellationTokenRegistration.Token);
}
}
}

Expand All @@ -138,10 +240,9 @@ private static void BackgroundJobHandler()

private void Pump()
{
if (_isDisposed)
if (_isDisposed || _isCancellationRequested)
{
// FIXME: there could be abandoned work, but here we have no way how to propagate the failure
// ObjectDisposedException.ThrowIf(_isDisposed, this);
UninstallWebWorkerInterop();
return;
}
try
Expand All @@ -159,17 +260,20 @@ private void Pump()
{
item.Signal?.Set();
}
if (_isDisposed || _isCancellationRequested)
{
UninstallWebWorkerInterop();
return;
}
}
// if anything throws unhandled exception, we will abort the program
// otherwise, we could schedule another round
AwaitNewData();
}
catch (Exception e)
{
Environment.FailFast($"JSSynchronizationContext.BackgroundJobHandler failed, ManagedThreadId: {Environment.CurrentManagedThreadId}. {Environment.NewLine} {e.StackTrace}");
}
finally
{
// If an item throws, we want to ensure that the next pump gets scheduled appropriately regardless.
if (!_isDisposed) AwaitNewData();
}
}

private void Dispose(bool disposing)
Expand All @@ -178,10 +282,11 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Queue.Writer.Complete();
Queue.Writer.TryComplete();
}
previousSynchronizationContext = null;
_isDisposed = true;
_cancellationTokenRegistration.Dispose();
previousSynchronizationContext = null;
}
}

Expand Down
Loading
Loading