Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Implement ValueTask extensibility #16618

Merged
merged 3 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskToApm.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskSchedulerException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ValueTask.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\IValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadAbortException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPriority.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadStart.cs" />
Expand Down
12 changes: 6 additions & 6 deletions src/mscorlib/shared/System/IO/FileStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,12 @@ private unsafe void WriteNative(ReadOnlySpan<byte> source)
/// <param name="source">The buffer to write data from.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_useAsyncIO);

if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));

if (_fileHandle.IsClosed)
throw Error.GetFileNotOpen();
Expand All @@ -667,11 +667,11 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
source.Span.CopyTo(new Span<byte>(GetBuffer(), _writePos, source.Length));
_writePos += source.Length;

return Task.CompletedTask;
return default;
}
catch (Exception exc)
{
return Task.FromException(exc);
return new ValueTask(Task.FromException(exc));
}
finally
{
Expand All @@ -682,7 +682,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c

// Otherwise, issue the whole request asynchronously.
_asyncState.ReadOnlyMemory = source;
return waitTask.ContinueWith((t, s) =>
return new ValueTask(waitTask.ContinueWith((t, s) =>
{
// The options available on Unix for writing asynchronously to an arbitrary file
// handle typically amount to just using another thread to do the synchronous write,
Expand All @@ -702,7 +702,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
thisRef.WriteSpan(readOnlyMemory.Span);
}
finally { thisRef._asyncState.Release(); }
}, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}, this, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default));
}

/// <summary>Sets the current position of this stream to the given value.</summary>
Expand Down
25 changes: 8 additions & 17 deletions src/mscorlib/shared/System/IO/FileStream.Windows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ unsafe private Task<int> ReadNativeAsync(Memory<byte> destination, int numBuffer
return completionSource.Task;
}

private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_useAsyncIO);
Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both.");
Expand Down Expand Up @@ -1005,7 +1005,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
// completely, we want to do the asynchronous flush/write as part of this operation
// rather than waiting until the next write that fills the buffer.
if (source.Length != remainingBuffer)
return Task.CompletedTask;
return default;

Debug.Assert(_writePos == _bufferLength);
}
Expand Down Expand Up @@ -1051,7 +1051,7 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
flushTask.IsFaulted ||
flushTask.IsCanceled)
{
return flushTask;
return new ValueTask(flushTask);
}
}

Expand All @@ -1061,10 +1061,10 @@ private Task WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken c
// Finally, issue the write asynchronously, and return a Task that logically
// represents the write operation, including any flushing done.
Task writeTask = WriteAsyncInternalCore(source, cancellationToken);
return
return new ValueTask(
(flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
(writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
Task.WhenAll(flushTask, writeTask);
Task.WhenAll(flushTask, writeTask));
}

private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
Expand Down Expand Up @@ -1319,7 +1319,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
int bufferedBytes = _readLength - _readPos;
if (bufferedBytes > 0)
{
await destination.WriteAsync(GetBuffer(), _readPos, bufferedBytes, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(new ReadOnlyMemory<byte>(GetBuffer(), _readPos, bufferedBytes), cancellationToken).ConfigureAwait(false);
_readPos = _readLength = 0;
}
}
Expand All @@ -1345,7 +1345,6 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that
// we'd likely be unable to use it anyway. Instead, we rent the buffer from a pool.
byte[] copyBuffer = ArrayPool<byte>.Shared.Rent(bufferSize);
bufferSize = 0; // repurpose bufferSize to be the high water mark for the buffer, to avoid an extra field in the state machine

// Allocate an Overlapped we can use repeatedly for all operations
var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer);
Expand Down Expand Up @@ -1452,13 +1451,6 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
{
readAwaitable._position += numBytesRead;
}

// (and keep track of the maximum number of bytes in the buffer we used, to avoid excessive and unnecessary
// clearing of the buffer before we return it to the pool)
if (numBytesRead > bufferSize)
{
bufferSize = numBytesRead;
}
}
finally
{
Expand All @@ -1479,7 +1471,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
}

// Write out the read data.
await destination.WriteAsync(copyBuffer, 0, (int)readAwaitable._numBytes, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(new ReadOnlyMemory<byte>(copyBuffer, 0, (int)readAwaitable._numBytes), cancellationToken).ConfigureAwait(false);
}
}
finally
Expand All @@ -1488,8 +1480,7 @@ private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, Canc
cancellationReg.Dispose();
awaitableOverlapped.Dispose();

Array.Clear(copyBuffer, 0, bufferSize);
ArrayPool<byte>.Shared.Return(copyBuffer, clearArray: false);
ArrayPool<byte>.Shared.Return(copyBuffer);

// Make sure the stream's current position reflects where we ended up
if (!_fileHandle.IsClosed && CanSeek)
Expand Down
8 changes: 4 additions & 4 deletions src/mscorlib/shared/System/IO/FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,10 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
if (IsClosed)
throw Error.GetFileNotOpen();

return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (!_useAsyncIO || GetType() != typeof(FileStream))
{
Expand All @@ -473,7 +473,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
return new ValueTask(Task.FromCanceled<int>(cancellationToken));
}

