Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update position before ReadAsync starts, but fix it after incomplete read #56531

Merged
merged 6 commits into from
Jul 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -92,6 +94,38 @@ public async Task ReadAsyncCanceledFile()
}
}
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access
[InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached
[InlineData(FileShare.None, FileOptions.None)]
[InlineData(FileShare.ReadWrite, FileOptions.None)]
public async Task IncompleteReadCantSetPositionBeyondEndOfFile(FileShare fileShare, FileOptions options)
{
const int fileSize = 10_000;
string filePath = GetTestFilePath();
byte[] content = RandomNumberGenerator.GetBytes(fileSize);
File.WriteAllBytes(filePath, content);

byte[][] buffers = Enumerable.Repeat(Enumerable.Repeat(byte.MaxValue, fileSize * 2).ToArray(), 10).ToArray();

using (FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, fileShare, bufferSize: 0, options))
{
Task<int>[] reads = buffers.Select(buffer => fs.ReadAsync(buffer, 0, buffer.Length)).ToArray();

// the reads were not awaited, it's an anti-pattern and Position can be (0, buffersLength) now:
Assert.InRange(fs.Position, 0, buffers.Sum(buffer => buffer.Length));

await Task.WhenAll(reads);
// but when they are finished, the first buffer should contain valid data:
Assert.Equal(fileSize, reads.First().Result);
AssertExtensions.SequenceEqual(content, buffers.First().AsSpan(0, fileSize));
// and other reads should return 0:
Assert.All(reads.Skip(1), read => Assert.Equal(0, read.Result));
// and the Position must be correct:
Assert.Equal(fileSize, fs.Position);
}
}
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/34582", TestPlatforms.Windows, TargetFrameworkMonikers.Netcoreapp, TestRuntimes.Mono)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Strategies;
using System.Threading;
using System.Threading.Tasks.Sources;

Expand Down Expand Up @@ -45,7 +46,9 @@ internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource<int>,

internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
internal readonly SafeFileHandle _fileHandle;
private AsyncWindowsFileStreamStrategy? _strategy;
internal MemoryHandle _memoryHandle;
private int _bufferSize;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
Expand Down Expand Up @@ -74,9 +77,11 @@ internal static Exception GetIOError(int errorCode, string? path)
? ThrowHelper.CreateEndOfFileException()
: Win32Marshal.GetExceptionForWin32Error(errorCode, path);

internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset)
internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null)
{
_result = 0;
_strategy = strategy;
_bufferSize = memory.Length;
_memoryHandle = memory.Pin();
_overlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
_overlapped->OffsetLow = (int)fileOffset;
Expand Down Expand Up @@ -132,8 +137,9 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
}
}

