Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Expose and roll out ValueTask extensibility #27497

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Common/src/System/IO/DelegatingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
return _innerStream.WriteAsync(buffer, offset, count, cancellationToken);
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return _innerStream.WriteAsync(source, cancellationToken);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Common/src/System/IO/ReadOnlyMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio
{
StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
return _content.Length > _position ?
destination.WriteAsync(_content.Slice(_position), cancellationToken) :
destination.WriteAsync(_content.Slice(_position), cancellationToken).AsTask() :
Task.CompletedTask;
}

Expand Down
99 changes: 43 additions & 56 deletions src/Common/src/System/Net/WebSockets/ManagedWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ public static ManagedWebSocket CreateFromConnectedStream(
/// </summary>
private readonly Utf8MessageState _utf8TextState = new Utf8MessageState();
/// <summary>
/// Semaphore used to ensure that calls to SendFrameAsync don't run concurrently. While <see cref="_lastSendAsync"/>
/// is used to fail if a caller tries to issue another SendAsync while a previous one is running, internally
/// we use SendFrameAsync as an implementation detail, and it should not cause user requests to SendAsync to fail,
/// nor should such internal usage be allowed to run concurrently with other internal usage or with SendAsync.
/// Semaphore used to ensure that calls to SendFrameAsync don't run concurrently.
/// </summary>
private readonly SemaphoreSlim _sendFrameAsyncLock = new SemaphoreSlim(1, 1);

Expand Down Expand Up @@ -145,15 +142,10 @@ public static ManagedWebSocket CreateFromConnectedStream(
/// </summary>
private bool _lastSendWasFragment;
/// <summary>
/// The task returned from the last SendAsync operation to not complete synchronously.
/// If this is not null and not completed when a subsequent SendAsync is issued, an exception occurs.
/// </summary>
private Task _lastSendAsync;
/// <summary>
/// The task returned from the last ReceiveAsync operation to not complete synchronously.
/// The task returned from the last ReceiveAsync(ArraySegment, ...) operation to not complete synchronously.
/// If this is not null and not completed when a subsequent ReceiveAsync is issued, an exception occurs.
/// </summary>
private Task _lastReceiveAsync;
private Task _lastReceiveAsync = Task.CompletedTask;

/// <summary>Lock used to protect update and check-and-update operations on _state.</summary>
private object StateUpdateLock => _abortSource;
Expand Down Expand Up @@ -262,10 +254,10 @@ public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType m

WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer));

return SendPrivateAsync((ReadOnlyMemory<byte>)buffer, messageType, endOfMessage, cancellationToken);
return SendPrivateAsync((ReadOnlyMemory<byte>)buffer, messageType, endOfMessage, cancellationToken).AsTask();
}

