Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Expose and roll out ValueTask extensibility
Browse files Browse the repository at this point in the history
This commit does several things:
- Exposes the new `ValueTask` extensibility model being added in coreclr.  The ValueTask-related files will separately be mirrored over to corefx to enable the netstandard build of System.Threading.Tasks.Extensions.
- Adapts all `Stream`-derived types to return `ValueTask` instead of `Task` from `WriteAsync`.
- Changes the new `WebSocket` `SendAsync` method to return `ValueTask` instead of `Task`, and updates the `ManagedWebSocket` implementation accordingly.  Most `SendAsync`s on `ManagedWebSocket` should now return a `ValueTask` that's either completed synchronously (no allocation) or using a pooled object.  It now uses the underlying transport's new `WriteAsync` overload that returns `ValueTask`.
- Switches more uses of `ReadAsync` and `WriteAsync` over to the new overloads, including in Process, DeflateStream, BrotliStream, File, HttpClient, SslStream, WebClient, BufferedStream, CryptoStream,
- Removed some unnecessary array clearing from various routines using ArrayPool (after the clearing was added we changed our minds and decided clearing was only necessary in very specific circumstances)
- Implements a custom `IValueTaskSource` in Socket, such that async receives and sends become allocation-free (ammortized).  `NetworkStream` then inherits this functionality, such that its new `ReadAsync` and `WriteAsync` are also allocation-free (in the unbounded channel implementations; we can subsequently add it in for bounded).
- Implements a custom `IValueTaskSource` in System.Threading.Channels, such that reading and writing are ammortized allocation-free up to one concurrent reader and writer.
- A few random things I noticed as I was going through, e.g. missing ConfigureAwait, some incorrect synchronization in tests, etc.
- Adds a ton of new tests, mainly in System.Threading.Tasks.Extensions, System.Threading.Channels, and System.Net.Sockets.
  • Loading branch information
stephentoub committed Mar 1, 2018
1 parent 7c5d4bd commit b52bb27
Show file tree
Hide file tree
Showing 88 changed files with 3,633 additions and 1,129 deletions.
2 changes: 1 addition & 1 deletion src/Common/src/System/IO/DelegatingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
return _innerStream.WriteAsync(buffer, offset, count, cancellationToken);
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return _innerStream.WriteAsync(source, cancellationToken);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Common/src/System/IO/ReadOnlyMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio
{
StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
return _content.Length > _position ?
destination.WriteAsync(_content.Slice(_position), cancellationToken) :
destination.WriteAsync(_content.Slice(_position), cancellationToken).AsTask() :
Task.CompletedTask;
}

Expand Down
99 changes: 43 additions & 56 deletions src/Common/src/System/Net/WebSockets/ManagedWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ public static ManagedWebSocket CreateFromConnectedStream(
/// </summary>
private readonly Utf8MessageState _utf8TextState = new Utf8MessageState();
/// <summary>
/// Semaphore used to ensure that calls to SendFrameAsync don't run concurrently. While <see cref="_lastSendAsync"/>
/// is used to fail if a caller tries to issue another SendAsync while a previous one is running, internally
/// we use SendFrameAsync as an implementation detail, and it should not cause user requests to SendAsync to fail,
/// nor should such internal usage be allowed to run concurrently with other internal usage or with SendAsync.
/// Semaphore used to ensure that calls to SendFrameAsync don't run concurrently.
/// </summary>
private readonly SemaphoreSlim _sendFrameAsyncLock = new SemaphoreSlim(1, 1);

Expand Down Expand Up @@ -145,15 +142,10 @@ public static ManagedWebSocket CreateFromConnectedStream(
/// </summary>
private bool _lastSendWasFragment;
/// <summary>
/// The task returned from the last SendAsync operation to not complete synchronously.
/// If this is not null and not completed when a subsequent SendAsync is issued, an exception occurs.
/// </summary>
private Task _lastSendAsync;
/// <summary>
/// The task returned from the last ReceiveAsync operation to not complete synchronously.
/// The task returned from the last ReceiveAsync(ArraySegment, ...) operation to not complete synchronously.
/// If this is not null and not completed when a subsequent ReceiveAsync is issued, an exception occurs.
/// </summary>
private Task _lastReceiveAsync;
private Task _lastReceiveAsync = Task.CompletedTask;

/// <summary>Lock used to protect update and check-and-update operations on _state.</summary>
private object StateUpdateLock => _abortSource;
Expand Down Expand Up @@ -262,10 +254,10 @@ public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType m

WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer));

