Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser] WS & HTTP clients more async #95483

Merged
merged 6 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1391,8 +1391,20 @@ await server.AcceptConnectionAsync(async connection =>
{
await connection.ReadRequestDataAsync();
tcs2.SetResult(true);
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
try
{
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
}
catch (IOException ex)
{
// when testing in the browser, we are using the WebSocket for the loopback
// it could get disconnected after the cancellation above, earlier than the server-side gets chance to write the response
if (!(ex.InnerException is InvalidOperationException ivd) || !ivd.Message.Contains("The WebSocket is not connected"))
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
{
throw;
}
}
await tcs.Task;
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessa
{
bool? allowAutoRedirect = _isAllowAutoRedirectTouched ? AllowAutoRedirect : null;
#if FEATURE_WASM_THREADS
return JSHost.CurrentOrMainJSSynchronizationContext.Send(() =>
return JSHost.CurrentOrMainJSSynchronizationContext.Post(() =>
{
#endif
return Impl(request, cancellationToken, allowAutoRedirect);
Expand Down Expand Up @@ -365,7 +365,7 @@ private Task WriteAsyncCore(ReadOnlyMemory<byte> buffer, CancellationToken cance
{
cancellationToken.ThrowIfCancellationRequested();
#if FEATURE_WASM_THREADS
return _transformStream.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken));
return _transformStream.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken));
#else
return Impl(this, buffer, cancellationToken);
#endif
Expand Down Expand Up @@ -474,24 +474,25 @@ public void Dispose()
return;

#if FEATURE_WASM_THREADS
FetchResponse?.SynchronizationContext.Send(static (WasmFetchResponse self) =>
FetchResponse?.SynchronizationContext.Post(static (WasmFetchResponse self) =>
{
lock (self.ThisLock)
{
if (self._isDisposed)
return;
self._isDisposed = true;
self._abortRegistration.Dispose();
self._abortController.Dispose();
if (!self.FetchResponse!.IsDisposed)
if (!self._isDisposed)
{
BrowserHttpInterop.AbortResponse(self.FetchResponse);
self._isDisposed = true;
self._abortRegistration.Dispose();
self._abortController.Dispose();
if (!self.FetchResponse!.IsDisposed)
{
BrowserHttpInterop.AbortResponse(self.FetchResponse);
}
self.FetchResponse.Dispose();
self.FetchResponse = null;
}
self.FetchResponse.Dispose();
self.FetchResponse = null;
return Task.CompletedTask;
}
}, this);

#else
_isDisposed = true;
_abortRegistration.Dispose();
Expand Down Expand Up @@ -521,7 +522,7 @@ public BrowserHttpContent(WasmFetchResponse fetchResponse)
_fetchResponse = fetchResponse;
}

// TODO alocate smaller buffer and call multiple times
// TODO allocate smaller buffer and call multiple times
private async ValueTask<byte[]> GetResponseData(CancellationToken cancellationToken)
{
Task<int> promise;
Expand Down Expand Up @@ -557,12 +558,13 @@ protected override Task<Stream> CreateContentReadStreamAsync()
{
_fetchResponse.ThrowIfDisposed();
#if FEATURE_WASM_THREADS
return _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this));
return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this));
#else
return Impl(this);
#endif
static async Task<Stream> Impl(BrowserHttpContent self)
{
self._fetchResponse.ThrowIfDisposed();
byte[] data = await self.GetResponseData(CancellationToken.None).ConfigureAwait(true);
return new MemoryStream(data, writable: false);
}
Expand All @@ -576,13 +578,14 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext?
ArgumentNullException.ThrowIfNull(stream, nameof(stream));
_fetchResponse.ThrowIfDisposed();
#if FEATURE_WASM_THREADS
return _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this, stream, cancellationToken));
return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, stream, cancellationToken));
#else
return Impl(this, stream, cancellationToken);
#endif

static async Task Impl(BrowserHttpContent self, Stream stream, CancellationToken cancellationToken)
{
self._fetchResponse.ThrowIfDisposed();
byte[] data = await self.GetResponseData(cancellationToken).ConfigureAwait(true);
await stream.WriteAsync(data, cancellationToken).ConfigureAwait(true);
}
Expand Down Expand Up @@ -621,13 +624,14 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
ArgumentNullException.ThrowIfNull(buffer, nameof(buffer));
_fetchResponse.ThrowIfDisposed();
#if FEATURE_WASM_THREADS
return await _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true);
return await _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true);
#else
return await Impl(this, buffer, cancellationToken).ConfigureAwait(true);
#endif

static async Task<int> Impl(WasmHttpReadStream self, Memory<byte> buffer, CancellationToken cancellationToken)
{
self._fetchResponse.ThrowIfDisposed();
Task<int> promise;
using (Buffers.MemoryHandle handle = buffer.Pin())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ internal Task ConnectAsync(Uri uri, List<string>? requestedSubProtocols, Cancell
}
}, null);

