Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Fix pipes and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym committed Mar 2, 2018
1 parent 4eecbab commit efa461f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void SetMemory(OwnedMemory<byte> ownedMemory, int start, int end, bool re
public void ResetMemory()
{
_ownedMemory.Release();
_ownedMemory.Dispose();
_ownedMemory = null;
AvailableMemory = default;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public static class TestWriterExtensions
{
public static PipeWriter WriteEmpty(this PipeWriter writer, int count)
{
writer.GetMemory(count);
writer.GetSpan(count).Slice(0, count).Fill(0);
writer.Advance(count);
return writer;
}
Expand Down
36 changes: 30 additions & 6 deletions src/System.IO.Pipelines/tests/PipePoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Diagnostics;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -13,6 +14,7 @@ public class PipePoolTests
private class DisposeTrackingBufferPool : TestMemoryPool
{
public int ReturnedBlocks { get; set; }
public int DisposedBlocks { get; set; }
public int CurrentlyRentedBlocks { get; set; }

public override OwnedMemory<byte> Rent(int size)
Expand All @@ -26,10 +28,12 @@ protected override void Dispose(bool disposing)

private class DisposeTrackingOwnedMemory : OwnedMemory<byte>
{
private readonly byte[] _array;
private byte[] _array;

private readonly DisposeTrackingBufferPool _bufferPool;

private int _refCount = 1;

public DisposeTrackingOwnedMemory(byte[] array, DisposeTrackingBufferPool bufferPool)
{
_array = array;
Expand All @@ -49,9 +53,9 @@ public override Span<byte> Span
}
}

public override bool IsDisposed { get; }
public override bool IsDisposed => _array == null;

protected override bool IsRetained => true;
protected override bool IsRetained => _refCount > 0;

public override MemoryHandle Pin(int byteOffset = 0)
{
Expand All @@ -68,18 +72,26 @@ protected override bool TryGetArray(out ArraySegment<byte> arraySegment)

protected override void Dispose(bool disposing)
{
throw new NotImplementedException();
if (IsRetained)
{
throw new InvalidOperationException();
}
_bufferPool.DisposedBlocks++;

_array = null;
}

public override bool Release()
{
_bufferPool.ReturnedBlocks++;
_bufferPool.CurrentlyRentedBlocks--;
_refCount--;
return IsRetained;
}

public override void Retain()
{
_refCount++;
}
}
}
Expand All @@ -102,6 +114,8 @@ public async Task AdvanceToEndReturnsAllBlocks()
pipe.Reader.AdvanceTo(readResult.Buffer.End);

Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
Assert.Equal(3, pool.ReturnedBlocks);
}

[Fact]
Expand All @@ -128,6 +142,10 @@ public async Task CanWriteAfterReturningMultipleBlocks()

// Try writing more
await pipe.Writer.WriteAsync(new byte[writeSize]);

Assert.Equal(1, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
Assert.Equal(2, pool.ReturnedBlocks);
}

[Fact]
Expand All @@ -141,10 +159,12 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce()
readerWriter.Writer.Complete();
readerWriter.Reader.Complete();
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);

readerWriter.Writer.Complete();
readerWriter.Reader.Complete();
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
Expand Down Expand Up @@ -174,24 +194,28 @@ public void ReturnsWriteHeadOnComplete()
{
var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(512);

pipe.Reader.Complete();
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
public void ReturnsWriteHeadWhenRequestingLargerBlock()
{
var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(4096);

pipe.Reader.Complete();
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(2, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
Expand Down
4 changes: 4 additions & 0 deletions src/System.IO.Pipelines/tests/TestMemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public PooledMemory(OwnedMemory<byte> ownedMemory, TestMemoryPool pool)
_ownedMemory = ownedMemory;
_pool = pool;
_leaser = Environment.StackTrace;
_referenceCount = 1;
}

~PooledMemory()
Expand All @@ -74,12 +75,15 @@ public override MemoryHandle Pin(int byteOffset = 0)
public override void Retain()
{
_pool.CheckDisposed();
_ownedMemory.Retain();
Interlocked.Increment(ref _referenceCount);
}

public override bool Release()
{
_pool.CheckDisposed();
_ownedMemory.Release();

int newRefCount = Interlocked.Decrement(ref _referenceCount);

if (newRefCount < 0)
Expand Down
1 change: 1 addition & 0 deletions src/System.Memory/src/System.Memory.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
<Reference Include="System.Runtime.CompilerServices.Unsafe" />
<Reference Include="System.Threading" />
<Reference Condition="'$(TargetGroup)' != 'netstandard1.1'" Include="System.Numerics.Vectors" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' == 'true'">
Expand Down

0 comments on commit efa461f

Please sign in to comment.