Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented ReadAtLeastAsync #51979

Merged
merged 6 commits into from
May 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions src/libraries/System.IO.Pipelines/System.IO.Pipelines.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
Microsoft Visual Studio Solution File, Format Version 12.00

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.31223.327
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestUtilities", "..\Common\tests\TestUtilities\TestUtilities.csproj", "{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Win32.Registry", "..\Microsoft.Win32.Registry\ref\Microsoft.Win32.Registry.csproj", "{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0}"
Expand All @@ -23,18 +27,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{A7DEEB5B-C33
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StreamConformanceTests", "..\Common\tests\StreamConformanceTests\StreamConformanceTests.csproj", "{1E18A8F9-3602-4948-8764-BC7C85840B80}"
EndProject
Global
GlobalSection(NestedProjects) = preSolution
{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{1B97BE49-ACAD-4DC9-B195-FE23C3DE5D08} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{DC148B85-05DB-43B2-B2AF-958C305C2C0A} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{C6EA39FC-B265-4B3E-B5BA-9D0D2C601691} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{E0AFC8E8-C697-40B3-A8AD-52788AE1B753} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{90FCF3EB-F36D-4D39-8A4A-623497F54700} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{1DD520A2-ED75-4889-848E-DBDC7BE41873} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
{FFDC73EC-53C9-4B6E-B468-C477039742D8} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
EndGlobalSection
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
Expand Down Expand Up @@ -76,10 +71,26 @@ Global
{90FCF3EB-F36D-4D39-8A4A-623497F54700}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90FCF3EB-F36D-4D39-8A4A-623497F54700}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90FCF3EB-F36D-4D39-8A4A-623497F54700}.Release|Any CPU.Build.0 = Release|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E18A8F9-3602-4948-8764-BC7C85840B80}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{DC148B85-05DB-43B2-B2AF-958C305C2C0A} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{1DD520A2-ED75-4889-848E-DBDC7BE41873} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
{1B97BE49-ACAD-4DC9-B195-FE23C3DE5D08} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
{C6EA39FC-B265-4B3E-B5BA-9D0D2C601691} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{FFDC73EC-53C9-4B6E-B468-C477039742D8} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}
{E0AFC8E8-C697-40B3-A8AD-52788AE1B753} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{90FCF3EB-F36D-4D39-8A4A-623497F54700} = {A7DEEB5B-C332-4307-852F-8859096F4DCD}
{1E18A8F9-3602-4948-8764-BC7C85840B80} = {AB8B533C-817A-4010-9FFD-0206D41307D0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2034EAE9-84E4-42C3-8C1F-AB515D313D5E}
EndGlobalSection
Expand Down
4 changes: 3 additions & 1 deletion src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ protected PipeReader() { }
public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.Buffers.ReadOnlySequence<byte> sequence) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
[System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
public virtual void OnWriterCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAtLeastAsync(int minimumSize, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAtLeastAsyncCore(int minimumSize, System.Threading.CancellationToken cancellationToken) { throw null; }
public abstract bool TryRead(out System.IO.Pipelines.ReadResult result);
}
public abstract partial class PipeScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public DefaultPipeReader(Pipe pipe)

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) => _pipe.ReadAsync(cancellationToken);

protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken) => _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken);

public override void AdvanceTo(SequencePosition consumed) => _pipe.AdvanceReader(consumed);

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _pipe.AdvanceReader(consumed, examined);
Expand Down
77 changes: 71 additions & 6 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public sealed partial class Pipe
// The extent of the bytes available to the PipeReader to consume
private BufferSegment? _readTail;
private int _readTailIndex;
private int _minimumReadBytes;

// The write head which is the extent of the PipeWriter's written bytes
private BufferSegment? _writingHead;
Expand Down Expand Up @@ -274,7 +275,7 @@ internal bool CommitUnsynchronized()
if (_unflushedBytes == 0)
{
// Nothing written to commit
return true;
return false;
}

// Update the writing head
Expand All @@ -288,8 +289,18 @@ internal bool CommitUnsynchronized()
long oldLength = _unconsumedBytes;
_unconsumedBytes += _unflushedBytes;

// Do not reset if reader is complete
if (PauseWriterThreshold > 0 &&
bool resumeReader = true;

if (_unconsumedBytes < _minimumReadBytes)
{
// Don't yield the reader if we haven't written enough
resumeReader = false;
}
// We only apply back pressure if the reader isn't paused. This is important
// because if it is blocked then this could cause a deadlock (if resumeReader is false).
// If we are resuming the reader, then we can look at the pause threshold to know
// if we should pause the writer.
else if (PauseWriterThreshold > 0 &&
oldLength < PauseWriterThreshold &&
_unconsumedBytes >= PauseWriterThreshold &&
!_readerCompletion.IsCompleted)
Expand All @@ -300,7 +311,7 @@ internal bool CommitUnsynchronized()
_unflushedBytes = 0;
_writingHeadBytesBuffered = 0;

return false;
return resumeReader;
}

internal void Advance(int bytes)
Expand Down Expand Up @@ -346,7 +357,7 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)

private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
var wasEmpty = CommitUnsynchronized();
var completeReader = CommitUnsynchronized();

