diff --git a/Iggy_SDK.sln.DotSettings.user b/Iggy_SDK.sln.DotSettings.user
index 2ec56de..2256bcf 100644
--- a/Iggy_SDK.sln.DotSettings.user
+++ b/Iggy_SDK.sln.DotSettings.user
@@ -49,6 +49,9 @@
<TestId>xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.ToSnakeCaseMessagePolicy</TestId>
<TestId>xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MessageStreamTests.MessageStreamUnitTests.CreateTopicAsync_Returns201Created_WhenSuccessfullyCreated</TestId>
</TestAncestor>
+</SessionState>
+ <SessionState ContinuousTestingMode="0" IsActive="True" Name="Session" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session">
+ <Project Location="/home/numinex/projects/Iggy_SDK/Iggy_SDK_Tests" Presentation="<Iggy_SDK_Tests>" />
</SessionState>
<SessionState ContinuousTestingMode="0" Name="SendMessageAsync_ReturnsNull_Failure" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session">
<TestAncestor>
@@ -73,6 +76,6 @@
<TestId>xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.UtilityTests.JsonConverterTests.Read_DeserializeValidJson_ReturnsMessageResponseList</TestId>
</TestAncestor>
</SessionState>
- <SessionState ContinuousTestingMode="0" IsActive="True" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session">
+ <SessionState ContinuousTestingMode="0" Name="GetTopicByIdAsync_Returns200Ok_WhenFound" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session">
<Project Location="/home/numinex/projects/Iggy_SDK/Iggy_SDK_Tests" Presentation="<Iggy_SDK_Tests>" />
</SessionState>
\ No newline at end of file
diff --git a/Iggy_SDK/Contracts/Http/TopicRequest.cs b/Iggy_SDK/Contracts/Http/TopicRequest.cs
index d6c8647..6f9513c 100644
--- a/Iggy_SDK/Contracts/Http/TopicRequest.cs
+++ b/Iggy_SDK/Contracts/Http/TopicRequest.cs
@@ -4,5 +4,6 @@ public sealed class TopicRequest
{
public required int TopicId { get; init; }
public required string Name { get; init; }
+ public int? MessageExpiry { get; init; }
public required int PartitionsCount{ get; init; }
}
\ No newline at end of file
diff --git a/Iggy_SDK/Contracts/Http/TopicResponse.cs b/Iggy_SDK/Contracts/Http/TopicResponse.cs
index 40d16f3..5d731e0 100644
--- a/Iggy_SDK/Contracts/Http/TopicResponse.cs
+++ b/Iggy_SDK/Contracts/Http/TopicResponse.cs
@@ -1,10 +1,15 @@
+using System.Text.Json.Serialization;
+using Iggy_SDK.JsonConfiguration;
+
namespace Iggy_SDK.Contracts.Http;
+[JsonConverter(typeof(TopicResponseConverter))]
public sealed class TopicResponse
{
public required int Id { get; init; }
public required string Name { get; init; }
public required ulong SizeBytes { get; init; }
+ public int MessageExpiry { get; init; }
public required ulong MessagesCount { get; init; }
public required int PartitionsCount { get; init; }
public IEnumerable? Partitions { get; init; }
diff --git a/Iggy_SDK/Contracts/Tcp/TcpContracts.cs b/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index b1bad7d..f0f420c 100644
--- a/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -214,9 +214,10 @@ private static byte[] GetBytesFromHeader(HeaderKey headerKey, HeaderValue header
}
internal static byte[] CreateStream(StreamRequest request)
{
- Span bytes = stackalloc byte[4 + request.Name.Length];
+ Span bytes = stackalloc byte[4 + request.Name.Length + 1];
BinaryPrimitives.WriteInt32LittleEndian(bytes[..4] , request.StreamId);
- Encoding.UTF8.GetBytes(request.Name, bytes[4..]);
+ bytes[4] = (byte)request.Name.Length;
+ Encoding.UTF8.GetBytes(request.Name, bytes[5..]);
return bytes.ToArray();
}
@@ -272,13 +273,15 @@ internal static byte[] GetGroup(Identifier streamId, Identifier topicId, int gro
internal static byte[] CreateTopic(Identifier streamId, TopicRequest request)
{
- Span bytes = stackalloc byte[2 + streamId.Length + 8 + request.Name.Length];
+ Span bytes = stackalloc byte[2 + streamId.Length + 13 + request.Name.Length];
WriteBytesFromIdentifierToSpan(streamId, bytes);
var streamIdBytesLength = 2 + streamId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[streamIdBytesLength..(streamIdBytesLength + 4)], request.TopicId);
int position = 4 + streamIdBytesLength;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionsCount);
- Encoding.UTF8.GetBytes(request.Name, bytes[(position + 4)..]);
+ BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 4)..(position + 8)], request.MessageExpiry ?? 0);
+ bytes[position + 8] = (byte)request.Name.Length;
+ Encoding.UTF8.GetBytes(request.Name, bytes[(position + 9)..]);
return bytes.ToArray();
}
diff --git a/Iggy_SDK/Factory/MessageStreamFactory.cs b/Iggy_SDK/Factory/MessageStreamFactory.cs
index 2953629..bfb4e49 100644
--- a/Iggy_SDK/Factory/MessageStreamFactory.cs
+++ b/Iggy_SDK/Factory/MessageStreamFactory.cs
@@ -51,7 +51,7 @@ private static TcpMessageStream CreateTcpMessageStream(IMessageStreamConfigurato
});
var messageStream = new TcpMessageStream(socket, channel);
- var messageBus = new MessageBus(socket);
+ var messageBus = new MessageInvoker(socket);
var messageDispatcher = new MessageSenderDispatcher(sendMessagesOptions, channel, messageBus);
messageDispatcher.Start();
diff --git a/Iggy_SDK/JsonConfiguration/TopicResponseConverter.cs b/Iggy_SDK/JsonConfiguration/TopicResponseConverter.cs
new file mode 100644
index 0000000..d03b4a5
--- /dev/null
+++ b/Iggy_SDK/JsonConfiguration/TopicResponseConverter.cs
@@ -0,0 +1,85 @@
+using System.ComponentModel;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using Iggy_SDK.Contracts.Http;
+using Iggy_SDK.Extensions;
+
+namespace Iggy_SDK.JsonConfiguration;
+
+public sealed class TopicResponseConverter : JsonConverter
+{
+ private readonly JsonSerializerOptions _options;
+ public TopicResponseConverter()
+ {
+ _options = new JsonSerializerOptions();
+ _options.PropertyNamingPolicy = new ToSnakeCaseNamingPolicy();
+ }
+
+ public override TopicResponse? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ using var doc = JsonDocument.ParseValue(ref reader);
+ var root = doc.RootElement;
+ var id = root.GetProperty(nameof(TopicResponse.Id).ToSnakeCase()).GetInt32();
+ var name = root.GetProperty(nameof(TopicResponse.Name).ToSnakeCase()).GetString();
+ var sizeBytes = root.GetProperty(nameof(TopicResponse.SizeBytes).ToSnakeCase()).GetUInt64();
+ var messageExpiryProperty = root.GetProperty(nameof(TopicResponse.MessageExpiry).ToSnakeCase());
+
+ var messageExpiry = messageExpiryProperty.ValueKind switch
+ {
+ JsonValueKind.Null => 0,
+ JsonValueKind.Number => messageExpiryProperty.GetInt32(),
+ _ => throw new InvalidEnumArgumentException("Error Wrong JsonValueKind when deserializing MessageExpiry")
+ };
+ var messagesCount = root.GetProperty(nameof(TopicResponse.MessagesCount).ToSnakeCase()).GetUInt64();
+ var partitionsCount = root.GetProperty(nameof(TopicResponse.PartitionsCount).ToSnakeCase()).GetInt32();
+ root.TryGetProperty(nameof(TopicResponse.Partitions).ToSnakeCase(), out var partitionsProperty);
+ var partitions = partitionsProperty.ValueKind switch
+ {
+ JsonValueKind.Null => null,
+ JsonValueKind.Undefined => null,
+ JsonValueKind.Array => DeserializePartitions(partitionsProperty),
+ _ => throw new InvalidEnumArgumentException("Error Wrong JsonValueKind when deserializing Partitions")
+ };
+ return new TopicResponse
+ {
+ Id = id,
+ Name = name!,
+ SizeBytes = sizeBytes,
+ MessageExpiry = messageExpiry,
+ MessagesCount = messagesCount,
+ PartitionsCount = partitionsCount,
+ Partitions = partitions
+ };
+
+ }
+ private IEnumerable DeserializePartitions(JsonElement partitionsElement)
+ {
+ return JsonSerializer.Deserialize>(partitionsElement.GetRawText(), _options)!;
+ }
+
+ public override void Write(Utf8JsonWriter writer, TopicResponse value, JsonSerializerOptions options)
+ {
+ writer.WriteStartObject();
+
+ writer.WriteNumber(nameof(TopicResponse.Id).ToSnakeCase(), value.Id);
+ writer.WriteString(nameof(TopicResponse.Name).ToSnakeCase(), value.Name);
+ writer.WriteNumber(nameof(TopicResponse.SizeBytes).ToSnakeCase(), value.SizeBytes);
+ writer.WriteNumber(nameof(TopicResponse.MessageExpiry).ToSnakeCase(), value.MessageExpiry);
+ writer.WriteNumber(nameof(TopicResponse.MessagesCount).ToSnakeCase(), value.MessagesCount);
+ writer.WriteNumber(nameof(TopicResponse.PartitionsCount).ToSnakeCase(), value.PartitionsCount);
+
+ if (value.Partitions != null)
+ {
+ writer.WriteStartArray(nameof(TopicResponse.Partitions).ToSnakeCase());
+
+ foreach (var partition in value.Partitions)
+ {
+ var partitionJson = JsonSerializer.Serialize(partition, options);
+ writer.WriteRawValue(partitionJson);
+ }
+
+ writer.WriteEndArray();
+ }
+ writer.WriteEndObject();
+ }
+}
\ No newline at end of file
diff --git a/Iggy_SDK/Mappers/BinaryMapper.cs b/Iggy_SDK/Mappers/BinaryMapper.cs
index fc0da83..0273211 100644
--- a/Iggy_SDK/Mappers/BinaryMapper.cs
+++ b/Iggy_SDK/Mappers/BinaryMapper.cs
@@ -303,7 +303,7 @@ internal static IReadOnlyList MapStreams(ReadOnlySpan payl
while (position < length)
{
- (StreamResponse stream, int readBytes) = MapToStreams(payload, position);
+ (StreamResponse stream, int readBytes) = MapToStream(payload, position);
streams.Add(stream);
position += readBytes;
}
@@ -341,10 +341,10 @@ private static (StreamResponse stream, int readBytes) MapToStream(ReadOnlySpan payload, int position)
+ /*private static (StreamResponse stream, int readBytes) MapToStreams(ReadOnlySpan payload, int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
int topicsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 8)..(position + 16)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 16)..(position + 24)]);
- int nameLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 24)..(position + 28)]);
- string name = Encoding.UTF8.GetString(payload[(position + 28)..(position + 28 + nameLength)]);
- int readBytes = 4 + 4 + 8 + 8 + 4 + nameLength;
+ int nameLength = (int)payload[24];
+ string name = Encoding.UTF8.GetString(payload[(position + 25)..(position + 25 + nameLength)]);
+ int readBytes = 4 + 4 + 8 + 8 + 1 + nameLength;
var stream = new StreamResponse
{
@@ -373,7 +373,7 @@ private static (StreamResponse stream, int readBytes) MapToStreams(ReadOnlySpan<
};
return (stream, readBytes);
- }
+ }*/
internal static IReadOnlyList MapTopics(ReadOnlySpan payload)
{
@@ -410,6 +410,7 @@ internal static TopicResponse MapTopic(ReadOnlySpan payload)
Name = topic.Name,
PartitionsCount = topic.PartitionsCount,
Partitions = partitions,
+ MessageExpiry = topic.MessageExpiry,
MessagesCount = topic.MessagesCount,
SizeBytes = topic.SizeBytes
};
@@ -419,17 +420,18 @@ private static (TopicResponse topic, int readBytes) MapToTopic(ReadOnlySpan _responseBuffer = new(new byte[BufferSizes.ExpectedResponseSize]);
- public MessageBus(Socket socket)
+ public MessageInvoker(Socket socket)
{
_socket = socket;
}
diff --git a/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs b/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs
index e755d75..7afc22e 100644
--- a/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs
+++ b/Iggy_SDK/MessagesDispatcher/MessageSenderDispatcher.cs
@@ -14,12 +14,12 @@ internal sealed class MessageSenderDispatcher
private readonly PeriodicTimer _timer;
private Task? _timerTask;
private readonly CancellationTokenSource _cts = new();
- private readonly MessageBus _bus;
+ private readonly MessageInvoker _bus;
private readonly Channel _channel;
private readonly int _maxMessages;
internal MessageSenderDispatcher(SendMessageConfigurator sendMessagesOptions, Channel channel,
- MessageBus bus)
+ MessageInvoker bus)
{
_timer = new PeriodicTimer(sendMessagesOptions.PollingInterval);
_bus = bus;
@@ -142,6 +142,7 @@ public async Task StopAsync()
{
return;
}
+ _timer.Dispose();
_cts.Cancel();
await _timerTask;
_cts.Dispose();
diff --git a/Iggy_SDK_Tests/ContractTests/TcpContract.cs b/Iggy_SDK_Tests/ContractTests/TcpContract.cs
index 70878eb..9382a5a 100644
--- a/Iggy_SDK_Tests/ContractTests/TcpContract.cs
+++ b/Iggy_SDK_Tests/ContractTests/TcpContract.cs
@@ -118,11 +118,11 @@ public void TcpContracts_CreateStream_HasCorrectBytes()
var result = TcpContracts.CreateStream(request).AsSpan();
// Assert
- int expectedBytesLength = sizeof(int) + request.Name.Length;
+ int expectedBytesLength = sizeof(int) + request.Name.Length + 1;
Assert.Equal(expectedBytesLength, result.Length);
- Assert.Equal(request.StreamId, BitConverter.ToInt32(result[..4]));
- Assert.Equal(request.Name, Encoding.UTF8.GetString(result[4..]));
+ Assert.Equal(request.StreamId, BitConverter.ToInt32(result[..5]));
+ Assert.Equal(request.Name, Encoding.UTF8.GetString(result[5..]));
}
[Fact]
@@ -283,7 +283,7 @@ public void TcpContracts_CreateTopic_HasCorrectBytes()
var result = TcpContracts.CreateTopic(streamId, request).AsSpan();
// Assert
- int expectedBytesLength = 2 + streamId.Length + 8 + request.Name.Length;
+ int expectedBytesLength = 2 + streamId.Length + 8 + request.Name.Length + 4 + 1;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
@@ -291,7 +291,9 @@ public void TcpContracts_CreateTopic_HasCorrectBytes()
Assert.Equal(streamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(request.TopicId, BitConverter.ToInt32(result[6..10]));
Assert.Equal(request.PartitionsCount, BitConverter.ToInt32(result[10..14]));
- Assert.Equal(request.Name, Encoding.UTF8.GetString(result[14..]));
+ Assert.Equal(request.MessageExpiry, BitConverter.ToInt32(result[14..18]));
+ Assert.Equal(request.Name.Length , (int)result[18]);
+ Assert.Equal(request.Name, Encoding.UTF8.GetString(result[19..]));
}
diff --git a/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs b/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs
index d1fc3c1..4454589 100644
--- a/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs
+++ b/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs
@@ -165,10 +165,11 @@ public void MapStream_ReturnsValidStreamResponse()
// Arrange
var ( id, topicsCount, sizeBytes, messagesCount, name) = StreamFactory.CreateStreamsResponseFields();
byte[] streamPayload = BinaryFactory.CreateStreamPayload(id, topicsCount, name, sizeBytes, messagesCount);
- var (topicId1, partitionsCount1, topicName1, topicSizeBytes1 ,messagesCountTopic1) =
+ var (topicId1, partitionsCount1, topicName1, messageExpiry1, topicSizeBytes1 ,messagesCountTopic1) =
TopicFactory.CreateTopicResponseFields();
byte[] topicPayload1 = BinaryFactory.CreateTopicPayload(topicId1,
partitionsCount1,
+ messageExpiry1,
topicName1,
topicSizeBytes1,
messagesCountTopic1);
@@ -204,12 +205,12 @@ public void MapStream_ReturnsValidStreamResponse()
public void MapTopics_ReturnsValidTopicsResponses()
{
// Arrange
- var (id1, partitionsCount1, name1, sizeBytesTopic1, messagesCountTopic1) =
+ var (id1, partitionsCount1, name1, messageExpiry1, sizeBytesTopic1, messagesCountTopic1) =
TopicFactory.CreateTopicResponseFields();
- byte[] payload1 = BinaryFactory.CreateTopicPayload(id1, partitionsCount1, name1, sizeBytesTopic1, messagesCountTopic1);
- var (id2, partitionsCount2, name2, sizeBytesTopic2, messagesCountTopic2) =
+ byte[] payload1 = BinaryFactory.CreateTopicPayload(id1, partitionsCount1, messageExpiry1, name1, sizeBytesTopic1, messagesCountTopic1);
+ var (id2, partitionsCount2, name2, messageExpiry2, sizeBytesTopic2, messagesCountTopic2) =
TopicFactory.CreateTopicResponseFields();
- byte[] payload2 = BinaryFactory.CreateTopicPayload(id2, partitionsCount2, name2, sizeBytesTopic2, messagesCountTopic2 );
+ byte[] payload2 = BinaryFactory.CreateTopicPayload(id2, partitionsCount2, messageExpiry2, name2, sizeBytesTopic2, messagesCountTopic2 );
byte[] combinedPayload = new byte[payload1.Length + payload2.Length];
payload1.CopyTo(combinedPayload.AsSpan());
@@ -241,8 +242,8 @@ public void MapTopics_ReturnsValidTopicsResponses()
public void MapTopic_ReturnsValidTopicResponse()
{
// Arrange
- var (topicId, partitionsCount, topicName, sizeBytes, messagesCount) = TopicFactory.CreateTopicResponseFields();
- byte[] topicPayload = BinaryFactory.CreateTopicPayload(topicId, partitionsCount, topicName, sizeBytes, messagesCount);
+ var (topicId, partitionsCount, topicName, messageExpiry, sizeBytes, messagesCount) = TopicFactory.CreateTopicResponseFields();
+ byte[] topicPayload = BinaryFactory.CreateTopicPayload(topicId, partitionsCount, messageExpiry, topicName, sizeBytes, messagesCount);
byte[] combinedPayload = new byte[topicPayload.Length];
topicPayload.CopyTo(combinedPayload.AsSpan());
diff --git a/Iggy_SDK_Tests/Utils/BinaryFactory.cs b/Iggy_SDK_Tests/Utils/BinaryFactory.cs
index 9126e73..4018b26 100644
--- a/Iggy_SDK_Tests/Utils/BinaryFactory.cs
+++ b/Iggy_SDK_Tests/Utils/BinaryFactory.cs
@@ -35,28 +35,29 @@ internal static byte[] CreateMessagePayload(ulong offset, ulong timestamp,int he
internal static byte[] CreateStreamPayload(int id, int topicsCount, string name, ulong sizeBytes, ulong messagesCount)
{
var nameBytes = Encoding.UTF8.GetBytes(name);
- var totalSize = 4 + 4 + 8 + 8 + 4 + nameBytes.Length;
+ var totalSize = 4 + 4 + 8 + 8 + 1 + nameBytes.Length;
var payload = new byte[totalSize];
BinaryPrimitives.WriteInt32LittleEndian(payload, id);
BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan(4), topicsCount);
BinaryPrimitives.WriteUInt64LittleEndian(payload.AsSpan(8), sizeBytes);
BinaryPrimitives.WriteUInt64LittleEndian(payload.AsSpan(16), messagesCount);
- BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan(24), nameBytes.Length);
- nameBytes.CopyTo(payload.AsSpan(28));
+ payload[24] = (byte)nameBytes.Length;
+ nameBytes.CopyTo(payload.AsSpan(25));
return payload;
}
- internal static byte[] CreateTopicPayload(int id, int partitionsCount, string name, ulong sizeBytes, ulong messagesCount)
+ internal static byte[] CreateTopicPayload(int id, int partitionsCount,int messageExpiry, string name, ulong sizeBytes, ulong messagesCount)
{
var nameBytes = Encoding.UTF8.GetBytes(name);
- var totalSize = 4 + 4 + 8 + 8 + 4 + nameBytes.Length;
+ var totalSize = 4 + 4 + 4 + 8 + 8 + 1 + nameBytes.Length;
var payload = new byte[totalSize];
BinaryPrimitives.WriteInt32LittleEndian(payload, id);
BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan(4), partitionsCount);
- BinaryPrimitives.WriteUInt64LittleEndian(payload.AsSpan(8), sizeBytes);
- BinaryPrimitives.WriteUInt64LittleEndian(payload.AsSpan(16), messagesCount);
- BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan(24), nameBytes.Length);
- nameBytes.CopyTo(payload.AsSpan(28));
+ BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan(8),messageExpiry);
+ BinaryPrimitives.WriteUInt64LittleEndian(payload.AsSpan(12), sizeBytes);
+ BinaryPrimitives.WriteUInt64LittleEndian(payload.AsSpan(20), messagesCount);
+ payload[28] = (byte)nameBytes.Length;
+ nameBytes.CopyTo(payload.AsSpan(29));
return payload;
}
diff --git a/Iggy_SDK_Tests/Utils/Topics/TopicFactory.cs b/Iggy_SDK_Tests/Utils/Topics/TopicFactory.cs
index 635a1b1..7a51aa4 100644
--- a/Iggy_SDK_Tests/Utils/Topics/TopicFactory.cs
+++ b/Iggy_SDK_Tests/Utils/Topics/TopicFactory.cs
@@ -5,15 +5,16 @@ namespace Iggy_SDK_Tests.Utils.Topics;
internal static class TopicFactory
{
- internal static (int topicId, int partitionsCount, string topicName, ulong sizeBytes, ulong messagesCount)
+ internal static (int topicId, int partitionsCount, string topicName,int messageExpriy, ulong sizeBytes, ulong messagesCount)
CreateTopicResponseFields()
{
int topicId = Random.Shared.Next(1,69);
int partitionsCount = Random.Shared.Next(1,69);
string topicName = "Topic "+Random.Shared.Next(1,69);
+ int messageExpiry = Random.Shared.Next(1,69);
ulong sizeBytes = (ulong)Random.Shared.Next(1, 69);
ulong messagesCount = (ulong)Random.Shared.Next(69, 42069);
- return (topicId, partitionsCount, topicName, sizeBytes, messagesCount);
+ return (topicId, partitionsCount, topicName, messageExpiry, sizeBytes, messagesCount);
}
internal static TopicRequest CreateTopicRequest()
{
@@ -21,6 +22,7 @@ internal static TopicRequest CreateTopicRequest()
{
Name = "Test Topic" + Random.Shared.Next(1,69),
TopicId = Random.Shared.Next(1,10),
+ MessageExpiry = Random.Shared.Next(1,69),
PartitionsCount = Random.Shared.Next(1,10)
};
}
@@ -33,6 +35,7 @@ internal static TopicResponse CreateTopicsResponse()
Id = Random.Shared.Next(1, 10),
Name = "Test Topic" + Random.Shared.Next(1, 69),
MessagesCount = (ulong)Random.Shared.Next(1, 10),
+ MessageExpiry = Random.Shared.Next(1,69),
PartitionsCount = Random.Shared.Next(1, 10),
SizeBytes = (ulong)Random.Shared.Next(1, 10),
Partitions = new List
diff --git a/Iggy_Sample_Consumer/Program.cs b/Iggy_Sample_Consumer/Program.cs
index a23a67f..f0a7bce 100644
--- a/Iggy_Sample_Consumer/Program.cs
+++ b/Iggy_Sample_Consumer/Program.cs
@@ -12,10 +12,10 @@
var jsonOptions = new JsonSerializerOptions();
jsonOptions.PropertyNamingPolicy = new ToSnakeCaseNamingPolicy();
jsonOptions.WriteIndented = true;
-var protocol = Protocol.Http;
+var protocol = Protocol.Tcp;
var bus = MessageStreamFactory.CreateMessageStream(options =>
{
- options.BaseAdress = "http://localhost:3000";
+ options.BaseAdress = "127.0.0.1:8090";
options.Protocol = protocol;
});
diff --git a/Iggy_Sample_Producer/Program.cs b/Iggy_Sample_Producer/Program.cs
index 2c09035..246a89e 100644
--- a/Iggy_Sample_Producer/Program.cs
+++ b/Iggy_Sample_Producer/Program.cs
@@ -43,13 +43,13 @@
await bus.CreateStreamAsync(new StreamRequest
{
StreamId = streamIdVal,
- Name = "Test Producer Stream",
+ Name = "producer-stream",
});
Console.WriteLine($"Creating topic with id:{topicId}");
await bus.CreateTopicAsync(streamId, new TopicRequest
{
- Name = "Test Topic From Producer Sample",
+ Name = "producer-topic",
PartitionsCount = 3,
TopicId = topicIdVal
});