Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Fix semantics of ArrayMemoryPool #27615

Merged
merged 3 commits into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void SetMemory(OwnedMemory<byte> buffer)
public void SetMemory(OwnedMemory<byte> ownedMemory, int start, int end, bool readOnly = false)
{
_ownedMemory = ownedMemory;
_ownedMemory.Retain();

AvailableMemory = _ownedMemory.Memory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public static class TestWriterExtensions
{
public static PipeWriter WriteEmpty(this PipeWriter writer, int count)
{
writer.GetMemory(count);
writer.GetSpan(count).Slice(0, count).Fill(0);
writer.Advance(count);
return writer;
}
Expand Down
35 changes: 29 additions & 6 deletions src/System.IO.Pipelines/tests/PipePoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class PipePoolTests
private class DisposeTrackingBufferPool : TestMemoryPool
{
public int ReturnedBlocks { get; set; }
public int DisposedBlocks { get; set; }
public int CurrentlyRentedBlocks { get; set; }

public override OwnedMemory<byte> Rent(int size)
Expand All @@ -26,10 +27,12 @@ protected override void Dispose(bool disposing)

private class DisposeTrackingOwnedMemory : OwnedMemory<byte>
{
private readonly byte[] _array;
private byte[] _array;

private readonly DisposeTrackingBufferPool _bufferPool;

private int _refCount = 1;

public DisposeTrackingOwnedMemory(byte[] array, DisposeTrackingBufferPool bufferPool)
{
_array = array;
Expand All @@ -49,9 +52,9 @@ public override Span<byte> Span
}
}

public override bool IsDisposed { get; }
public override bool IsDisposed => _array == null;

protected override bool IsRetained => true;
protected override bool IsRetained => _refCount > 0;

public override MemoryHandle Pin(int byteOffset = 0)
{
Expand All @@ -68,18 +71,26 @@ protected override bool TryGetArray(out ArraySegment<byte> arraySegment)

protected override void Dispose(bool disposing)
{
throw new NotImplementedException();
if (IsRetained)
{
throw new InvalidOperationException();
}
_bufferPool.DisposedBlocks++;

_array = null;
}

public override bool Release()
{
_bufferPool.ReturnedBlocks++;
_bufferPool.CurrentlyRentedBlocks--;
_refCount--;
return IsRetained;
}

public override void Retain()
{
_refCount++;
}
}
}
Expand All @@ -102,6 +113,8 @@ public async Task AdvanceToEndReturnsAllBlocks()
pipe.Reader.AdvanceTo(readResult.Buffer.End);

Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
Assert.Equal(3, pool.ReturnedBlocks);
}

[Fact]
Expand All @@ -128,6 +141,10 @@ public async Task CanWriteAfterReturningMultipleBlocks()

// Try writing more
await pipe.Writer.WriteAsync(new byte[writeSize]);

Assert.Equal(1, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
Assert.Equal(2, pool.ReturnedBlocks);
}

[Fact]
Expand All @@ -141,10 +158,12 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce()
readerWriter.Writer.Complete();
readerWriter.Reader.Complete();
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);

readerWriter.Writer.Complete();
readerWriter.Reader.Complete();
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
Expand Down Expand Up @@ -174,24 +193,28 @@ public void ReturnsWriteHeadOnComplete()
{
var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(512);

pipe.Reader.Complete();
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
public void ReturnsWriteHeadWhenRequestingLargerBlock()
{
var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false));
var memory = pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(4096);

pipe.Reader.Complete();
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(2, pool.ReturnedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}

[Fact]
Expand Down
4 changes: 4 additions & 0 deletions src/System.IO.Pipelines/tests/TestMemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public PooledMemory(OwnedMemory<byte> ownedMemory, TestMemoryPool pool)
_ownedMemory = ownedMemory;
_pool = pool;
_leaser = Environment.StackTrace;
_referenceCount = 1;
}

~PooledMemory()
Expand All @@ -75,12 +76,15 @@ public override MemoryHandle Pin(int byteOffset = 0)
public override void Retain()
{
_pool.CheckDisposed();
_ownedMemory.Retain();
Interlocked.Increment(ref _referenceCount);
}

