Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added headers for both tcp and http transports #4

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion Iggy_SDK.sln.DotSettings.user
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=184dd178_002Dd35b_002D4913_002Da4be_002Ded1b141ea05b/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="Read_DeserializeValidJson_ReturnsMessageResponseList #4" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Project Location="/home/numinex/projects/Iggy_SDK/Iggy_SDK_Tests" Presentation="&amp;lt;Iggy_SDK_Tests&amp;gt;" /&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=3c583216_002D7c7a_002D463d_002Da1a9_002D26e77cd2812c/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="Read_DeserializeValidJson_ReturnsMessageResponseList #3" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Read_DeserializeValidJson_ReturnsMessageResponseList&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=42ff71a6_002D2fdf_002D45cd_002Dade6_002D42a7bca41220/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="TcpContracts_UpdateOffset_HasCorrectBytes" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.ContractTests.TcpContract.TcpContracts_UpdateOffset_HasCorrectBytes&lt;/TestId&gt;
Expand All @@ -11,6 +19,11 @@
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MessageStreamTests.HttpMessageStream&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MapperTests.BinaryMapper.MapMessagesTMessage_ReturnsValidMessageResponse&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=4ce624bd_002D9053_002D4ac2_002D9c2e_002Df7fc4613a7dd/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="Read_DeserializeValidJson_ReturnsMessageResponseList" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Read_DeserializeValidJson_ReturnsMessageResponseList&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=594bddbd_002D4029_002D4993_002D9e7a_002D647e1a5e4e6b/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="CreateTopicAsync_Returns201Created_WhenSuccessfullyCreated" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
Expand All @@ -28,14 +41,22 @@
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MessageStreamTests.MessageStreamUnitTests&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=c6e7cdb2_002Da359_002D47ac_002D93bd_002De335f8b8d26f/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="CreateTopicAsync_Returns201Created_WhenSuccessfullyCreated #2" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=be76a793_002D1a58_002D4171_002D970f_002D2f5c4c31c5d6/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="Read_DeserializeValidJson_ReturnsMessageResponseList #2" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Read_DeserializeValidJson_ReturnsMessageResponseList&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=c6e7cdb2_002Da359_002D47ac_002D93bd_002De335f8b8d26f/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="CreateTopicAsync_Returns201Created_WhenSuccessfullyCreated #2" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MessageStreamTests.HttpMessageStream&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MapperTests.BinaryMapper&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.ToSnakeCaseMessagePolicy&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.ContractTests.TcpContract.TcpContracts_MessageSendRequest_HasCorrectBytes&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.ContractTests.TcpContract.TcpContracts_GetOffset_HasCorrectBytes&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.ContractTests.TcpContract&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.MessageConverter_ShouldConvertMessageCorrectly&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Write_ConvertsHttpMessageToJson&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Read_DeserializeValidJson_ReturnsMessageResponseList&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fdae582f_002D649c_002D46a6_002D8ad5_002D876f70bf8246/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
Expand All @@ -53,5 +74,7 @@
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MessageStreamTests.HttpMessageStream&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.ContractTests.TcpContract&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MapperTests.BinaryMapper&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.MessageConverter_ShouldConvertMessageCorrectly&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Write_ConvertsHttpMessageToJson&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String></wpf:ResourceDictionary>
3 changes: 3 additions & 0 deletions Iggy_SDK/Contracts/Http/MessageResponse.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Iggy_SDK.Headers;

namespace Iggy_SDK.Contracts.Http;

public sealed class MessageResponse
Expand All @@ -6,4 +8,5 @@ public sealed class MessageResponse
public required ulong Timestamp { get; init; }
public Guid Id { get; init; }
public required byte[] Payload { get; init; }
public Dictionary<HeaderKey, HeaderValue>? Headers { get; init; }
}
3 changes: 3 additions & 0 deletions Iggy_SDK/Contracts/Http/MessageResponseGeneric.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@

using Iggy_SDK.Headers;

namespace Iggy_SDK.Contracts.Http;

public sealed class MessageResponse<T>
{
public required ulong Offset { get; init; }
public required ulong Timestamp { get; init; }
public Guid Id { get; init; }
public Dictionary<HeaderKey, HeaderValue>? Headers { get; init; }
public required T Message { get; init; }
}
133 changes: 123 additions & 10 deletions Iggy_SDK/Contracts/Tcp/TcpContracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Text;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Enums;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;
using Iggy_SDK.Messages;

