Skip to content

Commit

Permalink
Implemented MessageExpiry feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex committed Aug 29, 2023
1 parent ab59f4e commit 62f456b
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 52 deletions.
5 changes: 4 additions & 1 deletion Iggy_SDK.sln.DotSettings.user
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
<TestId>xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.ToSnakeCaseMessagePolicy</TestId>
<TestId>xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MessageStreamTests.MessageStreamUnitTests.CreateTopicAsync_Returns201Created_WhenSuccessfullyCreated</TestId>
</TestAncestor>
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=84f674e3_002D0649_002D4ca4_002D8b7f_002D4077673c82b2/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="Session" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Project Location="/home/numinex/projects/Iggy_SDK/Iggy_SDK_Tests" Presentation="&amp;lt;Iggy_SDK_Tests&amp;gt;" /&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=940baf9d_002D8d34_002D4b30_002D94a7_002Dbe4712069236/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="SendMessageAsync_ReturnsNull_Failure" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
Expand All @@ -73,6 +76,6 @@
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Read_DeserializeValidJson_ReturnsMessageResponseList&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fdae582f_002D649c_002D46a6_002D8ad5_002D876f70bf8246/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=fdae582f_002D649c_002D46a6_002D8ad5_002D876f70bf8246/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;Project Location="/home/numinex/projects/Iggy_SDK/Iggy_SDK_Tests" Presentation="&amp;lt;Iggy_SDK_Tests&amp;gt;" /&gt;
&lt;/SessionState&gt;</s:String></wpf:ResourceDictionary>
1 change: 1 addition & 0 deletions Iggy_SDK/Contracts/Http/TopicRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ public sealed class TopicRequest
{
public required int TopicId { get; init; }
public required string Name { get; init; }
public int? MessageExpiry { get; init; }
public required int PartitionsCount{ get; init; }
}
5 changes: 5 additions & 0 deletions Iggy_SDK/Contracts/Http/TopicResponse.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
using System.Text.Json.Serialization;
using Iggy_SDK.JsonConfiguration;

namespace Iggy_SDK.Contracts.Http;

[JsonConverter(typeof(TopicResponseConverter))]
public sealed class TopicResponse
{
public required int Id { get; init; }
public required string Name { get; init; }
public required ulong SizeBytes { get; init; }
public int MessageExpiry { get; init; }
public required ulong MessagesCount { get; init; }
public required int PartitionsCount { get; init; }
public IEnumerable<PartitionContract>? Partitions { get; init; }
Expand Down
11 changes: 7 additions & 4 deletions Iggy_SDK/Contracts/Tcp/TcpContracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ private static byte[] GetBytesFromHeader(HeaderKey headerKey, HeaderValue header
}
internal static byte[] CreateStream(StreamRequest request)
{
Span<byte> bytes = stackalloc byte[4 + request.Name.Length];
Span<byte> bytes = stackalloc byte[4 + request.Name.Length + 1];
BinaryPrimitives.WriteInt32LittleEndian(bytes[..4] , request.StreamId);
Encoding.UTF8.GetBytes(request.Name, bytes[4..]);
bytes[4] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[5..]);
return bytes.ToArray();
}

Expand Down Expand Up @@ -272,13 +273,15 @@ internal static byte[] GetGroup(Identifier streamId, Identifier topicId, int gro

internal static byte[] CreateTopic(Identifier streamId, TopicRequest request)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 8 + request.Name.Length];
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 13 + request.Name.Length];
WriteBytesFromIdentifierToSpan(streamId, bytes);
var streamIdBytesLength = 2 + streamId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[streamIdBytesLength..(streamIdBytesLength + 4)], request.TopicId);
int position = 4 + streamIdBytesLength;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionsCount);
Encoding.UTF8.GetBytes(request.Name, bytes[(position + 4)..]);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 4)..(position + 8)], request.MessageExpiry ?? 0);
bytes[position + 8] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[(position + 9)..]);
return bytes.ToArray();
}

Expand Down
2 changes: 1 addition & 1 deletion Iggy_SDK/Factory/MessageStreamFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private static TcpMessageStream CreateTcpMessageStream(IMessageStreamConfigurato
});

var messageStream = new TcpMessageStream(socket, channel);
var messageBus = new MessageBus(socket);
var messageBus = new MessageInvoker(socket);
var messageDispatcher = new MessageSenderDispatcher(sendMessagesOptions, channel, messageBus);

messageDispatcher.Start();
Expand Down
85 changes: 85 additions & 0 deletions Iggy_SDK/JsonConfiguration/TopicResponseConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System.ComponentModel;
using System.Text.Json;
using System.Text.Json.Serialization;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Extensions;

