Skip to content

Commit

Permalink
Solution cleanup and todos setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex committed Aug 25, 2023
1 parent 737240c commit b68b991
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 39 deletions.
1 change: 1 addition & 0 deletions Iggy_SDK.sln.DotSettings.user
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=0f4657e0_002Da261_002D4925_002D80bd_002Dca9a229a0627/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" IsActive="True" Name="MapMessages_NoHeaders_ReturnsValidMessageResponses #2" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
&lt;TestAncestor&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MapperTests.BinaryMapper.MapMessages_NoHeaders_ReturnsValidMessageResponses&lt;/TestId&gt;
&lt;TestId&gt;xUnit::8514D555-35CF-4538-97D3-DC42A182B694::net7.0::Iggy_SDK_Tests.MapperTests.BinaryMapper.MapMessagesTMessage_NoHeaders_ReturnsValidMessageResponse&lt;/TestId&gt;
&lt;/TestAncestor&gt;
&lt;/SessionState&gt;</s:String>
<s:String x:Key="/Default/Environment/UnitTesting/UnitTestSessionStore/Sessions/=184dd178_002Dd35b_002D4913_002Da4be_002Ded1b141ea05b/@EntryIndexedValue">&lt;SessionState ContinuousTestingMode="0" Name="Read_DeserializeValidJson_ReturnsMessageResponseList #4" xmlns="urn:schemas-jetbrains-com:jetbrains-ut-session"&gt;
Expand Down
1 change: 1 addition & 0 deletions Iggy_SDK/Configuration/IMessageStreamConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Iggy_SDK.Configuration;

//TODO - refactor this to be more expressive (e.g nested configurators etc..)
public interface IMessageStreamConfigurator
{
public string BaseAdress { get; set; }
Expand Down
1 change: 1 addition & 0 deletions Iggy_SDK/Contracts/Http/MessageResponse.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Iggy_SDK.Enums;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;

Expand Down
1 change: 1 addition & 0 deletions Iggy_SDK/Contracts/Http/MessageResponseGeneric.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

using Iggy_SDK.Enums;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;

Expand Down
31 changes: 16 additions & 15 deletions Iggy_SDK/Contracts/Tcp/TcpContracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ internal static void GetMessages(Span<byte> bytes, MessageFetchRequest request)

bytes[position + 17] = request.AutoCommit ? (byte)1 : (byte)0;
}
internal static void GetMessagesLazy(Span<byte> bytes, MessageFetchRequest request)
{
bytes[0] = GetConsumerTypeByte(request.Consumer.Type);
BinaryPrimitives.WriteInt32LittleEndian(bytes[1..5], request.Consumer.Id);
WriteBytesFromStreamAndTopicIdToSpan(request.StreamId, request.TopicId, bytes, 5);
var position = 5 + 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionId);
bytes[position + 4] = GetPollingStrategyByte(request.PollingStrategy.Kind);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 5)..(position + 13)], request.PollingStrategy.Value);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 13)..(position + 17)], 1);

