Skip to content

Commit

Permalink
make HttpClient.Timeout actually work on WASI
Browse files Browse the repository at this point in the history
This requires passing the `CancellationToken` to `WasiEventLoop` and checking it
before polling.

Signed-off-by: Joel Dice <joel.dice@fermyon.com>
  • Loading branch information
dicej committed Aug 26, 2024
1 parent beabe5a commit 28d758f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ CancellationToken cancellationToken
await Task.WhenAll(
new Task<ITypes.IncomingResponse?>[]
{
SendRequestAsync(outgoingRequest),
SendRequestAsync(outgoingRequest, cancellationToken),
sendContent()
}
)
Expand Down Expand Up @@ -279,7 +279,8 @@ await Task.WhenAll(
}

private static async Task<ITypes.IncomingResponse?> SendRequestAsync(
ITypes.OutgoingRequest request
ITypes.OutgoingRequest request,
CancellationToken cancellationToken
)
{
ITypes.FutureIncomingResponse future;
Expand Down Expand Up @@ -314,7 +315,8 @@ ITypes.OutgoingRequest request
}
else
{
await RegisterWasiPollable(future.Subscribe()).ConfigureAwait(false);
await RegisterWasiPollable(future.Subscribe(), cancellationToken)
.ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -461,7 +463,10 @@ private static async Task SendContentAsync(HttpContent? content, Stream stream)
}
}

private static Task RegisterWasiPollable(IPoll.Pollable pollable)
private static Task RegisterWasiPollable(
IPoll.Pollable pollable,
CancellationToken cancellationToken
)
{
var handle = pollable.Handle;

Expand All @@ -470,12 +475,15 @@ private static Task RegisterWasiPollable(IPoll.Pollable pollable)
pollable.Handle = 0;
GC.SuppressFinalize(pollable);

return CallRegisterWasiPollableHandle((Thread)null!, handle);

return CallRegisterWasiPollableHandle((Thread)null!, handle, cancellationToken);
}

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "RegisterWasiPollableHandle")]
private static extern Task CallRegisterWasiPollableHandle(Thread t, int handle);
private static extern Task CallRegisterWasiPollableHandle(
Thread t,
int handle,
CancellationToken cancellationToken
);

private sealed class InputStream : Stream
{
Expand Down Expand Up @@ -562,7 +570,7 @@ CancellationToken cancellationToken
var buffer = result;
if (buffer.Length == 0)
{
await RegisterWasiPollable(stream.Subscribe())
await RegisterWasiPollable(stream.Subscribe(), cancellationToken)
.ConfigureAwait(false);
}
else
Expand Down Expand Up @@ -699,7 +707,8 @@ CancellationToken cancellationToken
var count = (int)stream.CheckWrite();
if (count == 0)
{
await RegisterWasiPollable(stream.Subscribe()).ConfigureAwait(false);
await RegisterWasiPollable(stream.Subscribe(), cancellationToken)
.ConfigureAwait(false);
}
else if (offset == limit)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public sealed partial class Thread
{
// these methods are temporarily accessed via UnsafeAccessor from generated code until we have it in public API, probably in WASI preview3 and promises
#if TARGET_WASI
internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handle)
internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handle, CancellationToken cancellationToken)
{
return WasiEventLoop.RegisterWasiPollableHandle(handle);
return WasiEventLoop.RegisterWasiPollableHandle(handle, cancellationToken);
}

internal static int PollWasiEventLoopUntilResolved(Task<int> mainTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ internal static class WasiEventLoop
{
private static List<TaskCompletionSource> s_pollables = new();

internal static Task RegisterWasiPollableHandle(int handle)
internal static Task RegisterWasiPollableHandle(
int handle,
CancellationToken cancellationToken
)
{
// note that this is duplicate of the original Pollable
// the original should be neutralized without disposing the handle
var pollableCpy = new IPoll.Pollable(new IPoll.Pollable.THandle(handle));
return RegisterWasiPollable(pollableCpy);
return RegisterWasiPollable(pollableCpy, cancellationToken);
}

internal static Task RegisterWasiPollable(IPoll.Pollable pollable)
internal static Task RegisterWasiPollable(
IPoll.Pollable pollable,
CancellationToken cancellationToken
)
{
var tcs = new TaskCompletionSource(pollable);
var tcs = new TaskCompletionSource((pollable, cancellationToken));
s_pollables.Add(tcs);
return tcs.Task;
}
Expand All @@ -36,27 +42,46 @@ internal static void DispatchWasiEventLoop()
s_pollables = new List<TaskCompletionSource>(pollables.Count);
var arguments = new List<IPoll.Pollable>(pollables.Count);
var indexes = new List<int>(pollables.Count);
var tasksCanceled = false;
for (var i = 0; i < pollables.Count; i++)
{
var tcs = pollables[i];
var pollable = (IPoll.Pollable)tcs.Task.AsyncState!;
arguments.Add(pollable);
indexes.Add(i);
var (pollable, cancellationToken) = ((IPoll.Pollable, CancellationToken))
tcs.Task.AsyncState!;
if (cancellationToken.IsCancellationRequested)
{
tcs.SetException(new TaskCanceledException());
tasksCanceled = true;
}
else
{
arguments.Add(pollable);
indexes.Add(i);
}
}

if (arguments.Count > 0)
{
// this is blocking until at least one pollable resolves
var readyIndexes = PollInterop.Poll(arguments);

var ready = new bool[arguments.Count];
foreach (int readyIndex in readyIndexes)

// If at least one task was canceled, we'll return without
// calling `poll` (i.e. delay calling `poll` until the next
// call to this function) to give any dependent tasks a
// chance to make progress before we block.
if (!tasksCanceled)
{
ready[readyIndex] = true;
arguments[readyIndex].Dispose();
var tcs = pollables[indexes[readyIndex]];
tcs.SetResult();
// this is blocking until at least one pollable resolves
var readyIndexes = PollInterop.Poll(arguments);

foreach (int readyIndex in readyIndexes)
{
ready[readyIndex] = true;
arguments[readyIndex].Dispose();
var tcs = pollables[indexes[readyIndex]];
tcs.SetResult();
}
}

for (var i = 0; i < arguments.Count; ++i)
{
if (!ready[i])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static unsafe void SetNextTimer(long shortestDueTimeMs, long currentTime

// `SubscribeDuration` expects nanoseconds:
var pollable = MonotonicClockInterop.SubscribeDuration(shortestWaitMs * 1000 * 1000);
Task task = WasiEventLoop.RegisterWasiPollable(pollable);
Task task = WasiEventLoop.RegisterWasiPollable(pollable, new CancellationToken(false));
task.ContinueWith(TimerHandler, TaskScheduler.Default);
}
}
Expand Down

0 comments on commit 28d758f

Please sign in to comment.