Skip to content

Commit

Permalink
[DEVEX-222] Added first working, but not yet complete JSON built-in s…
Browse files Browse the repository at this point in the history
…erialization

It's not fully ready, as it has hardcoded schema serializer. Main question will be how much from schema registry I need to move here.
  • Loading branch information
oskardudycz committed Jan 23, 2025
1 parent 8e27549 commit 001f1a0
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 49 deletions.
1 change: 1 addition & 0 deletions src/Kurrent.Client/Core/KurrentClientSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public partial class KurrentClientSettings {
/// </summary>
public TimeSpan? DefaultDeadline { get; set; } = TimeSpan.FromSeconds(10);

// TODO: This should be a dictionary or class with registered serializers for different types
public KurrentClientSerializationSettings Serialization = KurrentClientSerializationSettings.Default();
}
}
23 changes: 17 additions & 6 deletions src/Kurrent.Client/Core/ResolvedEvent.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client.Serialization;

namespace EventStore.Client {
/// <summary>
/// A structure representing a single event or a resolved link event.
Expand Down Expand Up @@ -43,23 +46,31 @@ public readonly struct ResolvedEvent {
/// </summary>
public bool IsResolved => Link != null && Event != null;

readonly ISchemaSerializer _serializer;

/// <summary>
/// Constructs a new <see cref="ResolvedEvent"/>.
/// </summary>
/// <param name="event"></param>
/// <param name="link"></param>
/// <param name="commitPosition"></param>
public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) {
Event = @event;
Link = link;
/// <param name="serializer"></param>
public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition, ISchemaSerializer serializer) {
Event = @event;
Link = link;
_serializer = serializer;
OriginalPosition = commitPosition.HasValue
? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition)
: new Position?();
}

public bool TryDeserialize(out object o) {
o = true;
return true;
#if NET48
public bool TryDeserialize(out object? deserialized) {
#else
public bool TryDeserialize([NotNullWhen(true)] out object? deserialized) {
#endif
deserialized = _serializer.Deserialize(OriginalEvent.Data, OriginalEvent.EventType);
return deserialized != null;
}
}
}
52 changes: 52 additions & 0 deletions src/Kurrent.Client/Core/Serialization/EventTypeMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Collections.Concurrent;

namespace Kurrent.Client.Tests.Streams.Serialization;

// TODO: Discuss how to proceed with that and whether to move the Schema Registry code here
// The scanning part and registration seems to be more robust there
// I used this for simplicity
public interface IEventTypeMapper {
void AddCustomMap<T>(string eventTypeName);
void AddCustomMap(Type eventType, string eventTypeName);
string ToName<TEventType>();
string ToName(Type eventType);
Type? ToType(string eventTypeName);
}

public class EventTypeMapper : IEventTypeMapper {
public static readonly EventTypeMapper Instance = new();

private readonly ConcurrentDictionary<string, Type?> typeMap = new();
private readonly ConcurrentDictionary<Type, string> typeNameMap = new();

public void AddCustomMap<T>(string eventTypeName) => AddCustomMap(typeof(T), eventTypeName);

public void AddCustomMap(Type eventType, string eventTypeName)
{
typeNameMap.AddOrUpdate(eventType, eventTypeName, (_, typeName) => typeName);
typeMap.AddOrUpdate(eventTypeName, eventType, (_, type) => type);
}

public string ToName<TEventType>() => ToName(typeof(TEventType));

public string ToName(Type eventType) => typeNameMap.GetOrAdd(eventType, _ =>
{
var eventTypeName = eventType.FullName!;

typeMap.TryAdd(eventTypeName, eventType);

return eventTypeName;
});

public Type? ToType(string eventTypeName) => typeMap.GetOrAdd(eventTypeName, _ =>
{
var type = TypeProvider.GetFirstMatchingTypeFromCurrentDomainAssembly(eventTypeName);

if (type == null)
return null;

typeNameMap.TryAdd(type, eventTypeName);

return type;
});
}
7 changes: 7 additions & 0 deletions src/Kurrent.Client/Core/Serialization/ISchemaSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace EventStore.Client.Serialization;

public interface ISchemaSerializer {
public (ReadOnlyMemory<byte> Bytes, string typeName) Serialize(object value);

public object? Deserialize(ReadOnlyMemory<byte> data, string typeName);
}
7 changes: 7 additions & 0 deletions src/Kurrent.Client/Core/Serialization/ISerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace EventStore.Client.Serialization;

