Skip to content

Commit

Permalink
Fix semantics of ArrayMemoryPool (dotnet/corefx#27615)
Browse files Browse the repository at this point in the history
* Fix semantics of ArrayMemoryPool

* More thread safety

* Fix pipes and add tests


Commit migrated from dotnet/corefx@9ce9033
  • Loading branch information
pakrym authored and ahsonkhan committed Mar 6, 2018
1 parent 5d436c4 commit afe60bd
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void SetMemory(OwnedMemory<byte> buffer)
public void SetMemory(OwnedMemory<byte> ownedMemory, int start, int end, bool readOnly = false)
{
_ownedMemory = ownedMemory;
_ownedMemory.Retain();

AvailableMemory = _ownedMemory.Memory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public static class TestWriterExtensions
{
public static PipeWriter WriteEmpty(this PipeWriter writer, int count)
{
writer.GetMemory(count);
writer.GetSpan(count).Slice(0, count).Fill(0);
writer.Advance(count);
return writer;
}
Expand Down
35 changes: 29 additions & 6 deletions src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class PipePoolTests
private class DisposeTrackingBufferPool : TestMemoryPool
{
public int ReturnedBlocks { get; set; }
public int DisposedBlocks { get; set; }
public int CurrentlyRentedBlocks { get; set; }

public override OwnedMemory<byte> Rent(int size)
Expand All @@ -26,10 +27,12 @@ protected override void Dispose(bool disposing)

private class DisposeTrackingOwnedMemory : OwnedMemory<byte>
{
private readonly byte[] _array;
private byte[] _array;

private readonly DisposeTrackingBufferPool _bufferPool;

private int _refCount = 1;

public DisposeTrackingOwnedMemory(byte[] array, DisposeTrackingBufferPool bufferPool)
{
_array = array;
Expand All @@ -49,9 +52,9 @@ public override Span<byte> Span
}
}

public override bool IsDisposed { get; }
public override bool IsDisposed => _array == null;

protected override bool IsRetained => true;
protected override bool IsRetained => _refCount > 0;

public override MemoryHandle Pin(int byteOffset = 0)
{
Expand All @@ -68,18 +71,26 @@ protected override bool TryGetArray(out ArraySegment<byte> arraySegment)

protected override void Dispose(bool disposing)
{
throw new NotImplementedException();
if (IsRetained)
{
throw new InvalidOperationException();
}
_bufferPool.DisposedBlocks++;

_array = null;
}

public override bool Release()
{
_bufferPool.ReturnedBlocks++;
_bufferPool.CurrentlyRentedBlocks--;
_refCount--;
return IsRetained;
}

public override void Retain()
{
_refCount++;
}
}
}
Expand All @@ -102,6 +113,8 @@ public async Task AdvanceToEndReturnsAllBlocks()
pipe.Reader.AdvanceTo(readResult.Buffer.End);

Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
Assert.Equal(3, pool.ReturnedBlocks);
}

[Fact]
Expand All @@ -128,6 +141,10 @@ public async Task CanWriteAfterReturningMultipleBlocks()

// Try writing more
await pipe.Writer.WriteAsync(new byte[writeSize]);

Assert.Equal(1, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
Assert.Equal(2, pool.ReturnedBlocks);
}

[Fact]
Expand All @@ -141,10 +158,12 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce()
readerWriter.Writer.Complete();
readerWriter.Reader.Complete();
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);

readerWriter.Writer.Complete();
readerWriter.Reader.Complete();
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
Expand Down Expand Up @@ -174,24 +193,28 @@ public void ReturnsWriteHeadOnComplete()
{
var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(512);

pipe.Reader.Complete();
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
public void ReturnsWriteHeadWhenRequestingLargerBlock()
{
var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(4096);

pipe.Reader.Complete();
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(2, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
Expand Down
4 changes: 4 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/TestMemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public PooledMemory(OwnedMemory<byte> ownedMemory, TestMemoryPool pool)
_ownedMemory = ownedMemory;
_pool = pool;
_leaser = Environment.StackTrace;
_referenceCount = 1;
}

~PooledMemory()
Expand All @@ -75,12 +76,15 @@ public override MemoryHandle Pin(int byteOffset = 0)
public override void Retain()
{
_pool.CheckDisposed();
_ownedMemory.Retain();
Interlocked.Increment(ref _referenceCount);
}

public override bool Release()
{
_pool.CheckDisposed();
_ownedMemory.Release();

int newRefCount = Interlocked.Decrement(ref _referenceCount);

if (newRefCount < 0)
Expand Down
1 change: 1 addition & 0 deletions src/libraries/System.Memory/src/System.Memory.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
<Reference Include="System.Runtime.CompilerServices.Unsafe" />
<Reference Include="System.Threading" />
<Reference Condition="'$(TargetGroup)' != 'netstandard1.1'" Include="System.Numerics.Vectors" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' == 'true'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information.

using System.Runtime.InteropServices;
using System.Threading;

#if !netstandard
using Internal.Runtime.CompilerServices;
#else
Expand All @@ -21,13 +23,14 @@ private sealed class ArrayMemoryPoolBuffer : OwnedMemory<T>
public ArrayMemoryPoolBuffer(int size)
{
_array = ArrayPool<T>.Shared.Rent(size);
_refCount = 1;
}

public sealed override int Length => _array.Length;

public sealed override bool IsDisposed => _array == null;

protected sealed override bool IsRetained => _refCount > 0;
protected sealed override bool IsRetained => Volatile.Read(ref _refCount) > 0;

public sealed override Span<T> Span
{
Expand Down Expand Up @@ -79,22 +82,30 @@ public sealed override MemoryHandle Pin(int byteOffset = 0)

public sealed override void Retain()
{
if (IsDisposed)
ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();

_refCount++;
while (true)
{
int currentCount = Volatile.Read(ref _refCount);
if (currentCount <= 0) ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();
if (Interlocked.CompareExchange(ref _refCount, currentCount + 1, currentCount) == currentCount) break;
}
}

public sealed override bool Release()
{
if (IsDisposed)
ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();

int newRefCount = --_refCount;
if (newRefCount < 0)
ThrowHelper.ThrowInvalidOperationException();

return newRefCount != 0;
while (true)
{
int currentCount = Volatile.Read(ref _refCount);
if (currentCount <= 0) ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();
if (Interlocked.CompareExchange(ref _refCount, currentCount - 1, currentCount) == currentCount)
{
if (currentCount == 1)
{
Dispose();
return false;
}
return true;
}
}
}
}
}
Expand Down
37 changes: 34 additions & 3 deletions src/libraries/System.Memory/tests/MemoryPool/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static void DisposingTheSharedPoolIsANop()
using (OwnedMemory<int> block = mp.Rent(10))
{
Assert.True(block.Length >= 10);
block.Release();
}
}

Expand All @@ -55,6 +56,7 @@ public static void MemoryPoolSpan()
Assert.Equal((IntPtr)newMemoryHandle.Pointer, (IntPtr)pSpan);
}
}
block.Release();
}
}