return SendPrivateAsync((ReadOnlyMemory<byte>)buffer, messageType, endOfMessage, cancellationToken);
return SendPrivateAsync((ReadOnlyMemory<byte>)buffer, messageType, endOfMessage, cancellationToken).AsTask();
}

private Task SendPrivateAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
private ValueTask SendPrivateAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
{
if (messageType != WebSocketMessageType.Text && messageType != WebSocketMessageType.Binary)
{
Expand All @@ -278,21 +270,19 @@ private Task SendPrivateAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType
try
{
WebSocketValidate.ThrowIfInvalidState(_state, _disposed, s_validSendStates);
ThrowIfOperationInProgress(_lastSendAsync);
}
catch (Exception exc)
{
return Task.FromException(exc);
return new ValueTask(Task.FromException(exc));
}

MessageOpcode opcode =
_lastSendWasFragment ? MessageOpcode.Continuation :
messageType == WebSocketMessageType.Binary ? MessageOpcode.Binary :
MessageOpcode.Text;

Task t = SendFrameAsync(opcode, endOfMessage, buffer, cancellationToken);
ValueTask t = SendFrameAsync(opcode, endOfMessage, buffer, cancellationToken);
_lastSendWasFragment = !endOfMessage;
_lastSendAsync = t;
return t;
}

Expand All @@ -307,7 +297,7 @@ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buf
Debug.Assert(!Monitor.IsEntered(StateUpdateLock), $"{nameof(StateUpdateLock)} must never be held when acquiring {nameof(ReceiveAsyncLock)}");
lock (ReceiveAsyncLock) // synchronize with receives in CloseAsync
{
ThrowIfOperationInProgress(_lastReceiveAsync);
ThrowIfOperationInProgress(_lastReceiveAsync.IsCompleted);
Task<WebSocketReceiveResult> t = ReceiveAsyncPrivate<WebSocketReceiveResultGetter,WebSocketReceiveResult>(buffer, cancellationToken).AsTask();
_lastReceiveAsync = t;
return t;
Expand Down Expand Up @@ -362,43 +352,34 @@ public override void Abort()
/// <param name="endOfMessage">The value of the FIN bit for the message.</param>
/// <param name="payloadBuffer">The buffer containing the payload data fro the message.</param>
/// <param name="cancellationToken">The CancellationToken to use to cancel the websocket.</param>
private Task SendFrameAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
private ValueTask SendFrameAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
{
// TODO: #4900 SendFrameAsync should in theory typically complete synchronously, making it fast and allocation free.
// However, due to #4900, it almost always yields, resulting in all of the allocations involved in an async method
// yielding, e.g. the boxed state machine, the Action delegate, the MoveNextRunner, and the resulting Task, plus it's
// common that the awaited operation completes so fast after the await that we may end up allocating an AwaitTaskContinuation
// inside of the TaskAwaiter. Since SendFrameAsync is such a core code path, until that can be fixed, we put some
// optimizations in place to avoid a few of those expenses, at the expense of more complicated code; for the common case,
// this code has fewer than half the number and size of allocations. If/when that issue is fixed, this method should be deleted
// and replaced by SendFrameFallbackAsync, which is the same logic but in a much more easily understand flow.

// If a cancelable cancellation token was provided, that would require registering with it, which means more state we have to
// pass around (the CancellationTokenRegistration), so if it is cancelable, just immediately go to the fallback path.
// Similarly, it should be rare that there are multiple outstanding calls to SendFrameAsync, but if there are, again
// fall back to the fallback path.
return cancellationToken.CanBeCanceled || !_sendFrameAsyncLock.Wait(0) ?
SendFrameFallbackAsync(opcode, endOfMessage, payloadBuffer, cancellationToken) :
new ValueTask(SendFrameFallbackAsync(opcode, endOfMessage, payloadBuffer, cancellationToken)) :
SendFrameLockAcquiredNonCancelableAsync(opcode, endOfMessage, payloadBuffer);
}

/// <summary>Sends a websocket frame to the network. The caller must hold the sending lock.</summary>
/// <param name="opcode">The opcode for the message.</param>
/// <param name="endOfMessage">The value of the FIN bit for the message.</param>
/// <param name="payloadBuffer">The buffer containing the payload data fro the message.</param>
private Task SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer)
private ValueTask SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer)
{
Debug.Assert(_sendFrameAsyncLock.CurrentCount == 0, "Caller should hold the _sendFrameAsyncLock");

// If we get here, the cancellation token is not cancelable so we don't have to worry about it,
// and we own the semaphore, so we don't need to asynchronously wait for it.
Task writeTask = null;
ValueTask writeTask = default;
bool releaseSemaphoreAndSendBuffer = true;
try
{
// Write the payload synchronously to the buffer, then write that buffer out to the network.
int sendBytes = WriteFrameToSendBuffer(opcode, endOfMessage, payloadBuffer.Span);
writeTask = _stream.WriteAsync(_sendBuffer, 0, sendBytes, CancellationToken.None);
writeTask = _stream.WriteAsync(new ReadOnlyMemory<byte>(_sendBuffer, 0, sendBytes));

// If the operation happens to complete synchronously (or, more specifically, by
// the time we get from the previous line to here), release the semaphore, return
Expand All @@ -415,10 +396,10 @@ private Task SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool
}
catch (Exception exc)
{
return Task.FromException(
return new ValueTask(Task.FromException(
exc is OperationCanceledException ? exc :
_state == WebSocketState.Aborted ? CreateOperationCanceledException(exc) :
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc));
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc)));
}
finally
{
Expand All @@ -429,22 +410,26 @@ private Task SendFrameLockAcquiredNonCancelableAsync(MessageOpcode opcode, bool
}
}

