Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Commit

Permalink
Implement ValueTask extensibility
Browse files Browse the repository at this point in the history
This commit adds support for extending `ValueTask<T>` with arbitrary backing sources.  Prior to this change, `ValueTask<T>` could wrap a `T` or a `Task<T>`; now it can also wrap an `IValueTaskSource<T>`, which can be implemented by arbitrary objects to be represented by `ValueTask<T>`.  These objects can then be pooled and reused to minimize allocation.  The commit also adds a non-generic `ValueTask` that can represent void-returning operations, including a `default` synchronous success, `Task`, and `IValueTaskSource`.  For the non-generic `ValueTask`, the commit also includes awaiters and async method builders, so it can be both awaited and used as the return type of an async method.

The rest of the changes fall into a few buckets all related to enabling this support:
- Modifying `AsyncTaskMethodBuilder<TResult>.AwaitUnsafeOnCompleted` to specially recognize any `ValueTask` and utilize either the `Task` or `IValueTaskSource` that backs it to avoid allocating an Action MoveNext method.  If every object awaited in an async method is either a `Task`/`Task<T>` or `ValueTask`/`ValueTask<T>`, regardless of what the `ValueTask`/`ValueTask<T>` wraps, we'll be able to avoid allocating the delegate and only allocate the single state machine object that also serves as the returned object.
- Changing `Stream.WriteAsync` to return `ValueTask` instead of `Task`.  This enables interested overriding stream types to use a reusable/pooled object to avoid `WriteAsync` allocations.
- Modifying Stream.CopyToAsync to use the new `Memory`-based overloads of `ReadAsync` and `WriteAsync`.  This enables the default `CopyToAsync` implementation to take advantage of any pooling done by derived streams, but even without pooling to take advantage of synchronously completing `ReadAsync`s returning `ValueTask<int>`s that contained an `int` rather than an allocated object. (While I was modifying this, I also removed some unnecessary array clearing that we'd added before later deciding it wasn't needed in general.)
- Modifying StreamReader/Writer to use the new `ReadAsync`/`WriteAsync` overloads.
  • Loading branch information
stephentoub committed Feb 28, 2018
1 parent f1fee6d commit 019cdaf
Show file tree
Hide file tree
Showing 17 changed files with 1,271 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\SendOrPostCallback.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\SpinWait.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\SynchronizationLockException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\IValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskCanceledException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskCompletionSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskExtensions.cs" />
Expand Down
12 changes: 6 additions & 6 deletions src/mscorlib/shared/System/IO/FileStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,12 @@ private unsafe void WriteNative(ReadOnlySpan<byte> source)
/// <param name="source">The buffer to write data from.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_useAsyncIO);

if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));

if (_fileHandle.IsClosed)
throw Error.GetFileNotOpen();
Expand All @@ -667,11 +667,11 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
_writePos += source.Length;

return Task.CompletedTask;
return default;
}
catch (Exception exc)
{
return Task.FromException(exc);
return new ValueTask(Task.FromException(exc));
}
finally
{
Expand All @@ -682,7 +682,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c

// Otherwise, issue the whole request asynchronously.
_asyncState.ReadOnlyMemory = source;
return waitTask.ContinueWith((t, s) =>
return new ValueTask(waitTask.ContinueWith((t, s) =>
{
// The options available on Unix for writing asynchronously to an arbitrary file
// handle typically amount to just using another thread to do the synchronous write,
Expand All @@ -702,7 +702,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
thisRef.WriteSpan(readOnlyMemory.Span);
}
finally { thisRef._asyncState.Release(); }
}, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
}

/// <summary>Sets the current position of this stream to the given value.</summary>
Expand Down
25 changes: 8 additions & 17 deletions src/mscorlib/shared/System/IO/FileStream.Windows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ unsafe private Task<int> ReadNativeAsync(Memory<byte> destination, int numBuffer
return completionSource.Task;
}

private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_useAsyncIO);
Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
Expand Down Expand Up @@ -1005,7 +1005,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
// completely, we want to do the asynchronous flush/write as part of this operation
// rather than waiting until the next write that fills the buffer.
if (source.Length != remainingBuffer)
return Task.CompletedTask;
return default;

Debug.Assert(_writePos == _bufferLength);
}
Expand Down Expand Up @@ -1051,7 +1051,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
flushTask.IsFaulted ||
flushTask.IsCanceled)
{
return flushTask;
return new ValueTask(flushTask);
}
}

Expand All @@ -1061,10 +1061,10 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
// Finally, issue the write asynchronously, and return a Task that logically
// represents the write operation, including any flushing done.
Task writeTask = WriteAsyncInternalCore(source, cancellationToken);
return
return new ValueTask(
(flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
(writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
Task.WhenAll(flushTask, writeTask);
Task.WhenAll(flushTask, writeTask));
}

private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
Expand Down Expand Up @@ -1319,7 +1319,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
int bufferedBytes = _readLength - _readPos;
if (bufferedBytes > 0)
{
await destination.WriteAsync(GetBuffer(), _readPos, bufferedBytes, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(new ReadOnlyMemory<byte>(GetBuffer(), _readPos, bufferedBytes), cancellationToken).ConfigureAwait(false);
_readPos = _readLength = 0;
}
}
Expand All @@ -1345,7 +1345,6 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that
// we'd likely be unable to use it anyway. Instead, we rent the buffer from a pool.
byte[] copyBuffer = ArrayPool<byte>.Shared.Rent(bufferSize);
bufferSize = 0; // repurpose bufferSize to be the high water mark for the buffer, to avoid an extra field in the state machine

// Allocate an Overlapped we can use repeatedly for all operations
var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer);
Expand Down Expand Up @@ -1452,13 +1451,6 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
{
readAwaitable._position += numBytesRead;
}

