Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More WriteGather fixes #109826

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,33 +173,25 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly

try
{
int buffersOffset = 0, firstBufferOffset = 0;
while (true)
long totalBytesToWrite = 0;
for (int i = 0; i < buffersCount; i++)
{
long totalBytesToWrite = 0;

for (int i = buffersOffset; i < buffersCount; i++)
{
ReadOnlyMemory<byte> buffer = buffers[i];
totalBytesToWrite += buffer.Length;

MemoryHandle memoryHandle = buffer.Pin();
Copy link
Member Author

@adamsitnik adamsitnik Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, for incomplete writes we were pinning the memory for every retry attempt. I am not sure if this can create some kind of edge case bugs, but I think we can do it just once, before we enter the main loop

vectors[i] = new Interop.Sys.IOVector { Base = firstBufferOffset + (byte*)memoryHandle.Pointer, Count = (UIntPtr)(buffer.Length - firstBufferOffset) };
handles[i] = memoryHandle;

firstBufferOffset = 0;
}
ReadOnlyMemory<byte> buffer = buffers[i];
totalBytesToWrite += buffer.Length;

if (totalBytesToWrite == 0)
{
break;
}
MemoryHandle memoryHandle = buffer.Pin();
vectors[i] = new Interop.Sys.IOVector { Base = (byte*)memoryHandle.Pointer, Count = (UIntPtr)buffer.Length };
handles[i] = memoryHandle;
}

int buffersOffset = 0;
while (totalBytesToWrite > 0)
{
long bytesWritten;
Span<Interop.Sys.IOVector> left = vectors.Slice(buffersOffset);
Span<Interop.Sys.IOVector> left = vectors.Slice(buffersOffset, buffersCount - buffersOffset);
fixed (Interop.Sys.IOVector* pinnedVectors = &MemoryMarshal.GetReference(left))
{
bytesWritten = Interop.Sys.PWriteV(handle, pinnedVectors, buffersCount - buffersOffset, fileOffset);
bytesWritten = Interop.Sys.PWriteV(handle, pinnedVectors, left.Length, fileOffset);
}

FileStreamHelpers.CheckFileCall(bytesWritten, handle.Path);
Expand All @@ -211,22 +203,29 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly
// The write completed successfully but for fewer bytes than requested.
// We need to perform next write where the previous one has finished.
fileOffset += bytesWritten;
totalBytesToWrite -= bytesWritten;
// We need to try again for the remainder.
for (int i = 0; i < buffersCount; i++)
while (buffersOffset < buffersCount && bytesWritten > 0)
{
int n = buffers[i].Length;
int n = buffers[buffersOffset].Length;
if (n <= bytesWritten)
{
buffersOffset++;
bytesWritten -= n;
if (bytesWritten == 0)
{
break;
}
buffersOffset++;
}
else
{
firstBufferOffset = (int)(bytesWritten - n);
// A partial read: the vector needs to point to the new offset.
// But that offset needs to be relative to the previous attempt.
// Example: we have a single buffer with 30 bytes and the first read returned 10.
// The next read should try to read the remaining 20 bytes, but in case it also reads just 10,
// the third attempt should read last 10 bytes (not 20 again).
Interop.Sys.IOVector current = vectors[buffersOffset];
vectors[buffersOffset] = new Interop.Sys.IOVector
{
Base = current.Base + (int)(bytesWritten),
Count = current.Count - (UIntPtr)(bytesWritten)
};
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
Expand All @@ -13,6 +14,7 @@
namespace System.IO.Tests
{
[SkipOnPlatform(TestPlatforms.Browser, "async file IO is not supported on browser")]
[Collection(nameof(DisableParallelization))] // don't run in parallel, as some of these tests use a LOT of resources
public class RandomAccess_WriteGatherAsync : RandomAccess_Base<ValueTask>
{
protected override ValueTask MethodUnderTest(SafeFileHandle handle, byte[] bytes, long fileOffset)
Expand Down Expand Up @@ -151,21 +153,6 @@ public async Task NoInt32OverflowForLargeInputs(bool asyncFile, bool asyncMethod
const int BufferSize = int.MaxValue / 1000;
const long FileSize = (long)BufferCount * BufferSize;
string filePath = GetTestFilePath();
ReadOnlyMemory<byte> writeBuffer = RandomNumberGenerator.GetBytes(BufferSize);
List<ReadOnlyMemory<byte>> writeBuffers = Enumerable.Repeat(writeBuffer, BufferCount).ToList();
List<Memory<byte>> readBuffers = new List<Memory<byte>>(BufferCount);

try
{
for (int i = 0; i < BufferCount; i++)
{
readBuffers.Add(new byte[BufferSize]);
}
}
catch (OutOfMemoryException)
{
throw new SkipTestException("Not enough memory.");
}

FileOptions options = asyncFile ? FileOptions.Asynchronous : FileOptions.None; // we need to test both code paths
options |= FileOptions.DeleteOnClose;
Expand All @@ -180,29 +167,86 @@ public async Task NoInt32OverflowForLargeInputs(bool asyncFile, bool asyncMethod
throw new SkipTestException("Not enough disk space.");
}

long fileOffset = 0, bytesRead = 0;
try
using (sfh)
{
if (asyncMethod)
ReadOnlyMemory<byte> writeBuffer = RandomNumberGenerator.GetBytes(BufferSize);
List<ReadOnlyMemory<byte>> writeBuffers = Enumerable.Repeat(writeBuffer, BufferCount).ToList();

List<NativeMemoryManager> memoryManagers = new List<NativeMemoryManager>(BufferCount);
List<Memory<byte>> readBuffers = new List<Memory<byte>>(BufferCount);

try
{
await RandomAccess.WriteAsync(sfh, writeBuffers, fileOffset);
bytesRead = await RandomAccess.ReadAsync(sfh, readBuffers, fileOffset);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a test bug, the test assumed that the read won't ever be a partial read

try
{
for (int i = 0; i < BufferCount; i++)
{
// We are using native memory here to get OOM as soon as possible.
NativeMemoryManager nativeMemoryManager = new(BufferSize);
memoryManagers.Add(nativeMemoryManager);
readBuffers.Add(nativeMemoryManager.Memory);
}
}
catch (OutOfMemoryException)
{
throw new SkipTestException("Not enough memory.");
}

await Verify(asyncMethod, FileSize, sfh, writeBuffer, writeBuffers, readBuffers);
}
else
finally
{
RandomAccess.Write(sfh, writeBuffers, fileOffset);
bytesRead = RandomAccess.Read(sfh, readBuffers, fileOffset);
foreach (IDisposable memoryManager in memoryManagers)
{
memoryManager.Dispose();
}
}
}
finally
{
sfh.Dispose(); // delete the file ASAP to avoid running out of resources in CI
}

Assert.Equal(FileSize, bytesRead);
for (int i = 0; i < BufferCount; i++)
static async Task Verify(bool asyncMethod, long FileSize, SafeFileHandle sfh, ReadOnlyMemory<byte> writeBuffer, List<ReadOnlyMemory<byte>> writeBuffers, List<Memory<byte>> readBuffers)
{
Assert.Equal(writeBuffer, readBuffers[i]);
if (asyncMethod)
{
await RandomAccess.WriteAsync(sfh, writeBuffers, 0);
}
else
{
RandomAccess.Write(sfh, writeBuffers, 0);
}

Assert.Equal(FileSize, RandomAccess.GetLength(sfh));

long fileOffset = 0;
while (fileOffset < FileSize)
{
long bytesRead = asyncMethod
? await RandomAccess.ReadAsync(sfh, readBuffers, fileOffset)
: RandomAccess.Read(sfh, readBuffers, fileOffset);

Assert.InRange(bytesRead, 0, FileSize);

while (bytesRead > 0)
{
Memory<byte> readBuffer = readBuffers[0];
if (bytesRead >= readBuffer.Length)
{
AssertExtensions.SequenceEqual(writeBuffer.Span, readBuffer.Span);

bytesRead -= readBuffer.Length;
fileOffset += readBuffer.Length;

readBuffers.RemoveAt(0);
}
else
{
// A read has finished somewhere in the middle of one of the read buffers.
// Example: buffer had 30 bytes and only 10 were read.
// We don't read the missing part, but try to read the whole buffer again.
// It's not optimal from performance perspective, but it keeps the test logic simple.
break;
}
}
}
}
}

Expand Down
47 changes: 38 additions & 9 deletions src/native/libs/System.Native/pal_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1950,19 +1950,48 @@ int32_t SystemNative_PWrite(intptr_t fd, void* buffer, int32_t bufferSize, int64
}

#if (HAVE_PREADV || HAVE_PWRITEV) && !defined(TARGET_WASM)
static int GetAllowedVectorCount(int32_t vectorCount)
static int GetAllowedVectorCount(IOVector* vectors, int32_t vectorCount)
{
#if defined(IOV_MAX)
const int IovMax = IOV_MAX;
#else
// In theory all the platforms that we support define IOV_MAX,
// but we want to be extra safe and provde a fallback
// in case it turns out to not be true.
// 16 is low, but supported on every platform.
const int IovMax = 16;
#endif

int allowedCount = (int)vectorCount;

#if defined(IOV_MAX)
if (IOV_MAX < allowedCount)
// We need to respect the limit of items that can be passed in iov.
// In case of writes, the managed code is responsible for handling incomplete writes.
// In case of reads, we simply returns the number of bytes read and it's up to the users.
if (IovMax < allowedCount)
{
// We need to respect the limit of items that can be passed in iov.
// In case of writes, the managed code is reponsible for handling incomplete writes.
// In case of reads, we simply returns the number of bytes read and it's up to the users.
allowedCount = IOV_MAX;
allowedCount = IovMax;
}

#if defined(TARGET_APPLE)
// For macOS preadv and pwritev can fail with EINVAL when the total length
// of all vectors overflows a 32-bit integer.
size_t totalLength = 0;
for (int i = 0; i < allowedCount; i++)
{
assert(INT_MAX >= vectors[i].Count);

totalLength += vectors[i].Count;

if (totalLength > INT_MAX)
{
allowedCount = i;
break;
}
}
#else
(void)vectors;
#endif

return allowedCount;
}
#endif // (HAVE_PREADV || HAVE_PWRITEV) && !defined(TARGET_WASM)
Expand All @@ -1975,7 +2004,7 @@ int64_t SystemNative_PReadV(intptr_t fd, IOVector* vectors, int32_t vectorCount,
int64_t count = 0;
int fileDescriptor = ToFileDescriptor(fd);
#if HAVE_PREADV && !defined(TARGET_WASM) // preadv is buggy on WASM
int allowedVectorCount = GetAllowedVectorCount(vectorCount);
int allowedVectorCount = GetAllowedVectorCount(vectors, vectorCount);
while ((count = preadv(fileDescriptor, (struct iovec*)vectors, allowedVectorCount, (off_t)fileOffset)) < 0 && errno == EINTR);
#else
int64_t current;
Expand Down Expand Up @@ -2016,7 +2045,7 @@ int64_t SystemNative_PWriteV(intptr_t fd, IOVector* vectors, int32_t vectorCount
int64_t count = 0;
int fileDescriptor = ToFileDescriptor(fd);
#if HAVE_PWRITEV && !defined(TARGET_WASM) // pwritev is buggy on WASM
int allowedVectorCount = GetAllowedVectorCount(vectorCount);
int allowedVectorCount = GetAllowedVectorCount(vectors, vectorCount);
while ((count = pwritev(fileDescriptor, (struct iovec*)vectors, allowedVectorCount, (off_t)fileOffset)) < 0 && errno == EINTR);
#else
int64_t current;
Expand Down
Loading