Skip to content

Commit

Permalink
Small tweaks to PipeReader behaviors (dotnet/corefx#27596)
Browse files Browse the repository at this point in the history
* Small tweaks to PipeReader behaviors
- Throw if AdvanceTo is called after completing the reader
- Don't throw if Complete is called without AdvanceTo
- Swapped the reading and writing exceptions
- Added AdvanceReader that takes BufferSegment and int, this cleans up
the API a bit as we touch SequencePosition in less places.



Commit migrated from dotnet/corefx@c6018ef
  • Loading branch information
davidfowl authored Mar 1, 2018
1 parent 257087d commit 2c0eae5
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public void ResetMemory()
AvailableMemory = default;
}

internal OwnedMemory<byte> OwnedMemory => _ownedMemory;

public Memory<byte> AvailableMemory { get; private set; }

public int Length => End - Start;
Expand Down
37 changes: 26 additions & 11 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,25 @@ internal void CompleteWriter(Exception exception)
}
}

internal void AdvanceReader(SequencePosition consumed)
internal void AdvanceReader(in SequencePosition consumed)
{
AdvanceReader(consumed, consumed);
}

internal void AdvanceReader(SequencePosition consumed, SequencePosition examined)
internal void AdvanceReader(in SequencePosition consumed, in SequencePosition examined)
{
// If the reader is completed
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

// TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data
// directly casting only works because the type value in ReadOnlySequenceSegment is 0
AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
}