bytes[position + 17] = request.AutoCommit ? (byte)1 : (byte)0;
}
//TODO - since message is of type IList maybe I can simplife the HandleMessages methods.
internal static void CreateMessage(Span<byte> bytes, Identifier streamId, Identifier topicId,
Partitioning partitioning, IList<Message> messages)
{
Expand Down Expand Up @@ -161,6 +175,7 @@ private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue> headers
return headersBytes.ToArray();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte HeaderKindToByte(HeaderKind kind)
{
return kind switch
Expand Down Expand Up @@ -188,21 +203,7 @@ private static byte[] GetBytesFromHeader(HeaderKey headerKey, HeaderValue header
var headerKeyBytes = Encoding.UTF8.GetBytes(headerKey.Value);
headerKeyBytes.CopyTo(headerBytes[4..(4 + headerKey.Value.Length)]);

headerBytes[4 + headerKey.Value.Length] = headerValue.Kind switch
{
HeaderKind.Raw => 1,
HeaderKind.String => 2,
HeaderKind.Bool => 3,
HeaderKind.Int32 => 6,
HeaderKind.Int64 => 7,
HeaderKind.Int128 => 8,
HeaderKind.Uint32 => 11,
HeaderKind.Uint64 => 12,
HeaderKind.Uint128 => 13,
HeaderKind.Float32 => 14,
HeaderKind.Float64 => 15,
_ => throw new ArgumentOutOfRangeException()
};
headerBytes[4 + headerKey.Value.Length] = HeaderKindToByte(headerValue.Kind);

BinaryPrimitives.WriteInt32LittleEndian(
headerBytes[(4 + headerKey.Value.Length + 1)..(4 + headerKey.Value.Length + 1 + 4)],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Iggy_SDK.Kinds;
namespace Iggy_SDK.Enums;

public enum MessageState
{
Expand Down
3 changes: 0 additions & 3 deletions Iggy_SDK/Factory/MessageStreamFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public static IMessageStream CreateMessageStream(Action<IMessageStreamConfigurat
};
}



private static TcpMessageStream CreateTcpMessageStream(IMessageStreamConfigurator options)
{
var urlPortSplitter = options.BaseAdress.Split(":");
Expand All @@ -39,7 +37,6 @@ private static TcpMessageStream CreateTcpMessageStream(IMessageStreamConfigurato
socket.ReceiveBufferSize = options.ReceiveBufferSize;
return new TcpMessageStream(socket);
}


private static HttpMessageStream CreateHttpMessageStream(IMessageStreamConfigurator options)
{
Expand Down
1 change: 1 addition & 0 deletions Iggy_SDK/JsonConfiguration/MessageResponseConverter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Enums;
using Iggy_SDK.Extensions;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Enums;
using Iggy_SDK.Extensions;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;
Expand Down
68 changes: 65 additions & 3 deletions Iggy_SDK/Mappers/BinaryMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
using System.Buffers.Binary;
using System.Text;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Enums;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;
using Iggy_SDK.Utils;

namespace Iggy_SDK.Mappers;
//TODO - major refactor, look for repeating pieces of code (kind maps, etc..)
internal static class BinaryMapper
{
private const int PROPERTIES_SIZE = 45;
Expand All @@ -21,7 +22,69 @@ internal static OffsetResponse MapOffsets(ReadOnlySpan<byte> payload)
ConsumerId = consumerId
};
}


internal static MessageResponse MapMessage(ReadOnlySpan<byte> payload,
Func<byte[], byte[]>? decryptor = null)
{
int length = payload.Length;
int position = 4;
if (position >= length)
{
throw new ArgumentOutOfRangeException();
}

ulong offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[position..(position + 8)]);
var state = payload[position + 8] switch
{
1 => MessageState.Available,
10 => MessageState.Unavailable,
20 => MessageState.Poisoned,
30 => MessageState.MarkedForDeletion,
_ => throw new ArgumentOutOfRangeException()
};
ulong timestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 9)..(position + 17)]);
var id = new Guid(payload[(position + 17)..(position + 33)]);
var checksum = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 33)..(position + 37)]);
int headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 37)..(position + 41)]);

var headers = headersLength switch
{
0 => null,
> 0 => MapHeaders(payload[(position + 41)..(position + 41 + headersLength)]),
< 0 => throw new ArgumentOutOfRangeException()
};
position += headersLength;
uint messageLength = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 41)..(position + 45)]);

int payloadRangeStart = position + PROPERTIES_SIZE;
int payloadRangeEnd = position + PROPERTIES_SIZE + (int)messageLength;

var payloadSlice = payload[payloadRangeStart..payloadRangeEnd];
var messagePayload = ArrayPool<byte>.Shared.Rent(payloadSlice.Length);
var payloadSliceLen = payloadSlice.Length;