Expand All @@ -77,6 +79,7 @@ public static void MemoryPoolPin(int byteOffset)
Assert.Equal((IntPtr)pSpan, ((IntPtr)newMemoryHandle.Pointer) - byteOffset);
}
}
block.Release();
}
}

Expand Down Expand Up @@ -109,7 +112,7 @@ public static void MemoryPoolPinOffsetAtEnd()
{
return; // The pool gave us a very large block - too big to compute the byteOffset needed to carry out this test. Skip.
}

using (MemoryHandle newMemoryHandle = block.Pin(byteOffset: byteOffset))
{
unsafe
Expand Down Expand Up @@ -177,6 +180,7 @@ public static void EachRentalIsUniqueUntilDisposed()

foreach (OwnedMemory<int> prior in priorBlocks)
{
prior.Release();
prior.Dispose();
}
}
Expand All @@ -187,6 +191,7 @@ public static void RentWithDefaultSize()
using (OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(minBufferSize: -1))
{
Assert.True(block.Length >= 1);
block.Release();
}
}

Expand Down Expand Up @@ -224,6 +229,7 @@ public static void MemoryPoolTryGetArray()
Assert.Equal((IntPtr)pSpan, (IntPtr)pArray);
}
}
block.Release();
}
}

Expand All @@ -243,10 +249,13 @@ public static void RefCounting()
moreToGo = block.Release();
Assert.True(moreToGo);

moreToGo = block.Release();
Assert.True(moreToGo);

moreToGo = block.Release();
Assert.False(moreToGo);

Assert.Throws<InvalidOperationException>(() => block.Release());
Assert.Throws<ObjectDisposedException>(() => block.Release());
}
}

Expand All @@ -255,7 +264,7 @@ public static void IsDisposed()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
Assert.False(block.IsDisposed);
block.Dispose();
block.Release();
Assert.True(block.IsDisposed);
block.Dispose();
Assert.True(block.IsDisposed);
Expand All @@ -265,6 +274,7 @@ public static void IsDisposed()
public static void ExtraDisposesAreIgnored()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
block.Dispose();
}
Expand All @@ -273,6 +283,7 @@ public static void ExtraDisposesAreIgnored()
public static void NoSpanAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Span.DontBox());
}
Expand All @@ -281,6 +292,7 @@ public static void NoSpanAfterDispose()
public static void NoRetainAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Retain());
}
Expand All @@ -289,6 +301,7 @@ public static void NoRetainAfterDispose()
public static void NoRelease_AfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Release());
}
Expand All @@ -297,6 +310,7 @@ public static void NoRelease_AfterDispose()
public static void NoPinAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Pin());
}
Expand All @@ -306,9 +320,26 @@ public static void NoTryGetArrayAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
Memory<int> memory = block.Memory;
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => MemoryMarshal.TryGetArray(memory, out ArraySegment<int> arraySegment));
}

[Fact]
public static void IsRetainedWhenReturned()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
Assert.False(block.Release());
}

[Fact]
public static void IsDisposedWhenReleased()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();

Assert.True(block.IsDisposed);
}
}
}

0 comments on commit afe60bd

Please sign in to comment.