Skip to content

Commit

Permalink
Fix batch collection massages through sockets (#7265)
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaszRozmej authored Jul 17, 2024
1 parent bab117f commit 0315f4a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
Expand Down Expand Up @@ -128,7 +129,7 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to
}

disposeCount.Should().Be(messageCount);
cts.Cancel();
await cts.CancelAsync();

return messageCount;
});
Expand Down Expand Up @@ -223,7 +224,7 @@ async Task<int> ReadMessages(Socket socket, IList<byte[]> receivedMessages, Canc
}
}
stream.Close();
cts.Cancel();
await cts.CancelAsync();

return messageCount;
});
Expand Down Expand Up @@ -298,7 +299,7 @@ public async Task Can_send_multiple_messages(int messageCount)
await client.SendJsonRpcResult(result);
await Task.Delay(100);
}
cts.Cancel();
await cts.CancelAsync();

return messageCount;
});
Expand Down Expand Up @@ -340,7 +341,7 @@ public async Task Can_send_collections(int elements)
await client.SendJsonRpcResult(result);

await Task.Delay(100);
cts.Cancel();
await cts.CancelAsync();
});

await Task.WhenAll(sendCollection, server);
Expand Down Expand Up @@ -380,7 +381,7 @@ public async Task Stops_on_limited_body_size(int maxByteCount)
int sent = await client.SendJsonRpcResult(result);

await Task.Delay(100);
cts.Cancel();
await cts.CancelAsync();

return sent;
});
Expand All @@ -391,6 +392,27 @@ public async Task Stops_on_limited_body_size(int maxByteCount)
Assert.That(received, Is.LessThanOrEqualTo(Math.Min(sent, maxByteCount)));
}

[Test]
public async Task Can_serialize_collection()
{
await using MemoryMessageStream stream = new();
EthereumJsonSerializer ethereumJsonSerializer = new();
using JsonRpcSocketsClient<MemoryMessageStream> client = new(
clientName: "TestClient",
stream: stream,
endpointType: RpcEndpoint.Ws,
jsonRpcProcessor: null!,
jsonRpcLocalStats: new NullJsonRpcLocalStats(),
jsonSerializer: ethereumJsonSerializer,
maxBatchResponseBodySize: 10_000
);
using JsonRpcResult result = JsonRpcResult.Collection(RandomBatchResult(10, 100));
await client.SendJsonRpcResult(result);
stream.Seek(0, SeekOrigin.Begin);
JsonRpcSuccessResponse[]? response = ethereumJsonSerializer.Deserialize<JsonRpcSuccessResponse[]>(stream);
response.Should().NotContainNulls();
}

private static async Task<T> OneShotServer<T>(string uri, Func<WebSocket, Task<T>> func)
{
using HttpListener httpListener = new();
Expand Down
30 changes: 30 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc.Test/MemoryMessageStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO;
using System.Threading.Tasks;
using Nethermind.Sockets;

namespace Nethermind.JsonRpc.Test;

public class MemoryMessageStream : MemoryStream, IMessageBorderPreservingStream
{
private static readonly byte Delimiter = Convert.ToByte('\n');

public Task<ReceiveResult?> ReceiveAsync(ArraySegment<byte> buffer)
{
int read = Read(buffer.AsSpan());
return Task.FromResult<ReceiveResult?>(new ReceiveResult
{
Read = read,
EndOfMessage = read > 0 && buffer[read - 1] == Delimiter
});
}

public Task<int> WriteEndOfMessageAsync()
{
WriteByte(Delimiter);
return Task.FromResult(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public virtual async Task<int> SendJsonRpcResult(JsonRpcResult result)
responseSize += 1;
}
isFirst = false;
responseSize += (int)await _jsonSerializer.SerializeAsync(_stream, result.Response, indented: false);
responseSize += (int)await _jsonSerializer.SerializeAsync(_stream, entry.Response, indented: false);
_ = _jsonRpcLocalStats.ReportCall(entry.Report);

// We reached the limit and don't want to responded to more request in the batch
Expand Down

0 comments on commit 0315f4a

Please sign in to comment.