Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [DEVEX-222] Add built-in serialization #329

Draft
wants to merge 4 commits into
base: DEVEX-185-Rebranding
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
namespace EventStore.Client;

public enum SchemaDefinitionType {
Undefined = 0,
Json = 1,
Protobuf = 2,
Avro = 3,
Bytes = 4
}

public abstract class KurrentClientSerializationSettings {
public abstract SchemaDefinitionType SchemaType { get; }

public static KurrentClientSerializationSettings Default(
SchemaDefinitionType schemaDefinitionType = SchemaDefinitionType.Json
) =>
schemaDefinitionType switch {
SchemaDefinitionType.Json => new SystemTextJsonSerializationSettings(),
SchemaDefinitionType.Protobuf => throw new NotImplementedException("Not implemented yet, sorry!"),
SchemaDefinitionType.Avro => throw new NotImplementedException("Not implemented yet, sorry!"),
SchemaDefinitionType.Bytes => new BytesSerializationSettings(),
SchemaDefinitionType.Undefined => throw new NotImplementedException("Not implemented yet, sorry!"),
_ => throw new ArgumentOutOfRangeException(nameof(schemaDefinitionType), schemaDefinitionType, null)
};

public static JSONSerializationSettings Json() =>
new JSONSerializationSettings();

public static BytesSerializationSettings Bytes() =>
new BytesSerializationSettings();

public static CustomSerializationSettings Custom() =>
new CustomSerializationSettings();
}

public class JSONSerializationSettings: KurrentClientSerializationSettings {
public override SchemaDefinitionType SchemaType { get => SchemaDefinitionType.Json; }
}

public class SystemTextJsonSerializationSettings: JSONSerializationSettings {
}

public class BytesSerializationSettings: KurrentClientSerializationSettings {
public override SchemaDefinitionType SchemaType { get => SchemaDefinitionType.Bytes; }
}

public class CustomSerializationSettings: KurrentClientSerializationSettings {
public override SchemaDefinitionType SchemaType { get => SchemaDefinitionType.Undefined; }
}
14 changes: 14 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ public partial class KurrentClientSettings {
public static KurrentClientSettings Create(string connectionString) =>
ConnectionStringParser.Parse(connectionString);

/// <summary>
/// Creates client settings from a connection string with additional configuration
/// </summary>
/// <param name="connectionString"></param>
/// <param name="configure">allows you to make additional customization of client settings</param>
/// <returns></returns>
public static KurrentClientSettings Create(string connectionString, Action<KurrentClientSettings> configure) {
var settings = ConnectionStringParser.Parse(connectionString);

configure(settings);

return settings;
}

private static class ConnectionStringParser {
private const string SchemeSeparator = "://";
private const string UserInfoSeparator = "@";
Expand Down
3 changes: 3 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,8 @@ public partial class KurrentClientSettings {
/// The default deadline for calls. Will not be applied to reads or subscriptions.
/// </summary>
public TimeSpan? DefaultDeadline { get; set; } = TimeSpan.FromSeconds(10);

// TODO: This should be a dictionary or class with registered serializers for different types
public KurrentClientSerializationSettings Serialization = KurrentClientSerializationSettings.Default();
}
}
22 changes: 19 additions & 3 deletions src/Kurrent.Client/Core/ResolvedEvent.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client.Serialization;

namespace EventStore.Client {
/// <summary>
/// A structure representing a single event or a resolved link event.
Expand Down Expand Up @@ -43,18 +46,31 @@ public readonly struct ResolvedEvent {
/// </summary>
public bool IsResolved => Link != null && Event != null;

readonly ISchemaSerializer _serializer;

/// <summary>
/// Constructs a new <see cref="ResolvedEvent"/>.
/// </summary>
/// <param name="event"></param>
/// <param name="link"></param>
/// <param name="commitPosition"></param>
public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) {
Event = @event;
Link = link;
/// <param name="serializer"></param>
public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition, ISchemaSerializer serializer) {
Event = @event;
Link = link;
_serializer = serializer;
OriginalPosition = commitPosition.HasValue
? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition)
: new Position?();
}

#if NET48
public bool TryDeserialize(out object? deserialized) {
#else
public bool TryDeserialize([NotNullWhen(true)] out object? deserialized) {
#endif
deserialized = _serializer.Deserialize(OriginalEvent.Data, OriginalEvent.EventType);
return deserialized != null;
}
}
}
52 changes: 52 additions & 0 deletions src/Kurrent.Client/Core/Serialization/EventTypeMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Collections.Concurrent;

namespace Kurrent.Client.Tests.Streams.Serialization;

// TODO: Discuss how to proceed with that and whether to move the Schema Registry code here
// The scanning part and registration seems to be more robust there
// I used this for simplicity
public interface IEventTypeMapper {
void AddCustomMap<T>(string eventTypeName);
void AddCustomMap(Type eventType, string eventTypeName);
string ToName<TEventType>();
string ToName(Type eventType);
Type? ToType(string eventTypeName);
}