internal void ReleaseResources()
private void ReleaseResources()
{
_strategy = null;
// Unpin any pinned buffer.
_memoryHandle.Dispose();

Expand Down Expand Up @@ -187,11 +193,19 @@ private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped*

internal void Complete(uint errorCode, uint numBytes)
{
Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes");

AsyncWindowsFileStreamStrategy? strategy = _strategy;
ReleaseResources();

if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads
{
strategy.OnIncompleteRead(_bufferSize, (int)numBytes);
}

switch (errorCode)
{
case 0:
case Interop.Errors.ERROR_SUCCESS:
case Interop.Errors.ERROR_BROKEN_PIPE:
case Interop.Errors.ERROR_NO_DATA:
case Interop.Errors.ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file)
Expand Down
36 changes: 28 additions & 8 deletions src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,14 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
{
return Task.FromCanceled<int>(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanRead)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnreadableStream();
}

return _strategy.ReadAsync(buffer, offset, count, cancellationToken);
Expand All @@ -294,9 +299,14 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
{
return ValueTask.FromCanceled<int>(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanRead)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnreadableStream();
}

return _strategy.ReadAsync(buffer, cancellationToken);
Expand All @@ -319,9 +329,14 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
{
return Task.FromCanceled(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanWrite)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

return _strategy.WriteAsync(buffer, offset, count, cancellationToken);
Expand All @@ -333,9 +348,14 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
{
return ValueTask.FromCanceled(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanWrite)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

return _strategy.WriteAsync(buffer, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,21 +242,23 @@ internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<b
return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
}

internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null)
{
handle.EnsureThreadPoolBindingInitialized();

SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource();
int errorCode = 0;
try
{
NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset);
NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy);
Debug.Assert(vts._memoryHandle.Pointer != null);

// Queue an async ReadFile operation.
if (Interop.Kernel32.ReadFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0)
{
// The operation failed, or it's pending.
int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
switch (errorCode)
{
case Interop.Errors.ERROR_IO_PENDING:
Expand Down Expand Up @@ -286,6 +288,13 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error
vts.Dispose();
throw;
}
finally
{
if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this function is initializing errorCode to '0' but then checking against 'ERROR_SUCCESS'. Style-wise, it'd be nice if they were consistent.

{
strategy?.OnIncompleteRead(buffer.Length, 0);
}
}

// Completion handled by callback.
vts.FinishedScheduling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,29 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
=> ReadAsyncInternal(destination, cancellationToken);

private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanRead)
if (!CanSeek)
{
ThrowHelper.ThrowNotSupportedException_UnreadableStream();
return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}

long positionBefore = _filePosition;
if (CanSeek)
if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length)
{
long len = Length;
if (positionBefore + destination.Length > len)
{
destination = positionBefore <= len ?
destination.Slice(0, (int)(len - positionBefore)) :
default;
}

// When using overlapped IO, the OS is not supposed to
// touch the file pointer location at all. We will adjust it
// ourselves, but only in memory. This isn't threadsafe.
_filePosition += destination.Length;

// We know for sure that there is nothing to read, so we just return here and avoid a sys-call.
if (destination.IsEmpty && LengthCachingSupported)
{
return ValueTask.FromResult(0);
}
// We know for sure that the file length can be safely cached and it has already been obtained.
// If we have reached EOF we just return here and avoid a sys-call.
return ValueTask.FromResult(0);
}

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, positionBefore, cancellationToken);
// This implementation updates the file position before the operation starts and updates it after incomplete read.
// This is done to keep backward compatibility for concurrent reads.
// It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this);
return vts != null
? new ValueTask<int>(vts, vts.Version)
: (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(positionBefore, errorCode));
: (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -69,35 +58,22 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> WriteAsyncInternal(buffer, cancellationToken);

private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
if (!CanWrite)
{
ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

long positionBefore = _filePosition;
if (CanSeek)
{
// When using overlapped IO, the OS is not supposed to
// touch the file pointer location at all. We will adjust it
// ourselves, but only in memory. This isn't threadsafe.
_filePosition += source.Length;
UpdateLengthOnChangePosition();
}
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, positionBefore, cancellationToken);
(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken);
return vts != null
? new ValueTask(vts, vts.Version)
: (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(positionBefore, errorCode));
: (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
}

private Exception HandleIOError(long positionBefore, int errorCode)
{
if (!_fileHandle.IsClosed && CanSeek)
if (_fileHandle.CanSeek)
{
// Update Position... it could be anywhere.
_filePosition = positionBefore;
Interlocked.Exchange(ref _filePosition, positionBefore);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is precarious regardless of the interlocked, e.g. if someone did kick off concurrent operations, they could all fight to reset this to their starting position. It's not clear there's a "right" answer.


return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy
private readonly FileAccess _access; // What file was opened for.

protected long _filePosition;
protected long _length = -1; // negative means that hasn't been fetched.
private long _appendStart; // When appending, prevent overwriting file.
private long _length = -1; // When the file is locked for writes on Windows ((share & FileShare.Write) == 0) cache file length in-memory, negative means that hasn't been fetched.
private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed and FileShare.Write was not specified when the handle was opened.
private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing.

internal OSFileStreamStrategy(SafeFileHandle handle, FileAccess access)
{
Expand All @@ -44,7 +44,7 @@ internal OSFileStreamStrategy(string path, FileMode mode, FileAccess access, Fil
string fullPath = Path.GetFullPath(path);

_access = access;
_lengthCanBeCached = (share & FileShare.Write) == 0;
_lengthCanBeCached = (share & FileShare.Write) == 0 && (access & FileAccess.Write) == 0;

_fileHandle = SafeFileHandle.Open(fullPath, mode, access, share, options, preallocationSize);

Expand Down Expand Up @@ -96,21 +96,9 @@ public unsafe sealed override long Length
}
}

protected void UpdateLengthOnChangePosition()
{
// Do not update the cached length if the file is not locked
// or if the length hasn't been fetched.
if (!LengthCachingSupported || _length < 0)
{
Debug.Assert(_length < 0);
return;
}

if (_filePosition > _length)
{
_length = _filePosition;
}
}
// in case of concurrent incomplete reads, there can be multiple threads trying to update the position
// at the same time. That is why we are using Interlocked here.
internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephentoub I've used Interlocked here as the new IncompleteReadCantSetPositionBeyondEndOfFile test was simply failing without it. I know that we are never going to be 100% thread safe, but the alternative was to call Length which would result in an extra sys-call.

_filePosition = Math.Min(Length, _filePosition - expectedBytesRead + actualBytesRead);


protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;

Expand Down Expand Up @@ -287,18 +275,8 @@ public sealed override void Write(ReadOnlySpan<byte> buffer)
ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

try
{
RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
}
catch
{
_length = -1; // invalidate cached length
throw;
}

RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
_filePosition += buffer.Length;
UpdateLengthOnChangePosition();
}
}
}
Loading