Skip to content
This repository has been archived by the owner on Nov 1, 2020. It is now read-only.

Commit

Permalink
Merge pull request dotnet/coreclr#16618 from stephentoub/valuetaskext…
Browse files Browse the repository at this point in the history
…ensibility

Implement ValueTask extensibility

Signed-off-by: dotnet-bot <dotnet-bot@microsoft.com>
  • Loading branch information
stephentoub authored and dotnet-bot committed Mar 1, 2018
1 parent fdf4790 commit 17aaa07
Show file tree
Hide file tree
Showing 14 changed files with 1,254 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskToApm.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskSchedulerException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ValueTask.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\IValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadAbortException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPriority.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadStart.cs" />
Expand Down
12 changes: 6 additions & 6 deletions src/System.Private.CoreLib/shared/System/IO/FileStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,12 @@ private unsafe void WriteNative(ReadOnlySpan<byte> source)
/// <param name="source">The buffer to write data from.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_useAsyncIO);

if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));

if (_fileHandle.IsClosed)
throw Error.GetFileNotOpen();
Expand All @@ -667,11 +667,11 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
_writePos += source.Length;

return Task.CompletedTask;
return default;
}
catch (Exception exc)
{
return Task.FromException(exc);
return new ValueTask(Task.FromException(exc));
}
finally
{
Expand All @@ -682,7 +682,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c

// Otherwise, issue the whole request asynchronously.
_asyncState.ReadOnlyMemory = source;
return waitTask.ContinueWith((t, s) =>
return new ValueTask(waitTask.ContinueWith((t, s) =>
{
// The options available on Unix for writing asynchronously to an arbitrary file
// handle typically amount to just using another thread to do the synchronous write,
Expand All @@ -702,7 +702,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
thisRef.WriteSpan(readOnlyMemory.Span);
}
finally { thisRef._asyncState.Release(); }
}, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
}

/// <summary>Sets the current position of this stream to the given value.</summary>
Expand Down
25 changes: 8 additions & 17 deletions src/System.Private.CoreLib/shared/System/IO/FileStream.Windows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ unsafe private Task<int> ReadNativeAsync(Memory<byte> destination, int numBuffer
return completionSource.Task;
}

private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_useAsyncIO);
Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
Expand Down Expand Up @@ -1005,7 +1005,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
// completely, we want to do the asynchronous flush/write as part of this operation
// rather than waiting until the next write that fills the buffer.
if (source.Length != remainingBuffer)
return Task.CompletedTask;
return default;

Debug.Assert(_writePos == _bufferLength);
}
Expand Down Expand Up @@ -1051,7 +1051,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
flushTask.IsFaulted ||
flushTask.IsCanceled)
{
return flushTask;
return new ValueTask(flushTask);
}
}

Expand All @@ -1061,10 +1061,10 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
// Finally, issue the write asynchronously, and return a Task that logically
// represents the write operation, including any flushing done.
Task writeTask = WriteAsyncInternalCore(source, cancellationToken);
return
return new ValueTask(
(flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
(writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
Task.WhenAll(flushTask, writeTask);
Task.WhenAll(flushTask, writeTask));
}

private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
Expand Down Expand Up @@ -1319,7 +1319,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
int bufferedBytes = _readLength - _readPos;
if (bufferedBytes > 0)
{
await destination.WriteAsync(GetBuffer(), _readPos, bufferedBytes, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(new ReadOnlyMemory<byte>(GetBuffer(), _readPos, bufferedBytes), cancellationToken).ConfigureAwait(false);
_readPos = _readLength = 0;
}
}
Expand All @@ -1345,7 +1345,6 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that
// we'd likely be unable to use it anyway. Instead, we rent the buffer from a pool.
byte[] copyBuffer = ArrayPool<byte>.Shared.Rent(bufferSize);
bufferSize = 0; // repurpose bufferSize to be the high water mark for the buffer, to avoid an extra field in the state machine

// Allocate an Overlapped we can use repeatedly for all operations
var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer);
Expand Down Expand Up @@ -1452,13 +1451,6 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
{
readAwaitable._position += numBytesRead;
}

// (and keep track of the maximum number of bytes in the buffer we used, to avoid excessive and unnecessary
// clearing of the buffer before we return it to the pool)
if (numBytesRead > bufferSize)
{
bufferSize = numBytesRead;
}
}
finally
{
Expand All @@ -1479,7 +1471,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
}

