Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient RandomAccess async I/O on the thread pool. #55123

Merged
merged 10 commits into from
Jul 4, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,34 @@ namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private ValueTaskSource? _reusableValueTaskSource; // reusable ValueTaskSource that is currently NOT being used
private OverlappedValueTaskSource? _reusableOverlappedValueTaskSource; // reusable OverlappedValueTaskSource that is currently NOT being used

// Rent the reusable ValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the FileStream is being used concurrently).
internal ValueTaskSource GetValueTaskSource() => Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
// Rent the reusable OverlappedValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the SafeFileHandle is being used concurrently).
internal OverlappedValueTaskSource GetOverlappedValueTaskSource() =>
Interlocked.Exchange(ref _reusableOverlappedValueTaskSource, null) ?? new OverlappedValueTaskSource(this);

protected override bool ReleaseHandle()
{
bool result = Interop.Kernel32.CloseHandle(handle);

Interlocked.Exchange(ref _reusableValueTaskSource, null)?.Dispose();
Interlocked.Exchange(ref _reusableOverlappedValueTaskSource, null)?.Dispose();

return result;
}

private void TryToReuse(ValueTaskSource source)
private void TryToReuse(OverlappedValueTaskSource source)
{
source._source.Reset();

if (Interlocked.CompareExchange(ref _reusableValueTaskSource, source, null) is not null)
if (Interlocked.CompareExchange(ref _reusableOverlappedValueTaskSource, source, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}

/// <summary>Reusable IValueTaskSource for FileStream ValueTask-returning async operations.</summary>
internal sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
/// <summary>Reusable IValueTaskSource for RandomAccess async operations based on Overlapped I/O.</summary>
internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

Expand All @@ -55,7 +56,7 @@ internal sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTask
/// </summary>
internal ulong _result;

internal ValueTaskSource(SafeFileHandle fileHandle)
internal OverlappedValueTaskSource(SafeFileHandle fileHandle)
{
_fileHandle = fileHandle;
_source.RunContinuationsAsynchronously = true;
Expand Down Expand Up @@ -112,7 +113,7 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
{
_cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
{
ValueTaskSource vts = (ValueTaskSource)s!;
OverlappedValueTaskSource vts = (OverlappedValueTaskSource)s!;
if (!vts._fileHandle.IsInvalid)
{
try
Expand Down Expand Up @@ -156,7 +157,7 @@ internal void ReleaseResources()
// done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
// responsible for calling Complete and for passing the necessary data between parties.

/// <summary>Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation.</summary>
/// <summary>Invoked when the async operation finished being scheduled.</summary>
internal void FinishedScheduling()
{
// Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
Expand All @@ -172,7 +173,7 @@ internal void FinishedScheduling()
/// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
OverlappedValueTaskSource? vts = (OverlappedValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(vts is not null);
Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private ThreadPoolValueTaskSource? _reusableThreadPoolValueTaskSource; // reusable ThreadPoolValueTaskSource that is currently NOT being used

// Rent the reusable ThreadPoolValueTaskSource, or create a new one to use if we couldn't get one (which
// should only happen on first use or if the SafeFileHandle is being used concurrently).
internal ThreadPoolValueTaskSource GetThreadPoolValueTaskSource() =>
Interlocked.Exchange(ref _reusableThreadPoolValueTaskSource, null) ?? new ThreadPoolValueTaskSource(this);

private void TryToReuse(ThreadPoolValueTaskSource source)
{
Interlocked.CompareExchange(ref _reusableThreadPoolValueTaskSource, source, null);
}

/// <summary>
/// A reusable <see cref="IValueTaskSource"/> implementation that
/// queues asynchronous <see cref="RandomAccess"/> operations to
/// be completed synchronously on the thread pool.
/// </summary>
internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTaskSource<int>, IValueTaskSource<long>
{
private readonly SafeFileHandle _fileHandle;
private ManualResetValueTaskSourceCore<long> _source;
private Operation _operation = Operation.None;
private ExecutionContext? _context;
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved

// These fields store the parameters for the operation.
// The first two are common for all kinds of operations.
private long _fileOffset;
private CancellationToken _cancellationToken;
// Used by simple reads and writes. Will be unsafely cast to a memory when performing a read.
private ReadOnlyMemory<byte> _singleSegment;
private IReadOnlyList<Memory<byte>>? _readScatterBuffers;
private IReadOnlyList<ReadOnlyMemory<byte>>? _writeGatherBuffers;

internal ThreadPoolValueTaskSource(SafeFileHandle fileHandle)
{
_fileHandle = fileHandle;
}

[Conditional("DEBUG")]
private void ValidateInvariants()
{
Operation op = _operation;
Debug.Assert(op == Operation.None, $"An operation was queued before the previous {op}'s completion.");
}

private long GetResultAndRelease(short token)
{
try
{
return _source.GetResult(token);
}
finally
{
_source.Reset();
_fileHandle.TryToReuse(this);
}
}

public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_source.OnCompleted(continuation, state, token, flags);
int IValueTaskSource<int>.GetResult(short token) => (int) GetResultAndRelease(token);
teo-tsirpanis marked this conversation as resolved.
Show resolved Hide resolved
long IValueTaskSource<long>.GetResult(short token) => GetResultAndRelease(token);

private void ExecuteInternal()
{
Debug.Assert(_operation >= Operation.Read && _operation <= Operation.WriteGather);

long result = 0;
Exception? exception = null;
try
{
// This is the operation's last chance to be canceled.
if (_cancellationToken.IsCancellationRequested)
{
exception = new OperationCanceledException(_cancellationToken);
}
else
{
switch (_operation)
{
case Operation.Read:
Memory<byte> writableSingleSegment = MemoryMarshal.AsMemory(_singleSegment);
result = RandomAccess.ReadAtOffset(_fileHandle, writableSingleSegment.Span, _fileOffset);
break;
case Operation.Write:
result = RandomAccess.WriteAtOffset(_fileHandle, _singleSegment.Span, _fileOffset);
break;
case Operation.ReadScatter:
Debug.Assert(_readScatterBuffers != null);
result = RandomAccess.ReadScatterAtOffset(_fileHandle, _readScatterBuffers, _fileOffset);
break;
case Operation.WriteGather:
Debug.Assert(_writeGatherBuffers != null);
result = RandomAccess.WriteGatherAtOffset(_fileHandle, _writeGatherBuffers, _fileOffset);
break;
}
}
}
catch (Exception e)
{
exception = e;
}
finally
{
_operation = Operation.None;
_context = null;
_cancellationToken = default;
_singleSegment = default;
_readScatterBuffers = null;
_writeGatherBuffers = null;
}

if (exception == null)
{
_source.SetResult(result);
}
else
{
_source.SetException(exception);
}
}

void IThreadPoolWorkItem.Execute()
{
if (_context == null || _context.IsDefault)
{
ExecuteInternal();
}
else
{
ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.ExecuteInternal(), this);
}
}

private void QueueToThreadPool()
{
_context = ExecutionContext.Capture();
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
}

public ValueTask<int> QueueRead(Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.Read;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<int>(this, _source.Version);
}

public ValueTask<int> QueueWrite(ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.Write;
_singleSegment = buffer;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<int>(this, _source.Version);
}

public ValueTask<long> QueueReadScatter(IReadOnlyList<Memory<byte>> buffers, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.ReadScatter;
_readScatterBuffers = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<long>(this, _source.Version);
}

public ValueTask<long> QueueWriteGather(IReadOnlyList<ReadOnlyMemory<byte>> buffers, long fileOffset, CancellationToken cancellationToken)
{
ValidateInvariants();

_operation = Operation.WriteGather;
_writeGatherBuffers = buffers;
_fileOffset = fileOffset;
_cancellationToken = cancellationToken;
QueueToThreadPool();

return new ValueTask<long>(this, _source.Version);
}

private enum Operation : byte
{
None,
Read,
Write,
ReadScatter,
WriteGather
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Microsoft.Win32.SafeHandles
{
public sealed class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
// not using bool? as it's not thread safe
private volatile NullableBool _canSeek = NullableBool.Undefined;
Expand All @@ -23,11 +23,6 @@ private SafeFileHandle(bool ownsHandle)
SetHandle(new IntPtr(-1));
}

public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : this(ownsHandle)
{
SetHandle(preexistingHandle);
}

public bool IsAsync { get; private set; }

internal bool CanSeek => !IsClosed && GetCanSeek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ public SafeFileHandle() : base(true)
{
}

public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle)
{
SetHandle(preexistingHandle);
}

public bool IsAsync => (GetFileOptions() & FileOptions.Asynchronous) != 0;

internal bool CanSeek => !IsClosed && GetFileType() == Interop.Kernel32.FileTypes.FILE_TYPE_DISK;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
{
public SafeFileHandle(IntPtr preexistingHandle, bool ownsHandle) : base(ownsHandle)
{
SetHandle(preexistingHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\CriticalHandleZeroOrMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeHandleMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeHandleZeroOrMinusOneIsInvalid.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.ThreadPoolValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeWaitHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\AccessViolationException.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Action.cs" />
Expand Down Expand Up @@ -1517,7 +1519,7 @@
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetModuleFileName.cs">
<Link>Common\Interop\Windows\Kernel32\Interop.GetModuleFileName.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs">
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs">
<Link>Common\Interop\Windows\Kernel32\Interop.GetOverlappedResult.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetProcessMemoryInfo.cs">
Expand Down Expand Up @@ -1775,7 +1777,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Internal\Console.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Internal\Win32\RegistryKey.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.ValueTaskSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFileHandle.OverlappedValueTaskSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeFindHandle.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeRegistryHandle.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Microsoft\Win32\SafeHandles\SafeRegistryHandle.Windows.cs" />
Expand Down
Loading