namespace Iggy_SDK.JsonConfiguration;

public sealed class TopicResponseConverter : JsonConverter<TopicResponse>
{
private readonly JsonSerializerOptions _options;
public TopicResponseConverter()
{
_options = new JsonSerializerOptions();
_options.PropertyNamingPolicy = new ToSnakeCaseNamingPolicy();
}

public override TopicResponse? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
using var doc = JsonDocument.ParseValue(ref reader);
var root = doc.RootElement;
var id = root.GetProperty(nameof(TopicResponse.Id).ToSnakeCase()).GetInt32();
var name = root.GetProperty(nameof(TopicResponse.Name).ToSnakeCase()).GetString();
var sizeBytes = root.GetProperty(nameof(TopicResponse.SizeBytes).ToSnakeCase()).GetUInt64();
var messageExpiryProperty = root.GetProperty(nameof(TopicResponse.MessageExpiry).ToSnakeCase());

var messageExpiry = messageExpiryProperty.ValueKind switch
{
JsonValueKind.Null => 0,
JsonValueKind.Number => messageExpiryProperty.GetInt32(),
_ => throw new InvalidEnumArgumentException("Error Wrong JsonValueKind when deserializing MessageExpiry")
};
var messagesCount = root.GetProperty(nameof(TopicResponse.MessagesCount).ToSnakeCase()).GetUInt64();
var partitionsCount = root.GetProperty(nameof(TopicResponse.PartitionsCount).ToSnakeCase()).GetInt32();
root.TryGetProperty(nameof(TopicResponse.Partitions).ToSnakeCase(), out var partitionsProperty);
var partitions = partitionsProperty.ValueKind switch
{
JsonValueKind.Null => null,
JsonValueKind.Undefined => null,
JsonValueKind.Array => DeserializePartitions(partitionsProperty),
_ => throw new InvalidEnumArgumentException("Error Wrong JsonValueKind when deserializing Partitions")
};
return new TopicResponse
{
Id = id,
Name = name!,
SizeBytes = sizeBytes,
MessageExpiry = messageExpiry,
MessagesCount = messagesCount,
PartitionsCount = partitionsCount,
Partitions = partitions
};

}
private IEnumerable<PartitionContract> DeserializePartitions(JsonElement partitionsElement)
{
return JsonSerializer.Deserialize<IEnumerable<PartitionContract>>(partitionsElement.GetRawText(), _options)!;
}

public override void Write(Utf8JsonWriter writer, TopicResponse value, JsonSerializerOptions options)
{
writer.WriteStartObject();

writer.WriteNumber(nameof(TopicResponse.Id).ToSnakeCase(), value.Id);
writer.WriteString(nameof(TopicResponse.Name).ToSnakeCase(), value.Name);
writer.WriteNumber(nameof(TopicResponse.SizeBytes).ToSnakeCase(), value.SizeBytes);
writer.WriteNumber(nameof(TopicResponse.MessageExpiry).ToSnakeCase(), value.MessageExpiry);
writer.WriteNumber(nameof(TopicResponse.MessagesCount).ToSnakeCase(), value.MessagesCount);
writer.WriteNumber(nameof(TopicResponse.PartitionsCount).ToSnakeCase(), value.PartitionsCount);

if (value.Partitions != null)
{
writer.WriteStartArray(nameof(TopicResponse.Partitions).ToSnakeCase());

foreach (var partition in value.Partitions)
{
var partitionJson = JsonSerializer.Serialize(partition, options);
writer.WriteRawValue(partitionJson);
}

writer.WriteEndArray();
}
writer.WriteEndObject();
}
}
32 changes: 17 additions & 15 deletions Iggy_SDK/Mappers/BinaryMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ internal static IReadOnlyList<StreamResponse> MapStreams(ReadOnlySpan<byte> payl

while (position < length)
{
(StreamResponse stream, int readBytes) = MapToStreams(payload, position);
(StreamResponse stream, int readBytes) = MapToStream(payload, position);
streams.Add(stream);
position += readBytes;
}
Expand Down Expand Up @@ -341,10 +341,10 @@ private static (StreamResponse stream, int readBytes) MapToStream(ReadOnlySpan<b
int topicsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 8)..(position + 16)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 16)..(position + 24)]);
int nameLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 24)..(position + 28)]);
int nameLength = (int)payload[position + 24];

string name = Encoding.UTF8.GetString(payload[(position + 28)..(position + 28 + nameLength)]);
int readBytes = 4 + 4 + 8 + 8 + 4 + nameLength;
string name = Encoding.UTF8.GetString(payload[(position + 25)..(position + 25 + nameLength)]);
int readBytes = 4 + 4 + 8 + 8 + 1 + nameLength;

