From 4b2ceeb5a040a9979b1851335e41cca616888c74 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Sat, 17 Apr 2021 17:47:11 -0400 Subject: [PATCH 1/2] Remove pinning in BufferedFileStreamStrategy --- .../Strategies/BufferedFileStreamStrategy.cs | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs index fcd6be4a48804..75f9357f325ac 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs @@ -2,7 +2,8 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; -using System.Runtime.InteropServices; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; @@ -216,7 +217,7 @@ private int ReadSpan(Span destination, ArraySegment arraySegment) } EnsureBufferAllocated(); - n = _strategy.Read(_buffer!, 0, _bufferSize); + n = _strategy.Read(_buffer, 0, _bufferSize); if (n == 0) { @@ -232,7 +233,7 @@ private int ReadSpan(Span destination, ArraySegment arraySegment) { n = destination.Length; } - new ReadOnlySpan(_buffer!, _readPos, n).CopyTo(destination); + new ReadOnlySpan(_buffer, _readPos, n).CopyTo(destination); _readPos += n; // We may have read less than the number of bytes the user asked @@ -291,7 +292,7 @@ private int ReadByteSlow() } EnsureBufferAllocated(); - _readLen = _strategy.Read(_buffer!, 0, _bufferSize); + _readLen = _strategy.Read(_buffer, 0, _bufferSize); _readPos = 0; if (_readLen == 0) @@ -299,7 +300,7 @@ private int ReadByteSlow() return -1; } - return _buffer![_readPos++]; + return _buffer[_readPos++]; } public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -463,7 +464,7 @@ private async ValueTask ReadAsyncSlowPath(Task semaphoreLockTask, Memory 0) { - await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); + await _strategy.WriteAsync(new ReadOnlyMemory(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); _writePos = 0; } @@ -475,7 +476,7 @@ private async ValueTask ReadAsyncSlowPath(Task semaphoreLockTask, Memory(_buffer), cancellationToken).ConfigureAwait(false); bytesFromBuffer = Math.Min(_readLen, buffer.Length); _buffer.AsSpan(0, bytesFromBuffer).CopyTo(buffer.Span); @@ -578,7 +579,7 @@ private void WriteSpan(ReadOnlySpan source, ArraySegment arraySegmen // Copy remaining bytes into buffer, to write at a later date. EnsureBufferAllocated(); - source.CopyTo(_buffer!.AsSpan(_writePos)); + source.CopyTo(_buffer.AsSpan(_writePos)); _writePos = source.Length; } @@ -721,19 +722,19 @@ private async ValueTask WriteAsyncSlowPath(Task semaphoreLockTask, ReadOnlyMemor { if (spaceLeft >= source.Length) { - source.Span.CopyTo(_buffer!.AsSpan(_writePos)); + source.Span.CopyTo(_buffer.AsSpan(_writePos)); _writePos += source.Length; return; } else { - source.Span.Slice(0, spaceLeft).CopyTo(_buffer!.AsSpan(_writePos)); + source.Span.Slice(0, spaceLeft).CopyTo(_buffer.AsSpan(_writePos)); _writePos += spaceLeft; source = source.Slice(spaceLeft); } } - await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); + await _strategy.WriteAsync(new ReadOnlyMemory(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); _writePos = 0; } @@ -751,7 +752,7 @@ private async ValueTask WriteAsyncSlowPath(Task semaphoreLockTask, ReadOnlyMemor // Copy remaining bytes into buffer, to write at a later date. EnsureBufferAllocated(); - source.Span.CopyTo(_buffer!.AsSpan(_writePos)); + source.Span.CopyTo(_buffer.AsSpan(_writePos)); _writePos = source.Length; } finally @@ -834,7 +835,7 @@ private async Task FlushAsyncInternal(CancellationToken cancellationToken) { if (_writePos > 0) { - await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); + await _strategy.WriteAsync(new ReadOnlyMemory(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); _writePos = 0; Debug.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0); return; @@ -888,13 +889,13 @@ private async Task CopyToAsyncCore(Stream destination, int bufferSize, Cancellat { // If there's any read data in the buffer, write it all to the destination stream. Debug.Assert(_writePos == 0, "Write buffer must be empty if there's data in the read buffer"); - await destination.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, _readPos, readBytes), cancellationToken).ConfigureAwait(false); + await destination.WriteAsync(new ReadOnlyMemory(_buffer, _readPos, readBytes), cancellationToken).ConfigureAwait(false); _readPos = _readLen = 0; } else if (_writePos > 0) { // If there's write data in the buffer, flush it back to the underlying stream, as does ReadAsync. - await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); + await _strategy.WriteAsync(new ReadOnlyMemory(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false); _writePos = 0; } @@ -1059,19 +1060,21 @@ private void EnsureCanWrite() } } + [MemberNotNull(nameof(_buffer))] private void EnsureBufferAllocated() { - // BufferedFileStreamStrategy is not intended for multi-threaded use, so no worries about the get/set race on _buffer. - if (_buffer == null) + if (_buffer is null) { AllocateBuffer(); } + } - void AllocateBuffer() // logic kept in a separate method to get EnsureBufferAllocated() inlined - { - _buffer = GC.AllocateUninitializedArray(_bufferSize, - pinned: true); // this allows us to avoid pinning when the buffer is used for the syscalls - } + // TODO https://github.com/dotnet/roslyn/issues/47896: should be local function in EnsureBufferAllocated above. + [MemberNotNull(nameof(_buffer))] + [MethodImpl(MethodImplOptions.NoInlining)] + private void AllocateBuffer() + { + Interlocked.CompareExchange(ref _buffer, GC.AllocateUninitializedArray(_bufferSize), null); } [Conditional("DEBUG")] From 322599808c37487035db46fd1e0a077f7be46ef8 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Sat, 17 Apr 2021 23:13:30 -0400 Subject: [PATCH 2/2] Simplify code/synchronization in ValueTaskSource --- ...ndowsFileStreamStrategy.ValueTaskSource.cs | 206 ++++++-------- .../AsyncWindowsFileStreamStrategy.cs | 269 +++++++----------- .../Strategies/BufferedFileStreamStrategy.cs | 2 +- .../Strategies/FileStreamHelpers.Windows.cs | 3 +- .../src/System/ThrowHelper.cs | 5 +- 5 files changed, 201 insertions(+), 284 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.ValueTaskSource.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.ValueTaskSource.cs index 335d464ee622b..1a7fcbe9f913e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.ValueTaskSource.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.ValueTaskSource.cs @@ -3,47 +3,49 @@ using System.Buffers; using System.Diagnostics; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks.Sources; -using TaskSourceCodes = System.IO.Strategies.FileStreamHelpers.TaskSourceCodes; namespace System.IO.Strategies { internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy { - /// - /// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync. - /// + /// Reusable IValueTaskSource for FileStream ValueTask-returning async operations. private sealed unsafe class ValueTaskSource : IValueTaskSource, IValueTaskSource { internal static readonly IOCompletionCallback s_ioCallback = IOCallback; + internal readonly PreAllocatedOverlapped _preallocatedOverlapped; private readonly AsyncWindowsFileStreamStrategy _strategy; - private MemoryHandle _handle; + internal MemoryHandle _memoryHandle; internal ManualResetValueTaskSourceCore _source; // mutable struct; do not make this readonly private NativeOverlapped* _overlapped; private CancellationTokenRegistration _cancellationRegistration; - private long _result; // Using long since this needs to be used in Interlocked APIs -#if DEBUG - private bool _cancellationHasBeenRegistered; -#endif + /// + /// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed, + /// in which case its value is a packed combination of the error code and number of bytes, or when + /// the read/write call has finished scheduling the async operation. + /// + internal ulong _result; internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy) { _strategy = strategy; - _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null); - _source.RunContinuationsAsynchronously = true; + _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null); } - internal NativeOverlapped* Configure(ReadOnlyMemory memory) + internal void Dispose() { - _result = TaskSourceCodes.NoResult; + ReleaseResources(); + _preallocatedOverlapped.Dispose(); + } - _handle = memory.Pin(); + internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory) + { + _result = 0; + _memoryHandle = memory.Pin(); _overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped); - return _overlapped; } @@ -69,49 +71,45 @@ private int GetResultAndRelease(short token) internal void RegisterForCancellation(CancellationToken cancellationToken) { -#if DEBUG - Debug.Assert(cancellationToken.CanBeCanceled); - Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice"); - _cancellationHasBeenRegistered = true; -#endif - - // Quick check to make sure the IO hasn't completed - if (_overlapped != null) + Debug.Assert(_overlapped != null); + if (cancellationToken.CanBeCanceled) { - // Register the cancellation only if the IO hasn't completed - long packedResult = Interlocked.CompareExchange(ref _result, TaskSourceCodes.RegisteringCancellation, TaskSourceCodes.NoResult); - if (packedResult == TaskSourceCodes.NoResult) + try { - _cancellationRegistration = cancellationToken.UnsafeRegister((s, token) => Cancel(token), this); - - // Switch the result, just in case IO completed while we were setting the registration - packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult); - } - else if (packedResult != TaskSourceCodes.CompletedCallback) - { - // Failed to set the result, IO is in the process of completing - // Attempt to take the packed result - packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult); + _cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) => + { + ValueTaskSource vts = (ValueTaskSource)s!; + if (!vts._strategy._fileHandle.IsInvalid) + { + try + { + Interop.Kernel32.CancelIoEx(vts._strategy._fileHandle, vts._overlapped); + // Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback. + } + catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently + } + }, this); } - - // If we have a callback that needs to be completed - if ((packedResult != TaskSourceCodes.NoResult) && (packedResult != TaskSourceCodes.CompletedCallback) && (packedResult != TaskSourceCodes.RegisteringCancellation)) + catch (OutOfMemoryException) { - CompleteCallback((ulong)packedResult); + // Just in case trying to register OOMs, we ignore it in order to + // protect the higher-level calling code that would proceed to unpin + // memory that might be actively used by an in-flight async operation. } } } - internal void ReleaseNativeResource() + internal void ReleaseResources() { - _handle.Dispose(); + // Unpin any pinned buffer. + _memoryHandle.Dispose(); - // Ensure that cancellation has been completed and cleaned up. + // Ensure that any cancellation callback has either completed or will never run, + // so that we don't try to access an overlapped for this operation after it's already + // been freed. _cancellationRegistration.Dispose(); // Free the overlapped. - // NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory - // (this is why we disposed the registration above). if (_overlapped != null) { _strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped); @@ -119,90 +117,64 @@ internal void ReleaseNativeResource() } } - private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped) - { - ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!; - Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match"); - - // Handle reading from & writing to closed pipes. While I'm not sure - // this is entirely necessary anymore, maybe it's possible for - // an async read on a pipe to be issued and then the pipe is closed, - // returning this error. This may very well be necessary. - ulong packedResult; - if (errorCode != 0 && errorCode != Interop.Errors.ERROR_BROKEN_PIPE && errorCode != Interop.Errors.ERROR_NO_DATA) - { - packedResult = ((ulong)TaskSourceCodes.ResultError | errorCode); - } - else - { - packedResult = ((ulong)TaskSourceCodes.ResultSuccess | numBytes); - } + // After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation, + // and only after that should we allow for completing the operation, as completion needs to factor in work + // done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's + // responsible for calling Complete and for passing the necessary data between parties. - // Stow the result so that other threads can observe it - // And, if no other thread is registering cancellation, continue - if (Interlocked.Exchange(ref valueTaskSource._result, (long)packedResult) == TaskSourceCodes.NoResult) + /// Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation. + internal void FinishedScheduling() + { + // Set the value to 1. If it was already non-0, then the asynchronous operation already completed but + // didn't call Complete, so we call Complete here. The read result value is the data (packed) necessary + // to make the call. + ulong result = Interlocked.Exchange(ref _result, 1); + if (result != 0) { - // Successfully set the state, attempt to take back the callback - if (Interlocked.Exchange(ref valueTaskSource._result, TaskSourceCodes.CompletedCallback) != TaskSourceCodes.NoResult) - { - // Successfully got the callback, finish the callback - valueTaskSource.CompleteCallback(packedResult); - } - // else: Some other thread stole the result, so now it is responsible to finish the callback + Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF); } - // else: Some other thread is registering a cancellation, so it *must* finish the callback } - private void CompleteCallback(ulong packedResult) + /// Invoked when the asynchronous operation has completed asynchronously. + private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped) { - CancellationToken cancellationToken = _cancellationRegistration.Token; - - ReleaseNativeResource(); - - // Unpack the result and send it to the user - long result = (long)(packedResult & TaskSourceCodes.ResultMask); - if (result == TaskSourceCodes.ResultError) + ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped); + Debug.Assert(vts is not null); + Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match"); + + // Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1 + // to ensure the value we're setting is non-zero). If it was already non-0 (the common case), then + // the call site already finished scheduling the async operation, in which case we're ready to complete. + Debug.Assert(numBytes < int.MaxValue); + if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0) { - int errorCode = unchecked((int)(packedResult & uint.MaxValue)); - Exception e; - if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED) - { - CancellationToken ct = cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(canceled: true); - e = new OperationCanceledException(ct); - } - else - { - e = Win32Marshal.GetExceptionForWin32Error(errorCode); - } - e.SetCurrentStackTrace(); - _source.SetException(e); - } - else - { - Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result"); - _source.SetResult((int)(packedResult & uint.MaxValue)); + vts.Complete(errorCode, numBytes); } } - private void Cancel(CancellationToken token) + internal void Complete(uint errorCode, uint numBytes) { - // WARNING: This may potentially be called under a lock (during cancellation registration) - Debug.Assert(_overlapped != null && GetStatus(Version) != ValueTaskSourceStatus.Succeeded, "IO should not have completed yet"); + ReleaseResources(); - // If the handle is still valid, attempt to cancel the IO - if (!_strategy._fileHandle.IsInvalid && - !Interop.Kernel32.CancelIoEx(_strategy._fileHandle, _overlapped)) + switch (errorCode) { - int errorCode = Marshal.GetLastWin32Error(); - - // ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel. - // This probably means that the IO operation has completed. - if (errorCode != Interop.Errors.ERROR_NOT_FOUND) - { - Exception e = new OperationCanceledException(SR.OperationCanceled, Win32Marshal.GetExceptionForWin32Error(errorCode), token); - e.SetCurrentStackTrace(); - _source.SetException(e); - } + case 0: + case Interop.Errors.ERROR_BROKEN_PIPE: + case Interop.Errors.ERROR_NO_DATA: + // Success + _source.SetResult((int)numBytes); + break; + + case Interop.Errors.ERROR_OPERATION_ABORTED: + // Cancellation + CancellationToken ct = _cancellationRegistration.Token; + _source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException()); + break; + + default: + // Failure + _source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode)); + break; } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs index db0201855a099..561bd2de7e97c 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs @@ -31,7 +31,7 @@ public override ValueTask DisposeAsync() ValueTask result = base.DisposeAsync(); Debug.Assert(result.IsCompleted, "the method must be sync, as it performs no flushing"); - Interlocked.Exchange(ref _reusableValueTaskSource, null)?._preallocatedOverlapped.Dispose(); + Interlocked.Exchange(ref _reusableValueTaskSource, null)?.Dispose(); return result; } @@ -42,7 +42,7 @@ protected override void Dispose(bool disposing) // before _preallocatedOverlapped is disposed base.Dispose(disposing); - Interlocked.Exchange(ref _reusableValueTaskSource, null)?._preallocatedOverlapped.Dispose(); + Interlocked.Exchange(ref _reusableValueTaskSource, null)?.Dispose(); } protected override void OnInitFromHandle(SafeFileHandle handle) @@ -133,111 +133,75 @@ private unsafe ValueTask ReadAsyncInternal(Memory destination, Cancel ThrowHelper.ThrowNotSupportedException_UnreadableStream(); } - Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); - - // valueTaskSource is not null when: - // - First time calling ReadAsync in buffered mode - // - Second+ time calling ReadAsync, both buffered or unbuffered - // - On buffered flush, when source memory is also the internal buffer - // valueTaskSource is null when: - // - First time calling ReadAsync in unbuffered mode - ValueTaskSource valueTaskSource = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this); - NativeOverlapped* intOverlapped = valueTaskSource.Configure(destination); - - // Calculate position in the file we should be at after the read is done - long positionBefore = _filePosition; - if (CanSeek) + // Rent the reusable ValueTaskSource, or create a new one to use if we couldn't get one (which + // should only happen on first use or if the FileStream is being used concurrently). + ValueTaskSource vts = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this); + try { - long len = Length; + NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(destination); + Debug.Assert(vts._memoryHandle.Pointer != null); - if (positionBefore + destination.Length > len) + // Calculate position in the file we should be at after the read is done + long positionBefore = _filePosition; + if (CanSeek) { - if (positionBefore <= len) - { - destination = destination.Slice(0, (int)(len - positionBefore)); - } - else + long len = Length; + + if (positionBefore + destination.Length > len) { - destination = default; + destination = positionBefore <= len ? + destination.Slice(0, (int)(len - positionBefore)) : + default; } - } - // Now set the position to read from in the NativeOverlapped struct - // For pipes, we should leave the offset fields set to 0. - intOverlapped->OffsetLow = unchecked((int)positionBefore); - intOverlapped->OffsetHigh = (int)(positionBefore >> 32); - - // When using overlapped IO, the OS is not supposed to - // touch the file pointer location at all. We will adjust it - // ourselves, but only in memory. This isn't threadsafe. - _filePosition += destination.Length; - } + // Now set the position to read from in the NativeOverlapped struct + // For pipes, we should leave the offset fields set to 0. + nativeOverlapped->OffsetLow = unchecked((int)positionBefore); + nativeOverlapped->OffsetHigh = (int)(positionBefore >> 32); - // queue an async ReadFile operation and pass in a packed overlapped - int r = FileStreamHelpers.ReadFileNative(_fileHandle, destination.Span, false, intOverlapped, out int errorCode); - - // ReadFile, the OS version, will return 0 on failure. But - // my ReadFileNative wrapper returns -1. My wrapper will return - // the following: - // On error, r==-1. - // On async requests that are still pending, r==-1 w/ errorCode==ERROR_IO_PENDING - // on async requests that completed sequentially, r==0 - // You will NEVER RELIABLY be able to get the number of bytes - // read back from this call when using overlapped structures! You must - // not pass in a non-null lpNumBytesRead to ReadFile when using - // overlapped structures! This is by design NT behavior. - if (r == -1) - { - // For pipes, when they hit EOF, they will come here. - if (errorCode == Interop.Errors.ERROR_BROKEN_PIPE) - { - // Not an error, but EOF. AsyncFSCallback will NOT be - // called. Call the user callback here. - - // We clear the overlapped status bit for this special case. - // Failure to do so looks like we are freeing a pending overlapped later. - intOverlapped->InternalLow = IntPtr.Zero; - valueTaskSource.ReleaseNativeResource(); - TryToReuse(valueTaskSource); - return new ValueTask(0); + // When using overlapped IO, the OS is not supposed to + // touch the file pointer location at all. We will adjust it + // ourselves, but only in memory. This isn't threadsafe. + _filePosition += destination.Length; } - else if (errorCode != Interop.Errors.ERROR_IO_PENDING) - { - if (!_fileHandle.IsClosed && CanSeek) // Update Position - It could be anywhere. - { - _filePosition = positionBefore; - } - valueTaskSource.ReleaseNativeResource(); - TryToReuse(valueTaskSource); - - if (errorCode == Interop.Errors.ERROR_HANDLE_EOF) - { - ThrowHelper.ThrowEndOfFileException(); - } - else + // Queue an async ReadFile operation. + if (Interop.Kernel32.ReadFile(_fileHandle, (byte*)vts._memoryHandle.Pointer, destination.Length, IntPtr.Zero, nativeOverlapped) == 0) + { + // The operation failed, or it's pending. + int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(_fileHandle); + switch (errorCode) { - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + case Interop.Errors.ERROR_IO_PENDING: + // Common case: IO was initiated, completion will be handled by callback. + // Register for cancellation now that the operation has been initiated. + vts.RegisterForCancellation(cancellationToken); + break; + + case Interop.Errors.ERROR_BROKEN_PIPE: + // EOF on a pipe. Callback will not be called. + // We clear the overlapped status bit for this special case (failure + // to do so looks like we are freeing a pending overlapped later). + nativeOverlapped->InternalLow = IntPtr.Zero; + vts.Dispose(); + return ValueTask.FromResult(0); + + default: + // Error. Callback will not be called. + vts.Dispose(); + return ValueTask.FromException(HandleIOError(positionBefore, errorCode)); } } - else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING - { - // Only once the IO is pending do we register for cancellation - valueTaskSource.RegisterForCancellation(cancellationToken); - } } - else + catch { - // Due to a workaround for a race condition in NT's ReadFile & - // WriteFile routines, we will always be returning 0 from ReadFileNative - // when we do async IO instead of the number of bytes read, - // irregardless of whether the operation completed - // synchronously or asynchronously. We absolutely must not - // set asyncResult._numBytes here, since will never have correct - // results. + vts.Dispose(); + throw; } - return new ValueTask(valueTaskSource, valueTaskSource.Version); + // Completion handled by callback. + vts.FinishedScheduling(); + return new ValueTask(vts, vts.Version); } public override void Write(byte[] buffer, int offset, int count) @@ -256,93 +220,72 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory source, Cancell ThrowHelper.ThrowNotSupportedException_UnwritableStream(); } - Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); - - // valueTaskSource is not null when: - // - First time calling WriteAsync in buffered mode - // - Second+ time calling WriteAsync, both buffered or unbuffered - // - On buffered flush, when source memory is also the internal buffer - // valueTaskSource is null when: - // - First time calling WriteAsync in unbuffered mode - ValueTaskSource valueTaskSource = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this); - NativeOverlapped* intOverlapped = valueTaskSource.Configure(source); - - long positionBefore = _filePosition; - if (CanSeek) + // Rent the reusable ValueTaskSource, or create a new one to use if we couldn't get one (which + // should only happen on first use or if the FileStream is being used concurrently). + ValueTaskSource vts = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this); + try { - // Now set the position to read from in the NativeOverlapped struct - // For pipes, we should leave the offset fields set to 0. - intOverlapped->OffsetLow = (int)positionBefore; - intOverlapped->OffsetHigh = (int)(positionBefore >> 32); - - // When using overlapped IO, the OS is not supposed to - // touch the file pointer location at all. We will adjust it - // ourselves, but only in memory. This isn't threadsafe. - _filePosition += source.Length; - UpdateLengthOnChangePosition(); - } + NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(source); + Debug.Assert(vts._memoryHandle.Pointer != null); - // queue an async WriteFile operation and pass in a packed overlapped - int r = FileStreamHelpers.WriteFileNative(_fileHandle, source.Span, false, intOverlapped, out int errorCode); - - // WriteFile, the OS version, will return 0 on failure. But - // my WriteFileNative wrapper returns -1. My wrapper will return - // the following: - // On error, r==-1. - // On async requests that are still pending, r==-1 w/ errorCode==ERROR_IO_PENDING - // On async requests that completed sequentially, r==0 - // You will NEVER RELIABLY be able to get the number of bytes - // written back from this call when using overlapped IO! You must - // not pass in a non-null lpNumBytesWritten to WriteFile when using - // overlapped structures! This is ByDesign NT behavior. - if (r == -1) - { - // For pipes, when they are closed on the other side, they will come here. - if (errorCode == Interop.Errors.ERROR_NO_DATA) + long positionBefore = _filePosition; + if (CanSeek) { - // Not an error, but EOF. AsyncFSCallback will NOT be called. - // Completing TCS and return cached task allowing the GC to collect TCS. - valueTaskSource.ReleaseNativeResource(); - TryToReuse(valueTaskSource); - return ValueTask.CompletedTask; + // Now set the position to read from in the NativeOverlapped struct + // For pipes, we should leave the offset fields set to 0. + nativeOverlapped->OffsetLow = (int)positionBefore; + nativeOverlapped->OffsetHigh = (int)(positionBefore >> 32); + + // When using overlapped IO, the OS is not supposed to + // touch the file pointer location at all. We will adjust it + // ourselves, but only in memory. This isn't threadsafe. + _filePosition += source.Length; + UpdateLengthOnChangePosition(); } - else if (errorCode != Interop.Errors.ERROR_IO_PENDING) - { - if (!_fileHandle.IsClosed && CanSeek) // Update Position - It could be anywhere. - { - _filePosition = positionBefore; - } - valueTaskSource.ReleaseNativeResource(); - TryToReuse(valueTaskSource); - - if (errorCode == Interop.Errors.ERROR_HANDLE_EOF) + // Queue an async WriteFile operation. + if (Interop.Kernel32.WriteFile(_fileHandle, (byte*)vts._memoryHandle.Pointer, source.Length, IntPtr.Zero, nativeOverlapped) == 0) + { + // The operation failed, or it's pending. + int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(_fileHandle); + if (errorCode == Interop.Errors.ERROR_IO_PENDING) { - ThrowHelper.ThrowEndOfFileException(); + // Common case: IO was initiated, completion will be handled by callback. + // Register for cancellation now that the operation has been initiated. + vts.RegisterForCancellation(cancellationToken); } else { - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + // Error. Callback will not be invoked. + vts.Dispose(); + return errorCode == Interop.Errors.ERROR_NO_DATA ? // EOF on a pipe. IO callback will not be called. + ValueTask.CompletedTask : + ValueTask.FromException(HandleIOError(positionBefore, errorCode)); } } - else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING - { - // Only once the IO is pending do we register for cancellation - valueTaskSource.RegisterForCancellation(cancellationToken); - } } - else + catch + { + vts.Dispose(); + throw; + } + + // Completion handled by callback. + vts.FinishedScheduling(); + return new ValueTask(vts, vts.Version); + } + + private Exception HandleIOError(long positionBefore, int errorCode) + { + if (!_fileHandle.IsClosed && CanSeek) { - // Due to a workaround for a race condition in NT's ReadFile & - // WriteFile routines, we will always be returning 0 from WriteFileNative - // when we do async IO instead of the number of bytes written, - // irregardless of whether the operation completed - // synchronously or asynchronously. We absolutely must not - // set asyncResult._numBytes here, since will never have correct - // results. + // Update Position... it could be anywhere. + _filePosition = positionBefore; } - return new ValueTask(valueTaskSource, valueTaskSource.Version); + return errorCode == Interop.Errors.ERROR_HANDLE_EOF ? + ThrowHelper.CreateEndOfFileException() : + Win32Marshal.GetExceptionForWin32Error(errorCode, _path); } public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; // no buffering = nothing to flush diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs index 75f9357f325ac..c39d2ae49b3c6 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/BufferedFileStreamStrategy.cs @@ -476,7 +476,7 @@ private async ValueTask ReadAsyncSlowPath(Task semaphoreLockTask, Memory(_buffer), cancellationToken).ConfigureAwait(false); + _readLen = await _strategy.ReadAsync(new Memory(_buffer, 0, _bufferSize), cancellationToken).ConfigureAwait(false); bytesFromBuffer = Math.Min(_readLen, buffer.Length); _buffer.AsSpan(0, bytesFromBuffer).CopyTo(buffer.Span); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/FileStreamHelpers.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/FileStreamHelpers.Windows.cs index b35b8d7b01cb7..18079bb923ab6 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/FileStreamHelpers.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/FileStreamHelpers.Windows.cs @@ -227,7 +227,7 @@ internal static long Seek(SafeFileHandle handle, string? path, long offset, Seek return ret; } - private static int GetLastWin32ErrorAndDisposeHandleIfInvalid(SafeFileHandle handle) + internal static int GetLastWin32ErrorAndDisposeHandleIfInvalid(SafeFileHandle handle) { int errorCode = Marshal.GetLastWin32Error(); @@ -340,7 +340,6 @@ internal static unsafe void SetFileLength(SafeFileHandle handle, string? path, l } } - // __ConsoleStream also uses this code. internal static unsafe int ReadFileNative(SafeFileHandle handle, Span bytes, bool syncUsingOverlapped, NativeOverlapped* overlapped, out int errorCode) { Debug.Assert(handle != null, "handle != null"); diff --git a/src/libraries/System.Private.CoreLib/src/System/ThrowHelper.cs b/src/libraries/System.Private.CoreLib/src/System/ThrowHelper.cs index dd2cc2742975d..4c09c210458c9 100644 --- a/src/libraries/System.Private.CoreLib/src/System/ThrowHelper.cs +++ b/src/libraries/System.Private.CoreLib/src/System/ThrowHelper.cs @@ -270,9 +270,12 @@ internal static void ThrowArgumentOutOfRangeException(ExceptionArgument argument [DoesNotReturn] internal static void ThrowEndOfFileException() { - throw new EndOfStreamException(SR.IO_EOF_ReadBeyondEOF); + throw CreateEndOfFileException(); } + internal static Exception CreateEndOfFileException() => + new EndOfStreamException(SR.IO_EOF_ReadBeyondEOF); + [DoesNotReturn] internal static void ThrowInvalidOperationException() {