Skip to content

Commit

Permalink
Implement PAT for tcp protocol && add unit/integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex committed Oct 14, 2023
1 parent 11c70a8 commit 70196f0
Show file tree
Hide file tree
Showing 11 changed files with 673 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ namespace Iggy_SDK.Contracts.Http;
public sealed class CreatePersonalAccessTokenRequest
{
public required string Name { get; init; }
public uint Expiry { get; init; }
public uint? Expiry { get; init; }
}
23 changes: 22 additions & 1 deletion Iggy_SDK/Contracts/Tcp/TcpContracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Iggy_SDK.Enums;
using Iggy_SDK.Extensions;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;
using Iggy_SDK.Messages;
using System.Buffers.Binary;
using System.Runtime.CompilerServices;
Expand All @@ -15,6 +14,28 @@ namespace Iggy_SDK.Contracts.Tcp;
//TODO - write unit tests for all the user related contracts
internal static class TcpContracts
{
internal static byte[] LoginWithPersonalAccessToken(LoginWithPersonalAccessToken request)
{
Span<byte> bytes = stackalloc byte[5 + request.Token.Length];
bytes[0] = (byte)request.Token.Length;
Encoding.UTF8.GetBytes(request.Token, bytes[1..(1 + request.Token.Length)]);
return bytes.ToArray();
}
internal static byte[] DeletePersonalRequestToken(DeletePersonalAccessTokenRequest request)
{
Span<byte> bytes = stackalloc byte[5 + request.Name.Length];
bytes[0] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[1..(1 + request.Name.Length)]);
return bytes.ToArray();
}
internal static byte[] CreatePersonalAccessToken(CreatePersonalAccessTokenRequest request)
{
Span<byte> bytes = stackalloc byte[5 + request.Name.Length];
bytes[0] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[1..(1 + request.Name.Length)]);
BinaryPrimitives.WriteUInt32LittleEndian(bytes[(1 + request.Name.Length)..], request.Expiry ?? 0);
return bytes.ToArray();
}
internal static byte[] GetClient(uint clientId)
{
var bytes = new byte[4];
Expand Down
93 changes: 85 additions & 8 deletions Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,20 +1003,97 @@ public async Task LogoutUser(CancellationToken token = default)
throw new InvalidResponseException($"Invalid response status code: {response.Status}");
}
}
public Task<IReadOnlyList<PersonalAccessTokenResponse>> GetPersonalAccessTokensAsync(CancellationToken token = default)
public async Task<IReadOnlyList<PersonalAccessTokenResponse>> GetPersonalAccessTokensAsync(CancellationToken token = default)
{
throw new NotImplementedException();
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_PERSONAL_ACCESS_TOKENS_CODE);

await _socket.SendAsync(payload, token);

var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _socket.ReceiveAsync(buffer, token);

var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);

if (response.Status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {response.Status}");
}
var responseBuffer = new byte[response.Length];
await _socket.ReceiveAsync(responseBuffer, token);
return BinaryMapper.MapPersonalAccessTokens(responseBuffer);
}
public Task<RawPersonalAccessToken?> CreatePersonalAccessTokenAsync(CreatePersonalAccessTokenRequest request, CancellationToken token = default)
public async Task<RawPersonalAccessToken?> CreatePersonalAccessTokenAsync(CreatePersonalAccessTokenRequest request, CancellationToken token = default)
{
throw new NotImplementedException();
var message = TcpContracts.CreatePersonalAccessToken(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE);

await _socket.SendAsync(payload, token);

var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _socket.ReceiveAsync(buffer, token);

var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);

