Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Add initial async iterators support to System.Private.CoreLib #20442

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/System.Private.CoreLib/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -2620,6 +2620,9 @@
<data name="InvalidOperation_TimeoutsNotSupported" xml:space="preserve">
<value>Timeouts are not supported on this stream.</value>
</data>
<data name="InvalidOperation_TimerAlreadyClosed" xml:space="preserve">
<value>The Timer was already closed using an incompatible Dispose method.</value>
</data>
<data name="InvalidOperation_TypeCannotBeBoxed" xml:space="preserve">
<value>The given type cannot be boxed.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\DictionaryEntry.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\ArraySortHelper.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\Dictionary.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\IAsyncEnumerable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\IAsyncEnumerator.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\ICollection.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\ICollectionDebugView.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Collections\Generic\IComparer.cs" />
Expand Down Expand Up @@ -208,6 +210,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Guid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\HashCode.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\HResults.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IAsyncDisposable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IAsyncResult.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\ICloneable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IComparable.cs" />
Expand Down Expand Up @@ -421,6 +424,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Resources\SatelliteContractVersionAttribute.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Resources\UltimateResourceFallbackLocation.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\CompilerServices\AccessedThroughPropertyAttribute.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\CompilerServices\AsyncIteratorMethodBuilder.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\CompilerServices\AsyncMethodBuilderAttribute.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\CompilerServices\AsyncStateMachineAttribute.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\CompilerServices\AsyncValueTaskMethodBuilder.cs" />
Expand Down Expand Up @@ -632,6 +636,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskToApm.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\TaskSchedulerException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\ValueTask.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\ManualResetValueTaskSourceLogic.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\Tasks\Sources\IValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadAbortException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadInterruptedException.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IAsyncEnumerable [](start = 21, length = 16)

I don't know what level of xml doc is expected. Consider adding (here and other public types).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add 'em.

{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;

namespace System.Collections.Generic
{
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
ValueTask<bool> MoveNextAsync();
T Current { get; }
}
}
13 changes: 13 additions & 0 deletions src/System.Private.CoreLib/shared/System/IAsyncDisposable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;

namespace System
{
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
}
28 changes: 27 additions & 1 deletion src/System.Private.CoreLib/shared/System/IO/BinaryWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
using System.Text;
using System.Diagnostics;
using System.Buffers;
using System.Threading.Tasks;
using System.Threading;

namespace System.IO
{
// This abstract base class represents a writer that can write
// primitives to an arbitrary stream. A subclass can override methods to
// give unique encodings.
//
public class BinaryWriter : IDisposable
public class BinaryWriter : IDisposable, IAsyncDisposable
{
public static readonly BinaryWriter Null = new BinaryWriter();

Expand Down Expand Up @@ -87,6 +89,30 @@ public void Dispose()
Dispose(true);
}

public virtual ValueTask DisposeAsync()
{
if (GetType() == typeof(BinaryWriter))
{
if (_leaveOpen)
{
return new ValueTask(OutStream.FlushAsync());
}
else
{
OutStream.Close();
return default;
}
}
else
{
// Since this is a derived BinaryWriter, delegate to whatever logic
// the derived implementation already has in Dispose.
return new ValueTask(Task.Factory.StartNew(s => ((BinaryWriter)s).Dispose(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default));
}
}


// Returns the stream associated with the writer. It flushes all pending
// writes before returning. All subclasses should override Flush to
// ensure that all buffered data is sent to the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ protected override void Dispose(bool disposing)
}
}

public override ValueTask DisposeAsync() =>
// On Unix, we'll always end up doing what's in Dispose anyway,
// so just delegate to the base to queue it. We maintain an explicit override for
// consistency with Windows, which has a more complicated implementation.
base.DisposeAsync();

/// <summary>Flushes the OS buffer. This does not flush the internal read/write buffer.</summary>
private void FlushOSBuffer()
{
Expand Down
37 changes: 32 additions & 5 deletions src/System.Private.CoreLib/shared/System/IO/FileStream.Windows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,11 @@ protected override void Dispose(bool disposing)
{
if (_fileHandle != null && !_fileHandle.IsClosed)
{
if (_fileHandle.ThreadPoolBinding != null)
_fileHandle.ThreadPoolBinding.Dispose();

_fileHandle.ThreadPoolBinding?.Dispose();
_fileHandle.Dispose();
}

if (_preallocatedOverlapped != null)
_preallocatedOverlapped.Dispose();
_preallocatedOverlapped?.Dispose();

_canSeek = false;

Expand All @@ -270,6 +267,35 @@ protected override void Dispose(bool disposing)
}
}

public override ValueTask DisposeAsync() =>
GetType() == typeof(FileStream) ?
DisposeAsyncCore() :
base.DisposeAsync();

private async ValueTask DisposeAsyncCore()
{
// Same logic as in Dispose(disposing:true), except with async counterparts.
// TODO: https://github.com/dotnet/corefx/issues/32837: FlushAsync does synchronous work.
try
{
if (_fileHandle != null && !_fileHandle.IsClosed && _writePos > 0)
{
await FlushAsyncInternal(default).ConfigureAwait(false);
}
}
finally
{
if (_fileHandle != null && !_fileHandle.IsClosed)
{
_fileHandle.ThreadPoolBinding?.Dispose();
_fileHandle.Dispose();
}

_preallocatedOverlapped?.Dispose();
_canSeek = false;
}
}