// AttachToken before completing reader awaiter in case cancellationToken is already completed
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
Expand All @@ -367,7 +378,7 @@ private void PrepareFlush(out CompletionData completionData, out ValueTask<Flush
// Complete reader only if new data was pushed into the pipe
// Avoid throwing in between completing the reader and scheduling the callback
// if the intent is to allow pipe to continue reading the data
if (!wasEmpty)
if (completeReader)
{
_readerAwaitable.Complete(out completionData);
}
Expand Down Expand Up @@ -646,6 +657,57 @@ internal void OnReaderCompleted(Action<Exception?, object?> callback, object? st
}
}

internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token)
{
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

CompletionData completionData = default;
ValueTask<ReadResult> result;
lock (SyncObj)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);

// If the awaitable is already complete then return the value result directly
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);

// Short circuit if we have the data or if we enter another terminal state
if (_unconsumedBytes >= minimumBytes || readResult.IsCanceled || readResult.IsCompleted)
{
return new ValueTask<ReadResult>(readResult);
}

// We don't have enough data so we need to reset the reader awaitable
_readerAwaitable.SetUncompleted();

// We also need to flip the reading state off
_operationState.EndRead();
}

// If the writer is currently paused and we are about the wait for more data then this would deadlock.
// The writer is paused at the pause threshold but the reader needs a minimum amount in order to make progress.
// We resume the writer so that we can unblock this read.
if (!_writerAwaitable.IsCompleted)
{
_writerAwaitable.Complete(out completionData);
}

// Set the minimum read bytes if we need to wait
_minimumReadBytes = minimumBytes;

// Otherwise it's async
result = new ValueTask<ReadResult>(_reader, token: 0);
}

TrySchedule(WriterScheduler, in completionData);

return result;
}

internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
{
if (_readerCompletion.IsCompleted)
Expand Down Expand Up @@ -889,6 +951,9 @@ private void GetReadResult(out ReadResult result)
{
_operationState.BeginRead();
}

// Reset the minimum read bytes when read yields
_minimumReadBytes = 0;
}

internal ValueTaskSourceStatus GetFlushAsyncStatus()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@ public abstract partial class PipeReader
/// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default);

/// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
/// <param name="minimumSize">The minimum length that needs to be buffered in order to for the call to return.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
/// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
/// <remarks>The call returns if the <see cref="System.IO.Pipelines.PipeReader" /> has read the minimumLength specified, or is cancelled or completed.</remarks>
public ValueTask<ReadResult> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default)
{
if (minimumSize < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize);
}

return ReadAtLeastAsyncCore(minimumSize, cancellationToken);
}

/// <summary>Asynchronously reads a sequence of bytes from the current <see cref="System.IO.Pipelines.PipeReader" />.</summary>
/// <param name="minimumSize">The minimum length that needs to be buffered in order to for the call to return.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see langword="default" />.</param>
/// <returns>A <see cref="System.Threading.Tasks.ValueTask{T}" /> representing the asynchronous read operation.</returns>
/// <remarks>The call returns if the <see cref="System.IO.Pipelines.PipeReader" /> has read the minimumLength specified, or is cancelled or completed.</remarks>
protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
{
while (true)
{
ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;

if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled)
{
return result;
}

// Keep buffering until we get more data
AdvanceTo(buffer.Start, buffer.End);
}
}

/// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed.</summary>
/// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
/// <remarks>The memory for the consumed data will be released and no longer available.
Expand Down
26 changes: 26 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,32 @@ public void FlushAsyncReturnsNonCompletedSizeWhenCommitOverTheLimit()
Assert.False(flushAsync.IsCompleted);
}

[Fact]
public async Task ReadAtLeastAsyncUnblocksWriterIfMinimumlowerThanResumeThreshold()
{
PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold);
ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();
Assert.False(flushAsync.IsCompleted);

ValueTask<ReadResult> readAsync = _pipe.Reader.ReadAtLeastAsync(PauseWriterThreshold * 3);

Assert.False(readAsync.IsCompleted);

// This should unblock the flush
Assert.True(flushAsync.IsCompleted);

for (int i = 0; i < 2; i++)
{
writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold);
flushAsync = writableBuffer.FlushAsync();
Assert.True(flushAsync.IsCompleted);
}

var result = await readAsync;
Assert.Equal(PauseWriterThreshold * 3, result.Buffer.Length);
_pipe.Reader.AdvanceTo(result.Buffer.End);
}

[Fact]
public async Task FlushAsyncThrowsIfReaderCompletedWithException()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines.Tests
{
// This is a PipeReader implementation that does not override any of the virtual methods.
// The intent is to test the base implementation without having to rewrite the base functionality
// of the PipeReader.
public class BasePipeReader : PipeReader
{
private readonly PipeReader _reader;

public BasePipeReader(PipeReader reader)
{
_reader = reader;
}

public override void AdvanceTo(SequencePosition consumed) => _reader.AdvanceTo(consumed);
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _reader.AdvanceTo(consumed, examined);
public override void CancelPendingRead() => _reader.CancelPendingRead();
public override void Complete(Exception? exception = null) => _reader.Complete(exception);
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) => _reader.ReadAsync(cancellationToken);
public override bool TryRead(out ReadResult result) => _reader.TryRead(out result);
}
}
Loading