Expand All @@ -25,7 +26,7 @@ internal static void GetMessages(Span<byte> bytes, MessageFetchRequest request)
bytes[position + 17] = request.AutoCommit ? (byte)1 : (byte)0;
}
internal static void CreateMessage(Span<byte> bytes, Identifier streamId, Identifier topicId,
Partitioning partitioning, IEnumerable<Message> messages)
Partitioning partitioning, IList<Message> messages)
{
WriteBytesFromStreamAndTopicIdToSpan(streamId , topicId , bytes);
int streamTopicIdPosition = 2 + streamId.Length + 2 + topicId.Length;
Expand All @@ -43,42 +44,75 @@ internal static void CreateMessage(Span<byte> bytes, Identifier streamId, Identi
}
private static Span<byte> HandleMessagesEnumerable(int position, IEnumerable<Message> messages, Span<byte> bytes)
{
Span<byte> emptyHeaders = stackalloc byte[4];

foreach (var message in messages)
{
var idSlice = bytes[position..(position + 16)];
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(idSlice), message.Id);

if (message.Headers is not null)
{
var headersBytes = GetHeadersBytes(message.Headers);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], headersBytes.Length);
headersBytes.CopyTo(bytes[(position + 20)..(position + 20 + headersBytes.Length)]);
position += headersBytes.Length + 20;
}
else
{
emptyHeaders.CopyTo(bytes[(position + 16)..(position + 16 + emptyHeaders.Length)]);
position += 20;
}

BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], message.Payload.Length);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position )..(position + 4)], message.Payload.Length);
var payloadBytes = message.Payload;
var slice = bytes[(position + 20)..];
var slice = bytes[(position + 4)..];
payloadBytes.CopyTo(slice);
position += payloadBytes.Length + 20;
position += payloadBytes.Length + 4;
}

return bytes;
}
private static Span<byte> HandleMessagesArray(int position, Message[] messages, Span<byte> bytes)
{
Span<byte> emptyHeaders = stackalloc byte[4];

ref var start = ref MemoryMarshal.GetArrayDataReference(messages);
ref var end = ref Unsafe.Add(ref start, messages.Length);
while (Unsafe.IsAddressLessThan(ref start, ref end))
{
var idSlice = bytes[position..(position + 16)];
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(idSlice), start.Id);

BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], start.Payload.Length);
if (start.Headers is not null)
{
var headersBytes = GetHeadersBytes(start.Headers);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], headersBytes.Length);
headersBytes.CopyTo(bytes[(position + 20)..(position + 20 + headersBytes.Length)]);
position += headersBytes.Length + 20;
}
else
{
emptyHeaders.CopyTo(bytes[(position + 16)..(position + 16 + emptyHeaders.Length)]);
position += 20;
}

BinaryPrimitives.WriteInt32LittleEndian(bytes[(position )..(position + 4)], start.Payload.Length);
var payloadBytes = start.Payload;
var slice = bytes[(position + 20)..];
var slice = bytes[(position + 4)..];
payloadBytes.CopyTo(slice);
position += payloadBytes.Length + 20;
position += payloadBytes.Length + 4;

start = ref Unsafe.Add(ref start, 1);
}

return bytes;
}

private static Span<byte> HandleMessagesList(int position, List<Message> messages, Span<byte> bytes)
{
Span<byte> emptyHeaders = stackalloc byte[4];

Span<Message> listAsSpan = CollectionsMarshal.AsSpan(messages);
ref var start = ref MemoryMarshal.GetReference(listAsSpan);
ref var end = ref Unsafe.Add(ref start, listAsSpan.Length);
Expand All @@ -87,17 +121,96 @@ private static Span<byte> HandleMessagesList(int position, List<Message> message
var idSlice = bytes[position..(position + 16)];
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(idSlice), start.Id);

BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], start.Payload.Length);
if (start.Headers is not null)
{
var headersBytes = GetHeadersBytes(start.Headers);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], headersBytes.Length);
headersBytes.CopyTo(bytes[(position + 20)..(position + 20 + headersBytes.Length)]);
position += headersBytes.Length + 20;
}
else
{
emptyHeaders.CopyTo(bytes[(position + 16)..(position + 16 + emptyHeaders.Length)]);
position += 20;
}

BinaryPrimitives.WriteInt32LittleEndian(bytes[(position)..(position + 4)], start.Payload.Length);
var payloadBytes = start.Payload;
var slice = bytes[(position + 20)..];
var slice = bytes[(position + 4)..];
payloadBytes.CopyTo(slice);
position += payloadBytes.Length + 20;
position += payloadBytes.Length + 4;

start = ref Unsafe.Add(ref start, 1);
}

return bytes;
}

private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue> headers)
{
var headersLength = headers.Sum(header => 4 + header.Key.Value.Length + 1 + 4 + header.Value.Value.Length);
Span<byte> headersBytes = stackalloc byte[headersLength];
int position = 0;
foreach (var (headerKey, headerValue) in headers)
{
var headerBytes = GetBytesFromHeader(headerKey, headerValue);
headerBytes.CopyTo(headersBytes[position..(position + headerBytes.Length)]);
position += headerBytes.Length;

}
return headersBytes.ToArray();
}