private void FlushOSBuffer()
{
if (!Interop.Kernel32.FlushFileBuffers(_fileHandle))
Expand Down Expand Up @@ -1544,6 +1570,7 @@ private Task FlushAsyncInternal(CancellationToken cancellationToken)
if (_fileHandle.IsClosed)
throw Error.GetFileNotOpen();

// TODO: https://github.com/dotnet/corefx/issues/32837 (stop doing this synchronous work).
// The always synchronous data transfer between the OS and the internal buffer is intentional
// because this is needed to allow concurrent async IO requests. Concurrent data transfer
// between the OS and the internal buffer will result in race conditions. Since FlushWrite and
Expand Down
11 changes: 11 additions & 0 deletions src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ protected override void Dispose(bool disposing)
}
}

public override ValueTask DisposeAsync()
{
if (GetType() != typeof(MemoryStream))
{
return base.DisposeAsync();
}

Dispose(disposing: true);
return default;
}

// returns a bool saying whether we allocated a new array.
private bool EnsureCapacity(int value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
===========================================================*/

using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;

namespace System.IO
{
Expand Down Expand Up @@ -56,5 +57,11 @@ protected override void Dispose(bool disposing)

base.Dispose(disposing);
}

public override ValueTask DisposeAsync()
{
Dispose(disposing: true);
return default;
}
}
}
16 changes: 15 additions & 1 deletion src/System.Private.CoreLib/shared/System/IO/Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace System.IO
{
public abstract partial class Stream : MarshalByRefObject, IDisposable
public abstract partial class Stream : MarshalByRefObject, IDisposable, IAsyncDisposable
{
public static readonly Stream Null = new NullStream();

Expand Down Expand Up @@ -234,6 +234,12 @@ protected virtual void Dispose(bool disposing)
// torn down. This is the last code to run on cleanup for a stream.
}

public virtual ValueTask DisposeAsync()
{
return new ValueTask(Task.Factory.StartNew(s => ((Stream)s).Close(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default));
}

public abstract void Flush();

public Task FlushAsync()
Expand Down Expand Up @@ -899,6 +905,8 @@ protected override void Dispose(bool disposing)
// Do nothing - we don't want NullStream singleton (static) to be closable
}

public override ValueTask DisposeAsync() => default;

public override void Flush()
{
}
Expand Down Expand Up @@ -1202,6 +1210,12 @@ protected override void Dispose(bool disposing)
}
}

public override ValueTask DisposeAsync()
{
lock (_stream)
return _stream.DisposeAsync();
}

public override void Flush()
{
lock (_stream)
Expand Down
79 changes: 57 additions & 22 deletions src/System.Private.CoreLib/shared/System/IO/StreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,32 +195,67 @@ protected override void Dispose(bool disposing)
}
finally
{
// Dispose of our resources if this StreamWriter is closable.
// Note: Console.Out and other such non closable streamwriters should be left alone
if (!LeaveOpen && _stream != null)
CloseStreamFromDispose(disposing);
}
}

private void CloseStreamFromDispose(bool disposing)
{
// Dispose of our resources if this StreamWriter is closable.
if (!LeaveOpen && _stream != null)
{
try
{
try
{
// Attempt to close the stream even if there was an IO error from Flushing.
// Note that Stream.Close() can potentially throw here (may or may not be
// due to the same Flush error). In this case, we still need to ensure
// cleaning up internal resources, hence the finally block.
if (disposing)
{
_stream.Close();
}
}
finally
// Attempt to close the stream even if there was an IO error from Flushing.
// Note that Stream.Close() can potentially throw here (may or may not be
// due to the same Flush error). In this case, we still need to ensure
// cleaning up internal resources, hence the finally block.
if (disposing)
{
_stream = null;
_byteBuffer = null;
_charBuffer = null;
_encoding = null;
_encoder = null;
_charLen = 0;
base.Dispose(disposing);
_stream.Close();
}
}
finally
{
_stream = null;
_byteBuffer = null;
_charBuffer = null;
_encoding = null;
_encoder = null;
_charLen = 0;
base.Dispose(disposing);
}
}
}

public override ValueTask DisposeAsync()
{
if (GetType() != typeof(StreamWriter))
{
// Since this is a derived StreamWriter, delegate to whatever logic
// the derived implementation already has in Dispose.
return new ValueTask(Task.Factory.StartNew(s => ((StreamWriter)s).Dispose(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default));
}

return DisposeAsyncCore();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try / finally in DisposeAsyncCore suggests exceptions are possible. Won't it violate the async dispose pattern to have exceptions escape here vs. returned in ValueTask?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not quite following you. No exceptions should escape here: if any occur inside DisposeAsyncCore, they'll be stored into the ValueTask that's returned, which presumably is the behavior we'd want.

I have been debating with myself the larger question about whether exceptions should be valid at all. In general Dispose isn't supposed to throw, and thus DisposeAsync shouldn't either (or return a ValueTask that might fault with an exception), but in cases where an implementation's Dispose is already throwing, it seems reasonable to keep parity on the DisposeAsync, rather than eating the exceptions in cases where they wouldn't be eaten in Dispose.

Opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugg ... I missed that DisposeAsyncCore is async. I only checked this method for it.

Yeah I agree that no exceptions is a great principal but reality doesn't always line up with it.

}

private async ValueTask DisposeAsyncCore()
{
Debug.Assert(GetType() == typeof(StreamWriter));

// Same logic as in Dispose(true), but async.
try
{
if (_stream != null)
{
await FlushAsync().ConfigureAwait(false);
}
}
finally
{
CloseStreamFromDispose(disposing: true);
}
}

Expand Down
Loading