Skip to content

Commit

Permalink
Support unseekable filestream when ReadAllBytes[Async] (#58434)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Sitnik <adam.sitnik@gmail.com>
  • Loading branch information
lateapexearlyspeed and adamsitnik committed Oct 13, 2021
1 parent 49cf05c commit c2f287c
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 15 deletions.
59 changes: 59 additions & 0 deletions src/libraries/System.IO.FileSystem/tests/File/ReadWriteAllBytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using System.IO.Pipes;
using Microsoft.DotNet.XUnitExtensions;

namespace System.IO.Tests
{
Expand Down Expand Up @@ -172,5 +175,61 @@ public void ProcFs_NotEmpty(string path)
{
Assert.InRange(File.ReadAllBytes(path).Length, 1, int.MaxValue);
}

[Fact]
[PlatformSpecific(TestPlatforms.Windows)] // DOS device paths (\\.\ and \\?\) are a Windows concept
public async Task ReadAllBytes_NonSeekableFileStream_InWindows()
{
string pipeName = FileSystemTest.GetNamedPipeServerStreamName();
string pipePath = Path.GetFullPath($@"\\.\pipe\{pipeName}");

var namedPipeWriterStream = new NamedPipeServerStream(pipeName, PipeDirection.Out);
var contentBytes = new byte[] { 1, 2, 3 };

using (var cts = new CancellationTokenSource())
{
Task writingServerTask = WaitConnectionAndWritePipeStreamAsync(namedPipeWriterStream, contentBytes, cts.Token);
Task<byte[]> readTask = Task.Run(() => File.ReadAllBytes(pipePath), cts.Token);
cts.CancelAfter(TimeSpan.FromSeconds(50));

await writingServerTask;
byte[] readBytes = await readTask;
Assert.Equal<byte>(contentBytes, readBytes);
}

static async Task WaitConnectionAndWritePipeStreamAsync(NamedPipeServerStream namedPipeWriterStream, byte[] contentBytes, CancellationToken cancellationToken)
{
await using (namedPipeWriterStream)
{
await namedPipeWriterStream.WaitForConnectionAsync(cancellationToken);
await namedPipeWriterStream.WriteAsync(contentBytes, cancellationToken);
}
}
}

[Fact]
[PlatformSpecific(TestPlatforms.AnyUnix & ~TestPlatforms.Browser)]
public async Task ReadAllBytes_NonSeekableFileStream_InUnix()
{
string fifoPath = GetTestFilePath();
Assert.Equal(0, mkfifo(fifoPath, 438 /* 666 in octal */ ));

var contentBytes = new byte[] { 1, 2, 3 };

await Task.WhenAll(
Task.Run(() =>
{
byte[] readBytes = File.ReadAllBytes(fifoPath);
Assert.Equal<byte>(contentBytes, readBytes);
}),
Task.Run(() =>
{
using var fs = new FileStream(fifoPath, FileMode.Open, FileAccess.Write, FileShare.Read);
foreach (byte content in contentBytes)
{
fs.WriteByte(content);
}
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using System.IO.Pipes;
using Microsoft.DotNet.XUnitExtensions;

namespace System.IO.Tests
{
Expand Down Expand Up @@ -186,5 +188,61 @@ public async Task ProcFs_NotEmpty(string path)
{
Assert.InRange((await File.ReadAllBytesAsync(path)).Length, 1, int.MaxValue);
}

[Fact]
[PlatformSpecific(TestPlatforms.Windows)] // DOS device paths (\\.\ and \\?\) are a Windows concept
public async Task ReadAllBytesAsync_NonSeekableFileStream_InWindows()
{
string pipeName = FileSystemTest.GetNamedPipeServerStreamName();
string pipePath = Path.GetFullPath($@"\\.\pipe\{pipeName}");

var namedPipeWriterStream = new NamedPipeServerStream(pipeName, PipeDirection.Out);
var contentBytes = new byte[] { 1, 2, 3 };

using (var cts = new CancellationTokenSource())
{
Task writingServerTask = WaitConnectionAndWritePipeStreamAsync(namedPipeWriterStream, contentBytes, cts.Token);
Task<byte[]> readTask = File.ReadAllBytesAsync(pipePath, cts.Token);
cts.CancelAfter(TimeSpan.FromSeconds(50));

await writingServerTask;
byte[] readBytes = await readTask;
Assert.Equal<byte>(contentBytes, readBytes);
}

static async Task WaitConnectionAndWritePipeStreamAsync(NamedPipeServerStream namedPipeWriterStream, byte[] contentBytes, CancellationToken cancellationToken)
{
await using (namedPipeWriterStream)
{
await namedPipeWriterStream.WaitForConnectionAsync(cancellationToken);
await namedPipeWriterStream.WriteAsync(contentBytes, cancellationToken);
}
}
}

[Fact]
[PlatformSpecific(TestPlatforms.AnyUnix & ~TestPlatforms.Browser)]
public async Task ReadAllBytesAsync_NonSeekableFileStream_InUnix()
{
string fifoPath = GetTestFilePath();
Assert.Equal(0, mkfifo(fifoPath, 438 /* 666 in octal */ ));

var contentBytes = new byte[] { 1, 2, 3 };

await Task.WhenAll(
Task.Run(async () =>
{
byte[] readBytes = await File.ReadAllBytesAsync(fifoPath);
Assert.Equal<byte>(contentBytes, readBytes);
}),
Task.Run(() =>
{
using var fs = new FileStream(fifoPath, FileMode.Open, FileAccess.Write, FileShare.Read);
foreach (byte content in contentBytes)
{
fs.WriteByte(content);
}
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ internal static Exception GetIOError(int errorCode, string? path)
_bufferSize = memory.Length;
_memoryHandle = memory.Pin();
_overlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
_overlapped->OffsetLow = (int)fileOffset;
_overlapped->OffsetHigh = (int)(fileOffset >> 32);
if (_fileHandle.CanSeek)
{
_overlapped->OffsetLow = (int)fileOffset;
_overlapped->OffsetHigh = (int)(fileOffset >> 32);
}
return _overlapped;
}

Expand Down
12 changes: 6 additions & 6 deletions src/libraries/System.Private.CoreLib/src/System/IO/File.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,14 @@ public static byte[] ReadAllBytes(string path)
// bufferSize == 1 used to avoid unnecessary buffer in FileStream
using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 1, FileOptions.SequentialScan))
{
long fileLength = fs.Length;
if (fileLength > int.MaxValue)
long fileLength = 0;
if (fs.CanSeek && (fileLength = fs.Length) > int.MaxValue)
{
throw new IOException(SR.IO_FileTooLong2GB);
}
else if (fileLength == 0)
if (fileLength == 0)
{
// Some file systems (e.g. procfs on Linux) return 0 for length even when there's content.
// Some file systems (e.g. procfs on Linux) return 0 for length even when there's content; also there is non-seekable file stream.
// Thus we need to assume 0 doesn't mean empty.
return ReadAllBytesUnknownLength(fs);
}
Expand Down Expand Up @@ -711,8 +711,8 @@ private static async Task<string> InternalReadAllTextAsync(string path, Encoding
bool returningInternalTask = false;
try
{
long fileLength = fs.Length;
if (fileLength > int.MaxValue)
long fileLength = 0L;
if (fs.CanSeek && (fileLength = fs.Length) > int.MaxValue)
{
var e = new IOException(SR.IO_FileTooLong2GB);
ExceptionDispatchInfo.SetCurrentStackTrace(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static unsafe int ReadSyncUsingAsyncHandle(SafeFileHandle handle, Span<b

try
{
overlapped = GetNativeOverlappedForAsyncHandle(handle.ThreadPoolBinding!, fileOffset, resetEvent);
overlapped = GetNativeOverlappedForAsyncHandle(handle, fileOffset, resetEvent);

fixed (byte* pinned = &MemoryMarshal.GetReference(buffer))
{
Expand Down Expand Up @@ -171,7 +171,7 @@ private static unsafe void WriteSyncUsingAsyncHandle(SafeFileHandle handle, Read

try
{
overlapped = GetNativeOverlappedForAsyncHandle(handle.ThreadPoolBinding!, fileOffset, resetEvent);
overlapped = GetNativeOverlappedForAsyncHandle(handle, fileOffset, resetEvent);

fixed (byte* pinned = &MemoryMarshal.GetReference(buffer))
{
Expand Down Expand Up @@ -681,15 +681,17 @@ private static async ValueTask WriteGatherAtOffsetMultipleSyscallsAsync(SafeFile
}
}

private static unsafe NativeOverlapped* GetNativeOverlappedForAsyncHandle(ThreadPoolBoundHandle threadPoolBinding, long fileOffset, CallbackResetEvent resetEvent)
private static unsafe NativeOverlapped* GetNativeOverlappedForAsyncHandle(SafeFileHandle handle, long fileOffset, CallbackResetEvent resetEvent)
{
// After SafeFileHandle is bound to ThreadPool, we need to use ThreadPoolBinding
// to allocate a native overlapped and provide a valid callback.
NativeOverlapped* result = threadPoolBinding.AllocateNativeOverlapped(s_callback, resetEvent, null);
NativeOverlapped* result = handle.ThreadPoolBinding!.AllocateNativeOverlapped(s_callback, resetEvent, null);

// For pipes the offsets are ignored by the OS
result->OffsetLow = unchecked((int)fileOffset);
result->OffsetHigh = (int)(fileOffset >> 32);
if (handle.CanSeek)
{
result->OffsetLow = unchecked((int)fileOffset);
result->OffsetHigh = (int)(fileOffset >> 32);
}

// From https://docs.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getoverlappedresult:
// "If the hEvent member of the OVERLAPPED structure is NULL, the system uses the state of the hFile handle to signal when the operation has been completed.
Expand Down

0 comments on commit c2f287c

Please sign in to comment.