// (and keep track of the maximum number of bytes in the buffer we used, to avoid excessive and unnecessary
// clearing of the buffer before we return it to the pool)
if (numBytesRead > bufferSize)
{
bufferSize = numBytesRead;
}
}
finally
{
Expand All @@ -1479,7 +1471,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
}

// Write out the read data.
await destination.WriteAsync(copyBuffer, 0, (int)readAwaitable._numBytes, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(new ReadOnlyMemory<byte>(copyBuffer, 0, (int)readAwaitable._numBytes), cancellationToken).ConfigureAwait(false);
}
}
finally
Expand All @@ -1488,8 +1480,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
cancellationReg.Dispose();
awaitableOverlapped.Dispose();

Array.Clear(copyBuffer, 0, bufferSize);
ArrayPool<byte>.Shared.Return(copyBuffer, clearArray: false);
ArrayPool<byte>.Shared.Return(copyBuffer);

// Make sure the stream's current position reflects where we ended up
if (!_fileHandle.IsClosed && CanSeek)
Expand Down
8 changes: 4 additions & 4 deletions src/mscorlib/shared/System/IO/FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,10 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
if (IsClosed)
throw Error.GetFileNotOpen();

return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (!_useAsyncIO || GetType() != typeof(FileStream))
{
Expand All @@ -473,7 +473,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
return new ValueTask(Task.FromCanceled<int>(cancellationToken));
}

if (IsClosed)
Expand Down Expand Up @@ -853,7 +853,7 @@ public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes,
if (!IsAsync)
return base.BeginWrite(array, offset, numBytes, callback, state);
else
return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None), callback, state);
return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None).AsTask(), callback, state);
}

public override int EndRead(IAsyncResult asyncResult)
Expand Down
10 changes: 5 additions & 5 deletions src/mscorlib/shared/System/IO/MemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,11 @@ public override Task WriteAsync(Byte[] buffer, int offset, int count, Cancellati
}
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
Expand All @@ -771,15 +771,15 @@ public override Task WriteAsync(Byte[] buffer, int offset, int count, Cancellati
{
Write(source.Span);
}
return Task.CompletedTask;
return default;
}
catch (OperationCanceledException oce)
{
return Task.FromCancellation<VoidTaskResult>(oce);
return new ValueTask(Task.FromCancellation<VoidTaskResult>(oce));
}
catch (Exception exception)
{
return Task.FromException(exception);
return new ValueTask(Task.FromException(exception));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/mscorlib/shared/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, Ca
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos, cancellationToken).ConfigureAwait(false);
int len = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos), cancellationToken).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

if (len == 0)
Expand Down Expand Up @@ -1127,7 +1127,7 @@ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, Ca
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");

_byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length, cancellationToken).ConfigureAwait(false);
_byteLen = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer), cancellationToken).ConfigureAwait(false);

Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

Expand Down Expand Up @@ -1303,7 +1303,7 @@ private async Task<int> ReadBufferAsync()
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos).ConfigureAwait(false);
int len = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos)).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

if (len == 0)
Expand All @@ -1325,7 +1325,7 @@ private async Task<int> ReadBufferAsync()
else
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");
_byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length).ConfigureAwait(false);
_byteLen = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer)).ConfigureAwait(false);
Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! Bug in stream class.");

if (_byteLen == 0) // We're at EOF
Expand Down
4 changes: 2 additions & 2 deletions src/mscorlib/shared/System/IO/StreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,14 +963,14 @@ private static async Task FlushAsyncInternal(StreamWriter _this, bool flushStrea
byte[] preamble = encoding.GetPreamble();
if (preamble.Length > 0)
{
await stream.WriteAsync(preamble, 0, preamble.Length, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(new ReadOnlyMemory<byte>(preamble), cancellationToken).ConfigureAwait(false);
}
}

int count = encoder.GetBytes(charBuffer, 0, charPos, byteBuffer, 0, flushEncoder);
if (count > 0)
{
await stream.WriteAsync(byteBuffer, 0, count, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(new ReadOnlyMemory<byte>(byteBuffer, 0, count), cancellationToken).ConfigureAwait(false);
}

// By definition, calling Flush should flush the stream, but this is
Expand Down
8 changes: 4 additions & 4 deletions src/mscorlib/shared/System/IO/UnmanagedMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,11 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
/// </summary>
/// <param name="buffer">Buffer that will be written.</param>
/// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
Expand All @@ -802,11 +802,11 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
{
Write(source.Span);
}
return Task.CompletedTask;
return default;
}
catch (Exception ex)
{
return Task.FromException(ex);
return new ValueTask(Task.FromException(ex));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
return _unmanagedStream.WriteAsync(buffer, offset, count, cancellationToken);
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
return _unmanagedStream.WriteAsync(source, cancellationToken);
}
Expand Down
Loading

0 comments on commit 019cdaf

Please sign in to comment.