private Task SendPrivateAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
private ValueTask SendPrivateAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
{
if (messageType != WebSocketMessageType.Text && messageType != WebSocketMessageType.Binary)
{
Expand All @@ -278,21 +270,19 @@ private Task SendPrivateAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType
try
{
WebSocketValidate.ThrowIfInvalidState(_state, _disposed, s_validSendStates);
ThrowIfOperationInProgress(_lastSendAsync);
}
catch (Exception exc)
{
return Task.FromException(exc);
return new ValueTask(Task.FromException(exc));
}

MessageOpcode opcode =
_lastSendWasFragment ? MessageOpcode.Continuation :
messageType == WebSocketMessageType.Binary ? MessageOpcode.Binary :
MessageOpcode.Text;

Task t = SendFrameAsync(opcode, endOfMessage, buffer, cancellationToken);
ValueTask t = SendFrameAsync(opcode, endOfMessage, buffer, cancellationToken);
_lastSendWasFragment = !endOfMessage;
_lastSendAsync = t;
return t;
}

Expand All @@ -307,7 +297,7 @@ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buf
Debug.Assert(!Monitor.IsEntered(StateUpdateLock), $"{nameof(StateUpdateLock)} must never be held when acquiring {nameof(ReceiveAsyncLock)}");
lock (ReceiveAsyncLock) // synchronize with receives in CloseAsync
{
ThrowIfOperationInProgress(_lastReceiveAsync);
ThrowIfOperationInProgress(_lastReceiveAsync.IsCompleted);
Task<WebSocketReceiveResult> t = ReceiveAsyncPrivate<WebSocketReceiveResultGetter,WebSocketReceiveResult>(buffer, cancellationToken).AsTask();
_lastReceiveAsync = t;
return t;
Expand Down Expand Up @@ -362,43 +352,34 @@ public override void Abort()
/// <param name="endOfMessage">The value of the FIN bit for the message.</param>
/// <param name="payloadBuffer">The buffer containing the payload data fro the message.</param>
/// <param name="cancellationToken">The CancellationToken to use to cancel the websocket.</param>
private Task SendFrameAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
private ValueTask SendFrameAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
{
// TODO: #4900 SendFrameAsync should in theory typically complete synchronously, making it fast and allocation free.
// However, due to #4900, it almost always yields, resulting in all of the allocations involved in an async method
// yielding, e.g. the boxed state machine, the Action delegate, the MoveNextRunner, and the resulting Task, plus it's
// common that the awaited operation completes so fast after the await that we may end up allocating an AwaitTaskContinuation
// inside of the TaskAwaiter. Since SendFrameAsync is such a core code path, until that can be fixed, we put some
// optimizations in place to avoid a few of those expenses, at the expense of more complicated code; for the common case,
// this code has fewer than half the number and size of allocations. If/when that issue is fixed, this method should be deleted
// and replaced by SendFrameFallbackAsync, which is the same logic but in a much more easily understand flow.

// If a cancelable cancellation token was provided, that would require registering with it, which means more state we have to
// pass around (the CancellationTokenRegistration), so if it is cancelable, just immediately go to the fallback path.
// Similarly, it should be rare that there are multiple outstanding calls to SendFrameAsync, but if there are, again
// fall back to the fallback path.
return cancellationToken.CanBeCanceled || !_sendFrameAsyncLock.Wait(0) ?
SendFrameFallbackAsync(opcode, endOfMessage, payloadBuffer, cancellationToken) :
new ValueTask(SendFrameFallbackAsync(opcode, endOfMessage, payloadBuffer, cancellationToken)) :
SendFrameLockAcquiredNonCancelableAsync(opcode, endOfMessage, payloadBuffer);
}

/// <summary>Sends a websocket frame to the network. The caller must hold the sending lock.</summary>
/// <param name="opcode">The opcode for the message.</param>
/// <param name="endOfMessage">The value of the FIN bit for the message.</param>
/// <param name="payloadBuffer">The buffer containing the payload data fro the message.</param>
private Task SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer)
private ValueTask SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer)
{
Debug.Assert(_sendFrameAsyncLock.CurrentCount == 0, "Caller should hold the _sendFrameAsyncLock");

// If we get here, the cancellation token is not cancelable so we don't have to worry about it,
// and we own the semaphore, so we don't need to asynchronously wait for it.
Task writeTask = null;
ValueTask writeTask = default;
bool releaseSemaphoreAndSendBuffer = true;
try
{
// Write the payload synchronously to the buffer, then write that buffer out to the network.
int sendBytes = WriteFrameToSendBuffer(opcode, endOfMessage, payloadBuffer.Span);
writeTask = _stream.WriteAsync(_sendBuffer, 0, sendBytes, CancellationToken.None);
writeTask = _stream.WriteAsync(new ReadOnlyMemory<byte>(_sendBuffer, 0, sendBytes));

// If the operation happens to complete synchronously (or, more specifically, by
// the time we get from the previous line to here), release the semaphore, return
Expand All @@ -415,10 +396,10 @@ private Task SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool
}
catch (Exception exc)
{
return Task.FromException(
return new ValueTask(Task.FromException(
exc is OperationCanceledException ? exc :
_state == WebSocketState.Aborted ? CreateOperationCanceledException(exc) :
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc));
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc)));
}
finally
{
Expand All @@ -429,22 +410,26 @@ private Task SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool
}
}

// The write was not yet completed. Create and return a continuation that will
// release the semaphore and translate any exception that occurred.
return writeTask.ContinueWith((t, s) =>
{
var thisRef = (ManagedWebSocket)s;
thisRef._sendFrameAsyncLock.Release();
thisRef.ReleaseSendBuffer();
return new ValueTask(WaitForWriteTaskAsync(writeTask));
}

try { t.GetAwaiter().GetResult(); }
catch (Exception exc) when (!(exc is OperationCanceledException))
{
throw thisRef._state == WebSocketState.Aborted ?
CreateOperationCanceledException(exc) :
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc);
}
}, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
private async Task WaitForWriteTaskAsync(ValueTask writeTask)
{
try
{
await writeTask.ConfigureAwait(false);
}
catch (Exception exc) when (!(exc is OperationCanceledException))
{
throw _state == WebSocketState.Aborted ?
CreateOperationCanceledException(exc) :
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc);
}
finally
{
_sendFrameAsyncLock.Release();
ReleaseSendBuffer();
}
}