public override bool Release()
{
_pool.CheckDisposed();
_ownedMemory.Release();

int newRefCount = Interlocked.Decrement(ref _referenceCount);

if (newRefCount < 0)
Expand Down
1 change: 1 addition & 0 deletions src/System.Memory/src/System.Memory.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
<Reference Include="System.Runtime.CompilerServices.Unsafe" />
<Reference Include="System.Threading" />
<Reference Condition="'$(TargetGroup)' != 'netstandard1.1'" Include="System.Numerics.Vectors" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' == 'true'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information.

using System.Runtime.InteropServices;
using System.Threading;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add new line


#if !netstandard
using Internal.Runtime.CompilerServices;
#else
Expand All @@ -21,13 +23,14 @@ private sealed class ArrayMemoryPoolBuffer : OwnedMemory<T>
public ArrayMemoryPoolBuffer(int size)
{
_array = ArrayPool<T>.Shared.Rent(size);
_refCount = 1;
}

public sealed override int Length => _array.Length;

public sealed override bool IsDisposed => _array == null;

protected sealed override bool IsRetained => _refCount > 0;
protected sealed override bool IsRetained => Volatile.Read(ref _refCount) > 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephentoub, does it need to be interlocked read?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be interlocked read?

No, it's only 32-bit, so there's no risk of tearing and Interlocked.Read isn't needed. Volatile.Read may not even be necessary depending on what it's used for, but it's unlikely to hurt and so might as well be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that it's an int.


public sealed override Span<T> Span
{
Expand Down Expand Up @@ -79,22 +82,30 @@ public sealed override MemoryHandle Pin(int byteOffset = 0)

public sealed override void Retain()
{
if (IsDisposed)
ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();

_refCount++;
while (true)
{
int currentCount = Volatile.Read(ref _refCount);
if (currentCount <= 0) ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();
if (Interlocked.CompareExchange(ref _refCount, currentCount + 1, currentCount) == currentCount) break;
}
}

public sealed override bool Release()
{
if (IsDisposed)
ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();

int newRefCount = --_refCount;
if (newRefCount < 0)
ThrowHelper.ThrowInvalidOperationException();

return newRefCount != 0;
while (true)
{
int currentCount = Volatile.Read(ref _refCount);
if (currentCount <= 0) ThrowHelper.ThrowObjectDisposedException_ArrayMemoryPoolBuffer();
if (Interlocked.CompareExchange(ref _refCount, currentCount - 1, currentCount) == currentCount)
{
if (currentCount == 1)
{
Dispose();
return false;
}
return true;
}
}
}
}
}
Expand Down
37 changes: 34 additions & 3 deletions src/System.Memory/tests/MemoryPool/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static void DisposingTheSharedPoolIsANop()
using (OwnedMemory<int> block = mp.Rent(10))
{
Assert.True(block.Length >= 10);
block.Release();
}
}

Expand All @@ -55,6 +56,7 @@ public static void MemoryPoolSpan()
Assert.Equal((IntPtr)newMemoryHandle.Pointer, (IntPtr)pSpan);
}
}
block.Release();
}
}

Expand All @@ -77,6 +79,7 @@ public static void MemoryPoolPin(int byteOffset)
Assert.Equal((IntPtr)pSpan, ((IntPtr)newMemoryHandle.Pointer) - byteOffset);
}
}
block.Release();
}
}

Expand Down Expand Up @@ -109,7 +112,7 @@ public static void MemoryPoolPinOffsetAtEnd()
{
return; // The pool gave us a very large block - too big to compute the byteOffset needed to carry out this test. Skip.
}

using (MemoryHandle newMemoryHandle = block.Pin(byteOffset: byteOffset))
{
unsafe
Expand Down Expand Up @@ -177,6 +180,7 @@ public static void EachRentalIsUniqueUntilDisposed()

foreach (OwnedMemory<int> prior in priorBlocks)
{
prior.Release();
prior.Dispose();
}
}
Expand All @@ -187,6 +191,7 @@ public static void RentWithDefaultSize()
using (OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(minBufferSize: -1))
{
Assert.True(block.Length >= 1);
block.Release();
}
}

Expand Down Expand Up @@ -224,6 +229,7 @@ public static void MemoryPoolTryGetArray()
Assert.Equal((IntPtr)pSpan, (IntPtr)pArray);
}
}
block.Release();
}
}

Expand All @@ -243,10 +249,13 @@ public static void RefCounting()
moreToGo = block.Release();
Assert.True(moreToGo);

moreToGo = block.Release();
Assert.True(moreToGo);

moreToGo = block.Release();
Assert.False(moreToGo);

Assert.Throws<InvalidOperationException>(() => block.Release());
Assert.Throws<ObjectDisposedException>(() => block.Release());
}
}

Expand All @@ -255,7 +264,7 @@ public static void IsDisposed()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
Assert.False(block.IsDisposed);
block.Dispose();
block.Release();
Assert.True(block.IsDisposed);
block.Dispose();
Assert.True(block.IsDisposed);
Expand All @@ -265,6 +274,7 @@ public static void IsDisposed()
public static void ExtraDisposesAreIgnored()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
block.Dispose();
}
Expand All @@ -273,6 +283,7 @@ public static void ExtraDisposesAreIgnored()
public static void NoSpanAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Span.DontBox());
}
Expand All @@ -281,6 +292,7 @@ public static void NoSpanAfterDispose()
public static void NoRetainAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Retain());
}
Expand All @@ -289,6 +301,7 @@ public static void NoRetainAfterDispose()
public static void NoRelease_AfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Release());
}
Expand All @@ -297,6 +310,7 @@ public static void NoRelease_AfterDispose()
public static void NoPinAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => block.Pin());
}
Expand All @@ -306,9 +320,26 @@ public static void NoTryGetArrayAfterDispose()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
Memory<int> memory = block.Memory;
block.Release();
block.Dispose();
Assert.Throws<ObjectDisposedException>(() => MemoryMarshal.TryGetArray(memory, out ArraySegment<int> arraySegment));
}

[Fact]
public static void IsRetainedWhenReturned()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
Assert.False(block.Release());
}

[Fact]
public static void IsDisposedWhenReleased()
{
OwnedMemory<int> block = MemoryPool<int>.Shared.Rent(42);
block.Release();

Assert.True(block.IsDisposed);
}
}
}