// Write out the read data.
await destination.WriteAsync(copyBuffer, 0, (int)readAwaitable._numBytes, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(new ReadOnlyMemory<byte>(copyBuffer, 0, (int)readAwaitable._numBytes), cancellationToken).ConfigureAwait(false);
}
}
finally
Expand All @@ -1488,8 +1480,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
cancellationReg.Dispose();
awaitableOverlapped.Dispose();

Array.Clear(copyBuffer, 0, bufferSize);
ArrayPool<byte>.Shared.Return(copyBuffer, clearArray: false);
ArrayPool<byte>.Shared.Return(copyBuffer);

// Make sure the stream's current position reflects where we ended up
if (!_fileHandle.IsClosed && CanSeek)
Expand Down
8 changes: 4 additions & 4 deletions src/System.Private.CoreLib/shared/System/IO/FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,10 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
if (IsClosed)
throw Error.GetFileNotOpen();

return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (!_useAsyncIO || GetType() != typeof(FileStream))
{
Expand All @@ -473,7 +473,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
return new ValueTask(Task.FromCanceled<int>(cancellationToken));
}

if (IsClosed)
Expand Down Expand Up @@ -853,7 +853,7 @@ public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes,
if (!IsAsync)
return base.BeginWrite(array, offset, numBytes, callback, state);
else
return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None), callback, state);
return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None).AsTask(), callback, state);
}

public override int EndRead(IAsyncResult asyncResult)
Expand Down
10 changes: 5 additions & 5 deletions src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,11 @@ public override Task WriteAsync(Byte[] buffer, int offset, int count, Cancellati
}
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
Expand All @@ -771,15 +771,15 @@ public override Task WriteAsync(Byte[] buffer, int offset, int count, Cancellati
{
Write(source.Span);
}
return Task.CompletedTask;
return default;
}
catch (OperationCanceledException oce)
{
return Task.FromCancellation<VoidTaskResult>(oce);
return new ValueTask(Task.FromCancellation<VoidTaskResult>(oce));
}
catch (Exception exception)
{
return Task.FromException(exception);
return new ValueTask(Task.FromException(exception));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/System.Private.CoreLib/shared/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, Ca
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos, cancellationToken).ConfigureAwait(false);
int len = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos), cancellationToken).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

if (len == 0)
Expand Down Expand Up @@ -1127,7 +1127,7 @@ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, Ca
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");

_byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length, cancellationToken).ConfigureAwait(false);
_byteLen = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer), cancellationToken).ConfigureAwait(false);

Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

Expand Down Expand Up @@ -1303,7 +1303,7 @@ private async Task<int> ReadBufferAsync()
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos).ConfigureAwait(false);
int len = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos)).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

if (len == 0)
Expand All @@ -1325,7 +1325,7 @@ private async Task<int> ReadBufferAsync()
else
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");
_byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length).ConfigureAwait(false);
_byteLen = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer)).ConfigureAwait(false);
Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! Bug in stream class.");

if (_byteLen == 0) // We're at EOF
Expand Down
4 changes: 2 additions & 2 deletions src/System.Private.CoreLib/shared/System/IO/StreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,14 +963,14 @@ private static async Task FlushAsyncInternal(StreamWriter _this, bool flushStrea
byte[] preamble = encoding.GetPreamble();
if (preamble.Length > 0)
{
await stream.WriteAsync(preamble, 0, preamble.Length, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(new ReadOnlyMemory<byte>(preamble), cancellationToken).ConfigureAwait(false);
}
}

int count = encoder.GetBytes(charBuffer, 0, charPos, byteBuffer, 0, flushEncoder);
if (count > 0)
{
await stream.WriteAsync(byteBuffer, 0, count, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(new ReadOnlyMemory<byte>(byteBuffer, 0, count), cancellationToken).ConfigureAwait(false);
}

// By definition, calling Flush should flush the stream, but this is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,11 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
/// </summary>
/// <param name="buffer">Buffer that will be written.</param>
/// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
Expand All @@ -802,11 +802,11 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
{
Write(source.Span);
}
return Task.CompletedTask;
return default;
}
catch (Exception ex)
{
return Task.FromException(ex);
return new ValueTask(Task.FromException(ex));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
return _unmanagedStream.WriteAsync(buffer, offset, count, cancellationToken);
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
return _unmanagedStream.WriteAsync(source, cancellationToken);
}
Expand Down
Loading

0 comments on commit 17aaa07

Please sign in to comment.