if (IsClosed)
Expand Down Expand Up @@ -853,7 +853,7 @@ public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes,
if (!IsAsync)
return base.BeginWrite(array, offset, numBytes, callback, state);
else
return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None), callback, state);
return TaskToApm.Begin(WriteAsyncInternal(new ReadOnlyMemory<byte>(array, offset, numBytes), CancellationToken.None).AsTask(), callback, state);
}

public override int EndRead(IAsyncResult asyncResult)
Expand Down
10 changes: 5 additions & 5 deletions src/mscorlib/shared/System/IO/MemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,11 @@ public override Task WriteAsync(Byte[] buffer, int offset, int count, Cancellati
}
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
Expand All @@ -771,15 +771,15 @@ public override Task WriteAsync(Byte[] buffer, int offset, int count, Cancellati
{
Write(source.Span);
}
return Task.CompletedTask;
return default;
}
catch (OperationCanceledException oce)
{
return Task.FromCancellation<VoidTaskResult>(oce);
return new ValueTask(Task.FromCancellation<VoidTaskResult>(oce));
}
catch (Exception exception)
{
return Task.FromException(exception);
return new ValueTask(Task.FromException(exception));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/mscorlib/shared/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, Ca
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos, cancellationToken).ConfigureAwait(false);
int len = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos), cancellationToken).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

if (len == 0)
Expand Down Expand Up @@ -1127,7 +1127,7 @@ internal override async ValueTask<int> ReadAsyncInternal(Memory<char> buffer, Ca
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");

_byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length, cancellationToken).ConfigureAwait(false);
_byteLen = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer), cancellationToken).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tmpByteBuffer.AsMemory() is cleaner (generic-type inference)

here and elsewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpByteBuffer.AsMemory() is cleaner (generic-type inference)

I prefer the explicitness.


Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

Expand Down Expand Up @@ -1303,7 +1303,7 @@ private async Task<int> ReadBufferAsync()
{
Debug.Assert(_bytePos <= _encoding.Preamble.Length, "possible bug in _compressPreamble. Are two threads using this StreamReader at the same time?");
int tmpBytePos = _bytePos;
int len = await tmpStream.ReadAsync(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos).ConfigureAwait(false);
int len = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer, tmpBytePos, tmpByteBuffer.Length - tmpBytePos)).ConfigureAwait(false);
Debug.Assert(len >= 0, "Stream.Read returned a negative number! This is a bug in your stream class.");

if (len == 0)
Expand All @@ -1325,7 +1325,7 @@ private async Task<int> ReadBufferAsync()
else
{
Debug.Assert(_bytePos == 0, "_bytePos can be non zero only when we are trying to _checkPreamble. Are two threads using this StreamReader at the same time?");
_byteLen = await tmpStream.ReadAsync(tmpByteBuffer, 0, tmpByteBuffer.Length).ConfigureAwait(false);
_byteLen = await tmpStream.ReadAsync(new Memory<byte>(tmpByteBuffer)).ConfigureAwait(false);
Debug.Assert(_byteLen >= 0, "Stream.Read returned a negative number! Bug in stream class.");

if (_byteLen == 0) // We're at EOF
Expand Down
4 changes: 2 additions & 2 deletions src/mscorlib/shared/System/IO/StreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,14 +963,14 @@ private static async Task FlushAsyncInternal(StreamWriter _this, bool flushStrea
byte[] preamble = encoding.GetPreamble();
if (preamble.Length > 0)
{
await stream.WriteAsync(preamble, 0, preamble.Length, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(new ReadOnlyMemory<byte>(preamble), cancellationToken).ConfigureAwait(false);
Copy link
Member

@ahsonkhan ahsonkhan Feb 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to explicitly create a ROM here? Would implicit cast work? I don't see an array based stream.WriteAsync overload that has 2 parameters (second one being the cancellation token).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to explicitly create a ROM here?

Being explicit helps me when reading the code to know that I'm using the desired overload. Otherwise I need to start paying attention to how many arguments there are and whether I might be slipping into the wrong one.

}
}

int count = encoder.GetBytes(charBuffer, 0, charPos, byteBuffer, 0, flushEncoder);
if (count > 0)
{
await stream.WriteAsync(byteBuffer, 0, count, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(new ReadOnlyMemory<byte>(byteBuffer, 0, count), cancellationToken).ConfigureAwait(false);
}

// By definition, calling Flush should flush the stream, but this is
Expand Down
8 changes: 4 additions & 4 deletions src/mscorlib/shared/System/IO/UnmanagedMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,11 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
/// </summary>
/// <param name="buffer">Buffer that will be written.</param>
/// <param name="cancellationToken">Token that can be used to cancel the operation.</param>
public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
return new ValueTask(Task.FromCanceled(cancellationToken));
}

try
Expand All @@ -802,11 +802,11 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
{
Write(source.Span);
}
return Task.CompletedTask;
return default;
}
catch (Exception ex)
{
return Task.FromException(ex);
return new ValueTask(Task.FromException(ex));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, Cancel
return _unmanagedStream.WriteAsync(buffer, offset, count, cancellationToken);
}

public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
return _unmanagedStream.WriteAsync(source, cancellationToken);
}
Expand Down
Loading