Skip to content

Commit

Permalink
Implemented ReadAtLeastAsync (#51979)
Browse files Browse the repository at this point in the history
- Implemented the PipeReader implementation and override the method on Pipe.
- The Pipe implementation has some caveats. Calling ReadAtLeast messes with the pipe back pressure since it's trying to prevent deadlocks. This means that if ReadAsync is called with a big threshold, the writer will write as fast as it can ignoring the PauseThreshold up until the reader is unblocked. In a sense, the call to ReadAtLeast async overrides the pause threshold for that one read.
- Added tests
  • Loading branch information
davidfowl authored May 1, 2021
1 parent 5ed9ca4 commit a46f7e9
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 19 deletions.
35 changes: 23 additions & 12 deletions src/libraries/System.IO.Pipelines/System.IO.Pipelines.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
Microsoft Visual Studio Solution File, Format Version 12.00

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.31223.327
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestUtilities", "..\Common\tests\TestUtilities\TestUtilities.csproj", "{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Win32.Registry", "..\Microsoft.Win32.Registry\ref\Microsoft.Win32.Registry.csproj", "{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0}"
Expand All @@ -23,18 +27,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{A7DEEB5B-C33
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StreamConformanceTests", "..\Common\tests\StreamConformanceTests\StreamConformanceTests.csproj", "{1E18A8F9-3602-4948-8764-BC7C85840B80}"
EndProject
Global
GlobalSection(NestedProjects) = preSolution
{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{1B97BE49-ACAD-4DC9-B195-FE23C3DE5D08} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{DC148B85-05DB-43B2-B2AF-958C305C2C0A} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{C6EA39FC-B265-4B3E-B5BA-9D0D2C601691} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{E0AFC8E8-C697-40B3-A8AD-52788AE1B753} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{90FCF3EB-F36D-4D39-8A4A-623497F54700} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{1DD520A2-ED75-4889-848E-DBDC7BE41873} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
{FFDC73EC-53C9-4B6E-B468-C477039742D8} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
EndGlobalSection
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
Expand Down Expand Up @@ -76,10 +71,26 @@ Global
{90FCF3EB-F36D-4D39-8A4A-623497F54700}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90FCF3EB-F36D-4D39-8A4A-623497F54700}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90FCF3EB-F36D-4D39-8A4A-623497F54700}.Release|Any CPU.Build.0 = Release|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{DC148B85-05DB-43B2-B2AF-958C305C2C0A} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{1DD520A2-ED75-4889-848E-DBDC7BE41873} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
{1B97BE49-ACAD-4DC9-B195-FE23C3DE5D08} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{C6EA39FC-B265-4B3E-B5BA-9D0D2C601691} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{FFDC73EC-53C9-4B6E-B468-C477039742D8} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
{E0AFC8E8-C697-40B3-A8AD-52788AE1B753} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{90FCF3EB-F36D-4D39-8A4A-623497F54700} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{1E18A8F9-3602-4948-8764-BC7C85840B80} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2034EAE9-84E4-42C3-8C1F-AB515D313D5E}
EndGlobalSection
Expand Down
4 changes: 3 additions & 1 deletion src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ protected PipeReader() { }
public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.Buffers.ReadOnlySequence<byte> sequence) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
[System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
public virtual void OnWriterCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAtLeastAsync(int minimumSize, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAtLeastAsyncCore(int minimumSize, System.Threading.CancellationToken cancellationToken) { throw null; }
public abstract bool TryRead(out System.IO.Pipelines.ReadResult result);
}
public abstract partial class PipeScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public DefaultPipeReader(Pipe pipe)

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) => _pipe.ReadAsync(cancellationToken);

protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken) => _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken);

public override void AdvanceTo(SequencePosition consumed) => _pipe.AdvanceReader(consumed);

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _pipe.AdvanceReader(consumed, examined);
Expand Down
77 changes: 71 additions & 6 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public sealed partial class Pipe
// The extent of the bytes available to the PipeReader to consume
private BufferSegment? _readTail;
private int _readTailIndex;
private int _minimumReadBytes;

// The write head which is the extent of the PipeWriter's written bytes
private BufferSegment? _writingHead;
Expand Down Expand Up @@ -274,7 +275,7 @@ internal bool CommitUnsynchronized()
if (_unflushedBytes == 0)
{
// Nothing written to commit
return true;
return false;
}

// Update the writing head
Expand All @@ -288,8 +289,18 @@ internal bool CommitUnsynchronized()
long oldLength = _unconsumedBytes;
_unconsumedBytes += _unflushedBytes;

// Do not reset if reader is complete
if (PauseWriterThreshold > 0 &&
bool resumeReader = true;

if (_unconsumedBytes < _minimumReadBytes)
{
// Don't yield the reader if we haven't written enough
resumeReader = false;
}
// We only apply back pressure if the reader isn't paused. This is important
// because if it is blocked then this could cause a deadlock (if resumeReader is false).
// If we are resuming the reader, then we can look at the pause threshold to know
// if we should pause the writer.
else if (PauseWriterThreshold > 0 &&
oldLength < PauseWriterThreshold &&
_unconsumedBytes >= PauseWriterThreshold &&
!_readerCompletion.IsCompleted)
Expand All @@ -300,7 +311,7 @@ internal bool CommitUnsynchronized()
_unflushedBytes = 0;
_writingHeadBytesBuffered = 0;

return false;
return resumeReader;
}

