Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add by key batching to fusion. #5934

Merged
merged 2 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Language\src\Language.SyntaxTree\HotChocolate.Language.SyntaxTree.csproj" />
<ProjectReference Include="..\Transport.Sockets\HotChocolate.Transport.Sockets.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
using System;
using System.Collections.Generic;
using HotChocolate.Language;
using static HotChocolate.Transport.Sockets.Client.Properties.SocketClientResources;

namespace HotChocolate.Transport.Sockets.Client;

public readonly struct OperationRequest : IEquatable<OperationRequest>
{
public OperationRequest(
string? query,
string? id,
ObjectValueNode? variables,
ObjectValueNode? extensions)
{
if (query is null && id is null && extensions is null)
{
throw new ArgumentException(OperationRequest_QueryOrPersistedQueryId, nameof(query));
}

Query = query;
Id = id;
VariablesNode = variables;
ExtensionsNode = extensions;
}

public OperationRequest(
string? query = null,
string? id = null,
Expand All @@ -29,8 +47,12 @@ public OperationRequest(

public IReadOnlyDictionary<string, object?>? Variables { get; }

public ObjectValueNode? VariablesNode { get; }

public IReadOnlyDictionary<string, object?>? Extensions { get; }

public ObjectValueNode? ExtensionsNode { get; }

public bool Equals(OperationRequest other)
=> Id == other.Id &&
Query == other.Query &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public DataMessageObserver(string id)
throw _error;
}

_messages.TryDequeue(out IDataMessage? message);
_messages.TryDequeue(out var message);
return message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public async ValueTask InitializeAsync<T>(
CancellationToken cancellationToken = default)
{
var observer = new ConnectionMessageObserver<ConnectionAcceptMessage>(cancellationToken);
using IDisposable subscription = context.Messages.Subscribe(observer);
using var subscription = context.Messages.Subscribe(observer);
await context.Socket.SendConnectionInitMessage(payload, cancellationToken);
await observer.Accepted;
}
Expand All @@ -32,7 +32,7 @@ public async ValueTask<SocketResult> ExecuteAsync(
var id = Guid.NewGuid().ToString("N");
var observer = new DataMessageObserver(id);
var completion = new DataCompletion(context.Socket, id);
IDisposable subscription = context.Messages.Subscribe(observer);
var subscription = context.Messages.Subscribe(observer);

await context.Socket.SendSubscribeMessageAsync(id, request, cancellationToken);

Expand Down Expand Up @@ -62,9 +62,9 @@ public ValueTask OnReceiveAsync(
try
{
document = JsonDocument.Parse(message);
JsonElement root = document.RootElement;
var root = document.RootElement;

if (root.TryGetProperty(TypeProp, out JsonElement typeProp))
if (root.TryGetProperty(TypeProp, out var typeProp))
{
if (typeProp.ValueEquals(Utf8Messages.Ping))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static async ValueTask SendConnectionInitMessage<T>(
jsonWriter.WritePropertyName(PayloadProp);
JsonSerializer.Serialize(jsonWriter, payload, JsonDefaults.SerializerOptions);
}

jsonWriter.WriteEndObject();
await jsonWriter.FlushAsync(ct).ConfigureAwait(false);

Expand All @@ -44,6 +44,7 @@ public static async ValueTask SendSubscribeMessageAsync(
{
using var arrayWriter = new ArrayWriter();
await using var jsonWriter = new Utf8JsonWriter(arrayWriter, JsonDefaults.WriterOptions);

jsonWriter.WriteStartObject();
jsonWriter.WriteString(IdProp, operationSessionId);
jsonWriter.WriteString(TypeProp, Utf8Messages.Subscribe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private CompleteMessage(string id)

public static CompleteMessage From(JsonDocument document)
{
JsonElement root = document.RootElement;
var root = document.RootElement;
var id = root.GetProperty(Utf8MessageProperties.IdProp).GetString()!;
return new CompleteMessage(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ private ErrorMessage(string id, OperationResult payload)

public static ErrorMessage From(JsonDocument document)
{
JsonElement root = document.RootElement;
var root = document.RootElement;
var id = root.GetProperty(Utf8MessageProperties.IdProp).GetString()!;

JsonElement payload = root.GetProperty(Utf8MessageProperties.PayloadProp);
var payload = root.GetProperty(Utf8MessageProperties.PayloadProp);
var result = new OperationResult(document, errors: payload);

return new ErrorMessage(id, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ private NextMessage(string id, OperationResult payload)

public static NextMessage From(JsonDocument document)
{
JsonElement root = document.RootElement;
var root = document.RootElement;
var id = root.GetProperty(IdProp).GetString()!;

JsonElement payload = root.GetProperty(PayloadProp);
var payload = root.GetProperty(PayloadProp);
var result = new OperationResult(
document,
TryGetProperty(payload, DataProp),
Expand All @@ -34,7 +34,7 @@ public static NextMessage From(JsonDocument document)
}

private static JsonElement? TryGetProperty(JsonElement element, ReadOnlySpan<byte> name)
=> element.TryGetProperty(name, out JsonElement property)
=> element.TryGetProperty(name, out var property)
? property
: null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private void Unsubscribe(Subscription subscription)

private void OnNext(IOperationMessage value, ImmutableList<Subscription> subscriptions)
{
foreach (Subscription subscription in subscriptions)
foreach (var subscription in subscriptions)
{
subscription.Observer.OnNext(value);
}
Expand All @@ -43,7 +43,7 @@ private void OnNext(IOperationMessage value, ImmutableList<Subscription> subscri

private void OnError(Exception error, ImmutableList<Subscription> subscriptions)
{
foreach (Subscription subscription in subscriptions)
foreach (var subscription in subscriptions)
{
subscription.Observer.OnError(error);
}
Expand All @@ -53,7 +53,7 @@ private void OnError(Exception error, ImmutableList<Subscription> subscriptions)

private void OnCompleted(ImmutableList<Subscription> subscriptions)
{
foreach (Subscription subscription in subscriptions)
foreach (var subscription in subscriptions)
{
subscription.Observer.OnCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace HotChocolate.Transport.Sockets.Client;

public class SocketClient : ISocket
public sealed class SocketClient : ISocket
{
private static readonly IProtocolHandler[] _protocolHandlers =
{
Expand Down Expand Up @@ -156,7 +156,7 @@ async Task<bool> ISocket.ReadMessageAsync(
socketResult = await _socket.ReceiveAsync(arraySegment, cancellationToken);

// copy message segment to writer.
Memory<byte> memory = writer.GetMemory(socketResult.Count);
var memory = writer.GetMemory(socketResult.Count);
buffer.AsSpan().Slice(0, socketResult.Count).CopyTo(memory.Span);
writer.Advance(socketResult.Count);
read += socketResult.Count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,28 @@ private Operation CreateOperation(
variants[item.Key] = item.Value;
}

// we will complete the selection variants, sets and selections
// without sealing them so that analyzers in this step can fully
// inspect them.
var variantsSpan = variants.AsSpan();
ref var variantsStart = ref GetReference(variantsSpan);
ref var variantsEnd = ref Unsafe.Add(ref variantsStart, variantsSpan.Length);

while (Unsafe.IsAddressLessThan(ref variantsStart, ref variantsEnd))
{
variantsStart.Complete();
variantsStart = ref Unsafe.Add(ref variantsStart, 1);
}

#if NET5_0_OR_GREATER
ref var optSpace = ref GetReference(AsSpan(_operationOptimizers));
var optSpan = AsSpan(_operationOptimizers);
ref var optStart = ref GetReference(optSpan);
ref var optEnd = ref Unsafe.Add(ref optStart, optSpan.Length);

for (var i = 0; i < _operationOptimizers.Count; i++)
while (Unsafe.IsAddressLessThan(ref optStart, ref optEnd))
{
Unsafe.Add(ref optSpace, i).OptimizeOperation(context);
optStart.OptimizeOperation(context);
optStart = ref Unsafe.Add(ref optStart, 1);
}
#else
for (var i = 0; i < _operationOptimizers.Count; i++)
Expand All @@ -224,11 +240,14 @@ private Operation CreateOperation(

CompleteResolvers(schema);

ref var varSpace = ref GetReference(variants.AsSpan());
variantsSpan = variants.AsSpan();
variantsStart = ref GetReference(variantsSpan);
variantsEnd = ref Unsafe.Add(ref variantsStart, variantsSpan.Length);

for (var i = 0; i < _operationOptimizers.Count; i++)
while (Unsafe.IsAddressLessThan(ref variantsStart, ref variantsEnd))
{
Unsafe.Add(ref varSpace, i).Seal();
variantsStart.Seal();
variantsStart = ref Unsafe.Add(ref variantsStart, 1);
}
}

Expand Down
19 changes: 17 additions & 2 deletions src/HotChocolate/Core/src/Execution/Processing/Selection.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using HotChocolate.Execution.Properties;
using HotChocolate.Language;
using HotChocolate.Resolvers;
using HotChocolate.Types;
using Microsoft.Extensions.ObjectPool;

namespace HotChocolate.Execution.Processing;

Expand Down Expand Up @@ -313,6 +311,23 @@ internal void MarkAsStream(long ifCondition)
_flags |= Flags.Stream;
}

/// <summary>
/// Completes the selection without sealing it.
/// </summary>
internal void Complete(ISelectionSet declaringSelectionSet)
{
Debug.Assert(declaringSelectionSet is not null);

if ((_flags & Flags.Sealed) != Flags.Sealed)
{
DeclaringSelectionSet = declaringSelectionSet;
}

Debug.Assert(
ReferenceEquals(declaringSelectionSet, DeclaringSelectionSet),
"Selections can only belong to a single selectionSet.");
}

internal void Seal(ISelectionSet declaringSelectionSet)
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
Expand Down
15 changes: 15 additions & 0 deletions src/HotChocolate/Core/src/Execution/Processing/SelectionSet.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using HotChocolate.Language;

namespace HotChocolate.Execution.Processing;

Expand Down Expand Up @@ -56,6 +57,20 @@ public SelectionSet(
/// <inheritdoc />
public IReadOnlyList<IFragment> Fragments => _fragments;

/// <summary>
/// Completes the selection set without sealing it.
/// </summary>
internal void Complete()
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
{
for (var i = 0; i < _selections.Length; i++)
{
_selections[i].Complete(this);
}
}
}

internal void Seal()
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
Expand Down
Loading