Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
React to everything
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym committed Feb 23, 2018
1 parent 57b86d4 commit 463b7a4
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 25 deletions.
1 change: 0 additions & 1 deletion src/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ protected PipeWriter() { }
public abstract System.IO.Pipelines.PipeAwaiter<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public abstract System.Memory<byte> GetMemory(int minimumLength = 0);
public abstract System.Span<byte> GetSpan(int minimumLength = 0);
public abstract int MaxBufferSize { get; }
public abstract void OnReaderCompleted(System.Action<System.Exception, object> callback, object state);
public virtual System.IO.Pipelines.PipeAwaiter<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public DefaultPipeWriter(Pipe pipe)

public override Span<byte> GetSpan(int lengthHint = 0) => _pipe.GetMemory(lengthHint).Span;

public override int MaxBufferSize => _pipe._pool.MaxBufferSize;

public bool IsCompleted => _pipe.IsFlushAsyncCompleted;

public FlushResult GetResult() => _pipe.GetFlushAsyncResult();
Expand Down
14 changes: 7 additions & 7 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,26 +341,26 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
lock (_sync)
{
var examinedEverything = false;
if (examined.Segment == _commitHead)
if (examined.GetObject() == _commitHead)
{
examinedEverything = _commitHead != null ? examined.Index == _commitHeadIndex - _commitHead.Start : examined.Index == 0;
examinedEverything = _commitHead != null ? examined.GetInteger() == _commitHeadIndex - _commitHead.Start : examined.GetInteger() == 0;
}

if (consumed.Segment != null)
if (consumed.GetObject() != null)
{
if (_readHead == null)
{
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
return;
}

var consumedSegment = (BufferSegment)consumed.Segment;
var consumedSegment = (BufferSegment)consumed.GetObject();

returnStart = _readHead;
returnEnd = consumedSegment;

// Check if we crossed _maximumSizeLow and complete backpressure
long consumedBytes = new ReadOnlySequence<byte>(returnStart, _readHeadIndex, consumedSegment, consumed.Index).Length;
long consumedBytes = new ReadOnlySequence<byte>(returnStart, _readHeadIndex, consumedSegment, consumed.GetInteger()).Length;
long oldLength = _length;
_length -= consumedBytes;

Expand All @@ -373,7 +373,7 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
// Check if we consumed entire last segment
// if we are going to return commit head we need to check that there is no writing operation that
// might be using tailspace
if (consumed.Index == returnEnd.Length && _writingHead != returnEnd)
if (consumed.GetInteger() == returnEnd.Length && _writingHead != returnEnd)
{
BufferSegment nextBlock = returnEnd.NextSegment;
if (_commitHead == returnEnd)
Expand All @@ -389,7 +389,7 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
else
{
_readHead = consumedSegment;
_readHeadIndex = consumed.Index;
_readHeadIndex = consumed.GetInteger();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/System.IO.Pipelines/tests/PipeWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ public void ThrowsOnAdvanceWithNoMemory()
}

[Fact]
public async Task GetMemory_AjustsToPoolMaxBufferSize()
public void GetMemory_AdjustsToPoolMaxBufferSize()
{
PipeWriter buffer = Pipe.Writer;
var memory = buffer.GetMemory(int.MaxValue);
Assert.True(4096, memory.Lenght);
Assert.Equal(4096, memory.Length);
}
}
}
8 changes: 3 additions & 5 deletions src/System.Memory/src/System/Buffers/BuffersExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,14 @@ public static void Write<T>(this IBufferWriter<T> bufferWriter, ReadOnlySpan<T>

while (source.Length > 0)
{
int writeSize = destination.Length;

if (destination.Length == 0)
{
destination = bufferWriter.GetSpan(source.Length);
}

source.Slice(0, writeSize).CopyTo(destination);
bufferWriter.Advance(writeSize);
source = source.Slice(writeSize);
source.Slice(0, destination.Length).CopyTo(destination);
bufferWriter.Advance(destination.Length);
source = source.Slice(destination.Length);
destination = default;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public void GetPositionPrefersNextSegment()

SequencePosition c1 = buffer.GetPosition(buffer.Start, 50);

Assert.Equal(0, c1.Index);
Assert.Equal(bufferSegment2, c1.Segment);
Assert.Equal(0, c1.GetInteger());
Assert.Equal(bufferSegment2, c1.GetObject());
}

[Fact]
Expand All @@ -53,8 +53,8 @@ public void GetPositionDoesNotCrossOutsideBuffer()

SequencePosition c1 = buffer.GetPosition(buffer.Start, 200);

Assert.Equal(100, c1.Index);
Assert.Equal(bufferSegment2, c1.Segment);
Assert.Equal(100, c1.GetInteger());
Assert.Equal(bufferSegment2, c1.GetObject());
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public void GetPositionPrefersNextSegment()

SequencePosition c1 = buffer.GetPosition(buffer.Start, 50);

Assert.Equal(0, c1.Index);
Assert.Equal(bufferSegment2, c1.Segment);
Assert.Equal(0, c1.GetInteger());
Assert.Equal(bufferSegment2, c1.GetObject());
}

[Fact]
Expand All @@ -166,8 +166,8 @@ public void GetPositionDoesNotCrossOutsideBuffer()

SequencePosition c1 = buffer.GetPosition(buffer.Start, 200);

Assert.Equal(100, c1.Index);
Assert.Equal(bufferSegment2, c1.Segment);
Assert.Equal(100, c1.GetInteger());
Assert.Equal(bufferSegment2, c1.GetObject());
}

[Fact]
Expand Down

0 comments on commit 463b7a4

Please sign in to comment.