try
{
payloadSlice.CopyTo(messagePayload.AsSpan()[..payloadSliceLen]);

return new MessageResponse
{
Offset = offset,
Timestamp = timestamp,
Id = id,
Checksum = checksum,
State = state,
Headers = headers,
Payload = decryptor is not null
? decryptor(messagePayload[..payloadSliceLen])
: messagePayload[..payloadSliceLen]
};
}
finally
{
ArrayPool<byte>.Shared.Return(messagePayload);
}
}
internal static IReadOnlyList<MessageResponse> MapMessages(ReadOnlySpan<byte> payload, Func<byte[], byte[]>? decryptor = null)
{
int length = payload.Length;
Expand Down Expand Up @@ -464,7 +527,6 @@ internal static Stats MapStats(ReadOnlySpan<byte> payload)
internal static ConsumerGroupResponse MapConsumerGroup(ReadOnlySpan<byte> payload)
{
(ConsumerGroupResponse consumerGroup, int position) = MapToConsumerGroup(payload, 0);

return consumerGroup;
}
private static (ConsumerGroupResponse consumerGroup, int readBytes) MapToConsumerGroup(ReadOnlySpan<byte> payload,
Expand Down
4 changes: 4 additions & 0 deletions Iggy_SDK/MessageStream/IMessageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Iggy_SDK.MessageStream;

//TODO - look into creating another overload for PollMessages method that will use IAsyncEnumerable as return type.
//TODO - look into making the (de)cryptor and (de)serializer lambdas async
//TODO - create a SendMessage method that uses polling under the hood to collect batch of messages
public interface IMessageClient
{
Task SendMessagesAsync(Identifier streamId, Identifier topicId, MessageSendRequest request, Func<byte[], byte[]>?
Expand All @@ -18,5 +20,7 @@ Task<IReadOnlyList<MessageResponse>> PollMessagesAsync(MessageFetchRequest reque
CancellationToken token = default);
Task<IReadOnlyList<MessageResponse<TMessage>>> PollMessagesAsync<TMessage>(MessageFetchRequest request,
Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default);
// IAsyncEnumerable<MessageResponse> LazyPollMessagesAsync(MessageFetchRequest request, Func<byte[], byte[]>? decryptor = null,
// CancellationToken token = default);

}
72 changes: 67 additions & 5 deletions Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public sealed class TcpMessageStream : IMessageStream, IDisposable
private const int EXPECTED_RESPONSE_SIZE = 8;
private readonly Socket _socket;

private Memory<byte> _buffer = new(new byte[EXPECTED_RESPONSE_SIZE]);
private Memory<byte> _responseBuffer = new(new byte[EXPECTED_RESPONSE_SIZE]);

internal TcpMessageStream(Socket socket)
{
Expand Down Expand Up @@ -243,12 +243,12 @@ public async Task SendMessagesAsync(Identifier streamId, Identifier topicId, Mes
messages);
CreatePayload(payload.AsSpan()[..payloadBufferSize], message.AsSpan()[..messageBufferSize], CommandCodes.SEND_MESSAGES_CODE);

var recv = _socket.ReceiveAsync(_buffer, token);
var recv = _socket.ReceiveAsync(_responseBuffer, token);
await _socket.SendAsync(payload.AsMemory()[..payloadBufferSize], token);

await recv;

var status = GetResponseStatus(_buffer.Span);
var status = GetResponseStatus(_responseBuffer.Span);
if (status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {status}");
Expand Down Expand Up @@ -294,12 +294,12 @@ public async Task SendMessagesAsync<TMessage>(Identifier streamId, Identifier to
messagesToSend);
CreatePayload(payload.AsSpan()[..payloadBufferSize], message.AsSpan()[..messageBufferSize], CommandCodes.SEND_MESSAGES_CODE);

var recv = _socket.ReceiveAsync(_buffer, token);
var recv = _socket.ReceiveAsync(_responseBuffer, token);
await _socket.SendAsync(payload.AsMemory()[..payloadBufferSize], token);

await recv;

var status = GetResponseStatus(_buffer.Span);
var status = GetResponseStatus(_responseBuffer.Span);
if (status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {status}");
Expand All @@ -312,6 +312,7 @@ public async Task SendMessagesAsync<TMessage>(Identifier streamId, Identifier to
ArrayPool<byte>.Shared.Return(payload);
}
}
//TODO - explore simplifying this method since messages is of type IList
private static int CalculateMessageBytesCount(IList<Message> messages)
{
return messages switch
Expand Down Expand Up @@ -428,6 +429,67 @@ public async Task<IReadOnlyList<MessageResponse<TMessage>>> PollMessagesAsync<TM
ArrayPool<byte>.Shared.Return(buffer);
}
}

public async IAsyncEnumerable<MessageResponse> LazyPollMessagesAsync(MessageFetchRequest request, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default)

Check warning on line 433 in Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Async-iterator 'TcpMessageStream.LazyPollMessagesAsync(MessageFetchRequest, Func<byte[], byte[]>?, CancellationToken)' has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed

Check warning on line 433 in Iggy_SDK/MessageStream/Implementations/TcpMessageStream.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Async-iterator 'TcpMessageStream.LazyPollMessagesAsync(MessageFetchRequest, Func<byte[], byte[]>?, CancellationToken)' has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed
{
int messageBufferSize = 18 + 5 + 2 + request.StreamId.Length + 2 + request.TopicId.Length;
int payloadBufferSize = messageBufferSize + 4 + INITIAL_BYTES_LENGTH;
var message = ArrayPool<byte>.Shared.Rent(messageBufferSize);
var payload = ArrayPool<byte>.Shared.Rent(payloadBufferSize);

int pollingCount = 0;
TcpContracts.GetMessagesLazy(message.AsSpan()[..messageBufferSize], request);
CreatePayload(payload, message.AsSpan()[..messageBufferSize], CommandCodes.POLL_MESSAGES_CODE);
while (pollingCount < request.Count || token.IsCancellationRequested)
{
try
{

await _socket.SendAsync(payload.AsMemory()[..payloadBufferSize], token);
}
finally
{
ArrayPool<byte>.Shared.Return(message);
ArrayPool<byte>.Shared.Return(payload);
}

var buffer = ArrayPool<byte>.Shared.Rent(EXPECTED_RESPONSE_SIZE);
try
{
await _socket.ReceiveAsync(buffer.AsMemory()[..EXPECTED_RESPONSE_SIZE], token);

var response = GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
throw new TcpInvalidResponseException();
}

if (response.Length <= 1)
{
yield break;
}

var responseBuffer = ArrayPool<byte>.Shared.Rent(response.Length);

try
{
await _socket.ReceiveAsync(responseBuffer.AsMemory()[..response.Length], token);
yield return BinaryMapper.MapMessage(
responseBuffer.AsSpan()[..response.Length], decryptor);
pollingCount++;
}
finally
{
ArrayPool<byte>.Shared.Return(responseBuffer);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}

public async Task<IReadOnlyList<MessageResponse>> PollMessagesAsync(MessageFetchRequest request,
Func<byte[], byte[]>? decryptor = null, CancellationToken token = default)
{
Expand Down
26 changes: 14 additions & 12 deletions Iggy_Sample_Consumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,20 @@ async Task ConsumeMessages()
envelope.Payload = Encoding.UTF8.GetString(serializedData, 4 + messageTypeLength, serializedData.Length - (4 + messageTypeLength));
return envelope;
};
byte[] key = {
0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6,
0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c,
0xa8, 0x8d, 0x2d, 0x0a, 0x9f, 0x9d, 0xea, 0x43,
0x6c, 0x25, 0x17, 0x13, 0x20, 0x45, 0x78, 0xc8
};
byte[] iv = {
0x5f, 0x8a, 0xe4, 0x78, 0x9c, 0x3d, 0x2b, 0x0f,
0x12, 0x6a, 0x7e, 0x45, 0x91, 0xba, 0xdf, 0x33
};
Func<byte[], byte[]> decryptor = payload =>
Func<byte[], byte[]> decryptor = static payload =>
{
byte[] key =
{
0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6,
0xab, 0xf7, 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c,
0xa8, 0x8d, 0x2d, 0x0a, 0x9f, 0x9d, 0xea, 0x43,
0x6c, 0x25, 0x17, 0x13, 0x20, 0x45, 0x78, 0xc8
};
byte[] iv =
{
0x5f, 0x8a, 0xe4, 0x78, 0x9c, 0x3d, 0x2b, 0x0f,
0x12, 0x6a, 0x7e, 0x45, 0x91, 0xba, 0xdf, 0x33
};
using Aes aes = Aes.Create();
ICryptoTransform decryptor = aes.CreateDecryptor(key, iv);
using MemoryStream memoryStream = new MemoryStream(payload);
Expand All @@ -71,7 +73,7 @@ async Task ConsumeMessages()
{
var messages = await bus.PollMessagesAsync<Envelope>(new MessageFetchRequest
{
Consumer = Consumer.New(1),
Consumer = Consumer.New(consumerId),
Count = 1,
TopicId = topicId,
StreamId = streamId,
Expand Down

0 comments on commit b68b991

Please sign in to comment.