Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SseFormatter #109832

Merged
merged 13 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@

namespace System.Net.ServerSentEvents
{
public static partial class SseFormatter
{
public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable<System.Net.ServerSentEvents.SseItem<string>> source, System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.Threading.Tasks.Task WriteAsync<T>(System.Collections.Generic.IAsyncEnumerable<System.Net.ServerSentEvents.SseItem<T>> source, System.IO.Stream destination, System.Action<System.Buffers.IBufferWriter<byte>, System.Net.ServerSentEvents.SseItem<T>> itemFormatter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
}
public delegate T SseItemParser<out T>(string eventType, System.ReadOnlySpan<byte> data);
public readonly partial struct SseItem<T>
{
private readonly T _Data_k__BackingField;
private readonly object _dummy;
private readonly int _dummyPrimitive;
public SseItem(T data, string? eventType) { throw null; }
public SseItem(T data, string? eventType = null) { throw null; }
public T Data { get { throw null; } }
public string? EventId { get { throw null; } init { } }
public string EventType { get { throw null; } }
}
public static partial class SseParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
<Compile Include="System.Net.ServerSentEvents.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<Compile Include="$(CoreLibSharedDir)System\Runtime\CompilerServices\IsExternalInit.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<PackageReference Include="System.Memory" Version="$(SystemMemoryVersion)" />
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\src\Microsoft.Bcl.AsyncInterfaces.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,7 @@
<data name="InvalidOperation_EnumerateOnlyOnce" xml:space="preserve">
<value>The enumerable may be enumerated only once.</value>
</data>
<data name="ArgumentException_MustNotContainLineBreaks" xml:space="preserve">
<value>Parameter should not contain any line breaks.</value>
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ System.Net.ServerSentEvents.SseParser</PackageDescription>
</PropertyGroup>

<ItemGroup>
<Compile Include="System\Net\ServerSentEvents\Helpers.cs" />
<Compile Include="System\Net\ServerSentEvents\PooledByteBufferWriter.cs" />
<Compile Include="System\Net\ServerSentEvents\SseFormatter.cs" />
<Compile Include="System\Net\ServerSentEvents\SseParser_1.cs" />
<Compile Include="System\Net\ServerSentEvents\SseItem.cs" />
<Compile Include="System\Net\ServerSentEvents\SseItemParser.cs" />
<Compile Include="System\Net\ServerSentEvents\SseParser.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<Compile Include="$(CoreLibSharedDir)System\Runtime\CompilerServices\IsExternalInit.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\src\Microsoft.Bcl.AsyncInterfaces.csproj" />

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net.ServerSentEvents
{
internal static class Helpers
{
public static unsafe void WriteAsUtf8String(this IBufferWriter<byte> bufferWriter, ReadOnlySpan<char> value)
{
if (value.IsEmpty)
{
return;
}

int maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length);
Span<byte> buffer = bufferWriter.GetSpan(maxByteCount);
int bytesWritten;
#if NET
bytesWritten = Encoding.UTF8.GetBytes(value, buffer);
#else
fixed (char* chars = value)
fixed (byte* bytes = buffer)
{
bytesWritten = Encoding.UTF8.GetBytes(chars, value.Length, bytes, maxByteCount);
}
#endif
bufferWriter.Advance(bytesWritten);
}

public static void ValidateParameterDoesNotContainLineBreaks(string? input, string paramName)
{
if (input?.Contains('\n') is true)
{
Throw(paramName);
static void Throw(string parameterName) => throw new ArgumentException(SR.ArgumentException_MustNotContainLineBreaks, parameterName);
}
}

#if !NET
public static bool Contains(this string text, char character) => text.IndexOf(character) >= 0;

public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment))
{
return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken));
}
else
{
return WriteAsyncUsingPooledBuffer(stream, buffer, cancellationToken);

static async ValueTask WriteAsyncUsingPooledBuffer(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
buffer.Span.CopyTo(sharedBuffer);
try
{
await stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
}
finally
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
{
ArrayPool<byte>.Shared.Return(sharedBuffer);
}
}
}
}
#endif
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Diagnostics;

namespace System.Net.ServerSentEvents
{
internal sealed class PooledByteBufferWriter : IBufferWriter<byte>, IDisposable
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
{
private byte[] _rentedBuffer;
private int _index;
private const int MinimumBufferSize = 256;

// Value copied from Array.MaxLength in System.Private.CoreLib/src/libraries/System.Private.CoreLib/src/System/Array.cs.
public const int MaximumBufferSize = 0X7FFFFFC7;

public PooledByteBufferWriter(int initialCapacity = MinimumBufferSize)
{
Debug.Assert(initialCapacity > 0);

_rentedBuffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = 0;
}

public ReadOnlyMemory<byte> WrittenMemory
{
get
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(_index <= _rentedBuffer.Length);
return _rentedBuffer.AsMemory(0, _index);
}
}

public int WrittenCount
{
get
{
Debug.Assert(_rentedBuffer != null);
return _index;
}
}

public int Capacity
{
get
{
Debug.Assert(_rentedBuffer != null);
return _rentedBuffer.Length;
}
}

public void Reset()
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(_index <= _rentedBuffer.Length);

_rentedBuffer.AsSpan(0, _index).Clear();
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
_index = 0;
}

// Returns the rented buffer back to the pool
public void Dispose()
{
if (_rentedBuffer == null)
{
return;
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
}

Reset();
byte[] toReturn = _rentedBuffer;
_rentedBuffer = null!;
ArrayPool<byte>.Shared.Return(toReturn);
}

public void Advance(int count)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(count >= 0);
Debug.Assert(_index <= _rentedBuffer.Length - count);
_index += count;
}

public Memory<byte> GetMemory(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsMemory(_index);
}

public Span<byte> GetSpan(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsSpan(_index);
}

private void CheckAndResizeBuffer(int sizeHint)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(sizeHint >= 0);

int currentLength = _rentedBuffer.Length;
int availableSpace = currentLength - _index;

if (sizeHint == 0)
{
sizeHint = MinimumBufferSize;
}

// If we've reached ~1GB written, grow to the maximum buffer
// length to avoid incessant minimal growths causing perf issues.
if (_index >= MaximumBufferSize / 2)
{
sizeHint = Math.Max(sizeHint, MaximumBufferSize - currentLength);
}

if (sizeHint > availableSpace)
{
int growBy = Math.Max(sizeHint, currentLength);

int newSize = currentLength + growBy;

if ((uint)newSize > MaximumBufferSize)
{
newSize = currentLength + sizeHint;
if ((uint)newSize > MaximumBufferSize)
{
Throw();
static void Throw() => throw new OutOfMemoryException();
eiriktsarpalis marked this conversation as resolved.
Show resolved Hide resolved
}
}

byte[] oldBuffer = _rentedBuffer;

_rentedBuffer = ArrayPool<byte>.Shared.Rent(newSize);

Debug.Assert(oldBuffer.Length >= _index);
Debug.Assert(_rentedBuffer.Length >= _index);

Span<byte> oldBufferAsSpan = oldBuffer.AsSpan(0, _index);
oldBufferAsSpan.CopyTo(_rentedBuffer);
oldBufferAsSpan.Clear();
ArrayPool<byte>.Shared.Return(oldBuffer);
}

Debug.Assert(_rentedBuffer.Length - _index > 0);
Debug.Assert(_rentedBuffer.Length - _index >= sizeHint);
}
}
}
Loading
Loading