public interface ISerializer {
public ReadOnlyMemory<byte> Serialize(object value);

public object? Deserialize(ReadOnlyMemory<byte> data, Type type);
}
17 changes: 17 additions & 0 deletions src/Kurrent.Client/Core/Serialization/SchemaSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Kurrent.Client.Tests.Streams.Serialization;

namespace EventStore.Client.Serialization;

public class SchemaSerializer(ISerializer serializer, IEventTypeMapper eventTypeMapper) : ISchemaSerializer {
public (ReadOnlyMemory<byte> Bytes, string typeName) Serialize(object value) {
var eventType = eventTypeMapper.ToName(value.GetType());
var bytes = serializer.Serialize(value);

return (bytes, eventType);
}

public object? Deserialize(ReadOnlyMemory<byte> data, string typeName) {
var clrType = eventTypeMapper.ToType(typeName);
return clrType != null ? serializer.Deserialize(data, clrType) : null;
}
}
14 changes: 14 additions & 0 deletions src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Text;
using System.Text.Json;
using EventStore.Client.Serialization;

namespace Kurrent.Client.Core.Serialization;

public class SystemTextJsonSerializer: ISerializer {
public ReadOnlyMemory<byte> Serialize(object value) {
return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(value));
}
public object? Deserialize(ReadOnlyMemory<byte> data, Type type) {
return JsonSerializer.Deserialize(data.Span, type);
}
}
26 changes: 26 additions & 0 deletions src/Kurrent.Client/Core/Serialization/TypeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Reflection;

namespace Kurrent.Client.Tests.Streams.Serialization;

public static class TypeProvider
{
public static Type? GetTypeFromAnyReferencingAssembly(string typeName)
{
var referencedAssemblies = Assembly.GetEntryAssembly()?
.GetReferencedAssemblies()
.Select(a => a.FullName);

if (referencedAssemblies == null)
return null;

return AppDomain.CurrentDomain.GetAssemblies()
.Where(a => referencedAssemblies.Contains(a.FullName))
.SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName))
.FirstOrDefault();
}

public static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string typeName) =>
AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName))
.FirstOrDefault();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading.Channels;
using EventStore.Client.PersistentSubscriptions;
using EventStore.Client.Diagnostics;
using EventStore.Client.Serialization;
using Grpc.Core;

using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions;
Expand Down Expand Up @@ -128,6 +129,7 @@ public PersistentSubscriptionResult SubscribeToStream(
new() { Options = readOptions },
Settings,
userCredentials,
_schemaSerializer,
cancellationToken
);
}
Expand Down Expand Up @@ -221,9 +223,13 @@ async IAsyncEnumerable<PersistentSubscriptionMessage> GetMessages() {
}

internal PersistentSubscriptionResult(
string streamName, string groupName,
string streamName,
string groupName,
Func<CancellationToken, Task<ChannelInfo>> selectChannelInfo,
ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials,
ReadReq request,
KurrentClientSettings settings,
UserCredentials? userCredentials,
ISchemaSerializer schemaSerializer,
CancellationToken cancellationToken
) {
StreamName = streamName;
Expand Down Expand Up @@ -260,7 +266,7 @@ async Task PumpMessages() {
response.SubscriptionConfirmation.SubscriptionId
),
Event => new PersistentSubscriptionMessage.Event(
ConvertToResolvedEvent(response),
ConvertToResolvedEvent(response, schemaSerializer),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
_ => null
Expand Down Expand Up @@ -363,13 +369,14 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents) =>
Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId));

static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new(
static ResolvedEvent ConvertToResolvedEvent(ReadResp response, ISchemaSerializer schemaSerializer) => new(
ConvertToEventRecord(response.Event.Event)!,
ConvertToEventRecord(response.Event.Link),
response.Event.PositionCase switch {
ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
_ => null
}
},
schemaSerializer
);

Task AckInternal(params Uuid[] eventIds) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using EventStore.Client.Serialization;
using Kurrent.Client.Core.Serialization;
using Kurrent.Client.Tests.Streams.Serialization;

namespace EventStore.Client {
public partial class KurrentPersistentSubscriptionsClient {
// TODO: Resolve based on options
ISchemaSerializer _schemaSerializer = new SchemaSerializer(
new SystemTextJsonSerializer(),
EventTypeMapper.Instance
);
}
}
Loading

0 comments on commit 001f1a0

Please sign in to comment.