Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Add System.IO.Pipelines API #27007

Merged
merged 14 commits into from
Feb 14, 2018
Merged

Conversation

pakrym
Copy link

@pakrym pakrym commented Feb 9, 2018

94% coverage of new API

Please check csprojes, configuration.props and other infrastructure files. It's my first time adding a package to corefx so I may have done some mistakes there.

@pakrym
Copy link
Author

pakrym commented Feb 9, 2018

Not sure why System.IO.Pipelines ref doesn't pick up System.Memory reference on the first build.

/// </summary>
public sealed class Pipe
{
private const int SegmentPoolSize = 16;
Copy link
Member

@KrzysztofCwalina KrzysztofCwalina Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable in PipeOptions? The number is kind of small

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to try const first with the idea that if you have more you can increase minimum block size and have larger blocks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make it into an option, but it's ok to punt for later.


private readonly MemoryPool<byte> _pool;
private readonly int _minimumSegmentSize;
private readonly long _maximumSizeHigh;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these the backpressure thresholds? If yes, could you rename to use the same terminology as the PipeOptions properties.

ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}

if (minimumSize < 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is zero ok? Why would we allow that? In fact, whenever I write code using our IOutput APIs I need to have a special branch for empty buffer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero is whatever is availible at the moment but not empty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 is also whatever is available at the moment but not zero, correct? If yes, then default being one and zero throwing would be more transparent API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, one is "I need to write one byte", zero is for "whatever is availible"

{
BufferSegment nextSegment = CreateSegmentUnsynchronized();

nextSegment.SetMemory(_pool.Rent(Math.Max(_minimumSegmentSize, minimumSize)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to call rent under the lock? Since pool is an abstraction, we don't know what rent does and so it can cause deadlocks or other synchronization problems.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do everything around it in lock. It might deadlock if pool would try to access the same pipe object on different thread which is not very common situation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's see if we can optimized the locks later.


lock (_sync)
{
BufferSegment segment = _writingHead ?? AllocateWriteHeadUnsynchronized(minimumSize);
Copy link
Member

@KrzysztofCwalina KrzysztofCwalina Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be optimize, if the default scheduler is used? ... in which case the reader is reading on the same thread.
I could imagine doing all but allocations of new segments without a lock if the reader continuations are called on the same thread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if scheduler is inline you don't have any guarantees what thread would call into pipe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but at that point you could do just a simple interlocked read/write, correct? Anyway, I think the code is good as is, I just noticed how much heavy locking we do and it looks like we could optimize this code in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually really hard to remove the locking, we tried several time and failed, each time we ended up locking more 😄 . That said, I'm happy to look at this again once we have lots of tests in place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's see if we can optimized the locks later.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a valid use case for Pipe to have multiple consumers or producers anyway? Seems like that leads to a lot of undefined behavior down the road.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, pipe is single producer single consumer.

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void Advance(int bytesWritten)
{
if (_writingHead == null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can writing head be null? Is the exception going to propagate to user code? If yes, why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you call Advance without calling GetMemory()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the exception will explain the problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptions need to be reviewed. They are from before the IOutput refactor.

/// Create new instance of <see cref="PipeAwaiter{T}"/> that wraps async operation implemented by <paramref name="awaiter"/>
/// </summary>
public PipeAwaiter(IPipeAwaiter<T> awaiter)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Debug.Assert that the awaiter is not null


using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("System.IO.Pipelines.Tests, PublicKey=002400000480000094000000060200000024000052534131000400000100010015c01ae1f50e8cc09ba9eac9147cf8fd9fce2cfe9f8dce4f7301c4132ca9fb50ce8cbf1df4dc18dd4d210e4345c744ecb3365ed327efdbc52603faa5e21daa11234c8c4a73e51f03bf192544581ebe107adee3a34928e39d04e524a9ce729d5090bfd7dad9d10c722c0def9ccc08ff0a03790e48bcd1f9b6c476063e1966a1c4")]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid this. If tests need to extract internals (and this practice should be used with discretion), it should use Reflection to get just what's needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? The ASP.NET uses this pattern for tests all the time. In fact thats the only valid use of internals visible to IMO.


namespace System.IO.Pipelines
{
internal class ThrowHelper
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declare as "internal static class" to avoid creation of unnecessary constructor.


namespace System.IO.Pipelines
{
internal class BufferSegment : IMemoryList<byte>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a class deriving from this. Can this class be sealed?


namespace System.IO.Pipelines
{
internal class PipeCompletionCallbacks
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a class deriving from this. Can this class be sealed?

{
public override void Schedule(Action action)
{
#if NETCOREAPP2_1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This #if maze is too hard to follow. I think it'd be better to have conditionally imported files named ThreadPoolScheduler.NetCoreApp21.cs, ThreadPoolScheduler.NetStandard2.cs instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it? They don't even intersect, three small completely different versions depending on the target.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of commonality is exactly why separate files is preferable to #if's. #if's are our least-preferred mechanism for version-specific coding - mainly to be used when 95% of a file is the same between versions and only a handful of lines need to be tweaked. Here, almost every material line is conditional on something - it's very hard to follow.

/// Requests the <see cref="Memory{Byte}"/> of at least <paramref name="minimumLength"/> in size.
/// If <paramref name="minimumLength"/> is equal to <code>0</code>, currently available memory would get returned.
/// </summary>
Memory<byte> GetMemory(int minimumLength = 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should state in the contract whether returning an empty buffer is ok or not. And I would vote that it's not ok to return empty (at which point we should add some tests to validate it.

Copy link
Author

@pakrym pakrym Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ok to return empty. I'll add some.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

[DebuggerDisplay("IsCompleted: {" + nameof(IsCompleted) + "}")]
internal struct PipeCompletion
{
private static readonly ArrayPool<PipeCompletionCallback> CompletionCallbackPool = ArrayPool<PipeCompletionCallback>.Shared;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static field names should be prefixed with "s_" (saw various instances of this throughout.)

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateArgumentNullException(ExceptionArgument argument) { return new ArgumentNullException(argument.ToString()); }

public static void ThrowInvalidOperationException_NotWritingNoAlloc()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rest of this file should be packed like the first part (use one-line formatting for each method with blank lines only between each group of paired methods.) It's much easier to see the pattern and to figure out what you need to select for copy-pasting when adding a new ThrowHelper.

internal enum ResultFlags : byte
{
None = 0,
Canceled = 1,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For [Flags] enums, it's usually clearer to use hex notation for the values.

public bool IsActive => _state == State.Active;
}

internal enum State: byte
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a private enum defined inside PipeReaderState (assuming the DebuggerDisplay magic would still work)

// might be using tailspace
if (consumed.Index == returnEnd.Length && _writingHead != returnEnd)
{
var nextBlock = returnEnd.NextSegment;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-compliant use of "var"

}
}

private sealed class DefaultPipeReader : PipeReader, IPipeAwaiter<ReadResult>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are non-trivial-sized nested classes in a file that's already quite large. Please move these two to separate files.

public bool IsCanceled { get { throw null; } }
public bool IsCompleted { get { throw null; } }
}
public partial interface IDuplexPipe : System.IDisposable
Copy link
Member

@davidfowl davidfowl Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets get rid of IDisposable. Not critical for this PR but we'll need to make more changes.

public partial interface IBufferWriter<T>
{
void Advance(int count);
System.Memory<byte> GetMemory(int minimumLength = 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug here these should be T not byte

@@ -273,6 +280,7 @@ public static partial class BuffersExtensions
public static void CopyTo<T>(this System.Buffers.ReadOnlySequence<T> sequence, System.Span<T> destination) { }
public static System.Nullable<System.SequencePosition> PositionOf<T>(this System.Buffers.ReadOnlySequence<T> sequence, T value) where T : System.IEquatable<T> { throw null; }
public static T[] ToArray<T>(this System.Buffers.ReadOnlySequence<T> sequence) { throw null; }
public static void Write<T>(this System.Buffers.IBufferWriter<T> bufferWriter, ReadOnlySpan<byte> source) { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this should be ReadOnlySpan<T>

<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
<Compile Include="System\IO\Pipelines\ThrowHelper.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'=='netcoreapp2.1'">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition should look like Condition="'$(TargetGroup)'=='netcoreapp'" instead

<Reference Include="System.Resources.ResourceManager" />
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Threading.Tasks" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are also missing a reference to System.Threading.ThreadPool in order to get the WaitCallback

ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize);
}

lock (_sync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these locks expressions will now throw compiler error CS0656 saying:

System\IO\Pipelines\Pipe.cs(130,13): error CS0656: Missing compiler required member 'System.Threading.Monitor.Exit' [F:\git\corefx\src\System.IO.Pipelines\src\System.IO.Pipelines.csproj]
System\IO\Pipelines\Pipe.cs(130,13): error CS0656: Missing compiler required member 'System.Threading.Monitor.Enter' [F:\git\corefx\src\System.IO.Pipelines\src\System.IO.Pipelines.csproj]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by adding System.Threading reference.

@ghost
Copy link

ghost commented Feb 13, 2018 via email


while (source.Length > 0)
{
int writeSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: int writeSize = destination.Length;
And then we can remove the else condition.

while (source.Length > 0)
{
int writeSize;
if (destination.Length == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from the first time through the loop, this will always be true (since destination = default). Consider special casing the first iteration outside the loop.

writeSize = destination.Length;
}

bufferWriter.Advance(writeSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: advance after the copyto

Copy link
Member

@ahsonkhan ahsonkhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, lgtm

Copy link
Member

@joperezr joperezr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming you plan on adding the netstandard1.1 stuff later this LGTM

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.