if (response.Status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {response.Status}");
}
if (response.Length <= 1)
{
return null;
}
var responseBuffer = new byte[response.Length];
await _socket.ReceiveAsync(responseBuffer, token);
return BinaryMapper.MapRawPersonalAccessToken(responseBuffer);
}
public Task DeletePersonalAccessTokenAsync(DeletePersonalAccessTokenRequest request, CancellationToken token = default)
public async Task DeletePersonalAccessTokenAsync(DeletePersonalAccessTokenRequest request, CancellationToken token = default)
{
throw new NotImplementedException();
var message = TcpContracts.DeletePersonalRequestToken(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PERSONAL_ACCESS_TOKEN_CODE);

await _socket.SendAsync(payload, token);

var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _socket.ReceiveAsync(buffer, token);

var status = TcpMessageStreamHelpers.GetResponseStatus(buffer);

if (status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {status}");
}
}
public Task<AuthResponse?> LoginWithPersonalAccessToken(LoginWithPersonalAccessToken request, CancellationToken token = default)
public async Task<AuthResponse?> LoginWithPersonalAccessToken(LoginWithPersonalAccessToken request, CancellationToken token = default)
{
throw new NotImplementedException();
var message = TcpContracts.LoginWithPersonalAccessToken(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);

await _socket.SendAsync(payload, token);

var buffer = new byte[8];
await _socket.ReceiveAsync(buffer, token);

var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);

if (response.Status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {response.Status}");
}
if (response.Length <= 1)
{
return null;
}
var responseBuffer = new byte[response.Length];
await _socket.ReceiveAsync(responseBuffer, token);
var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..4]);
return new AuthResponse
{
UserId = userId
};
}
}
41 changes: 39 additions & 2 deletions Iggy_SDK/Mappers/BinaryMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,44 @@ namespace Iggy_SDK.Mappers;
internal static class BinaryMapper
{
private const int PROPERTIES_SIZE = 45;

internal static RawPersonalAccessToken MapRawPersonalAccessToken(ReadOnlySpan<byte> payload)
{
var tokenLength = payload[0];
var token = Encoding.UTF8.GetString(payload[1..(1 + tokenLength)]);
return new RawPersonalAccessToken
{
Token = token
};
}
internal static IReadOnlyList<PersonalAccessTokenResponse> MapPersonalAccessTokens(ReadOnlySpan<byte> payload)
{
if (payload.Length == 0)
{
return Array.Empty<PersonalAccessTokenResponse>();
}
var result = new List<PersonalAccessTokenResponse>();
int length = payload.Length;
int position = 0;
while (position < length)
{
var (response, readBytes) = MapToPersonalAccessTokenResponse(payload, position);
result.Add(response);
position += readBytes;
}
return result.AsReadOnly();
}
private static (PersonalAccessTokenResponse response, int position) MapToPersonalAccessTokenResponse(ReadOnlySpan<byte> payload, int position)
{
var nameLength = (int)payload[position];
var name = Encoding.UTF8.GetString(payload[(position + 1)..(1 + position + nameLength)]);
var expiry = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 1 + nameLength)..]);
var readBytes = 1 + nameLength + 8;
return (new PersonalAccessTokenResponse
{
Name = name,
Expiry = expiry == 0 ? null : DateTimeOffsetUtils.FromUnixTimeMicroSeconds(expiry)
}, readBytes);
}
internal static IReadOnlyList<UserResponse> MapUsers(ReadOnlySpan<byte> payload)
{
if (payload.Length == 0)
Expand Down Expand Up @@ -746,7 +783,7 @@ private static (ConsumerGroupMember, int readBytes) MapToMember(ReadOnlySpan<byt
}, 8 + partitionsCount * 4);
}

private static (ConsumerGroupResponse consumerGroup, int readBytes) MapToConsumerGroup(ReadOnlySpan<byte> payload,
private static (ConsumerGroupResponse consumerGroup, int readBytes) MapToConsumerGroup(ReadOnlySpan<byte> payload,
int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
Expand Down
4 changes: 4 additions & 0 deletions Iggy_SDK/Utils/CommandCodes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ internal static class CommandCodes
internal const int CHANGE_PASSWORD_CODE = 37;
internal const int LOGIN_USER_CODE = 38;
internal const int LOGOUT_USER_CODE = 39;
internal const int GET_PERSONAL_ACCESS_TOKENS_CODE = 41;
internal const int CREATE_PERSONAL_ACCESS_TOKEN_CODE = 42;
internal const int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43;
internal const int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44;
internal const int POLL_MESSAGES_CODE = 100;
internal const int SEND_MESSAGES_CODE = 101;
internal const int GET_CONSUMER_OFFSET_CODE = 120;
Expand Down
Loading

0 comments on commit 70196f0

Please sign in to comment.