Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser] threads & js cleanup #86278

Merged
merged 10 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,26 @@
using System.Runtime.CompilerServices;
using QueueType = System.Threading.Channels.Channel<System.Runtime.InteropServices.JavaScript.JSSynchronizationContext.WorkItem>;

namespace System.Runtime.InteropServices.JavaScript {
namespace System.Runtime.InteropServices.JavaScript
{
/// <summary>
/// Provides a thread-safe default SynchronizationContext for the browser that will automatically
/// route callbacks to the main browser thread where they can interact with the DOM and other
/// thread-affinity-having APIs like WebSockets, fetch, WebGL, etc.
/// Callbacks are processed during event loop turns via the runtime's background job system.
/// </summary>
internal sealed unsafe class JSSynchronizationContext : SynchronizationContext {
internal sealed unsafe class JSSynchronizationContext : SynchronizationContext
{
public readonly Thread MainThread;

internal readonly struct WorkItem {
internal readonly struct WorkItem
{
public readonly SendOrPostCallback Callback;
public readonly object? Data;
public readonly ManualResetEventSlim? Signal;

public WorkItem (SendOrPostCallback callback, object? data, ManualResetEventSlim? signal) {
public WorkItem(SendOrPostCallback callback, object? data, ManualResetEventSlim? signal)
{
Callback = callback;
Data = data;
Signal = signal;
Expand All @@ -35,31 +39,34 @@ public WorkItem (SendOrPostCallback callback, object? data, ManualResetEventSlim

private static JSSynchronizationContext? MainThreadSynchronizationContext;
private readonly QueueType Queue;
private readonly Action _DataIsAvailable;
private static void* BackgroundJobHandlerPtr = (void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler;
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved

private JSSynchronizationContext (Thread mainThread)
: this (
mainThread,
private JSSynchronizationContext()
: this(
Thread.CurrentThread,
Channel.CreateUnbounded<WorkItem>(
new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true }
)
)
{
}

private JSSynchronizationContext (Thread mainThread, QueueType queue) {
private JSSynchronizationContext(Thread mainThread, QueueType queue)
{
MainThread = mainThread;
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
Queue = queue;
_DataIsAvailable = DataIsAvailable;
}

public override SynchronizationContext CreateCopy () {
public override SynchronizationContext CreateCopy()
{
return new JSSynchronizationContext(MainThread, Queue);
}

private void AwaitNewData () {
private void AwaitNewData()
{
var vt = Queue.Reader.WaitToReadAsync();
if (vt.IsCompleted) {
if (vt.IsCompleted)
{
DataIsAvailable();
return;
}
Expand All @@ -69,31 +76,36 @@ private void AwaitNewData () {
// fire a callback that will schedule a background job to pump the queue on the main thread.
var awaiter = vt.AsTask().ConfigureAwait(false).GetAwaiter();
// UnsafeOnCompleted avoids spending time flowing the execution context (we don't need it.)
awaiter.UnsafeOnCompleted(_DataIsAvailable);
awaiter.UnsafeOnCompleted(DataIsAvailable);
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
}

private void DataIsAvailable () {
private void DataIsAvailable()
{
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
// While we COULD pump here, we don't want to. We want the pump to happen on the next event loop turn.
// Otherwise we could get a chain where a pump generates a new work item and that makes us pump again, forever.
ScheduleBackgroundJob((void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler);
MainThreadScheduleBackgroundJob(BackgroundJobHandlerPtr);
}

public override void Post (SendOrPostCallback d, object? state) {
public override void Post(SendOrPostCallback d, object? state)
{
var workItem = new WorkItem(d, state, null);
if (!Queue.Writer.TryWrite(workItem))
throw new Exception("Internal error");
}

// This path can only run when threading is enabled
#pragma warning disable CA1416
#pragma warning disable CA1416

public override void Send (SendOrPostCallback d, object? state) {
if (Thread.CurrentThread == MainThread) {
public override void Send(SendOrPostCallback d, object? state)
{
if (Thread.CurrentThread == MainThread)
{
d(state);
return;
}

using (var signal = new ManualResetEventSlim(false)) {
using (var signal = new ManualResetEventSlim(false))
{
var workItem = new WorkItem(d, state, signal);
if (!Queue.Writer.TryWrite(workItem))
throw new Exception("Internal error");
Expand All @@ -102,40 +114,46 @@ public override void Send (SendOrPostCallback d, object? state) {
}
}

internal static void Install () {
MainThreadSynchronizationContext ??= new JSSynchronizationContext(Thread.CurrentThread);

internal static void Install()
{
MainThreadSynchronizationContext ??= new JSSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(MainThreadSynchronizationContext);
MainThreadSynchronizationContext.AwaitNewData();
}

[MethodImplAttribute(MethodImplOptions.InternalCall)]
internal static extern unsafe void ScheduleBackgroundJob(void* callback);
internal static extern unsafe void MainThreadScheduleBackgroundJob(void* callback);

#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
private static unsafe void BackgroundJobHandler () {
#pragma warning restore CS3016
// this callback will arrive on the bound thread
private static unsafe void BackgroundJobHandler()
{
MainThreadSynchronizationContext!.Pump();
}

[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
private static unsafe void RequestPumpCallback () {
ScheduleBackgroundJob((void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler);
}

private void Pump () {
try {
while (Queue.Reader.TryRead(out var item)) {
try {
private void Pump()
{
try
{
while (Queue.Reader.TryRead(out var item))
{
try
{
item.Callback(item.Data);
// While we would ideally have a catch block here and do something to dispatch/forward unhandled
// exceptions, the standard threadpool (and thus standard synchronizationcontext) have zero
// error handling, so for consistency with them we do nothing. Don't throw in SyncContext callbacks.
} finally {
}
finally
{
item.Signal?.Set();
}
}
} finally {
}
finally
{
// If an item throws, we want to ensure that the next pump gets scheduled appropriately regardless.
AwaitNewData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public bool Unregister(WaitHandle? waitObject)
}
}

public static partial class ThreadPool
public static unsafe partial class ThreadPool
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
{
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work
Expand All @@ -37,6 +37,7 @@ public static partial class ThreadPool
private const bool IsWorkerTrackingEnabledInConfig = false;

private static bool _callbackQueued;
private static void* BackgroundJobHandlerPtr = (void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler;

public static bool SetMaxThreads(int workerThreads, int completionPortThreads)
{
Expand Down Expand Up @@ -79,7 +80,7 @@ internal static void RequestWorkerThread()
if (_callbackQueued)
return;
_callbackQueued = true;
QueueCallback();
MainThreadScheduleBackgroundJob(BackgroundJobHandlerPtr);
}

internal static void NotifyWorkItemProgress()
Expand Down Expand Up @@ -110,12 +111,13 @@ private static RegisteredWaitHandle RegisterWaitForSingleObject(
throw new PlatformNotSupportedException();
}

[DynamicDependency("Callback")]
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void QueueCallback();
internal static extern unsafe void MainThreadScheduleBackgroundJob(void* callback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be nice to collect all of these internal calls that are called inside S.P.C into a single class


private static void Callback()
{
#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
#pragma warning restore CS3016
private static unsafe void BackgroundJobHandler () {
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
_callbackQueued = false;
ThreadPoolWorkQueue.Dispatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;

namespace System.Threading
{
Expand All @@ -13,11 +14,12 @@ namespace System.Threading
// Based on TimerQueue.Portable.cs
// Not thread safe
//
internal partial class TimerQueue
internal unsafe partial class TimerQueue
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
{
private static List<TimerQueue>? s_scheduledTimers;
private static List<TimerQueue>? s_scheduledTimersToFire;
private static long s_shortestDueTimeMs = long.MaxValue;
private static void* TimerHandlerPtr = (void*)(delegate* unmanaged[Cdecl]<void>)&TimerHandler;

// this means that it's in the s_scheduledTimers collection, not that it's the one which would run on the next TimeoutCallback
private bool _isScheduled;
Expand All @@ -27,19 +29,20 @@ private TimerQueue(int _)
{
}

[DynamicDependency("TimeoutCallback")]
// This replaces the current pending setTimeout with shorter one
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void SetTimeout(int timeout);
private static extern unsafe void MainThreadScheduleTimer(void* callback, int shortestDueTimeMs);

// Called by mini-wasm.c:mono_set_timeout_exec
private static void TimeoutCallback()
{
#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
#pragma warning restore CS3016
// Called by mini-wasm.c:mono_wasm_execute_timer
private static unsafe void TimerHandler () {
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
// always only have one scheduled at a time
s_shortestDueTimeMs = long.MaxValue;

long currentTimeMs = TickCount64;
ReplaceNextSetTimeout(PumpTimerQueue(currentTimeMs), currentTimeMs);
ReplaceNextTimer(PumpTimerQueue(currentTimeMs), currentTimeMs);
}

// this is called with shortest of timers scheduled on the particular TimerQueue
Expand All @@ -57,13 +60,13 @@ private bool SetTimer(uint actualDuration)

_scheduledDueTimeMs = currentTimeMs + (int)actualDuration;

ReplaceNextSetTimeout(ShortestDueTime(), currentTimeMs);
ReplaceNextTimer(ShortestDueTime(), currentTimeMs);

return true;
}

// shortest time of all TimerQueues
private static void ReplaceNextSetTimeout(long shortestDueTimeMs, long currentTimeMs)
private static void ReplaceNextTimer(long shortestDueTimeMs, long currentTimeMs)
{
if (shortestDueTimeMs == long.MaxValue)
{
Expand All @@ -75,9 +78,8 @@ private static void ReplaceNextSetTimeout(long shortestDueTimeMs, long currentTi
{
s_shortestDueTimeMs = shortestDueTimeMs;
int shortestWait = Math.Max((int)(shortestDueTimeMs - currentTimeMs), 0);
// this would cancel the previous schedule and create shorter one
// it is expensive call
SetTimeout(shortestWait);
// this would cancel the previous schedule and create shorter one, it is expensive callback
MainThreadScheduleTimer(TimerHandlerPtr, shortestWait);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/mono/mono/metadata/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ mono_wasm_gc_finalize_notify (void)
/* use this if we are going to start the finalizer thread on wasm. */
mono_coop_sem_post (&finalizer_sem);
#else
mono_threads_schedule_background_job (mono_runtime_do_background_work);
mono_main_thread_schedule_background_job (mono_runtime_do_background_work);
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion src/mono/mono/metadata/sgen-mono.c
Original file line number Diff line number Diff line change
Expand Up @@ -2897,7 +2897,7 @@ sgen_client_binary_protocol_collection_end (int minor_gc_count, int generation,
void
sgen_client_schedule_background_job (void (*cb)(void))
{
mono_threads_schedule_background_job (cb);
mono_main_thread_schedule_background_job (cb);
}

#endif
Expand Down
Loading