public class EventTypeMapper : IEventTypeMapper {
public static readonly EventTypeMapper Instance = new();

private readonly ConcurrentDictionary<string, Type?> typeMap = new();
private readonly ConcurrentDictionary<Type, string> typeNameMap = new();

public void AddCustomMap<T>(string eventTypeName) => AddCustomMap(typeof(T), eventTypeName);

public void AddCustomMap(Type eventType, string eventTypeName)
{
typeNameMap.AddOrUpdate(eventType, eventTypeName, (_, typeName) => typeName);
typeMap.AddOrUpdate(eventTypeName, eventType, (_, type) => type);
}

public string ToName<TEventType>() => ToName(typeof(TEventType));

public string ToName(Type eventType) => typeNameMap.GetOrAdd(eventType, _ =>
{
var eventTypeName = eventType.FullName!;

typeMap.TryAdd(eventTypeName, eventType);

return eventTypeName;
});

public Type? ToType(string eventTypeName) => typeMap.GetOrAdd(eventTypeName, _ =>
{
var type = TypeProvider.GetFirstMatchingTypeFromCurrentDomainAssembly(eventTypeName);

if (type == null)
return null;

typeNameMap.TryAdd(type, eventTypeName);

return type;
});
}
7 changes: 7 additions & 0 deletions src/Kurrent.Client/Core/Serialization/ISchemaSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace EventStore.Client.Serialization;

public interface ISchemaSerializer {
public (ReadOnlyMemory<byte> Bytes, string typeName) Serialize(object value);

public object? Deserialize(ReadOnlyMemory<byte> data, string typeName);
}
7 changes: 7 additions & 0 deletions src/Kurrent.Client/Core/Serialization/ISerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace EventStore.Client.Serialization;

public interface ISerializer {
public ReadOnlyMemory<byte> Serialize(object value);

public object? Deserialize(ReadOnlyMemory<byte> data, Type type);
}
17 changes: 17 additions & 0 deletions src/Kurrent.Client/Core/Serialization/SchemaSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Kurrent.Client.Tests.Streams.Serialization;

namespace EventStore.Client.Serialization;

public class SchemaSerializer(ISerializer serializer, IEventTypeMapper eventTypeMapper) : ISchemaSerializer {
public (ReadOnlyMemory<byte> Bytes, string typeName) Serialize(object value) {
var eventType = eventTypeMapper.ToName(value.GetType());
var bytes = serializer.Serialize(value);

return (bytes, eventType);
}

public object? Deserialize(ReadOnlyMemory<byte> data, string typeName) {
var clrType = eventTypeMapper.ToType(typeName);
return clrType != null ? serializer.Deserialize(data, clrType) : null;
}
}
14 changes: 14 additions & 0 deletions src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Text;
using System.Text.Json;
using EventStore.Client.Serialization;

namespace Kurrent.Client.Core.Serialization;

public class SystemTextJsonSerializer: ISerializer {
public ReadOnlyMemory<byte> Serialize(object value) {
return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(value));
}
public object? Deserialize(ReadOnlyMemory<byte> data, Type type) {
return JsonSerializer.Deserialize(data.Span, type);
}
}
26 changes: 26 additions & 0 deletions src/Kurrent.Client/Core/Serialization/TypeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Reflection;

namespace Kurrent.Client.Tests.Streams.Serialization;

public static class TypeProvider
{
public static Type? GetTypeFromAnyReferencingAssembly(string typeName)
{
var referencedAssemblies = Assembly.GetEntryAssembly()?
.GetReferencedAssemblies()
.Select(a => a.FullName);

if (referencedAssemblies == null)
return null;

return AppDomain.CurrentDomain.GetAssemblies()
.Where(a => referencedAssemblies.Contains(a.FullName))
.SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName))
.FirstOrDefault();
}

public static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string typeName) =>
AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName))
.FirstOrDefault();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading.Channels;
using EventStore.Client.PersistentSubscriptions;
using EventStore.Client.Diagnostics;
using EventStore.Client.Serialization;
using Grpc.Core;

using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions;
Expand Down Expand Up @@ -128,6 +129,7 @@ public PersistentSubscriptionResult SubscribeToStream(
new() { Options = readOptions },
Settings,
userCredentials,
_schemaSerializer,
cancellationToken
);
}
Expand Down Expand Up @@ -221,9 +223,13 @@ async IAsyncEnumerable<PersistentSubscriptionMessage> GetMessages() {
}

internal PersistentSubscriptionResult(
string streamName, string groupName,
string streamName,
string groupName,
Func<CancellationToken, Task<ChannelInfo>> selectChannelInfo,
ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials,
ReadReq request,
KurrentClientSettings settings,
UserCredentials? userCredentials,
ISchemaSerializer schemaSerializer,
CancellationToken cancellationToken
) {
StreamName = streamName;
Expand Down Expand Up @@ -260,7 +266,7 @@ async Task PumpMessages() {
response.SubscriptionConfirmation.SubscriptionId
),
Event => new PersistentSubscriptionMessage.Event(
ConvertToResolvedEvent(response),
ConvertToResolvedEvent(response, schemaSerializer),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
_ => null
Expand Down Expand Up @@ -363,13 +369,14 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents) =>
Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId));

static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new(
static ResolvedEvent ConvertToResolvedEvent(ReadResp response, ISchemaSerializer schemaSerializer) => new(
ConvertToEventRecord(response.Event.Event)!,
ConvertToEventRecord(response.Event.Link),
response.Event.PositionCase switch {
ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
_ => null
}
},
schemaSerializer
);

Task AckInternal(params Uuid[] eventIds) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using EventStore.Client.Serialization;
using Kurrent.Client.Core.Serialization;
using Kurrent.Client.Tests.Streams.Serialization;

namespace EventStore.Client {
public partial class KurrentPersistentSubscriptionsClient {
// TODO: Resolve based on options
ISchemaSerializer _schemaSerializer = new SchemaSerializer(
new SystemTextJsonSerializer(),
EventTypeMapper.Instance
);
}
}
Loading
Loading