internal void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
BufferSegment returnStart = null;
BufferSegment returnEnd = null;
Expand All @@ -346,26 +359,24 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
lock (_sync)
{
var examinedEverything = false;
if (examined.GetObject() == _commitHead)
if (examinedSegment == _commitHead)
{
examinedEverything = _commitHead != null ? examined.GetInteger() == _commitHeadIndex - _commitHead.Start : examined.GetInteger() == 0;
examinedEverything = _commitHead != null ? examinedIndex == _commitHeadIndex - _commitHead.Start : examinedIndex == 0;
}

if (consumed.GetObject() != null)
if (consumedSegment != null)
{
if (_readHead == null)
{
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
return;
}

var consumedSegment = (BufferSegment)consumed.GetObject();

returnStart = _readHead;
returnEnd = consumedSegment;

// Check if we crossed _maximumSizeLow and complete backpressure
long consumedBytes = new ReadOnlySequence<byte>(returnStart, _readHeadIndex, consumedSegment, consumed.GetInteger()).Length;
long consumedBytes = new ReadOnlySequence<byte>(returnStart, _readHeadIndex, consumedSegment, consumedIndex).Length;
long oldLength = _length;
_length -= consumedBytes;

Expand All @@ -378,7 +389,7 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
// Check if we consumed entire last segment
// if we are going to return commit head we need to check that there is no writing operation that
// might be using tailspace
if (consumed.GetInteger() == returnEnd.Length && _writingHead != returnEnd)
if (consumedIndex == returnEnd.Length && _writingHead != returnEnd)
{
BufferSegment nextBlock = returnEnd.NextSegment;
if (_commitHead == returnEnd)
Expand All @@ -394,7 +405,7 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
else
{
_readHead = consumedSegment;
_readHeadIndex = consumed.GetInteger();
_readHeadIndex = consumedIndex;
}
}

Expand Down Expand Up @@ -431,11 +442,15 @@ internal void CompleteReader(Exception exception)

lock (_sync)
{
// If we're reading, treat clean up that state before continuting
if (_readingState.IsActive)
{
ThrowHelper.ThrowInvalidOperationException_CompleteReaderActiveReader();
_readingState.End();
}

// REVIEW: We should consider cleaning up all of the allocated memory
// on the reader side now.

completionCallbacks = _readerCompletion.TryComplete(exception);
awaitable = _writerAwaitable.Complete();
writerCompleted = _writerCompletion.IsCompleted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@ internal static class ThrowHelper
public static void ThrowInvalidOperationException_NoWritingAllowed() => throw CreateInvalidOperationException_NoWritingAllowed();

[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoWritingAllowed() => new InvalidOperationException(SR.ReadingAfterCompleted);
public static Exception CreateInvalidOperationException_NoWritingAllowed() => new InvalidOperationException(SR.WritingAfterCompleted);

public static void ThrowInvalidOperationException_NoReadingAllowed() => throw CreateInvalidOperationException_NoReadingAllowed();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException(SR.WritingAfterCompleted);

public static void ThrowInvalidOperationException_CompleteReaderActiveReader() => throw CreateInvalidOperationException_CompleteReaderActiveReader();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_CompleteReaderActiveReader() => new InvalidOperationException(SR.CannotCompleteWhileReading);
public static Exception CreateInvalidOperationException_NoReadingAllowed() => new InvalidOperationException(SR.ReadingAfterCompleted);

public static void ThrowInvalidOperationException_AdvancingPastBufferSize() => throw CreateInvalidOperationException_AdvancingPastBufferSize();
[MethodImpl(MethodImplOptions.NoInlining)]
Expand Down
30 changes: 30 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,36 @@ public void FlushAsyncReturnsCompletedIfReaderCompletes()
Assert.True(flushAsync.IsCompleted);
FlushResult flushResult = flushAsync.GetResult();
Assert.True(flushResult.IsCompleted);

writableBuffer = _pipe.Writer.WriteEmpty(64);
flushAsync = writableBuffer.FlushAsync();
flushResult = flushAsync.GetResult();

Assert.True(flushResult.IsCompleted);
Assert.True(flushAsync.IsCompleted);
}

[Fact]
public async Task FlushAsyncReturnsCompletedIfReaderCompletesWithoutAdvance()
{
PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(64);
PipeAwaiter<FlushResult> flushAsync = writableBuffer.FlushAsync();

Assert.False(flushAsync.IsCompleted);

ReadResult result = await _pipe.Reader.ReadAsync();
_pipe.Reader.Complete();

Assert.True(flushAsync.IsCompleted);
FlushResult flushResult = flushAsync.GetResult();
Assert.True(flushResult.IsCompleted);

writableBuffer = _pipe.Writer.WriteEmpty(64);
flushAsync = writableBuffer.FlushAsync();
flushResult = flushAsync.GetResult();

Assert.True(flushResult.IsCompleted);
Assert.True(flushAsync.IsCompleted);
}

[Fact]
Expand Down
60 changes: 55 additions & 5 deletions src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -149,15 +150,62 @@ public async Task AdvanceWithGetPositionCrossingIntoWriteHeadWorks()
}

[Fact]
public async Task CompleteReaderThrowsIfReadInProgress()
public async Task CompleteReaderAfterFlushWithoutAdvancingDoesNotThrow()
{
await _pipe.Writer.FlushAsync();
ReadResult result = await _pipe.Reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;

_pipe.Reader.Complete();
}

[Fact]
public async Task ResetAfterCompleteReaderAndWriterWithoutAdvancingClearsEverything()
{
_pipe.Writer.WriteEmpty(4094);
_pipe.Writer.WriteEmpty(4094);
await _pipe.Writer.FlushAsync();
ReadResult result = await _pipe.Reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;

SequenceMarshal.TryGetReadOnlySequenceSegment(
buffer,
out ReadOnlySequenceSegment<byte> start,
out int startIndex,
out ReadOnlySequenceSegment<byte> end,
out int endIndex);

var startSegment = (BufferSegment)start;
var endSegment = (BufferSegment)end;
Assert.NotNull(startSegment.OwnedMemory);
Assert.NotNull(endSegment.OwnedMemory);

_pipe.Reader.Complete();

// Nothing cleaned up
Assert.NotNull(startSegment.OwnedMemory);
Assert.NotNull(endSegment.OwnedMemory);

_pipe.Writer.Complete();

// Should be cleaned up now
Assert.Null(startSegment.OwnedMemory);
Assert.Null(endSegment.OwnedMemory);

_pipe.Reset();
}

[Fact]
public async Task AdvanceAfterCompleteThrows()
{
await _pipe.Writer.WriteAsync(new byte[1]);
ReadResult result = await _pipe.Reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;

Assert.Throws<InvalidOperationException>(() => _pipe.Reader.Complete());
_pipe.Reader.Complete();

_pipe.Reader.AdvanceTo(buffer.Start, buffer.Start);
var exception = Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(buffer.End));
Assert.Equal("Reading is not allowed after reader was completed.", exception.Message);
}

[Fact]
Expand Down Expand Up @@ -438,13 +486,15 @@ public async Task ReadingCanBeCanceled()
cts.Token.Register(() => { _pipe.Writer.Complete(new OperationCanceledException(cts.Token)); });

Task ignore = Task.Run(
async () => {
async () =>
{
await Task.Delay(1000);
cts.Cancel();
});

await Assert.ThrowsAsync<OperationCanceledException>(
async () => {
async () =>
{
ReadResult result = await _pipe.Reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
});
Expand Down

0 comments on commit 2c0eae5

Please sign in to comment.