private async Task SendFrameFallbackAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
Expand All @@ -455,7 +440,7 @@ private async Task SendFrameFallbackAsync(MessageOpcode opcode, bool endOfMessag
int sendBytes = WriteFrameToSendBuffer(opcode, endOfMessage, payloadBuffer.Span);
using (cancellationToken.Register(s => ((ManagedWebSocket)s).Abort(), this))
{
await _stream.WriteAsync(_sendBuffer, 0, sendBytes, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(new ReadOnlyMemory<byte>(_sendBuffer, 0, sendBytes), cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exc) when (!(exc is OperationCanceledException))
Expand Down Expand Up @@ -518,12 +503,12 @@ private void SendKeepAliveFrameAsync()
{
// This exists purely to keep the connection alive; don't wait for the result, and ignore any failures.
// The call will handle releasing the lock.
Task t = SendFrameLockAcquiredNonCancelableAsync(MessageOpcode.Ping, true, Memory<byte>.Empty);
ValueTask t = SendFrameLockAcquiredNonCancelableAsync(MessageOpcode.Ping, true, Memory<byte>.Empty);

// "Observe" any exception, ignoring it to prevent the unobserved exception event from being raised.
if (t.Status != TaskStatus.RanToCompletion)
if (!t.IsCompletedSuccessfully)
{
t.ContinueWith(p => { Exception ignored = p.Exception; },
t.AsTask().ContinueWith(p => { Exception ignored = p.Exception; },
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
Expand Down Expand Up @@ -1270,15 +1255,17 @@ private static unsafe int ApplyMask(Span<byte> toMask, int mask, int maskIndex)
}

/// <summary>Aborts the websocket and throws an exception if an existing operation is in progress.</summary>
private void ThrowIfOperationInProgress(Task operationTask, [CallerMemberName] string methodName = null)
private void ThrowIfOperationInProgress(bool operationCompleted, [CallerMemberName] string methodName = null)
{
if (operationTask != null && !operationTask.IsCompleted)
if (!operationCompleted)
{
Abort();
throw new InvalidOperationException(SR.Format(SR.net_Websockets_AlreadyOneOutstandingOperation, methodName));
ThrowOperationInProgress(methodName);
}
}

private void ThrowOperationInProgress(string methodName) => throw new InvalidOperationException(SR.Format(SR.net_Websockets_AlreadyOneOutstandingOperation, methodName));

/// <summary>Creates an OperationCanceledException instance, using a default message and the specified inner exception and token.</summary>
private static Exception CreateOperationCanceledException(Exception innerException, CancellationToken cancellationToken = default(CancellationToken))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> destination, Cancell
return await base.ReadAsync(destination, cancellationToken);
}

public override async Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
WriteHit = true;

Expand Down
5 changes: 2 additions & 3 deletions src/Common/tests/System/Net/Configuration.Certificates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static partial class Certificates
private const string CertificatePassword = "testcertificate";
private const string TestDataFolder = "TestData";

private static Mutex m;
private static readonly Mutex m;
private const int MutexTimeout = 120 * 1000;

static Certificates()
Expand Down Expand Up @@ -59,10 +59,9 @@ private static X509Certificate2Collection GetCertificateCollection(string certif
{
// On Windows, .NET Core applications should not import PFX files in parallel to avoid a known system-level race condition.
// This bug results in corrupting the X509Certificate2 certificate state.
Assert.True(m.WaitOne(MutexTimeout), "Cannot acquire the global certificate mutex.");
try
{
Assert.True(m.WaitOne(MutexTimeout), "Cannot acquire the global certificate mutex.");

var certCollection = new X509Certificate2Collection();
certCollection.Import(Path.Combine(TestDataFolder, certificateFileName), CertificatePassword, X509KeyStorageFlags.DefaultKeySet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private async Task ReadBufferAsync()
{
try
{
int bytesRead = await _stream.ReadAsync(_byteBuffer, 0, _byteBuffer.Length, _cts.Token).ConfigureAwait(false);
int bytesRead = await _stream.ReadAsync(new Memory<byte>(_byteBuffer), _cts.Token).ConfigureAwait(false);
if (bytesRead == 0)
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private async ValueTask<int> FinishReadAsyncMemory(Memory<byte> destination, Can
{
int readBytes = 0;
int iter = 0;
while (readBytes < _buffer.Length && ((iter = await _stream.ReadAsync(_buffer, readBytes, _buffer.Length - readBytes, cancellationToken).ConfigureAwait(false)) > 0))
while (readBytes < _buffer.Length && ((iter = await _stream.ReadAsync(new Memory<byte>(_buffer, readBytes, _buffer.Length - readBytes), cancellationToken).ConfigureAwait(false)) > 0))
{
readBytes += iter;
if (readBytes > _buffer.Length)
Expand Down
Loading