return (
new StreamResponse
Expand All @@ -353,15 +353,15 @@ private static (StreamResponse stream, int readBytes) MapToStream(ReadOnlySpan<b
}, readBytes);
}

private static (StreamResponse stream, int readBytes) MapToStreams(ReadOnlySpan<byte> payload, int position)
/*private static (StreamResponse stream, int readBytes) MapToStreams(ReadOnlySpan<byte> payload, int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
int topicsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 8)..(position + 16)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 16)..(position + 24)]);
int nameLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 24)..(position + 28)]);
string name = Encoding.UTF8.GetString(payload[(position + 28)..(position + 28 + nameLength)]);
int readBytes = 4 + 4 + 8 + 8 + 4 + nameLength;
int nameLength = (int)payload[24];
string name = Encoding.UTF8.GetString(payload[(position + 25)..(position + 25 + nameLength)]);
int readBytes = 4 + 4 + 8 + 8 + 1 + nameLength;
var stream = new StreamResponse
{
Expand All @@ -373,7 +373,7 @@ private static (StreamResponse stream, int readBytes) MapToStreams(ReadOnlySpan<
};
return (stream, readBytes);
}
}*/

internal static IReadOnlyList<TopicResponse> MapTopics(ReadOnlySpan<byte> payload)
{
Expand Down Expand Up @@ -410,6 +410,7 @@ internal static TopicResponse MapTopic(ReadOnlySpan<byte> payload)
Name = topic.Name,
PartitionsCount = topic.PartitionsCount,
Partitions = partitions,
MessageExpiry = topic.MessageExpiry,
MessagesCount = topic.MessagesCount,
SizeBytes = topic.SizeBytes
};
Expand All @@ -419,17 +420,18 @@ private static (TopicResponse topic, int readBytes) MapToTopic(ReadOnlySpan<byte
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
int partitionsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 8)..(position + 16)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 16)..(position + 24)]);
int nameLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 24)..(position + 28)]);
string name = Encoding.UTF8.GetString(payload[(position + 28)..(position + 28 + nameLength)]);
int readBytes = 4 + 4 + 8 + 8 + 4 + nameLength;
int messageExpiry = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 8)..(position + 12)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 12)..(position + 20)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 20)..(position + 28)]);
int nameLength = (int)payload[position + 28];
string name = Encoding.UTF8.GetString(payload[(position + 29)..(position + 29 + nameLength)]);
int readBytes = 4 + 4 + 4 + 8 + 8 + 1 + nameLength;