// The write was not yet completed. Create and return a continuation that will
// release the semaphore and translate any exception that occurred.
return writeTask.ContinueWith((t, s) =>
{
var thisRef = (ManagedWebSocket)s;
thisRef._sendFrameAsyncLock.Release();
thisRef.ReleaseSendBuffer();
return new ValueTask(WaitForWriteTaskAsync(writeTask));
}

try { t.GetAwaiter().GetResult(); }
catch (Exception exc) when (!(exc is OperationCanceledException))
{
throw thisRef._state == WebSocketState.Aborted ?
CreateOperationCanceledException(exc) :
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc);
}
}, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
private async Task WaitForWriteTaskAsync(ValueTask writeTask)
{
try
{
await writeTask.ConfigureAwait(false);
}
catch (Exception exc) when (!(exc is OperationCanceledException))
{
throw _state == WebSocketState.Aborted ?
CreateOperationCanceledException(exc) :
new WebSocketException(WebSocketError.ConnectionClosedPrematurely, exc);
}
finally
{
_sendFrameAsyncLock.Release();
ReleaseSendBuffer();
}
}

private async Task SendFrameFallbackAsync(MessageOpcode opcode, bool endOfMessage, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
Expand All @@ -455,7 +440,7 @@ private async Task SendFrameFallbackAsync(MessageOpcode opcode, bool endOfMessag
int sendBytes = WriteFrameToSendBuffer(opcode, endOfMessage, payloadBuffer.Span);
using (cancellationToken.Register(s => ((ManagedWebSocket)s).Abort(), this))
{
await _stream.WriteAsync(_sendBuffer, 0, sendBytes, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(new ReadOnlyMemory<byte>(_sendBuffer, 0, sendBytes), cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exc) when (!(exc is OperationCanceledException))
Expand Down Expand Up @@ -518,12 +503,12 @@ private void SendKeepAliveFrameAsync()
{
// This exists purely to keep the connection alive; don't wait for the result, and ignore any failures.
// The call will handle releasing the lock.
Task t = SendFrameLockAcquiredNonCancelableAsync(MessageOpcode.Ping, true, Memory<byte>.Empty);
ValueTask t = SendFrameLockAcquiredNonCancelableAsync(MessageOpcode.Ping, true, Memory<byte>.Empty);

// "Observe" any exception, ignoring it to prevent the unobserved exception event from being raised.
if (t.Status != TaskStatus.RanToCompletion)
if (!t.IsCompletedSuccessfully)
{
t.ContinueWith(p => { Exception ignored = p.Exception; },
t.AsTask().ContinueWith(p => { Exception ignored = p.Exception; },
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
Expand Down Expand Up @@ -1270,15 +1255,17 @@ private static unsafe int ApplyMask(Span<byte> toMask, int mask, int maskIndex)
}

/// <summary>Aborts the websocket and throws an exception if an existing operation is in progress.</summary>
private void ThrowIfOperationInProgress(Task operationTask, [CallerMemberName] string methodName = null)
private void ThrowIfOperationInProgress(bool operationCompleted, [CallerMemberName] string methodName = null)
{
if (operationTask != null && !operationTask.IsCompleted)
if (!operationCompleted)
{
Abort();
throw new InvalidOperationException(SR.Format(SR.net_Websockets_AlreadyOneOutstandingOperation, methodName));
ThrowOperationInProgress(methodName);
}
}

private void ThrowOperationInProgress(string methodName) => throw new InvalidOperationException(SR.Format(SR.net_Websockets_AlreadyOneOutstandingOperation, methodName));

/// <summary>Creates an OperationCanceledException instance, using a default message and the specified inner exception and token.</summary>
private static Exception CreateOperationCanceledException(Exception innerException, CancellationToken cancellationToken = default(CancellationToken))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> destination, Cancell
return await base.ReadAsync(destination, cancellationToken);
}

public override async Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
WriteHit = true;

Expand Down
5 changes: 2 additions & 3 deletions src/Common/tests/System/Net/Configuration.Certificates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static partial class Certificates
private const string CertificatePassword = "testcertificate";
private const string TestDataFolder = "TestData";

private static Mutex m;
private static readonly Mutex m;
private const int MutexTimeout = 120 * 1000;

static Certificates()
Expand Down Expand Up @@ -59,10 +59,9 @@ private static X509Certificate2Collection GetCertificateCollection(string certif
{
// On Windows, .NET Core applications should not import PFX files in parallel to avoid a known system-level race condition.
// This bug results in corrupting the X509Certificate2 certificate state.
Assert.True(m.WaitOne(MutexTimeout), "Cannot acquire the global certificate mutex.");
try
{
Assert.True(m.WaitOne(MutexTimeout), "Cannot acquire the global certificate mutex.");

var certCollection = new X509Certificate2Collection();
certCollection.Import(Path.Combine(TestDataFolder, certificateFileName), CertificatePassword, X509KeyStorageFlags.DefaultKeySet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private async Task ReadBufferAsync()
{
try
{
int bytesRead = await _stream.ReadAsync(_byteBuffer, 0, _byteBuffer.Length, _cts.Token).ConfigureAwait(false);
int bytesRead = await _stream.ReadAsync(new Memory<byte>(_byteBuffer), _cts.Token).ConfigureAwait(false);
if (bytesRead == 0)
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private async ValueTask<int> FinishReadAsyncMemory(Memory<byte> destination, Can
{
int readBytes = 0;
int iter = 0;
while (readBytes < _buffer.Length && ((iter = await _stream.ReadAsync(_buffer, readBytes, _buffer.Length - readBytes, cancellationToken).ConfigureAwait(false)) > 0))
while (readBytes < _buffer.Length && ((iter = await _stream.ReadAsync(new Memory<byte>(_buffer, readBytes, _buffer.Length - readBytes), cancellationToken).ConfigureAwait(false)) > 0))
{
readBytes += iter;
if (readBytes > _buffer.Length)
Expand Down
Loading

0 comments on commit b52bb27

Please sign in to comment.