internal void Advance(int bytes)
Expand Down Expand Up @@ -346,7 +357,7 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)

private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
var wasEmpty = CommitUnsynchronized();
var completeReader = CommitUnsynchronized();

// AttachToken before completing reader awaiter in case cancellationToken is already completed
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
Expand All @@ -367,7 +378,7 @@ private void PrepareFlush(out CompletionData completionData, out ValueTask<Flush
// Complete reader only if new data was pushed into the pipe
// Avoid throwing in between completing the reader and scheduling the callback
// if the intent is to allow pipe to continue reading the data
if (!wasEmpty)
if (completeReader)
{
_readerAwaitable.Complete(out completionData);
}
Expand Down Expand Up @@ -646,6 +657,57 @@ internal void OnReaderCompleted(Action<Exception?, object?> callback, object? st
}
}

internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token)
{
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

CompletionData completionData = default;
ValueTask<ReadResult> result;
lock (SyncObj)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);

// If the awaitable is already complete then return the value result directly
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);

// Short circuit if we have the data or if we enter another terminal state
if (_unconsumedBytes >= minimumBytes || readResult.IsCanceled || readResult.IsCompleted)
{
return new ValueTask<ReadResult>(readResult);
}

// We don't have enough data so we need to reset the reader awaitable
_readerAwaitable.SetUncompleted();

// We also need to flip the reading state off
_operationState.EndRead();
}

// If the writer is currently paused and we are about the wait for more data then this would deadlock.
// The writer is paused at the pause threshold but the reader needs a minimum amount in order to make progress.
// We resume the writer so that we can unblock this read.
if (!_writerAwaitable.IsCompleted)
{
_writerAwaitable.Complete(out completionData);
}

// Set the minimum read bytes if we need to wait
_minimumReadBytes = minimumBytes;

// Otherwise it's async
result = new ValueTask<ReadResult>(_reader, token: 0);
}

TrySchedule(WriterScheduler, in completionData);

return result;
}

internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
{
if (_readerCompletion.IsCompleted)
Expand Down Expand Up @@ -889,6 +951,9 @@ private void GetReadResult(out ReadResult result)
{
_operationState.BeginRead();
}

// Reset the minimum read bytes when read yields
_minimumReadBytes = 0;
}

internal ValueTaskSourceStatus GetFlushAsyncStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@ public abstract partial class PipeReader
/// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default);

/// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
/// <param name="minimumSize">The minimum length that needs to be buffered in order to for the call to return.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
/// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
/// <remarks>The call returns if the <see cref="System.IO.Pipelines.PipeReader" /> has read the minimumLength specified, or is cancelled or completed.</remarks>
public ValueTask<ReadResult> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default)
{
if (minimumSize < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize);
}

return ReadAtLeastAsyncCore(minimumSize, cancellationToken);
}

/// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
/// <param name="minimumSize">The minimum length that needs to be buffered in order to for the call to return.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
/// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
/// <remarks>The call returns if the <see cref="System.IO.Pipelines.PipeReader" /> has read the minimumLength specified, or is cancelled or completed.</remarks>
protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
{
while (true)
{
ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;

if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled)
{
return result;
}

// Keep buffering until we get more data
AdvanceTo(buffer.Start, buffer.End);
}
}

/// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed.</summary>
/// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
/// <remarks>The memory for the consumed data will be released and no longer available.
Expand Down
26 changes: 26 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,32 @@ public void FlushAsyncReturnsNonCompletedSizeWhenCommitOverTheLimit()
Assert.False(flushAsync.IsCompleted);
}

[Fact]
public async Task ReadAtLeastAsyncUnblocksWriterIfMinimumlowerThanResumeThreshold()
{
PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold);
ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();
Assert.False(flushAsync.IsCompleted);

ValueTask<ReadResult> readAsync = _pipe.Reader.ReadAtLeastAsync(PauseWriterThreshold * 3);

Assert.False(readAsync.IsCompleted);

// This should unblock the flush
Assert.True(flushAsync.IsCompleted);

for (int i = 0; i < 2; i++)
{
writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold);
flushAsync = writableBuffer.FlushAsync();
Assert.True(flushAsync.IsCompleted);
}

var result = await readAsync;
Assert.Equal(PauseWriterThreshold * 3, result.Buffer.Length);
_pipe.Reader.AdvanceTo(result.Buffer.End);
}

[Fact]
public async Task FlushAsyncThrowsIfReaderCompletedWithException()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines.Tests
{
// This is a PipeReader implementation that does not override any of the virtual methods.
// The intent is to test the base implementation without having to rewrite the base functionality
// of the PipeReader.
public class BasePipeReader : PipeReader
{
private readonly PipeReader _reader;

public BasePipeReader(PipeReader reader)
{
_reader = reader;
}

public override void AdvanceTo(SequencePosition consumed) => _reader.AdvanceTo(consumed);
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _reader.AdvanceTo(consumed, examined);
public override void CancelPendingRead() => _reader.CancelPendingRead();
public override void Complete(Exception? exception = null) => _reader.Complete(exception);
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) => _reader.ReadAsync(cancellationToken);
public override bool TryRead(out ReadResult result) => _reader.TryRead(out result);
}
}
Loading

0 comments on commit a46f7e9

Please sign in to comment.