Skip to content

Commit

Permalink
Efficient RandomAccess async I/O on the thread pool. (#55123)
Browse files Browse the repository at this point in the history
* Factor cross-platform SafeFileHandle code in a common file.

There isn't much actually.

* Write a reusable IValueTaskSource to queue async-over-sync RandomAccess I/O on the thread pool.

And use it in the RandomAccess.ScheduleSync methods instead of wrapping Task.Factory.StartNew in a ValueTask.

* Reduce ThreadPoolValueTaskSource's field count.

* Rename SafeFileHandle.ValueTaskSource to OverlappedValueTaskSource.

* Address most PR feedback.

* Run continuations synchronously and ensure the task's result is set only once.

* Address more PR feedback.

* Flow the ExecutionContext in ThreadPoolValueTaskSource.

* Set the ExecutionContext to null afterwards.

* Use separate fields for the two vectored I/O operations, avoiding a cast.
  • Loading branch information
teo-tsirpanis committed Jul 4, 2021
1 parent 607f98c commit 0696727
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,34 @@ namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private ValueTaskSource? _reusableValueTaskSource; // reusable ValueTaskSource that is currently NOT being used
private OverlappedValueTaskSource? _reusableOverlappedValueTaskSource; // reusable OverlappedValueTaskSource that is currently NOT being used

// Rent the reusable ValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the FileStream is being used concurrently).
internal ValueTaskSource GetValueTaskSource() => Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
// Rent the reusable OverlappedValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the SafeFileHandle is being used concurrently).
internal OverlappedValueTaskSource GetOverlappedValueTaskSource() =>
Interlocked.Exchange(ref _reusableOverlappedValueTaskSource, null) ?? new OverlappedValueTaskSource(this);

protected override bool ReleaseHandle()
{
bool result = Interop.Kernel32.CloseHandle(handle);

Interlocked.Exchange(ref _reusableValueTaskSource, null)?.Dispose();
Interlocked.Exchange(ref _reusableOverlappedValueTaskSource, null)?.Dispose();

return result;
}

private void TryToReuse(ValueTaskSource source)
private void TryToReuse(OverlappedValueTaskSource source)
{
source._source.Reset();

if (Interlocked.CompareExchange(ref _reusableValueTaskSource, source, null) is not null)
if (Interlocked.CompareExchange(ref _reusableOverlappedValueTaskSource, source, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}

/// <summary>Reusable IValueTaskSource for FileStream ValueTask-returning async operations.</summary>
internal sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
/// <summary>Reusable IValueTaskSource for RandomAccess async operations based on Overlapped I/O.</summary>
internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

Expand All @@ -55,7 +56,7 @@ internal sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTask
/// </summary>
internal ulong _result;

internal ValueTaskSource(SafeFileHandle fileHandle)
internal OverlappedValueTaskSource(SafeFileHandle fileHandle)
{
_fileHandle = fileHandle;
_source.RunContinuationsAsynchronously = true;
Expand Down Expand Up @@ -112,7 +113,7 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
{
_cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
{
ValueTaskSource vts = (ValueTaskSource)s!;
OverlappedValueTaskSource vts = (OverlappedValueTaskSource)s!;
if (!vts._fileHandle.IsInvalid)
{
try
Expand Down Expand Up @@ -156,7 +157,7 @@ internal void ReleaseResources()
// done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
// responsible for calling Complete and for passing the necessary data between parties.

/// <summary>Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation.</summary>
/// <summary>Invoked when the async operation finished being scheduled.</summary>
internal void FinishedScheduling()
{
// Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
Expand All @@ -172,7 +173,7 @@ internal void FinishedScheduling()
/// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
OverlappedValueTaskSource? vts = (OverlappedValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(vts is not null);
Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private ThreadPoolValueTaskSource? _reusableThreadPoolValueTaskSource; // reusable ThreadPoolValueTaskSource that is currently NOT being used

// Rent the reusable ThreadPoolValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the SafeFileHandle is being used concurrently).
internal ThreadPoolValueTaskSource GetThreadPoolValueTaskSource() =>
Interlocked.Exchange(ref _reusableThreadPoolValueTaskSource, null) ?? new ThreadPoolValueTaskSource(this);

private void TryToReuse(ThreadPoolValueTaskSource source)
{
Interlocked.CompareExchange(ref _reusableThreadPoolValueTaskSource, source, null);
}

/// <summary>
/// A reusable <see cref="IValueTaskSource"/> implementation that
/// queues asynchronous <see cref="RandomAccess"/> operations to
/// be completed synchronously on the thread pool.
/// </summary>
internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTaskSource<int>, IValueTaskSource<long>
{
private readonly SafeFileHandle _fileHandle;
private ManualResetValueTaskSourceCore<long> _source;
private Operation _operation = Operation.None;
private ExecutionContext? _context;

// These fields store the parameters for the operation.
// The first two are common for all kinds of operations.
private long _fileOffset;
private CancellationToken _cancellationToken;
// Used by simple reads and writes. Will be unsafely cast to a memory when performing a read.
private ReadOnlyMemory<byte> _singleSegment;
private IReadOnlyList<Memory<byte>>? _readScatterBuffers;
private IReadOnlyList<ReadOnlyMemory<byte>>? _writeGatherBuffers;

internal ThreadPoolValueTaskSource(SafeFileHandle fileHandle)
{
_fileHandle = fileHandle;
}

[Conditional("DEBUG")]
private void ValidateInvariants()
{
Operation op = _operation;
Debug.Assert(op == Operation.None, $"An operation was queued before the previous {op}'s completion.");
}

private long GetResultAndRelease(short token)
{
try
{
return _source.GetResult(token);
}
finally
{
_source.Reset();
_fileHandle.TryToReuse(this);
}
}

public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_source.OnCompleted(continuation, state, token, flags);
int IValueTaskSource<int>.GetResult(short token) => (int) GetResultAndRelease(token);
long IValueTaskSource<long>.GetResult(short token) => GetResultAndRelease(token);

private void ExecuteInternal()
{
Debug.Assert(_operation >= Operation.Read && _operation <= Operation.WriteGather);

long result = 0;
Exception? exception = null;
try
{
// This is the operation's last chance to be canceled.
if (_cancellationToken.IsCancellationRequested)
{
exception = new OperationCanceledException(_cancellationToken);
}
else
{
switch (_operation)
{
case Operation.Read:
Memory<byte> writableSingleSegment = MemoryMarshal.AsMemory(_singleSegment);
result = RandomAccess.ReadAtOffset(_fileHandle, writableSingleSegment.Span, _fileOffset);
break;
case Operation.Write:
result = RandomAccess.WriteAtOffset(_fileHandle, _singleSegment.Span, _fileOffset);
break;
case Operation.ReadScatter:
Debug.Assert(_readScatterBuffers != null);
result = RandomAccess.ReadScatterAtOffset(_fileHandle, _readScatterBuffers, _fileOffset);
break;
case Operation.WriteGather:
Debug.Assert(_writeGatherBuffers != null);
result = RandomAccess.WriteGatherAtOffset(_fileHandle, _writeGatherBuffers, _fileOffset);
break;
}
}
}
catch (Exception e)
{
exception = e;
}
finally
{
_operation = Operation.None;
_context = null;
_cancellationToken = default;
_singleSegment = default;
_readScatterBuffers = null;
_writeGatherBuffers = null;
}

if (exception == null)
{
_source.SetResult(result);
}
else
{
_source.SetException(exception);
}
}

void IThreadPoolWorkItem.Execute()
{
if (_context == null || _context.IsDefault)
{
ExecuteInternal();
}
else
{
ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.ExecuteInternal(), this);
}
}

private void QueueToThreadPool()
{
_context = ExecutionContext.Capture();
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
}

public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.Read;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<int>(this, _source.Version);
}

public ValueTask<int> QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.Write;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<int>(this, _source.Version);
}

public ValueTask<long> QueueReadScatter(IReadOnlyList<Memory<byte>> buffers, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.ReadScatter;
_readScatterBuffers = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<long>(this, _source.Version);
}

public ValueTask<long> QueueWriteGather(IReadOnlyList<ReadOnlyMemory<byte>> buffers, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.WriteGather;
_writeGatherBuffers = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<long>(this, _source.Version);
}

private enum Operation : byte
{
None,
Read,
Write,
ReadScatter,
WriteGather
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Microsoft.Win32.SafeHandles
{
public sealed class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
// not using bool? as it's not thread safe
private volatile NullableBool _canSeek = NullableBool.Undefined;
Expand All @@ -23,11 +23,6 @@ private SafeFileHandle(bool ownsHandle)
SetHandle(new IntPtr(-1));
}

public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : this(ownsHandle)
{
SetHandle(preexistingHandle);
}

public bool IsAsync { get; private set; }

internal bool CanSeek => !IsClosed && GetCanSeek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ public SafeFileHandle() : base(true)
{
}

public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle)
{
SetHandle(preexistingHandle);
}

public bool IsAsync => (GetFileOptions() & FileOptions.Asynchronous) != 0;

internal bool CanSeek => !IsClosed && GetFileType() == Interop.Kernel32.FileTypes.FILE_TYPE_DISK;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle)
{
SetHandle(preexistingHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\CriticalHandleZeroOrMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeHandleMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeHandleZeroOrMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.ThreadPoolValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeWaitHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\AccessViolationException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Action.cs" />
Expand Down Expand Up @@ -1517,7 +1519,7 @@
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetModuleFileName.cs">
<Link>Common\Interop\Windows\Kernel32\Interop.GetModuleFileName.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs">
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs">
<Link>Common\Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetProcessMemoryInfo.cs">
Expand Down Expand Up @@ -1775,7 +1777,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Internal\Console.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Internal\Win32\RegistryKey.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.ValueTaskSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.OverlappedValueTaskSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFindHandle.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeRegistryHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeRegistryHandle.Windows.cs" />
Expand Down
Loading

0 comments on commit 0696727

Please sign in to comment.