return JSHost.CurrentOrMainJSSynchronizationContext.Send(() =>
return JSHost.CurrentOrMainJSSynchronizationContext.Post(() =>
{
return ConnectAsyncCore(cancellationToken);
});
Expand Down Expand Up @@ -167,7 +167,7 @@ public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType m
WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer));

#if FEATURE_WASM_THREADS
return _innerWebSocket!.SynchronizationContext.Send(() =>
return _innerWebSocket!.SynchronizationContext.Post(() =>
{
Task promise;
lock (_thisLock)
Expand Down Expand Up @@ -200,7 +200,7 @@ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buf
WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer));

#if FEATURE_WASM_THREADS
return _innerWebSocket!.SynchronizationContext.Send(() =>
return _innerWebSocket!.SynchronizationContext.Post(() =>
{
Task<WebSocketReceiveResult> promise;
lock (_thisLock)
Expand Down Expand Up @@ -228,7 +228,7 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string?
}

#if FEATURE_WASM_THREADS
return _innerWebSocket!.SynchronizationContext.Send(() =>
return _innerWebSocket!.SynchronizationContext.Post(() =>
{
Task promise;
lock (_thisLock)
Expand All @@ -240,7 +240,7 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string?
{
throw new WebSocketException(WebSocketError.InvalidState, SR.Format(SR.net_WebSockets_InvalidState, state, "Connecting, Open, CloseSent, Aborted"));
}
if(state != WebSocketState.Open && state != WebSocketState.Connecting && state != WebSocketState.Aborted)
if (state != WebSocketState.Open && state != WebSocketState.Connecting && state != WebSocketState.Aborted)
{
return Task.CompletedTask;
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? status
}

#if FEATURE_WASM_THREADS
return _innerWebSocket!.SynchronizationContext.Send(() =>
return _innerWebSocket!.SynchronizationContext.Post(() =>
{
Task promise;
lock (_thisLock)
Expand Down Expand Up @@ -476,13 +476,17 @@ private async Task SendAsyncCore(ArraySegment<byte> buffer, WebSocketMessageType
try
{
var sendTask = BrowserInterop.UnsafeSendSync(_innerWebSocket!, buffer, messageType, endOfMessage);
if (sendTask == null)
if (sendTask != null)
{
// return synchronously
return;
await CancelationHelper(sendTask, cancellationToken, FastState).ConfigureAwait(true);
}

await CancelationHelper(sendTask, cancellationToken, FastState).ConfigureAwait(true);
#if FEATURE_WASM_THREADS
// return synchronously, not supported with MT
else
{
Environment.FailFast("Unexpected synchronous result");
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
}
#endif
}
catch (JSException ex)
{
Expand All @@ -502,19 +506,18 @@ private async Task<WebSocketReceiveResult> ReceiveAsyncCore(ArraySegment<byte> b
using (MemoryHandle pinBuffer = bufferMemory.Pin())
{
var receiveTask = BrowserInterop.ReceiveUnsafeSync(_innerWebSocket!, pinBuffer, bufferMemory.Length);
if (receiveTask == null)
if (receiveTask != null)
{
// return synchronously
#if FEATURE_WASM_THREADS
lock (_thisLock)
{
#endif
return ConvertResponse(this);
await CancelationHelper(receiveTask, cancellationToken, FastState).ConfigureAwait(true);
}
#if FEATURE_WASM_THREADS
} //lock
#endif
// return synchronously, not supported with MT
else
{
Environment.FailFast("Unexpected synchronous result");
}
await CancelationHelper(receiveTask, cancellationToken, FastState).ConfigureAwait(true);
#endif


#if FEATURE_WASM_THREADS
lock (_thisLock)
Expand Down Expand Up @@ -555,8 +558,18 @@ private async Task CloseAsyncCore(WebSocketCloseStatus closeStatus, string? stat
_closeStatus = closeStatus;
_closeStatusDescription = statusDescription;

var closeTask = BrowserInterop.WebSocketClose(_innerWebSocket!, (int)closeStatus, statusDescription, waitForCloseReceived) ?? Task.CompletedTask;
await CancelationHelper(closeTask, cancellationToken, FastState).ConfigureAwait(true);
var closeTask = BrowserInterop.WebSocketClose(_innerWebSocket!, (int)closeStatus, statusDescription, waitForCloseReceived);
if (closeTask != null)
{
await CancelationHelper(closeTask, cancellationToken, FastState).ConfigureAwait(true);
}
#if FEATURE_WASM_THREADS
// return synchronously, not supported with MT
else
{
Environment.FailFast("Unexpected synchronous result");
}
#endif

#if FEATURE_WASM_THREADS
lock (_thisLock)
Expand Down
5 changes: 2 additions & 3 deletions src/libraries/System.Net.WebSockets.Client/tests/CloseTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ await Assert.ThrowsAnyAsync<WebSocketException>(async () =>

[OuterLoop("Uses external servers", typeof(PlatformDetection), nameof(PlatformDetection.LocalEchoServerIsNotAvailable))]
[ConditionalTheory(nameof(WebSocketsSupported)), MemberData(nameof(EchoServers))]
[ActiveIssue("https://github.com/dotnet/runtime/issues/83517", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))]
[SkipOnPlatform(TestPlatforms.Browser, "This never really worked for browser, it was just lucky timing that browser's `close` event was executed in next browser tick, for this test. See also https://github.com/dotnet/runtime/issues/45538")]
public async Task CloseOutputAsync_ClientInitiated_CanReceive_CanClose(Uri server)
{
string message = "Hello WebSockets!";
Expand All @@ -233,8 +233,7 @@ public async Task CloseOutputAsync_ClientInitiated_CanReceive_CanClose(Uri serve
{
var cts = new CancellationTokenSource(TimeOutMilliseconds);

// See issue for Browser websocket differences https://github.com/dotnet/runtime/issues/45538
var closeStatus = PlatformDetection.IsBrowser ? WebSocketCloseStatus.NormalClosure : WebSocketCloseStatus.InvalidPayloadData;
var closeStatus = WebSocketCloseStatus.InvalidPayloadData;
string closeDescription = "CloseOutputAsync_Client_InvalidPayloadData";

await cws.SendAsync(WebSocketData.GetBufferFromText(message), WebSocketMessageType.Text, true, cts.Token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,11 @@ public async Task ZeroByteReceive_CompletesWhenDataAvailable(Uri server)
// Now do a receive to get the payload.
var receiveBuffer = new byte[1];
t = ReceiveAsync(cws, new ArraySegment<byte>(receiveBuffer), ctsDefault.Token);
Assert.Equal(TaskStatus.RanToCompletion, t.Status);
// this is not synchronously possible when the WS client is on another WebWorker
if(!PlatformDetection.IsWasmThreadingSupported)
{
Assert.Equal(TaskStatus.RanToCompletion, t.Status);
}

r = await t;
Assert.Equal(WebSocketMessageType.Binary, r.MessageType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,86 @@ public static TRes Send<TRes>(this SynchronizationContext? self, Func<TRes> body
return value!;
}

public static Task<TRes> Post<TRes>(this SynchronizationContext? self, Func<Task<TRes>> body)
{
if (self == null) return body();
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved

TaskCompletionSource<TRes> tcs = new TaskCompletionSource<TRes>();
self.Post(async (_) =>
{
try
{
var value = await body().ConfigureAwait(false);
tcs.TrySetResult(value);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}

public static Task<TRes> Post<T1, TRes>(this SynchronizationContext? self, Func<T1, Task<TRes>> body, T1 p1)
{
if (self == null) return body(p1);

TaskCompletionSource<TRes> tcs = new TaskCompletionSource<TRes>();
self.Post(async (_) =>
{
try
{
var value = await body(p1).ConfigureAwait(false);
tcs.TrySetResult(value);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}

public static Task Post<T1>(this SynchronizationContext? self, Func<T1, Task> body, T1 p1)
{
if (self == null) return body(p1);

TaskCompletionSource tcs = new TaskCompletionSource();
self.Post(async (_) =>
{
try
{
await body(p1).ConfigureAwait(false);
tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}

public static Task Post(this SynchronizationContext? self, Func<Task> body)
{
if (self == null) return body();

TaskCompletionSource tcs = new TaskCompletionSource();
self.Post(async (_) =>
{
try
{
await body().ConfigureAwait(false);
tcs.TrySetResult();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}

public static TRes Send<T1, TRes>(this SynchronizationContext? self, Func<T1, TRes> body, T1 p1)
{
if (self == null) return body(p1);
Expand Down
6 changes: 6 additions & 0 deletions src/mono/wasm/runtime/cancelable-promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export function wrap_as_cancelable_promise<T>(fn: () => Promise<T>): Controllabl
return promise;
}

export function wrap_as_cancelable<T>(inner: Promise<T>): ControllablePromise<T> {
const { promise, promise_control } = createPromiseController<T>();
inner.then((data) => promise_control.resolve(data)).catch((reason) => promise_control.reject(reason));
return promise;
}

export function mono_wasm_cancel_promise(task_holder_gcv_handle: GCHandle): void {
const holder = _lookup_js_owned_object(task_holder_gcv_handle) as PromiseHolder;
mono_assert(!!holder, () => `Expected Promise for GCVHandle ${task_holder_gcv_handle}`);
Expand Down
Loading
Loading