return (
new TopicResponse
{
Id = id, PartitionsCount = partitionsCount, Name = name, MessagesCount = messagesCount,
SizeBytes = sizeBytes
SizeBytes = sizeBytes, MessageExpiry = messageExpiry
}, readBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@

namespace Iggy_SDK.MessagesDispatcher;

internal sealed class MessageBus
internal sealed class MessageInvoker
{
private readonly Socket _socket;

//TODO - make this readonly
private Memory<byte> _responseBuffer = new(new byte[BufferSizes.ExpectedResponseSize]);

public MessageBus(Socket socket)
public MessageInvoker(Socket socket)
{
_socket = socket;
}
Expand Down
5 changes: 3 additions & 2 deletions Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ internal sealed class MessageSenderDispatcher
private readonly PeriodicTimer _timer;
private Task? _timerTask;
private readonly CancellationTokenSource _cts = new();
private readonly MessageBus _bus;
private readonly MessageInvoker _bus;
private readonly Channel<MessageSendRequest> _channel;
private readonly int _maxMessages;

internal MessageSenderDispatcher(SendMessageConfigurator sendMessagesOptions, Channel<MessageSendRequest> channel,
MessageBus bus)
MessageInvoker bus)
{
_timer = new PeriodicTimer(sendMessagesOptions.PollingInterval);
_bus = bus;
Expand Down Expand Up @@ -142,6 +142,7 @@ public async Task StopAsync()
{
return;
}
_timer.Dispose();
_cts.Cancel();
await _timerTask;
_cts.Dispose();
Expand Down
12 changes: 7 additions & 5 deletions Iggy_SDK_Tests/ContractTests/TcpContract.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ public void TcpContracts_CreateStream_HasCorrectBytes()
var result = TcpContracts.CreateStream(request).AsSpan();

// Assert
int expectedBytesLength = sizeof(int) + request.Name.Length;
int expectedBytesLength = sizeof(int) + request.Name.Length + 1;

Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(request.StreamId, BitConverter.ToInt32(result[..4]));
Assert.Equal(request.Name, Encoding.UTF8.GetString(result[4..]));
Assert.Equal(request.StreamId, BitConverter.ToInt32(result[..5]));
Assert.Equal(request.Name, Encoding.UTF8.GetString(result[5..]));
}

[Fact]
Expand Down Expand Up @@ -283,15 +283,17 @@ public void TcpContracts_CreateTopic_HasCorrectBytes()
var result = TcpContracts.CreateTopic(streamId, request).AsSpan();

// Assert
int expectedBytesLength = 2 + streamId.Length + 8 + request.Name.Length;
int expectedBytesLength = 2 + streamId.Length + 8 + request.Name.Length + 4 + 1;

Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(streamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(streamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(request.TopicId, BitConverter.ToInt32(result[6..10]));
Assert.Equal(request.PartitionsCount, BitConverter.ToInt32(result[10..14]));
Assert.Equal(request.Name, Encoding.UTF8.GetString(result[14..]));
Assert.Equal(request.MessageExpiry, BitConverter.ToInt32(result[14..18]));
Assert.Equal(request.Name.Length , (int)result[18]);
Assert.Equal(request.Name, Encoding.UTF8.GetString(result[19..]));
}


Expand Down
15 changes: 8 additions & 7 deletions Iggy_SDK_Tests/MapperTests/BinaryMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ public void MapStream_ReturnsValidStreamResponse()
// Arrange
var ( id, topicsCount, sizeBytes, messagesCount, name) = StreamFactory.CreateStreamsResponseFields();
byte[] streamPayload = BinaryFactory.CreateStreamPayload(id, topicsCount, name, sizeBytes, messagesCount);
var (topicId1, partitionsCount1, topicName1, topicSizeBytes1 ,messagesCountTopic1) =
var (topicId1, partitionsCount1, topicName1, messageExpiry1, topicSizeBytes1 ,messagesCountTopic1) =
TopicFactory.CreateTopicResponseFields();
byte[] topicPayload1 = BinaryFactory.CreateTopicPayload(topicId1,
partitionsCount1,
messageExpiry1,
topicName1,
topicSizeBytes1,
messagesCountTopic1);
Expand Down Expand Up @@ -204,12 +205,12 @@ public void MapStream_ReturnsValidStreamResponse()
public void MapTopics_ReturnsValidTopicsResponses()
{
// Arrange
var (id1, partitionsCount1, name1, sizeBytesTopic1, messagesCountTopic1) =
var (id1, partitionsCount1, name1, messageExpiry1, sizeBytesTopic1, messagesCountTopic1) =
TopicFactory.CreateTopicResponseFields();
byte[] payload1 = BinaryFactory.CreateTopicPayload(id1, partitionsCount1, name1, sizeBytesTopic1, messagesCountTopic1);
var (id2, partitionsCount2, name2, sizeBytesTopic2, messagesCountTopic2) =
byte[] payload1 = BinaryFactory.CreateTopicPayload(id1, partitionsCount1, messageExpiry1, name1, sizeBytesTopic1, messagesCountTopic1);
var (id2, partitionsCount2, name2, messageExpiry2, sizeBytesTopic2, messagesCountTopic2) =
TopicFactory.CreateTopicResponseFields();
byte[] payload2 = BinaryFactory.CreateTopicPayload(id2, partitionsCount2, name2, sizeBytesTopic2, messagesCountTopic2 );
byte[] payload2 = BinaryFactory.CreateTopicPayload(id2, partitionsCount2, messageExpiry2, name2, sizeBytesTopic2, messagesCountTopic2 );

byte[] combinedPayload = new byte[payload1.Length + payload2.Length];
payload1.CopyTo(combinedPayload.AsSpan());
Expand Down Expand Up @@ -241,8 +242,8 @@ public void MapTopics_ReturnsValidTopicsResponses()
public void MapTopic_ReturnsValidTopicResponse()
{
// Arrange
var (topicId, partitionsCount, topicName, sizeBytes, messagesCount) = TopicFactory.CreateTopicResponseFields();
byte[] topicPayload = BinaryFactory.CreateTopicPayload(topicId, partitionsCount, topicName, sizeBytes, messagesCount);
var (topicId, partitionsCount, topicName, messageExpiry, sizeBytes, messagesCount) = TopicFactory.CreateTopicResponseFields();
byte[] topicPayload = BinaryFactory.CreateTopicPayload(topicId, partitionsCount, messageExpiry, topicName, sizeBytes, messagesCount);

byte[] combinedPayload = new byte[topicPayload.Length];
topicPayload.CopyTo(combinedPayload.AsSpan());
Expand Down
Loading

0 comments on commit 62f456b

Please sign in to comment.