diff --git a/src/System.Private.CoreLib/Resources/Strings.resx b/src/System.Private.CoreLib/Resources/Strings.resx index 26f5b66a2f2d..de7eb5ba00f3 100644 --- a/src/System.Private.CoreLib/Resources/Strings.resx +++ b/src/System.Private.CoreLib/Resources/Strings.resx @@ -2620,6 +2620,9 @@ Timeouts are not supported on this stream. + + The Timer was already closed using an incompatible Dispose method. + The given type cannot be boxed. diff --git a/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems b/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems index e8520f213ee9..cdc6c7d62ece 100644 --- a/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems +++ b/src/System.Private.CoreLib/shared/System.Private.CoreLib.Shared.projitems @@ -72,6 +72,8 @@ + + @@ -208,6 +210,7 @@ + @@ -421,6 +424,7 @@ + @@ -632,6 +636,7 @@ + diff --git a/src/System.Private.CoreLib/shared/System/Collections/Generic/IAsyncEnumerable.cs b/src/System.Private.CoreLib/shared/System/Collections/Generic/IAsyncEnumerable.cs new file mode 100644 index 000000000000..b8cd4f26c9a1 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Collections/Generic/IAsyncEnumerable.cs @@ -0,0 +1,11 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Collections.Generic +{ + public interface IAsyncEnumerable + { + IAsyncEnumerator GetAsyncEnumerator(); + } +} diff --git a/src/System.Private.CoreLib/shared/System/Collections/Generic/IAsyncEnumerator.cs b/src/System.Private.CoreLib/shared/System/Collections/Generic/IAsyncEnumerator.cs new file mode 100644 index 000000000000..67b5670a3140 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Collections/Generic/IAsyncEnumerator.cs @@ -0,0 +1,14 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Collections.Generic +{ + public interface IAsyncEnumerator : IAsyncDisposable + { + ValueTask MoveNextAsync(); + T Current { get; } + } +} diff --git a/src/System.Private.CoreLib/shared/System/IAsyncDisposable.cs b/src/System.Private.CoreLib/shared/System/IAsyncDisposable.cs new file mode 100644 index 000000000000..6a1ac3f7b1e2 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/IAsyncDisposable.cs @@ -0,0 +1,13 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System +{ + public interface IAsyncDisposable + { + ValueTask DisposeAsync(); + } +} diff --git a/src/System.Private.CoreLib/shared/System/IO/BinaryWriter.cs b/src/System.Private.CoreLib/shared/System/IO/BinaryWriter.cs index d1a333f419f8..9b4523333e7f 100644 --- a/src/System.Private.CoreLib/shared/System/IO/BinaryWriter.cs +++ b/src/System.Private.CoreLib/shared/System/IO/BinaryWriter.cs @@ -5,6 +5,8 @@ using System.Text; using System.Diagnostics; using System.Buffers; +using System.Threading.Tasks; +using System.Threading; namespace System.IO { @@ -12,7 +14,7 @@ namespace System.IO // primitives to an arbitrary stream. A subclass can override methods to // give unique encodings. // - public class BinaryWriter : IDisposable + public class BinaryWriter : IDisposable, IAsyncDisposable { public static readonly BinaryWriter Null = new BinaryWriter(); @@ -87,6 +89,30 @@ public void Dispose() Dispose(true); } + public virtual ValueTask DisposeAsync() + { + if (GetType() == typeof(BinaryWriter)) + { + if (_leaveOpen) + { + return new ValueTask(OutStream.FlushAsync()); + } + else + { + OutStream.Close(); + return default; + } + } + else + { + // Since this is a derived BinaryWriter, delegate to whatever logic + // the derived implementation already has in Dispose. + return new ValueTask(Task.Factory.StartNew(s => ((BinaryWriter)s).Dispose(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)); + } + } + + // Returns the stream associated with the writer. It flushes all pending // writes before returning. All subclasses should override Flush to // ensure that all buffered data is sent to the stream. diff --git a/src/System.Private.CoreLib/shared/System/IO/FileStream.Unix.cs b/src/System.Private.CoreLib/shared/System/IO/FileStream.Unix.cs index ae4b709ea110..c77d348edfca 100644 --- a/src/System.Private.CoreLib/shared/System/IO/FileStream.Unix.cs +++ b/src/System.Private.CoreLib/shared/System/IO/FileStream.Unix.cs @@ -269,6 +269,12 @@ protected override void Dispose(bool disposing) } } + public override ValueTask DisposeAsync() => + // On Unix, we'll always end up doing what's in Dispose anyway, + // so just delegate to the base to queue it. We maintain an explicit override for + // consistency with Windows, which has a more complicated implementation. + base.DisposeAsync(); + /// Flushes the OS buffer. This does not flush the internal read/write buffer. private void FlushOSBuffer() { diff --git a/src/System.Private.CoreLib/shared/System/IO/FileStream.Windows.cs b/src/System.Private.CoreLib/shared/System/IO/FileStream.Windows.cs index 4f8292bcab17..7dcd6adf3d15 100644 --- a/src/System.Private.CoreLib/shared/System/IO/FileStream.Windows.cs +++ b/src/System.Private.CoreLib/shared/System/IO/FileStream.Windows.cs @@ -251,14 +251,11 @@ protected override void Dispose(bool disposing) { if (_fileHandle != null && !_fileHandle.IsClosed) { - if (_fileHandle.ThreadPoolBinding != null) - _fileHandle.ThreadPoolBinding.Dispose(); - + _fileHandle.ThreadPoolBinding?.Dispose(); _fileHandle.Dispose(); } - if (_preallocatedOverlapped != null) - _preallocatedOverlapped.Dispose(); + _preallocatedOverlapped?.Dispose(); _canSeek = false; @@ -270,6 +267,35 @@ protected override void Dispose(bool disposing) } } + public override ValueTask DisposeAsync() => + GetType() == typeof(FileStream) ? + DisposeAsyncCore() : + base.DisposeAsync(); + + private async ValueTask DisposeAsyncCore() + { + // Same logic as in Dispose(disposing:true), except with async counterparts. + // TODO: https://github.com/dotnet/corefx/issues/32837: FlushAsync does synchronous work. + try + { + if (_fileHandle != null && !_fileHandle.IsClosed && _writePos > 0) + { + await FlushAsyncInternal(default).ConfigureAwait(false); + } + } + finally + { + if (_fileHandle != null && !_fileHandle.IsClosed) + { + _fileHandle.ThreadPoolBinding?.Dispose(); + _fileHandle.Dispose(); + } + + _preallocatedOverlapped?.Dispose(); + _canSeek = false; + } + } + private void FlushOSBuffer() { if (!Interop.Kernel32.FlushFileBuffers(_fileHandle)) @@ -1544,6 +1570,7 @@ private Task FlushAsyncInternal(CancellationToken cancellationToken) if (_fileHandle.IsClosed) throw Error.GetFileNotOpen(); + // TODO: https://github.com/dotnet/corefx/issues/32837 (stop doing this synchronous work). // The always synchronous data transfer between the OS and the internal buffer is intentional // because this is needed to allow concurrent async IO requests. Concurrent data transfer // between the OS and the internal buffer will result in race conditions. Since FlushWrite and diff --git a/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs b/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs index 9bac0d818b12..538448f6383d 100644 --- a/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/MemoryStream.cs @@ -141,6 +141,17 @@ protected override void Dispose(bool disposing) } } + public override ValueTask DisposeAsync() + { + if (GetType() != typeof(MemoryStream)) + { + return base.DisposeAsync(); + } + + Dispose(disposing: true); + return default; + } + // returns a bool saying whether we allocated a new array. private bool EnsureCapacity(int value) { diff --git a/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs b/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs index 94331a2ef826..28385a6b740e 100644 --- a/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/PinnedBufferMemoryStream.cs @@ -15,8 +15,9 @@ ===========================================================*/ using System; -using System.Runtime.InteropServices; using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading.Tasks; namespace System.IO { @@ -56,5 +57,11 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } + + public override ValueTask DisposeAsync() + { + Dispose(disposing: true); + return default; + } } } diff --git a/src/System.Private.CoreLib/shared/System/IO/Stream.cs b/src/System.Private.CoreLib/shared/System/IO/Stream.cs index faeb69fb5418..f947b3f12471 100644 --- a/src/System.Private.CoreLib/shared/System/IO/Stream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/Stream.cs @@ -25,7 +25,7 @@ namespace System.IO { - public abstract partial class Stream : MarshalByRefObject, IDisposable + public abstract partial class Stream : MarshalByRefObject, IDisposable, IAsyncDisposable { public static readonly Stream Null = new NullStream(); @@ -234,6 +234,12 @@ protected virtual void Dispose(bool disposing) // torn down. This is the last code to run on cleanup for a stream. } + public virtual ValueTask DisposeAsync() + { + return new ValueTask(Task.Factory.StartNew(s => ((Stream)s).Close(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)); + } + public abstract void Flush(); public Task FlushAsync() @@ -899,6 +905,8 @@ protected override void Dispose(bool disposing) // Do nothing - we don't want NullStream singleton (static) to be closable } + public override ValueTask DisposeAsync() => default; + public override void Flush() { } @@ -1202,6 +1210,12 @@ protected override void Dispose(bool disposing) } } + public override ValueTask DisposeAsync() + { + lock (_stream) + return _stream.DisposeAsync(); + } + public override void Flush() { lock (_stream) diff --git a/src/System.Private.CoreLib/shared/System/IO/StreamWriter.cs b/src/System.Private.CoreLib/shared/System/IO/StreamWriter.cs index 8d94ac60b989..705cd12639d3 100644 --- a/src/System.Private.CoreLib/shared/System/IO/StreamWriter.cs +++ b/src/System.Private.CoreLib/shared/System/IO/StreamWriter.cs @@ -195,32 +195,67 @@ protected override void Dispose(bool disposing) } finally { - // Dispose of our resources if this StreamWriter is closable. - // Note: Console.Out and other such non closable streamwriters should be left alone - if (!LeaveOpen && _stream != null) + CloseStreamFromDispose(disposing); + } + } + + private void CloseStreamFromDispose(bool disposing) + { + // Dispose of our resources if this StreamWriter is closable. + if (!LeaveOpen && _stream != null) + { + try { - try - { - // Attempt to close the stream even if there was an IO error from Flushing. - // Note that Stream.Close() can potentially throw here (may or may not be - // due to the same Flush error). In this case, we still need to ensure - // cleaning up internal resources, hence the finally block. - if (disposing) - { - _stream.Close(); - } - } - finally + // Attempt to close the stream even if there was an IO error from Flushing. + // Note that Stream.Close() can potentially throw here (may or may not be + // due to the same Flush error). In this case, we still need to ensure + // cleaning up internal resources, hence the finally block. + if (disposing) { - _stream = null; - _byteBuffer = null; - _charBuffer = null; - _encoding = null; - _encoder = null; - _charLen = 0; - base.Dispose(disposing); + _stream.Close(); } } + finally + { + _stream = null; + _byteBuffer = null; + _charBuffer = null; + _encoding = null; + _encoder = null; + _charLen = 0; + base.Dispose(disposing); + } + } + } + + public override ValueTask DisposeAsync() + { + if (GetType() != typeof(StreamWriter)) + { + // Since this is a derived StreamWriter, delegate to whatever logic + // the derived implementation already has in Dispose. + return new ValueTask(Task.Factory.StartNew(s => ((StreamWriter)s).Dispose(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)); + } + + return DisposeAsyncCore(); + } + + private async ValueTask DisposeAsyncCore() + { + Debug.Assert(GetType() == typeof(StreamWriter)); + + // Same logic as in Dispose(true), but async. + try + { + if (_stream != null) + { + await FlushAsync().ConfigureAwait(false); + } + } + finally + { + CloseStreamFromDispose(disposing: true); } } diff --git a/src/System.Private.CoreLib/shared/System/IO/TextWriter.cs b/src/System.Private.CoreLib/shared/System/IO/TextWriter.cs index 99f99b665c99..34cd74957c0a 100644 --- a/src/System.Private.CoreLib/shared/System/IO/TextWriter.cs +++ b/src/System.Private.CoreLib/shared/System/IO/TextWriter.cs @@ -18,7 +18,7 @@ namespace System.IO // // This class is intended for character output, not bytes. // There are methods on the Stream class for writing bytes. - public abstract partial class TextWriter : MarshalByRefObject, IDisposable + public abstract partial class TextWriter : MarshalByRefObject, IDisposable, IAsyncDisposable { public static readonly TextWriter Null = new NullTextWriter(); @@ -79,6 +79,15 @@ public void Dispose() GC.SuppressFinalize(this); } + public virtual ValueTask DisposeAsync() + { + // Since TextWriter is abstract, delegate to whatever logic a derived + // type put in place already in Dispose. The derived type can then + // optionally choose to override this to do better. + return new ValueTask(Task.Factory.StartNew(s => ((TextWriter)s).Dispose(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)); + } + // Clears all buffers for this TextWriter and causes any buffered data to be // written to the underlying device. This default method is empty, but // descendant classes can override the method to provide the appropriate @@ -732,6 +741,32 @@ public override void WriteLine(object value) public override void Write(char value) { } + + public override Task FlushAsync() => Task.CompletedTask; + + public override Task WriteAsync(char value) => Task.CompletedTask; + + public override Task WriteAsync(char[] buffer, int index, int count) => Task.CompletedTask; + + public override Task WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => Task.CompletedTask; + + public override Task WriteAsync(string value) => Task.CompletedTask; + + public override Task WriteAsync(StringBuilder value, CancellationToken cancellationToken = default) => Task.CompletedTask; + + public override Task WriteLineAsync() => Task.CompletedTask; + + public override Task WriteLineAsync(char value) => Task.CompletedTask; + + public override Task WriteLineAsync(char[] buffer, int index, int count) => Task.CompletedTask; + + public override Task WriteLineAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => Task.CompletedTask; + + public override Task WriteLineAsync(string value) => Task.CompletedTask; + + public override Task WriteLineAsync(StringBuilder value, CancellationToken cancellationToken = default) => Task.CompletedTask; + + public override ValueTask DisposeAsync() => default; } public static TextWriter Synchronized(TextWriter writer) @@ -774,6 +809,14 @@ protected override void Dispose(bool disposing) ((IDisposable)_out).Dispose(); } + // [MethodImpl(MethodImplOptions.Synchronized)] + public override ValueTask DisposeAsync() + { + // TODO: https://github.com/dotnet/coreclr/issues/20499 + // Manual synchronization should be replaced by Synchronized. + lock (this) return _out.DisposeAsync(); + } + [MethodImpl(MethodImplOptions.Synchronized)] public override void Flush() => _out.Flush(); diff --git a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs index d4af4cfee370..73e92ad30927 100644 --- a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs +++ b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStream.cs @@ -231,6 +231,17 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } + public override ValueTask DisposeAsync() + { + if (GetType() != typeof(UnmanagedMemoryStream)) + { + return base.DisposeAsync(); + } + + Dispose(disposing: true); + return default; + } + private void EnsureNotClosed() { if (!_isOpen) diff --git a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs index 9a598951ee80..dbc88488b551 100644 --- a/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs +++ b/src/System.Private.CoreLib/shared/System/IO/UnmanagedMemoryStreamWrapper.cs @@ -59,6 +59,11 @@ protected override void Dispose(bool disposing) } } + public override ValueTask DisposeAsync() + { + return _unmanagedStream.DisposeAsync(); + } + public override void Flush() { _unmanagedStream.Flush(); diff --git a/src/System.Private.CoreLib/shared/System/Runtime/CompilerServices/AsyncIteratorMethodBuilder.cs b/src/System.Private.CoreLib/shared/System/Runtime/CompilerServices/AsyncIteratorMethodBuilder.cs new file mode 100644 index 000000000000..7605bdd55e16 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Runtime/CompilerServices/AsyncIteratorMethodBuilder.cs @@ -0,0 +1,77 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Runtime.CompilerServices +{ + /// Represents a builder for asynchronous iterators. + [StructLayout(LayoutKind.Auto)] + public struct AsyncIteratorMethodBuilder + { + // AsyncIteratorMethodBuilder is used by the language compiler as part of generating + // async iterators. For now, the implementation just wraps AsyncTaskMethodBuilder, as + // most of the logic is shared. However, in the future this could be changed and + // optimized. For example, we do need to allocate an object (once) to flow state like + // ExecutionContext, which AsyncTaskMethodBuilder handles, but it handles it by + // allocating a Task-derived object. We could optimize this further by removing + // the Task from the hierarchy, but in doing so we'd also lose a variety of optimizations + // related to it, so we'd need to replicate all of those optimizations (e.g. storing + // that box object directly into a Task's continuation field). + + private AsyncTaskMethodBuilder _methodBuilder; // mutable struct; do not make it readonly + + /// Creates an instance of the struct. + /// The initialized instance. + public static AsyncIteratorMethodBuilder Create() => +#if CORERT + // corert's AsyncTaskMethodBuilder.Create() currently does additional debugger-related + // work, so we need to delegate to it. + new AsyncIteratorMethodBuilder() { _methodBuilder = AsyncTaskMethodBuilder.Create() }; +#else + default; // coreclr's AsyncTaskMethodBuilder.Create just returns default as well +#endif + + /// Invokes on the state machine while guarding the + /// The type of the state machine. + /// The state machine instance, passed by reference. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void MoveNext(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine => +#if CORERT + _methodBuilder.Start(ref stateMachine); +#else + AsyncMethodBuilderCore.Start(ref stateMachine); +#endif + + /// Schedules the state machine to proceed to the next action when the specified awaiter completes. + /// The type of the awaiter. + /// The type of the state machine. + /// The awaiter. + /// The state machine. + public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) + where TAwaiter : INotifyCompletion + where TStateMachine : IAsyncStateMachine => + _methodBuilder.AwaitOnCompleted(ref awaiter, ref stateMachine); + + /// Schedules the state machine to proceed to the next action when the specified awaiter completes. + /// The type of the awaiter. + /// The type of the state machine. + /// The awaiter. + /// The state machine. + public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) + where TAwaiter : ICriticalNotifyCompletion + where TStateMachine : IAsyncStateMachine => + _methodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); + + /// Marks iteration as being completed, whether successfully or otherwise. + public void Complete() => + _methodBuilder.SetResult(); + + /// Gets an object that may be used to uniquely identify this builder to the debugger. + internal object ObjectIdForDebugger => + _methodBuilder.ObjectIdForDebugger; + } +} diff --git a/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs b/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs index 694514ef07e8..983281491778 100644 --- a/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs +++ b/src/System.Private.CoreLib/shared/System/Threading/ExecutionContext.cs @@ -21,6 +21,8 @@ namespace System.Threading { public delegate void ContextCallback(object state); + internal delegate void ContextCallback(ref TState state); + public sealed class ExecutionContext : IDisposable, ISerializable { internal static readonly ExecutionContext Default = new ExecutionContext(isDefault: true); @@ -201,6 +203,85 @@ internal static void RunInternal(ExecutionContext executionContext, ContextCallb edi?.Throw(); } + // Direct copy of the above RunInternal overload, except that it passes the state into the callback strongly-typed and by ref. + internal static void RunInternal(ExecutionContext executionContext, ContextCallback callback, ref TState state) + { + // Note: ExecutionContext.RunInternal is an extremely hot function and used by every await, ThreadPool execution, etc. + // Note: Manual enregistering may be addressed by "Exception Handling Write Through Optimization" + // https://github.com/dotnet/coreclr/blob/master/Documentation/design-docs/eh-writethru.md + + // Enregister variables with 0 post-fix so they can be used in registers without EH forcing them to stack + // Capture references to Thread Contexts + Thread currentThread0 = Thread.CurrentThread; + Thread currentThread = currentThread0; + ExecutionContext previousExecutionCtx0 = currentThread0.ExecutionContext; + + // Store current ExecutionContext and SynchronizationContext as "previousXxx". + // This allows us to restore them and undo any Context changes made in callback.Invoke + // so that they won't "leak" back into caller. + // These variables will cross EH so be forced to stack + ExecutionContext previousExecutionCtx = previousExecutionCtx0; + SynchronizationContext previousSyncCtx = currentThread0.SynchronizationContext; + + if (executionContext != null && executionContext.m_isDefault) + { + // Default is a null ExecutionContext internally + executionContext = null; + } + + if (previousExecutionCtx0 != executionContext) + { + // Restore changed ExecutionContext + currentThread0.ExecutionContext = executionContext; + if ((executionContext != null && executionContext.HasChangeNotifications) || + (previousExecutionCtx0 != null && previousExecutionCtx0.HasChangeNotifications)) + { + // There are change notifications; trigger any affected + OnValuesChanged(previousExecutionCtx0, executionContext); + } + } + + ExceptionDispatchInfo edi = null; + try + { + callback.Invoke(ref state); + } + catch (Exception ex) + { + // Note: we have a "catch" rather than a "finally" because we want + // to stop the first pass of EH here. That way we can restore the previous + // context before any of our callers' EH filters run. + edi = ExceptionDispatchInfo.Capture(ex); + } + + // Re-enregistrer variables post EH with 1 post-fix so they can be used in registers rather than from stack + SynchronizationContext previousSyncCtx1 = previousSyncCtx; + Thread currentThread1 = currentThread; + // The common case is that these have not changed, so avoid the cost of a write barrier if not needed. + if (currentThread1.SynchronizationContext != previousSyncCtx1) + { + // Restore changed SynchronizationContext back to previous + currentThread1.SynchronizationContext = previousSyncCtx1; + } + + ExecutionContext previousExecutionCtx1 = previousExecutionCtx; + ExecutionContext currentExecutionCtx1 = currentThread1.ExecutionContext; + if (currentExecutionCtx1 != previousExecutionCtx1) + { + // Restore changed ExecutionContext back to previous + currentThread1.ExecutionContext = previousExecutionCtx1; + if ((currentExecutionCtx1 != null && currentExecutionCtx1.HasChangeNotifications) || + (previousExecutionCtx1 != null && previousExecutionCtx1.HasChangeNotifications)) + { + // There are change notifications; trigger any affected + OnValuesChanged(currentExecutionCtx1, previousExecutionCtx1); + } + } + + // If exception was thrown by callback, rethrow it now original contexts are restored + edi?.Throw(); + } + internal static void OnValuesChanged(ExecutionContext previousExecutionCtx, ExecutionContext nextExecutionCtx) { Debug.Assert(previousExecutionCtx != nextExecutionCtx); diff --git a/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs b/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs new file mode 100644 index 000000000000..008cab167e06 --- /dev/null +++ b/src/System.Private.CoreLib/shared/System/Threading/Tasks/Sources/ManualResetValueTaskSourceLogic.cs @@ -0,0 +1,273 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; +using System.Runtime.InteropServices; + +namespace System.Threading.Tasks.Sources +{ + /// Provides the core logic for implementing a manual-reset or . + /// + [StructLayout(LayoutKind.Auto)] + public struct ManualResetValueTaskSourceLogic + { + /// + /// The callback to invoke when the operation completes if was called before the operation completed, + /// or if the operation completed before a callback was supplied, + /// or null if a callback hasn't yet been provided and the operation hasn't yet completed. + /// + private Action _continuation; + /// State to pass to . + private object _continuationState; + /// to flow to the callback, or null if no flowing is required. + private ExecutionContext _executionContext; + /// + /// A "captured" or with which to invoke the callback, + /// or null if no special context is required. + /// + private object _capturedContext; + /// Whether the current operation has completed. + private bool _completed; + /// The result with which the operation succeeded, or the default value if it hasn't yet completed or failed. + private TResult _result; + /// The exception with which the operation failed, or null if it hasn't yet completed or completed successfully. + private ExceptionDispatchInfo _error; + /// The current version of this value, used to help prevent misuse. + private short _version; + + /// Gets or sets whether to force continuations to run asynchronously. + /// Continuations may run asynchronously if this is false, but they'll never run synchronously if this is true. + public bool RunContinuationsAsynchronously { get; set; } + + /// Resets to prepare for the next operation. + public void Reset() + { + // Reset/update state for the next use/await of this instance. + _version++; + _completed = false; + _result = default; + _error = null; + _executionContext = null; + _capturedContext = null; + _continuation = null; + _continuationState = null; + } + + /// Completes with a successful result. + /// The result. + public void SetResult(TResult result) + { + _result = result; + SignalCompletion(); + } + + /// Complets with an error. + /// + public void SetException(Exception error) + { + _error = ExceptionDispatchInfo.Capture(error); + SignalCompletion(); + } + + /// Gets the operation version. + public short Version => _version; + + /// Gets the status of the operation. + /// Opaque value that was provided to the 's constructor. + public ValueTaskSourceStatus GetStatus(short token) + { + ValidateToken(token); + return + !_completed ? ValueTaskSourceStatus.Pending : + _error == null ? ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; + } + + /// Gets the result of the operation. + /// Opaque value that was provided to the 's constructor. + public TResult GetResult(short token) + { + ValidateToken(token); + if (!_completed) + { + ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); + } + + _error?.Throw(); + return _result; + } + + /// Schedules the continuation action for this operation. + /// The continuation to invoke when the operation has completed. + /// The state object to pass to when it's invoked. + /// Opaque value that was provided to the 's constructor. + /// The flags describing the behavior of the continuation. + public void OnCompleted(Action continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) + { + if (continuation == null) + { + throw new ArgumentNullException(nameof(continuation)); + } + ValidateToken(token); + + if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) + { + _executionContext = ExecutionContext.Capture(); + } + + if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) + { + SynchronizationContext sc = SynchronizationContext.Current; + if (sc != null && sc.GetType() != typeof(SynchronizationContext)) + { + _capturedContext = sc; + } + else + { + TaskScheduler ts = TaskScheduler.Current; + if (ts != TaskScheduler.Default) + { + _capturedContext = ts; + } + } + } + + // We need to set the continuation state before we swap in the delegate, so that + // if there's a race between this and SetResult/Exception and SetResult/Exception + // sees the _continuation as non-null, it'll be able to invoke it with the state + // stored here. However, this also means that if this is used incorrectly (e.g. + // awaited twice concurrently), _continuationState might get erroneously overwritten. + // To minimize the chances of that, we check preemptively whether _continuation + // is already set to something other than the completion sentinel. + object currentContinuation = _continuation; + if (currentContinuation != null && + !ReferenceEquals(currentContinuation, ManualResetValueTaskSourceLogicShared.s_sentinel)) + { + ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); + } + _continuationState = state; + + Action oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (oldContinuation != null) + { + switch (_capturedContext) + { + case null: + if (_executionContext != null) + { + ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true); + } + else + { + ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); + } + break; + + case SynchronizationContext sc: + sc.Post(s => + { + var tuple = (Tuple, object>)s; + tuple.Item1(tuple.Item2); + }, Tuple.Create(continuation, state)); + break; + + case TaskScheduler ts: + Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + break; + } + } + } + + /// Ensures that the specified token matches the current version. + /// The token supplied by . + private void ValidateToken(short token) + { + if (token != _version) + { + ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); + } + } + + /// Signals that that the operation has completed. Invoked after the result or error has been set. + private void SignalCompletion() + { + if (_completed) + { + ManualResetValueTaskSourceLogicShared.ThrowInvalidOperationException(); + } + _completed = true; + + if (Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceLogicShared.s_sentinel, null) != null) + { + if (_executionContext != null) + { + ExecutionContext.RunInternal( + _executionContext, + (ref ManualResetValueTaskSourceLogic s) => s.InvokeContinuation(), + ref this); + } + else + { + InvokeContinuation(); + } + } + } + + /// + /// Invokes the continuation with the appropriate captured context / scheduler. + /// This assumes that if is not null we're already + /// running within that . + /// + private void InvokeContinuation() + { + switch (_capturedContext) + { + case null: + if (RunContinuationsAsynchronously) + { + if (_executionContext != null) + { + ThreadPool.QueueUserWorkItem(_continuation, _continuationState, preferLocal: true); + } + else + { + ThreadPool.UnsafeQueueUserWorkItem(_continuation, _continuationState, preferLocal: true); + } + } + else + { + _continuation(_continuationState); + } + break; + + case SynchronizationContext sc: + sc.Post(s => + { + var state = (Tuple, object>)s; + state.Item1(state.Item2); + }, Tuple.Create(_continuation, _continuationState)); + break; + + case TaskScheduler ts: + Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + break; + } + } + } + + internal static class ManualResetValueTaskSourceLogicShared + { + internal static void ThrowInvalidOperationException() => + throw new InvalidOperationException(); + + internal static readonly Action s_sentinel = new Action(s => + { + Debug.Fail("The sentinel delegate should never be invoked."); + ThrowInvalidOperationException(); + }); + } +} diff --git a/src/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs b/src/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs index 815c9ccef18c..4261b89be7c9 100644 --- a/src/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs +++ b/src/System.Private.CoreLib/src/System/Threading/CancellationTokenRegistration.cs @@ -2,6 +2,8 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Threading.Tasks; + namespace System.Threading { /// @@ -10,7 +12,7 @@ namespace System.Threading /// /// To unregister a callback, dispose the corresponding Registration instance. /// - public readonly struct CancellationTokenRegistration : IEquatable, IDisposable + public readonly struct CancellationTokenRegistration : IEquatable, IDisposable, IAsyncDisposable { private readonly long _id; private readonly CancellationTokenSource.CallbackNode _node; @@ -36,6 +38,21 @@ public void Dispose() } } + /// + /// Disposes of the registration and unregisters the target callback from the associated + /// CancellationToken. + /// The returned will complete once the associated callback + /// is unregistered without having executed or once it's finished executing, except + /// in the degenerate case where the callback itself is unregistering itself. + /// + public ValueTask DisposeAsync() + { + CancellationTokenSource.CallbackNode node = _node; + return node != null && !node.Partition.Unregister(_id, node) ? + WaitForCallbackIfNecessaryAsync() : + default; + } + /// /// Gets the with which this registration is associated. If the /// registration isn't associated with a token (such as after the registration has been disposed), @@ -69,12 +86,30 @@ private void WaitForCallbackIfNecessary() !source.IsCancellationCompleted && // Running callbacks hasn't finished. source.ThreadIDExecutingCallbacks != Thread.CurrentThread.ManagedThreadId) // The executing thread ID is not this thread's ID. { - // Callback execution is in progress, the executing thread is different to us and has taken the callback for execution + // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution // so observe and wait until this target callback is no longer the executing callback. source.WaitForCallbackToComplete(_id); } } + private ValueTask WaitForCallbackIfNecessaryAsync() + { + // Same as WaitForCallbackIfNecessary, except returning a task that'll be completed when callbacks complete. + + CancellationTokenSource source = _node.Partition.Source; + if (source.IsCancellationRequested && // Running callbacks has commenced. + !source.IsCancellationCompleted && // Running callbacks hasn't finished. + source.ThreadIDExecutingCallbacks != Thread.CurrentThread.ManagedThreadId) // The executing thread ID is not this thread's ID. + { + // Callback execution is in progress, the executing thread is different from this thread and has taken the callback for execution + // so get a task that'll complete when this target callback is no longer the executing callback. + return source.WaitForCallbackToCompleteAsync(_id); + } + + // Callback is either already completed, won't execute, or the callback itself is calling this. + return default; + } + /// /// Determines whether two CancellationTokenRegistration diff --git a/src/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs b/src/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs index 498501da72ae..c82d7f750e06 100644 --- a/src/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs +++ b/src/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; +using System.Threading.Tasks; namespace System.Threading { @@ -786,6 +787,33 @@ internal void WaitForCallbackToComplete(long id) } } + /// + /// Asynchronously wait for a single callback to complete (or, more specifically, to not be running). + /// It is ok to call this method if the callback has already finished. + /// Calling this method before the target callback has been selected for execution would be an error. + /// + internal ValueTask WaitForCallbackToCompleteAsync(long id) + { + // If the currently executing callback is not the target one, then the target one has already + // completed and we can simply return. This should be the most common case, as the caller + // calls if we're currently canceling but doesn't know what callback is running, if any. + if (ExecutingCallback != id) + { + return default; + } + + // The specified callback is actually running: queue a task that'll poll for the currently + // executing callback to complete. In general scheduling such a work item that polls is a really + // thing to do. However, we expect this to be a rare case (disposing while the associated + // callback is running), and brief when it happens (so the polling will be minimal), and making + // this work with a callback mechanism will add additional cost to other more common cases. + return new ValueTask(Task.Factory.StartNew(s => + { + var state = (Tuple)s; + state.Item1.WaitForCallbackToComplete(state.Item2); + }, Tuple.Create(this, id), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default)); + } + private sealed class Linked1CancellationTokenSource : CancellationTokenSource { private readonly CancellationTokenRegistration _reg1; diff --git a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs index ee917cfe5249..4b17a128f570 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ThreadPool.cs @@ -1332,6 +1332,22 @@ public static bool QueueUserWorkItem(Action callBack, TState sta return true; } + // TODO: https://github.com/dotnet/corefx/issues/32547. Make public. + internal static bool UnsafeQueueUserWorkItem(Action callBack, TState state, bool preferLocal) + { + if (callBack == null) + { + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack); + } + + EnsureVMInitialized(); + + ThreadPoolGlobals.workQueue.Enqueue( + new QueueUserWorkItemCallback(callBack, state, null), forceGlobal: !preferLocal); + + return true; + } + public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object state) { if (callBack == null) diff --git a/src/System.Private.CoreLib/src/System/Threading/Timer.cs b/src/System.Private.CoreLib/src/System/Threading/Timer.cs index efea457bf0ae..8eb2310c3d55 100644 --- a/src/System.Private.CoreLib/src/System/Threading/Timer.cs +++ b/src/System.Private.CoreLib/src/System/Threading/Timer.cs @@ -9,6 +9,7 @@ using System.Diagnostics.Tracing; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; +using System.Threading.Tasks; namespace System.Threading { @@ -523,10 +524,11 @@ internal sealed class TimerQueueTimer : IThreadPoolWorkItem // after all pending callbacks are complete. We set m_canceled to prevent any callbacks that // are already queued from running. We track the number of callbacks currently executing in // m_callbacksRunning. We set m_notifyWhenNoCallbacksRunning only when m_callbacksRunning - // reaches zero. + // reaches zero. Same applies if Timer.DisposeAsync() is used, except with a Task + // instead of with a provided WaitHandle. private int m_callbacksRunning; private volatile bool m_canceled; - private volatile WaitHandle m_notifyWhenNoCallbacksRunning; + private volatile object m_notifyWhenNoCallbacksRunning; // may be either WaitHandle or Task internal TimerQueueTimer(TimerCallback timerCallback, object state, uint dueTime, uint period, bool flowExecutionContext) @@ -605,10 +607,7 @@ public bool Close(WaitHandle toSignal) m_canceled = true; m_notifyWhenNoCallbacksRunning = toSignal; m_associatedTimerQueue.DeleteTimer(this); - - if (m_callbacksRunning == 0) - shouldSignal = true; - + shouldSignal = m_callbacksRunning == 0; success = true; } } @@ -619,6 +618,62 @@ public bool Close(WaitHandle toSignal) return success; } + public ValueTask CloseAsync() + { + lock (m_associatedTimerQueue) + { + object notifyWhenNoCallbacksRunning = m_notifyWhenNoCallbacksRunning; + + // Mark the timer as canceled if it's not already. + if (m_canceled) + { + if (notifyWhenNoCallbacksRunning is WaitHandle) + { + // A previous call to Close(WaitHandle) stored a WaitHandle. We could try to deal with + // this case by using ThreadPool.RegisterWaitForSingleObject to create a Task that'll + // complete when the WaitHandle is set, but since arbitrary WaitHandle's can be supplied + // by the caller, it could be for an auto-reset event or similar where that caller's + // WaitOne on the WaitHandle could prevent this wrapper Task from completing. We could also + // change the implementation to support storing multiple objects, but that's not pay-for-play, + // and the existing Close(WaitHandle) already discounts this as being invalid, instead just + // returning false if you use it multiple times. Since first calling Timer.Dispose(WaitHandle) + // and then calling Timer.DisposeAsync is not something anyone is likely to or should do, we + // simplify by just failing in that case. + return new ValueTask(Task.FromException(new InvalidOperationException(SR.InvalidOperation_TimerAlreadyClosed))); + } + } + else + { + m_canceled = true; + m_associatedTimerQueue.DeleteTimer(this); + } + + // We've deleted the timer, so if there are no callbacks queued or running, + // we're done and return an already-completed value task. + if (m_callbacksRunning == 0) + { + return default; + } + + Debug.Assert( + notifyWhenNoCallbacksRunning == null || + notifyWhenNoCallbacksRunning is Task); + + // There are callbacks queued or running, so we need to store a Task + // that'll be used to signal the caller when all callbacks complete. Do so as long as + // there wasn't a previous CloseAsync call that did. + if (notifyWhenNoCallbacksRunning == null) + { + var t = new Task((object)null, TaskCreationOptions.RunContinuationsAsynchronously); + m_notifyWhenNoCallbacksRunning = t; + return new ValueTask(t); + } + + // A previous CloseAsync call already hooked up a task. Just return it. + return new ValueTask((Task)notifyWhenNoCallbacksRunning); + } + } + void IThreadPoolWorkItem.Execute() => Fire(); internal void Fire() @@ -651,7 +706,17 @@ internal void Fire() internal void SignalNoCallbacksRunning() { - Interop.Kernel32.SetEvent(m_notifyWhenNoCallbacksRunning.SafeWaitHandle); + object toSignal = m_notifyWhenNoCallbacksRunning; + Debug.Assert(toSignal is WaitHandle || toSignal is Task); + + if (toSignal is WaitHandle wh) + { + Interop.Kernel32.SetEvent(wh.SafeWaitHandle); + } + else + { + ((Task)toSignal).TrySetResult(true); + } } internal void CallCallback() @@ -723,10 +788,17 @@ public bool Close(WaitHandle notifyObject) GC.SuppressFinalize(this); return result; } + + public ValueTask CloseAsync() + { + ValueTask result = m_timer.CloseAsync(); + GC.SuppressFinalize(this); + return result; + } } - public sealed class Timer : MarshalByRefObject, IDisposable + public sealed class Timer : MarshalByRefObject, IDisposable, IAsyncDisposable { private const uint MAX_SUPPORTED_TIMEOUT = (uint)0xfffffffe; @@ -868,5 +940,10 @@ public void Dispose() { m_timer.Close(); } + + public ValueTask DisposeAsync() + { + return m_timer.CloseAsync(); + } } }