private static byte HeaderKindToByte(HeaderKind kind)
{
return kind switch
{
HeaderKind.Raw => 1,
HeaderKind.String => 2,
HeaderKind.Bool => 3,
HeaderKind.Int32 => 6,
HeaderKind.Int64 => 7,
HeaderKind.Int128 => 8,
HeaderKind.Uint32 => 11,
HeaderKind.Uint64 => 12,
HeaderKind.Uint128 => 13,
HeaderKind.Float32 => 14,
HeaderKind.Float64 => 15,
_ => throw new ArgumentOutOfRangeException(nameof(kind), kind, null)
};
}
private static byte[] GetBytesFromHeader(HeaderKey headerKey, HeaderValue headerValue)
{
var headerBytesLength = 4 + headerKey.Value.Length + 1 + 4 + headerValue.Value.Length;
Span<byte> headerBytes = stackalloc byte[headerBytesLength];

BinaryPrimitives.WriteInt32LittleEndian(headerBytes[..4], headerKey.Value.Length);
var headerKeyBytes = Encoding.UTF8.GetBytes(headerKey.Value);
headerKeyBytes.CopyTo(headerBytes[4..(4 + headerKey.Value.Length)]);

headerBytes[4 + headerKey.Value.Length] = headerValue.Kind switch
{
HeaderKind.Raw => 1,
HeaderKind.String => 2,
HeaderKind.Bool => 3,
HeaderKind.Int32 => 6,
HeaderKind.Int64 => 7,
HeaderKind.Int128 => 8,
HeaderKind.Uint32 => 11,
HeaderKind.Uint64 => 12,
HeaderKind.Uint128 => 13,
HeaderKind.Float32 => 14,
HeaderKind.Float64 => 15,
_ => throw new ArgumentOutOfRangeException()
};

BinaryPrimitives.WriteInt32LittleEndian(
headerBytes[(4 + headerKey.Value.Length + 1)..(4 + headerKey.Value.Length + 1 + 4)],
headerValue.Value.Length);
headerValue.Value.CopyTo(headerBytes[(4 + headerKey.Value.Length + 1 + 4)..]);

return headerBytes.ToArray();
}
internal static byte[] CreateStream(StreamRequest request)
{
Span<byte> bytes = stackalloc byte[4 + request.Name.Length];
Expand Down
32 changes: 30 additions & 2 deletions Iggy_SDK/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,31 @@ internal static UInt128 ToUInt128(this Guid g)
{
Span<byte> array = stackalloc byte[16];
MemoryMarshal.TryWrite(array, ref g);
var hi = BinaryPrimitives.ReadUInt64LittleEndian(array[0..8]);
var hi = BinaryPrimitives.ReadUInt64LittleEndian(array[..8]);
var lo = BinaryPrimitives.ReadUInt64LittleEndian(array[8..16]);
return new UInt128(hi, lo);
}
internal static UInt128 ToUInt128(this byte[] bytes)
{
var hi = BinaryPrimitives.ReadUInt64LittleEndian(bytes[..8]);
var lo = BinaryPrimitives.ReadUInt64LittleEndian(bytes[8..16]);
return new UInt128(hi, lo);
}
internal static Int128 ToInt128(this byte[] bytes)
{
var hi = BinaryPrimitives.ReadUInt64LittleEndian(bytes[..8]);
var lo = BinaryPrimitives.ReadUInt64LittleEndian(bytes[8..16]);
return new Int128(hi, lo);
}
internal static byte[] GetBytesFromGuid(this Guid value)
{
Span<byte> bytes = stackalloc byte[16];
MemoryMarshal.TryWrite(bytes, ref value);
return bytes.ToArray();
}
//TODO - remove result span from this method and return span.ToArray instead
internal static byte[] GetBytesFromUInt128(this UInt128 value)
{

Span<byte> result = stackalloc byte[16];
var span = MemoryMarshal.Cast<UInt128, byte>(MemoryMarshal.CreateReadOnlySpan(ref value, 1));
for (int i = 0; i < 16; i++)
Expand All @@ -50,6 +68,16 @@ internal static byte[] GetBytesFromUInt128(this UInt128 value)
}
return result.ToArray();
}
internal static byte[] GetBytesFromInt128(this Int128 value)
{
Span<byte> result = stackalloc byte[16];
var span = MemoryMarshal.Cast<Int128, byte>(MemoryMarshal.CreateReadOnlySpan(ref value, 1));
for (int i = 0; i < 16; i++)
{
result[i] = span[i];
}
return result.ToArray();
}

internal static UInt128 GetUInt128(this JsonElement jsonElement)
{
Expand Down
22 changes: 22 additions & 0 deletions Iggy_SDK/Headers/HeaderKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace Iggy_SDK.Headers;

public sealed class HeaderKey
{
public required string Value { get; init; }

public static HeaderKey New(string val)
{
return new HeaderKey
{
Value = val.Length is 0 or > 255
? throw new ArgumentException("Value has incorrect size, must be between 1 and 255", nameof(val))
: val
};
}

public override string ToString()
{
return Value;
}
}

16 changes: 16 additions & 0 deletions Iggy_SDK/Headers/HeaderKind.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Iggy_SDK.Headers;

public enum HeaderKind
{
Raw,
String,
Bool,
Int32,
Int64,
Int128,
Uint32,
Uint64,
Uint128,
Float32,
Float64
}
Loading