diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs index d45b53156..c0bf1d172 100644 --- a/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs +++ b/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using System.Runtime.CompilerServices; using System.Text.Json; +using EventStore.Client.Serialization; using Kurrent.Diagnostics; using Kurrent.Diagnostics.Tracing; @@ -81,4 +82,78 @@ static ReadOnlyMemory TryInjectTracingMetadata( return utf8Json; } } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static SerializationMetadata ExtractSerializationMetadata(this ReadOnlyMemory eventMetadata) { + if (eventMetadata.IsEmpty) + return SerializationMetadata.None; + + var reader = new Utf8JsonReader(eventMetadata.Span); + try { + if (!JsonDocument.TryParseValue(ref reader, out var doc)) + return SerializationMetadata.None; + + using (doc) { + if (!doc.RootElement.TryGetProperty( + SerializationMetadata.Constants.MessageTypeAssemblyQualifiedName, + out var messageTypeAssemblyQualifiedName + ) + || !doc.RootElement.TryGetProperty( + SerializationMetadata.Constants.MessageTypeClrTypeName, + out var messageTypeClrTypeName + )) + return SerializationMetadata.None; + + return new SerializationMetadata( + messageTypeAssemblyQualifiedName.GetString(), + messageTypeClrTypeName.GetString() + ); + } + } catch (Exception) { + return SerializationMetadata.None; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ReadOnlyMemory InjectSerializationMetadata( + this ReadOnlyMemory eventMetadata, SerializationMetadata serializationMetadata + ) { + if (serializationMetadata == SerializationMetadata.None || !serializationMetadata.IsValid) + return ReadOnlyMemory.Empty; + + return eventMetadata.IsEmpty + ? JsonSerializer.SerializeToUtf8Bytes(serializationMetadata) + : TryInjectSerializationMetadata(eventMetadata, serializationMetadata).ToArray(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static ReadOnlyMemory TryInjectSerializationMetadata( + this ReadOnlyMemory utf8Json, SerializationMetadata serializationMetadata + ) { + try { + using var doc = JsonDocument.Parse(utf8Json); + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + writer.WriteStartObject(); + + if (doc.RootElement.ValueKind != JsonValueKind.Object) + return utf8Json; + + foreach (var prop in doc.RootElement.EnumerateObject()) + prop.WriteTo(writer); + + writer.WritePropertyName(SerializationMetadata.Constants.MessageTypeAssemblyQualifiedName); + writer.WriteStringValue(serializationMetadata.MessageTypeAssemblyQualifiedName); + writer.WritePropertyName(SerializationMetadata.Constants.MessageTypeClrTypeName); + writer.WriteStringValue(serializationMetadata.MessageTypeClrTypeName); + + writer.WriteEndObject(); + writer.Flush(); + + return stream.ToArray(); + } catch (Exception) { + return utf8Json; + } + } } diff --git a/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs b/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs new file mode 100644 index 000000000..bcf8a1eda --- /dev/null +++ b/src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs @@ -0,0 +1,86 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using EventStore.Client.Serialization; +using Kurrent.Client.Core.Serialization; + +namespace EventStore.Client; + +public enum AutomaticDeserialization { + Disabled = 0, + Enabled = 1 +} + +public class KurrentClientSerializationSettings { + public ISerializer? JsonSerializer { get; set; } + public ISerializer? BytesSerializer { get; set; } + public ContentType DefaultContentType { get; set; } = ContentType.Json; + public AutomaticDeserialization AutomaticDeserialization { get; set; } = AutomaticDeserialization.Disabled; + public IMessageTypeResolutionStrategy? MessageTypeResolutionStrategy { get; set; } + + public IDictionary MessageTypeMap { get; set; } = new Dictionary(); + + public static KurrentClientSerializationSettings Default( + Action? configure = null + ) { + var settings = new KurrentClientSerializationSettings(); + + configure?.Invoke(settings); + + return settings; + } + + public KurrentClientSerializationSettings UseJsonSettings( + SystemTextJsonSerializationSettings jsonSerializationSettings + ) { + JsonSerializer = new SystemTextJsonSerializer(jsonSerializationSettings); + + return this; + } + + public KurrentClientSerializationSettings UseJsonSerializer(ISerializer serializer) { + JsonSerializer = serializer; + + return this; + } + + public KurrentClientSerializationSettings UseBytesSerializer(ISerializer serializer) { + BytesSerializer = serializer; + + return this; + } + + public KurrentClientSerializationSettings UseMessageTypeResolutionStrategy() + where TCustomMessageTypeResolutionStrategy : IMessageTypeResolutionStrategy, new() => + UseMessageTypeResolutionStrategy(new TCustomMessageTypeResolutionStrategy()); + + public KurrentClientSerializationSettings UseMessageTypeResolutionStrategy( + IMessageTypeResolutionStrategy messageTypeResolutionStrategy + ) { + MessageTypeResolutionStrategy = messageTypeResolutionStrategy; + + return this; + } + + public KurrentClientSerializationSettings RegisterMessageType(string typeName) => + RegisterMessageType(typeof(T), typeName); + + public KurrentClientSerializationSettings RegisterMessageType(Type type, string typeName) { + MessageTypeMap.Add(type, typeName); + + return this; + } + + public KurrentClientSerializationSettings RegisterMessageTypes(IDictionary typeMap) { + foreach (var map in typeMap) { + MessageTypeMap.Add(map.Key, map.Value); + } + + return this; + } + + public KurrentClientSerializationSettings EnableAutomaticDeserialization() { + AutomaticDeserialization = AutomaticDeserialization.Enabled; + + return this; + } +} diff --git a/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs b/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs index c730b7b5a..57be0c950 100644 --- a/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs +++ b/src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs @@ -14,6 +14,20 @@ public partial class KurrentClientSettings { public static KurrentClientSettings Create(string connectionString) => ConnectionStringParser.Parse(connectionString); + /// + /// Creates client settings from a connection string with additional configuration + /// + /// + /// allows you to make additional customization of client settings + /// + public static KurrentClientSettings Create(string connectionString, Action configure) { + var settings = ConnectionStringParser.Parse(connectionString); + + configure(settings); + + return settings; + } + private static class ConnectionStringParser { private const string SchemeSeparator = "://"; private const string UserInfoSeparator = "@"; diff --git a/src/Kurrent.Client/Core/KurrentClientSettings.cs b/src/Kurrent.Client/Core/KurrentClientSettings.cs index aed914074..28cd83acb 100644 --- a/src/Kurrent.Client/Core/KurrentClientSettings.cs +++ b/src/Kurrent.Client/Core/KurrentClientSettings.cs @@ -57,5 +57,7 @@ public partial class KurrentClientSettings { /// The default deadline for calls. Will not be applied to reads or subscriptions. /// public TimeSpan? DefaultDeadline { get; set; } = TimeSpan.FromSeconds(10); + + public KurrentClientSerializationSettings Serialization { get; set; } = KurrentClientSerializationSettings.Default(); } } diff --git a/src/Kurrent.Client/Core/ResolvedEvent.cs b/src/Kurrent.Client/Core/ResolvedEvent.cs index 25ca13a78..9e824334a 100644 --- a/src/Kurrent.Client/Core/ResolvedEvent.cs +++ b/src/Kurrent.Client/Core/ResolvedEvent.cs @@ -1,3 +1,7 @@ +using System.Diagnostics.CodeAnalysis; +using EventStore.Client.Serialization; +using Kurrent.Client.Core.Serialization; + namespace EventStore.Client { /// /// A structure representing a single event or a resolved link event. @@ -22,6 +26,13 @@ public readonly struct ResolvedEvent { /// public EventRecord OriginalEvent => Link ?? Event; + /// + /// Returns the deserialized event payload. + /// It will be provided or equal to null, depending on the automatic deserialization settings you choose. + /// If it's null, you can use to deserialize it manually. + /// + public readonly object? DeserializedEvent; + /// /// Position of the if available. /// @@ -49,12 +60,45 @@ public readonly struct ResolvedEvent { /// /// /// - public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) { - Event = @event; - Link = link; + /// + public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) : this( + @event, + link, + null, + commitPosition + ) { } + + /// + /// Constructs a new . + /// + /// + /// + /// + /// + ResolvedEvent( + EventRecord @event, + EventRecord? link, + object? deserializedEvent, + ulong? commitPosition + ) { + Event = @event; + Link = link; + DeserializedEvent = deserializedEvent; OriginalPosition = commitPosition.HasValue ? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition) : new Position?(); } + + public static ResolvedEvent From( + EventRecord @event, + EventRecord? link, + ulong? commitPosition, + IMessageSerializer messageSerializer + ) { + var originalEvent = link ?? @event; + return messageSerializer.TryDeserialize(originalEvent, out var deserialized) + ? new ResolvedEvent(@event, link, deserialized, commitPosition) + : new ResolvedEvent(@event, link, commitPosition); + } } } diff --git a/src/Kurrent.Client/Core/Serialization/ISerializer.cs b/src/Kurrent.Client/Core/Serialization/ISerializer.cs new file mode 100644 index 000000000..224a4b0bd --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/ISerializer.cs @@ -0,0 +1,7 @@ +namespace EventStore.Client.Serialization; + +public interface ISerializer { + public ReadOnlyMemory Serialize(object value); + + public object? Deserialize(ReadOnlyMemory data, Type type); +} diff --git a/src/Kurrent.Client/Core/Serialization/Message.cs b/src/Kurrent.Client/Core/Serialization/Message.cs new file mode 100644 index 000000000..53793be42 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/Message.cs @@ -0,0 +1,42 @@ +using EventStore.Client; + +namespace Kurrent.Client.Core.Serialization; + +public readonly struct Message { + /// + /// The raw bytes of the event data. + /// + public readonly object Data; + + /// + /// The raw bytes of the event metadata. + /// + public readonly object? Metadata; + + /// + /// The of the event, used as part of the idempotent write check. + /// + public readonly Uuid EventId; + + /// + /// Constructs a new . + /// + /// The raw bytes of the event data. + /// The raw bytes of the event metadata. + /// The of the event, used as part of the idempotent write check. + /// + public Message(object data, object? metadata = null, Uuid? eventId = null) { + if (eventId == Uuid.Empty) + throw new ArgumentOutOfRangeException(nameof(eventId)); + + EventId = eventId ?? Uuid.NewUuid(); + Data = data; + Metadata = metadata; + } + + public void Deconstruct(out object data, out object? metadata, out Uuid eventId) { + data = Data; + metadata = Metadata; + eventId = EventId; + } +} diff --git a/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs new file mode 100644 index 000000000..7850a8373 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/MessageSerializer.cs @@ -0,0 +1,90 @@ +using System.Diagnostics.CodeAnalysis; +using EventStore.Client.Diagnostics; +using Kurrent.Client.Core.Serialization; + +namespace EventStore.Client.Serialization; + +using static ContentTypeExtensions; + +public interface IMessageSerializer { + public EventData Serialize(Message value, MessageSerializationContext context); + +#if NET48 + public bool TryDeserialize(EventRecord messageRecord, out object? deserialized); +#else + public bool TryDeserialize(EventRecord messageRecord, [NotNullWhen(true)] out object? deserialized); +#endif +} + +public record MessageSerializationContext( + string StreamName, + ContentType ContentType +) { + public string CategoryName => + // TODO: This is dangerous, as separator can be changed in database settings + StreamName.Split('-').FirstOrDefault() ?? "no_stream_category"; +} + +public static class MessageSerializerExtensions { + public static EventData[] Serialize( + this IMessageSerializer serializer, + IEnumerable messages, + MessageSerializationContext context + ) { + return messages.Select(m => serializer.Serialize(m, context)).ToArray(); + } +} + +public class MessageSerializer(SchemaRegistry schemaRegistry) : IMessageSerializer { + readonly ISerializer _jsonSerializer = + schemaRegistry.GetSerializer(ContentType.Json); + + readonly IMessageTypeResolutionStrategy _messageTypeResolutionStrategy = + schemaRegistry.MessageTypeResolutionStrategy; + + public EventData Serialize(Message message, MessageSerializationContext serializationContext) { + var (data, metadata, eventId) = message; + + var eventType = _messageTypeResolutionStrategy + .ResolveTypeName(message, serializationContext); + + var serializedData = schemaRegistry + .GetSerializer(serializationContext.ContentType) + .Serialize(data); + + var serializedMetadata = metadata != null + ? _jsonSerializer.Serialize(metadata) + : ReadOnlyMemory.Empty; + + var metadataWithSerialization = serializedMetadata + .InjectSerializationMetadata(SerializationMetadata.From(data.GetType())) + .ToArray(); + + return new EventData( + eventId, + eventType, + serializedData, + metadataWithSerialization, + serializationContext.ContentType.ToMessageContentType() + ); + } + +#if NET48 + public bool TryDeserialize(EventRecord messageRecord, out object? deserialized) { +#else + public bool TryDeserialize(EventRecord messageRecord, [NotNullWhen(true)] out object? deserialized) { +#endif + if (!schemaRegistry + .MessageTypeResolutionStrategy + .TryResolveClrType(messageRecord, out var clrType)) { + deserialized = null; + return false; + } + + deserialized = schemaRegistry + .GetSerializer(FromMessageContentType(messageRecord.ContentType)) + .Deserialize(messageRecord.Data, clrType!); + + return deserialized != null; + } +} diff --git a/src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs b/src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs new file mode 100644 index 000000000..3792ff4c2 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/MessageSerializerWrapper.cs @@ -0,0 +1,40 @@ +using System.Diagnostics.CodeAnalysis; +using EventStore.Client; +using EventStore.Client.Serialization; + +namespace Kurrent.Client.Core.Serialization; + +public class MessageSerializerWrapper( + IMessageSerializer messageSerializer, + AutomaticDeserialization automaticDeserialization +): IMessageSerializer { + public EventData Serialize(Message value, MessageSerializationContext context) { + if (automaticDeserialization == AutomaticDeserialization.Disabled) + throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled"); + + return messageSerializer.Serialize(value, context); + } + +#if NET48 + public bool TryDeserialize(EventRecord eventRecord, out object? deserialized) { +#else + public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out object? deserialized) { +#endif + if (automaticDeserialization == AutomaticDeserialization.Disabled) { + deserialized = null; + return false; + } + + return messageSerializer + .TryDeserialize(eventRecord, out deserialized); + } + + public static MessageSerializerWrapper From(KurrentClientSerializationSettings? settings = null) { + settings ??= new KurrentClientSerializationSettings(); + + return new MessageSerializerWrapper( + new MessageSerializer(SchemaRegistry.From(settings)), + settings.AutomaticDeserialization + ); + } +} diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs new file mode 100644 index 000000000..aeb4aefd0 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/MessageTypeMapper.cs @@ -0,0 +1,76 @@ +using System.Collections.Concurrent; + +namespace Kurrent.Client.Tests.Streams.Serialization; + +public interface IMessageTypeRegistry { + void AddType(Type messageType, string messageTypeName); + string? GetTypeName(Type messageType); + string GetOrAddTypeName(Type clrType, Func getTypeName); + Type? GetClrType(string messageTypeName); + Type? GetOrAddClrType(string messageTypeName, Func getClrType); +} + +public class MessageTypeRegistry : IMessageTypeRegistry { + readonly ConcurrentDictionary _typeMap = new(); + readonly ConcurrentDictionary _typeNameMap = new(); + + public void AddType(Type messageType, string messageTypeName) { + _typeNameMap.AddOrUpdate(messageType, messageTypeName, (_, typeName) => typeName); + _typeMap.AddOrUpdate(messageTypeName, messageType, (_, type) => type); + } + + public string? GetTypeName(Type messageType) => +#if NET48 + _typeNameMap.TryGetValue(messageType, out var typeName) ? typeName : null; +#else + _typeNameMap.GetValueOrDefault(messageType); +#endif + + public string GetOrAddTypeName(Type clrType, Func getTypeName) => + _typeNameMap.GetOrAdd( + clrType, + _ => { + var typeName = getTypeName(clrType); + + _typeMap.TryAdd(typeName, clrType); + + return typeName; + } + ); + + public Type? GetClrType(string messageTypeName) => +#if NET48 + _typeMap.TryGetValue(messageTypeName, out var clrType) ? clrType : null; +#else + _typeMap.GetValueOrDefault(messageTypeName); +#endif + + public Type? GetOrAddClrType(string messageTypeName, Func getClrType) => + _typeMap.GetOrAdd( + messageTypeName, + _ => { + var clrType = getClrType(messageTypeName); + + if (clrType == null) + return null; + + _typeNameMap.TryAdd(clrType, messageTypeName); + + return clrType; + } + ); +} + +public static class MessageTypeMapperExtensions { + public static void AddType(this IMessageTypeRegistry messageTypeRegistry, string messageTypeName) => + messageTypeRegistry.AddType(typeof(T), messageTypeName); + + public static void AddTypes(this IMessageTypeRegistry messageTypeRegistry, IDictionary typeMap) { + foreach (var map in typeMap) { + messageTypeRegistry.AddType(map.Key, map.Value); + } + } + + public static string? GetTypeName(this IMessageTypeRegistry messageTypeRegistry) => + messageTypeRegistry.GetTypeName(typeof(TMessageType)); +} diff --git a/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs new file mode 100644 index 000000000..0d7854758 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/MessageTypeResolutionStrategy.cs @@ -0,0 +1,70 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json; +using Kurrent.Client.Tests.Streams.Serialization; +using EventStore.Client.Diagnostics; +using Kurrent.Client.Core.Serialization; + +namespace EventStore.Client.Serialization; + +public interface IMessageTypeResolutionStrategy { + string ResolveTypeName(Message message, MessageSerializationContext serializationContext); + +#if NET48 + bool TryResolveClrType(EventRecord messageRecord, out Type? type); +#else + bool TryResolveClrType(EventRecord messageRecord, [NotNullWhen(true)] out Type? type); +#endif +} + +public class MessageTypeResolutionStrategyWrapper( + IMessageTypeRegistry messageTypeRegistry, + IMessageTypeResolutionStrategy messageTypeResolutionStrategy +) : IMessageTypeResolutionStrategy { + public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) { + return messageTypeRegistry.GetOrAddTypeName( + message.Data.GetType(), + _ => messageTypeResolutionStrategy.ResolveTypeName(message, serializationContext) + ); + } + +#if NET48 + public bool TryResolveClrType(EventRecord messageRecord, out Type? type) { +#else + public bool TryResolveClrType(EventRecord messageRecord, [NotNullWhen(true)] out Type? type) { +#endif + type = messageTypeRegistry.GetOrAddClrType( + messageRecord.EventType, + _ => messageTypeResolutionStrategy.TryResolveClrType(messageRecord, out var resolvedType) + ? resolvedType + : null + ); + + return type != null; + } +} + +public class DefaultMessageTypeResolutionStrategy + : IMessageTypeResolutionStrategy { + public string ResolveTypeName(Message message, MessageSerializationContext serializationContext) => + $"{serializationContext.CategoryName}-{JsonNamingPolicy.SnakeCaseLower.ConvertName(message.Data.GetType().Name.ToLower())}"; + +#if NET48 + public bool TryResolveClrType(EventRecord messageRecord, out Type? type) { +#else + public bool TryResolveClrType(EventRecord messageRecord, [NotNullWhen(true)] out Type? type) { +#endif + var serializationMetadata = messageRecord.Metadata.ExtractSerializationMetadata(); + + if (!serializationMetadata.IsValid) { + type = null; + return false; + } + + type = Type.GetType(serializationMetadata.MessageTypeAssemblyQualifiedName!) + ?? TypeProvider.GetFirstMatchingTypeFromCurrentDomainAssembly( + serializationMetadata.MessageTypeClrTypeName! + ); + + return type != null; + } +} diff --git a/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs new file mode 100644 index 000000000..946c8ae42 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/SchemaRegistry.cs @@ -0,0 +1,61 @@ +using EventStore.Client; +using EventStore.Client.Serialization; +using Kurrent.Client.Tests.Streams.Serialization; + +namespace Kurrent.Client.Core.Serialization; + +using static Constants.Metadata.ContentTypes; + +public enum ContentType { + Json = 1, + + // Protobuf = 2, + // Avro = 3, + Bytes = 4 +} + +public static class ContentTypeExtensions { + public static ContentType FromMessageContentType(string contentType) => + contentType == ApplicationJson + ? ContentType.Json + : ContentType.Bytes; + + public static string ToMessageContentType(this ContentType contentType) => + contentType switch { + ContentType.Json => ApplicationJson, + ContentType.Bytes => ApplicationOctetStream, + _ => throw new ArgumentOutOfRangeException(nameof(contentType), contentType, null) + }; +} + +public class SchemaRegistry( + IDictionary serializers, + IMessageTypeResolutionStrategy messageTypeResolutionStrategy +) { + public IMessageTypeResolutionStrategy MessageTypeResolutionStrategy { get; } = messageTypeResolutionStrategy; + + public ISerializer GetSerializer(ContentType schemaType) => + serializers[schemaType]; + + public static SchemaRegistry From(KurrentClientSerializationSettings settings) { + var messageTypeRegistry = new MessageTypeRegistry(); + messageTypeRegistry.AddTypes(settings.MessageTypeMap); + + var messageTypeResolutionStrategy = new MessageTypeResolutionStrategyWrapper( + messageTypeRegistry, + settings.MessageTypeResolutionStrategy ?? new DefaultMessageTypeResolutionStrategy() + ); + + var serializers = new Dictionary { + { + ContentType.Json, + settings.JsonSerializer ?? new SystemTextJsonSerializer() + }, { + ContentType.Bytes, + settings.BytesSerializer ?? new SystemTextJsonSerializer() + } + }; + + return new SchemaRegistry(serializers, messageTypeResolutionStrategy); + } +} diff --git a/src/Kurrent.Client/Core/Serialization/SerializationMetadata.cs b/src/Kurrent.Client/Core/Serialization/SerializationMetadata.cs new file mode 100644 index 000000000..2ca1834ee --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/SerializationMetadata.cs @@ -0,0 +1,23 @@ +using System.Text.Json.Serialization; + +namespace EventStore.Client.Serialization; + +public record SerializationMetadata( + [property: JsonPropertyName(SerializationMetadata.Constants.MessageTypeAssemblyQualifiedName)] + string? MessageTypeAssemblyQualifiedName, + [property: JsonPropertyName(SerializationMetadata.Constants.MessageTypeClrTypeName)] + string? MessageTypeClrTypeName +) { + public static readonly SerializationMetadata None = new SerializationMetadata(null, null); + + public bool IsValid => + MessageTypeAssemblyQualifiedName != null && MessageTypeClrTypeName != null; + + public static SerializationMetadata From(Type clrType) => + new SerializationMetadata(clrType.AssemblyQualifiedName, clrType.Name); + + public static class Constants { + public const string MessageTypeAssemblyQualifiedName = "$clrTypeAssemblyQualifiedName"; + public const string MessageTypeClrTypeName = "$clrTypeName"; + } +} diff --git a/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs new file mode 100644 index 000000000..f99f20b21 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs @@ -0,0 +1,36 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using EventStore.Client.Serialization; + +namespace Kurrent.Client.Core.Serialization; + +public class SystemTextJsonSerializationSettings { + public static readonly JsonSerializerOptions DefaultJsonSerializerOptions = + new JsonSerializerOptions(JsonSerializerOptions.Default) { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + UnknownTypeHandling = JsonUnknownTypeHandling.JsonNode, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Skip, + NumberHandling = JsonNumberHandling.AllowReadingFromString, + Converters = { + new JsonStringEnumConverter(JsonNamingPolicy.CamelCase), + } + }; + + public JsonSerializerOptions Options { get; set; } = DefaultJsonSerializerOptions; +} + +public class SystemTextJsonSerializer(SystemTextJsonSerializationSettings? options = null) : ISerializer { + readonly JsonSerializerOptions _options = options?.Options ?? SystemTextJsonSerializationSettings.DefaultJsonSerializerOptions; + + public ReadOnlyMemory Serialize(object value) { + return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(value, _options)); + } + + public object? Deserialize(ReadOnlyMemory data, Type type) { + return JsonSerializer.Deserialize(data.Span, type, _options); + } +} diff --git a/src/Kurrent.Client/Core/Serialization/TypeProvider.cs b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs new file mode 100644 index 000000000..880a5701c --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs @@ -0,0 +1,26 @@ +using System.Reflection; + +namespace Kurrent.Client.Tests.Streams.Serialization; + +public static class TypeProvider +{ + public static Type? GetTypeFromAnyReferencingAssembly(string typeName) + { + var referencedAssemblies = Assembly.GetEntryAssembly()? + .GetReferencedAssemblies() + .Select(a => a.FullName); + + if (referencedAssemblies == null) + return null; + + return AppDomain.CurrentDomain.GetAssemblies() + .Where(a => referencedAssemblies.Contains(a.FullName)) + .SelectMany(a => a.GetTypes().Where(x => x.AssemblyQualifiedName == typeName || x.Name == typeName)) + .FirstOrDefault(); + } + + public static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string typeName) => + AppDomain.CurrentDomain.GetAssemblies() + .SelectMany(a => a.GetTypes().Where(x => x.AssemblyQualifiedName == typeName || x.Name == typeName)) + .FirstOrDefault(); +} diff --git a/src/Kurrent.Client/Kurrent.Client.csproj b/src/Kurrent.Client/Kurrent.Client.csproj index e6652b773..59ebc0861 100644 --- a/src/Kurrent.Client/Kurrent.Client.csproj +++ b/src/Kurrent.Client/Kurrent.Client.csproj @@ -101,7 +101,7 @@ - + diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs index 779413111..7cac10685 100644 --- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs @@ -1,12 +1,28 @@ using System.Threading.Channels; using EventStore.Client.PersistentSubscriptions; using EventStore.Client.Diagnostics; +using EventStore.Client.Serialization; using Grpc.Core; - +using Kurrent.Client.Core.Serialization; using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions; using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase; namespace EventStore.Client { + public class SubscribeToPersistentSubscriptionOptions { + /// + /// The size of the buffer. + /// + public int BufferSize { get; set; } = 10; + /// + /// The optional settings to customize the default serialization for the specific persistent subscription + /// + public KurrentClientSerializationSettings SerializationSettings { get; set; } = KurrentClientSerializationSettings.Default(); + /// + /// The optional user credentials to perform operation with. + /// + public UserCredentials? UserCredentials { get; set; } = null; + } + partial class KurrentPersistentSubscriptionsClient { /// /// Subscribes to a persistent subscription. @@ -40,6 +56,34 @@ public async Task SubscribeAsync( .ConfigureAwait(false); } + /// + /// Subscribes to a persistent subscription. Messages must be manually acknowledged + /// + /// + /// + /// + public Task SubscribeToStreamAsync( + string streamName, + string groupName, + Func eventAppeared, + Action? subscriptionDropped = null, + UserCredentials? userCredentials = null, + int bufferSize = 10, + CancellationToken cancellationToken = default + ) { + return SubscribeToStreamAsync( + streamName, + groupName, + eventAppeared, + new SubscribeToPersistentSubscriptionOptions { + UserCredentials = userCredentials, + BufferSize = bufferSize + }, + subscriptionDropped, + cancellationToken + ); + } + /// /// Subscribes to a persistent subscription. Messages must be manually acknowledged /// @@ -47,19 +91,20 @@ public async Task SubscribeAsync( /// /// public async Task SubscribeToStreamAsync( - string streamName, string groupName, + string streamName, + string groupName, Func eventAppeared, + SubscribeToPersistentSubscriptionOptions options, Action? subscriptionDropped = null, - UserCredentials? userCredentials = null, int bufferSize = 10, CancellationToken cancellationToken = default ) { return await PersistentSubscription .Confirm( - SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken), + SubscribeToStream(streamName, groupName, options, cancellationToken), eventAppeared, subscriptionDropped ?? delegate { }, _log, - userCredentials, + options.UserCredentials, cancellationToken ) .ConfigureAwait(false); @@ -75,8 +120,35 @@ public async Task SubscribeToStreamAsync( /// The optional . /// public PersistentSubscriptionResult SubscribeToStream( - string streamName, string groupName, int bufferSize = 10, - UserCredentials? userCredentials = null, CancellationToken cancellationToken = default + string streamName, + string groupName, + int bufferSize = 10, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default + ) { + return SubscribeToStream( + streamName, + groupName, + new SubscribeToPersistentSubscriptionOptions { + BufferSize = bufferSize, + UserCredentials = userCredentials + }, + cancellationToken + ); + } + + /// + /// Subscribes to a persistent subscription. Messages must be manually acknowledged. + /// + /// The name of the stream to read events from. + /// The name of the persistent subscription group. + /// Optional settings to configure subscription + /// The optional . + /// + public PersistentSubscriptionResult SubscribeToStream( + string streamName, string groupName, + SubscribeToPersistentSubscriptionOptions options, + CancellationToken cancellationToken = default ) { if (streamName == null) { throw new ArgumentNullException(nameof(streamName)); @@ -94,12 +166,12 @@ public PersistentSubscriptionResult SubscribeToStream( throw new ArgumentException($"{nameof(groupName)} may not be empty.", nameof(groupName)); } - if (bufferSize <= 0) { - throw new ArgumentOutOfRangeException(nameof(bufferSize)); + if (options.BufferSize <= 0) { + throw new ArgumentOutOfRangeException(nameof(options.BufferSize)); } var readOptions = new ReadReq.Types.Options { - BufferSize = bufferSize, + BufferSize = options.BufferSize, GroupName = groupName, UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() } }; @@ -127,7 +199,8 @@ public PersistentSubscriptionResult SubscribeToStream( }, new() { Options = readOptions }, Settings, - userCredentials, + options.UserCredentials, + _messageSerializer, cancellationToken ); } @@ -169,15 +242,15 @@ public PersistentSubscriptionResult SubscribeToAll( /// public class PersistentSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable { - const int MaxEventIdLength = 2000; - - readonly ReadReq _request; - readonly Channel _channel; - readonly CancellationTokenSource _cts; - readonly CallOptions _callOptions; + const int MaxEventIdLength = 2000; + + readonly ReadReq _request; + readonly Channel _channel; + readonly CancellationTokenSource _cts; + readonly CallOptions _callOptions; - AsyncDuplexStreamingCall? _call; - int _messagesEnumerated; + AsyncDuplexStreamingCall? _call; + int _messagesEnumerated; /// /// The server-generated unique identifier for the subscription. @@ -200,30 +273,34 @@ public class PersistentSubscriptionResult : IAsyncEnumerable, IAs public IAsyncEnumerable Messages { get { if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) - throw new InvalidOperationException("Messages may only be enumerated once."); + throw new InvalidOperationException("Messages may only be enumerated once."); return GetMessages(); async IAsyncEnumerable GetMessages() { - try { - await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token)) { - if (message is PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId)) - SubscriptionId = subscriptionId; - - yield return message; - } - } - finally { - _cts.Cancel(); - } + try { + await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token)) { + if (message is PersistentSubscriptionMessage.SubscriptionConfirmation(var subscriptionId + )) + SubscriptionId = subscriptionId; + + yield return message; + } + } finally { + _cts.Cancel(); + } } } } internal PersistentSubscriptionResult( - string streamName, string groupName, + string streamName, + string groupName, Func> selectChannelInfo, - ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials, + ReadReq request, + KurrentClientSettings settings, + UserCredentials? userCredentials, + IMessageSerializer messageSerializer, CancellationToken cancellationToken ) { StreamName = streamName; @@ -247,20 +324,21 @@ CancellationToken cancellationToken async Task PumpMessages() { try { - var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false); - var client = new PersistentSubscriptionsClient(channelInfo.CallInvoker); + var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false); + var client = new PersistentSubscriptionsClient(channelInfo.CallInvoker); _call = client.Read(_callOptions); await _call.RequestStream.WriteAsync(_request).ConfigureAwait(false); - await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token).ConfigureAwait(false)) { + await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token) + .ConfigureAwait(false)) { PersistentSubscriptionMessage subscriptionMessage = response.ContentCase switch { SubscriptionConfirmation => new PersistentSubscriptionMessage.SubscriptionConfirmation( response.SubscriptionConfirmation.SubscriptionId ), Event => new PersistentSubscriptionMessage.Event( - ConvertToResolvedEvent(response), + ConvertToResolvedEvent(response, messageSerializer), response.Event.CountCase switch { ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount, _ => null @@ -292,17 +370,18 @@ async Task PumpMessages() { // The absence of this header leads to an RpcException with the status code 'Cancelled' and the message "No grpc-status found on response". // The switch statement below handles these specific exceptions and translates them into the appropriate // PersistentSubscriptionDroppedByServerException exception. - case RpcException { StatusCode: StatusCode.Unavailable } rex1 when rex1.Status.Detail.Contains("WinHttpException: Error 12030"): + case RpcException { StatusCode: StatusCode.Unavailable } rex1 + when rex1.Status.Detail.Contains("WinHttpException: Error 12030"): case RpcException { StatusCode: StatusCode.Cancelled } rex2 - when rex2.Status.Detail.Contains("No grpc-status found on response"): + when rex2.Status.Detail.Contains("No grpc-status found on response"): ex = new PersistentSubscriptionDroppedByServerException(StreamName, GroupName, ex); break; } #endif if (ex is PersistentSubscriptionNotFoundException) { await _channel.Writer - .WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, cancellationToken) - .ConfigureAwait(false); + .WriteAsync(PersistentSubscriptionMessage.NotFound.Instance, cancellationToken) + .ConfigureAwait(false); _channel.Writer.TryComplete(); return; @@ -360,19 +439,26 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par /// A reason given. /// The s to nak. There should not be more than 2000 to nak at a time. /// The number of resolvedEvents exceeded the limit of 2000. - public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents) => - Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId)); - - static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new( - ConvertToEventRecord(response.Event.Event)!, - ConvertToEventRecord(response.Event.Link), - response.Event.PositionCase switch { - ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition, - _ => null - } - ); + public Task Nack( + PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents + ) => + Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId)); + + static ResolvedEvent ConvertToResolvedEvent( + ReadResp response, + IMessageSerializer messageSerializer + ) => + ResolvedEvent.From( + ConvertToEventRecord(response.Event.Event)!, + ConvertToEventRecord(response.Event.Link), + response.Event.PositionCase switch { + ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition, + _ => null + }, + messageSerializer + ); - Task AckInternal(params Uuid[] eventIds) { + Task AckInternal(params Uuid[] eventIds) { if (eventIds.Length > MaxEventIdLength) { throw new ArgumentException( $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", @@ -393,7 +479,7 @@ Task AckInternal(params Uuid[] eventIds) { ); } - Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) { + Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) { if (eventIds.Length > MaxEventIdLength) { throw new ArgumentException( $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", @@ -422,7 +508,7 @@ Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, ); } - static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) => + static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) => e is null ? null : new EventRecord( @@ -465,10 +551,12 @@ public void Dispose() { } /// - public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { + public async IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = default + ) { await foreach (var message in Messages.WithCancellation(cancellationToken)) { if (message is not PersistentSubscriptionMessage.Event(var resolvedEvent, _)) - continue; + continue; yield return resolvedEvent; } diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs new file mode 100644 index 000000000..283f06c5f --- /dev/null +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs @@ -0,0 +1,7 @@ +using EventStore.Client.Serialization; + +namespace EventStore.Client { + public partial class KurrentPersistentSubscriptionsClient { + readonly IMessageSerializer _messageSerializer; + } +} diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs index 070f32698..358471feb 100644 --- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.cs @@ -1,6 +1,7 @@ using System.Text.Encodings.Web; using System.Threading.Channels; using Grpc.Core; +using Kurrent.Client.Core.Serialization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -37,6 +38,8 @@ public KurrentPersistentSubscriptionsClient(KurrentClientSettings? settings) : b }) { _log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger(); + + _messageSerializer = MessageSerializerWrapper.From(settings?.Serialization); } private static string UrlEncode(string s) { diff --git a/src/Kurrent.Client/Streams/KurrentClient.Append.cs b/src/Kurrent.Client/Streams/KurrentClient.Append.cs index 39d6f3066..6ea0c5ea0 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Append.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Append.cs @@ -1,11 +1,14 @@ using System.Collections.Concurrent; using System.Diagnostics; +using System.Text; using System.Threading.Channels; using Google.Protobuf; using EventStore.Client.Streams; using Grpc.Core; using Microsoft.Extensions.Logging; using EventStore.Client.Diagnostics; +using EventStore.Client.Serialization; +using Kurrent.Client.Core.Serialization; using Kurrent.Diagnostics; using Kurrent.Diagnostics.Telemetry; using Kurrent.Diagnostics.Tracing; @@ -14,6 +17,82 @@ namespace EventStore.Client { public partial class KurrentClient { + /// + /// Appends events asynchronously to a stream. + /// + /// The name of the stream to append events to. + /// The expected of the stream to append to. + /// Messages to append to the stream. + /// An to configure the operation's options. + /// + /// The for the operation. + /// The optional . + /// + public Task AppendToStreamAsync( + string streamName, + StreamState expectedState, + IEnumerable events, + // TODO: I don't like those numerous options, but I'd prefer to tackle that in a dedicated PR + Action? configureOperationOptions = null, + TimeSpan? deadline = null, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default + ) { + var serializationContext = new MessageSerializationContext( + streamName, + Settings.Serialization.DefaultContentType + ); + var eventsData = _messageSerializer.Serialize(events.Select(e => new Message(e)), serializationContext); + + return AppendToStreamAsync( + streamName, + expectedState, + eventsData, + configureOperationOptions, + deadline, + userCredentials, + cancellationToken + ); + } + + /// + /// Appends events asynchronously to a stream. + /// + /// The name of the stream to append events to. + /// The expected of the stream to append to. + /// Messages to append to the stream. + /// An to configure the operation's options. + /// + /// The for the operation. + /// The optional . + /// + public Task AppendToStreamAsync( + string streamName, + StreamRevision expectedRevision, + IEnumerable events, + // TODO: I don't like those numerous options, but I'd prefer to tackle that in a dedicated PR + Action? configureOperationOptions = null, + TimeSpan? deadline = null, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default + ) { + var serializationContext = new MessageSerializationContext( + streamName, + Settings.Serialization.DefaultContentType + ); + var eventsData = _messageSerializer.Serialize(events.Select(e => new Message(e)), serializationContext); + + return AppendToStreamAsync( + streamName, + expectedRevision, + eventsData, + configureOperationOptions, + deadline, + userCredentials, + cancellationToken + ); + } + /// /// Appends events asynchronously to a stream. /// @@ -114,16 +193,28 @@ ValueTask AppendToStreamInternal( CancellationToken cancellationToken ) { var tags = new ActivityTagsCollection() - .WithRequiredTag(TelemetryTags.Kurrent.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8()) + .WithRequiredTag( + TelemetryTags.Kurrent.Stream, + header.Options.StreamIdentifier.StreamName.ToStringUtf8() + ) .WithGrpcChannelServerTags(channelInfo) .WithClientSettingsServerTags(Settings) - .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username); + .WithOptionalTag( + TelemetryTags.Database.User, + userCredentials?.Username ?? Settings.DefaultCredentials?.Username + ); - return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags); + return KurrentClientDiagnostics.ActivitySource.TraceClientOperation( + Operation, + TracingConstants.Operations.Append, + tags + ); async ValueTask Operation() { using var call = new StreamsClient(channelInfo.CallInvoker) - .Append(KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)); + .Append( + KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); await call.RequestStream .WriteAsync(header) @@ -160,11 +251,13 @@ await call.RequestStream } IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { - var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream + var currentRevision = response.Success.CurrentRevisionOptionCase + == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream ? StreamRevision.None : new StreamRevision(response.Success.CurrentRevision); - var position = response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position + var position = response.Success.PositionOptionCase + == AppendResp.Types.Success.PositionOptionOneofCase.Position ? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition) : default; @@ -181,7 +274,8 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { IWriteResult HandleWrongExpectedRevision( AppendResp response, AppendReq header, KurrentClientOperationOptions operationOptions ) { - var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision + var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase + == CurrentRevisionOptionOneofCase.CurrentRevision ? new StreamRevision(response.WrongExpectedVersion.CurrentRevision) : StreamRevision.None; @@ -193,7 +287,8 @@ IWriteResult HandleWrongExpectedRevision( ); if (operationOptions.ThrowOnAppendFailure) { - if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision) { + if (response.WrongExpectedVersion.ExpectedRevisionOptionCase + == ExpectedRevisionOptionOneofCase.ExpectedRevision) { throw new WrongExpectedVersionException( header.Options.StreamIdentifier!, new StreamRevision(response.WrongExpectedVersion.ExpectedRevision), @@ -215,7 +310,8 @@ IWriteResult HandleWrongExpectedRevision( ); } - var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision + var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase + == ExpectedRevisionOptionOneofCase.ExpectedRevision ? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision) : StreamRevision.None; @@ -227,7 +323,7 @@ IWriteResult HandleWrongExpectedRevision( } class StreamAppender : IDisposable { - readonly KurrentClientSettings _settings; + readonly KurrentClientSettings _settings; readonly CancellationToken _cancellationToken; readonly Action _onException; readonly Channel _channel; @@ -302,8 +398,7 @@ async ValueTask Operation() { try { foreach (var appendRequest in GetRequests(events, options, correlationId)) await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); - } - catch (ChannelClosedException ex) { + } catch (ChannelClosedException ex) { // channel is closed, our tcs won't necessarily get completed, don't wait for it. throw ex.InnerException ?? ex; } @@ -333,8 +428,7 @@ async Task Duplex(ValueTask channelInfoTask) { _ = Task.Run(Receive, _cancellationToken); _isUsable.TrySetResult(true); - } - catch (Exception ex) { + } catch (Exception ex) { _isUsable.TrySetException(ex); _onException(ex); } @@ -344,7 +438,8 @@ async Task Duplex(ValueTask channelInfoTask) { async Task Send() { if (_call is null) return; - await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) + await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); await _call.RequestStream.CompleteAsync().ConfigureAwait(false); @@ -354,20 +449,22 @@ async Task Receive() { if (_call is null) return; try { - await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) { - if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) { + await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) { + if (!_pendingRequests.TryRemove( + Uuid.FromDto(response.CorrelationId), + out var writeResult + )) { continue; // TODO: Log? } try { writeResult.TrySetResult(response.ToWriteResult()); - } - catch (Exception ex) { + } catch (Exception ex) { writeResult.TrySetException(ex); } } - } - catch (Exception ex) { + } catch (Exception ex) { // signal that no tcs added to _pendingRequests after this point will necessarily complete _channel.Writer.TryComplete(ex); @@ -380,7 +477,9 @@ async Task Receive() { } } - IEnumerable GetRequests(IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId) { + IEnumerable GetRequests( + IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId + ) { var batchSize = 0; var first = true; var correlationIdDto = correlationId.ToDto(); diff --git a/src/Kurrent.Client/Streams/KurrentClient.Read.cs b/src/Kurrent.Client/Streams/KurrentClient.Read.cs index 0523d9516..3deee9722 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Read.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Read.cs @@ -1,6 +1,8 @@ using System.Threading.Channels; +using EventStore.Client.Serialization; using EventStore.Client.Streams; using Grpc.Core; +using Kurrent.Client.Core.Serialization; using static EventStore.Client.Streams.ReadResp; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; @@ -91,6 +93,7 @@ public ReadAllStreamResult ReadAllAsync( Settings, deadline, userCredentials, + _messageSerializer, cancellationToken ); } @@ -130,8 +133,7 @@ async IAsyncEnumerable GetMessages() { yield return message; } - } - finally { + } finally { _cts.Cancel(); } } @@ -139,8 +141,12 @@ async IAsyncEnumerable GetMessages() { } internal ReadAllStreamResult( - Func> selectCallInvoker, ReadReq request, - KurrentClientSettings settings, TimeSpan? deadline, UserCredentials? userCredentials, + Func> selectCallInvoker, + ReadReq request, + KurrentClientSettings settings, + TimeSpan? deadline, + UserCredentials? userCredentials, + IMessageSerializer messageSerializer, CancellationToken cancellationToken ) { var callOptions = KurrentCallOptions.CreateStreaming( @@ -157,7 +163,7 @@ CancellationToken cancellationToken if (request.Options.FilterOptionCase == ReadReq.Types.Options.FilterOptionOneofCase.None) request.Options.NoFilter = new(); - + _ = PumpMessages(); return; @@ -167,14 +173,21 @@ async Task PumpMessages() { var callInvoker = await selectCallInvoker(linkedCancellationToken).ConfigureAwait(false); var client = new Streams.Streams.StreamsClient(callInvoker); using var call = client.Read(request, callOptions); + await foreach (var response in call.ResponseStream.ReadAllAsync(linkedCancellationToken) .ConfigureAwait(false)) { await _channel.Writer.WriteAsync( response.ContentCase switch { - StreamNotFound => StreamMessage.NotFound.Instance, - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), - FirstStreamPosition => new StreamMessage.FirstStreamPosition(new StreamPosition(response.FirstStreamPosition)), - LastStreamPosition => new StreamMessage.LastStreamPosition(new StreamPosition(response.LastStreamPosition)), + StreamNotFound => StreamMessage.NotFound.Instance, + Event => new StreamMessage.Event( + ConvertToResolvedEvent(response.Event, messageSerializer) + ), + FirstStreamPosition => new StreamMessage.FirstStreamPosition( + new StreamPosition(response.FirstStreamPosition) + ), + LastStreamPosition => new StreamMessage.LastStreamPosition( + new StreamPosition(response.LastStreamPosition) + ), LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( new Position( response.LastAllStreamPosition.CommitPosition, @@ -188,8 +201,7 @@ await _channel.Writer.WriteAsync( } _channel.Writer.Complete(); - } - catch (Exception ex) { + } catch (Exception ex) { _channel.Writer.TryComplete(ex); } } @@ -200,15 +212,15 @@ public async IAsyncEnumerator GetAsyncEnumerator( CancellationToken cancellationToken = default ) { try { - await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { + await foreach (var message in + _channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { if (message is not StreamMessage.Event e) { continue; } yield return e.ResolvedEvent; } - } - finally { + } finally { _cts.Cancel(); } } @@ -267,6 +279,7 @@ public ReadStreamResult ReadStreamAsync( Settings, deadline, userCredentials, + _messageSerializer, cancellationToken ); } @@ -308,7 +321,8 @@ async IAsyncEnumerable GetMessages() { } try { - await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token).ConfigureAwait(false)) { + await foreach (var message in _channel.Reader.ReadAllAsync(_cts.Token) + .ConfigureAwait(false)) { switch (message) { case StreamMessage.FirstStreamPosition(var streamPosition): FirstStreamPosition = streamPosition; @@ -324,8 +338,7 @@ async IAsyncEnumerable GetMessages() { yield return message; } - } - finally { + } finally { _cts.Cancel(); } } @@ -338,8 +351,12 @@ async IAsyncEnumerable GetMessages() { public Task ReadState { get; } internal ReadStreamResult( - Func> selectCallInvoker, ReadReq request, - KurrentClientSettings settings, TimeSpan? deadline, UserCredentials? userCredentials, + Func> selectCallInvoker, + ReadReq request, + KurrentClientSettings settings, + TimeSpan? deadline, + UserCredentials? userCredentials, + IMessageSerializer messageSerializer, CancellationToken cancellationToken ) { var callOptions = KurrentCallOptions.CreateStreaming( @@ -382,8 +399,7 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo .ConfigureAwait(false); tcs.SetResult(Client.ReadState.Ok); - } - else { + } else { tcs.SetResult(Client.ReadState.StreamNotFound); } } @@ -391,7 +407,9 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo await _channel.Writer.WriteAsync( response.ContentCase switch { StreamNotFound => StreamMessage.NotFound.Instance, - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + Event => new StreamMessage.Event( + ConvertToResolvedEvent(response.Event, messageSerializer) + ), ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition( new StreamPosition(response.FirstStreamPosition) ), @@ -411,8 +429,7 @@ await _channel.Writer.WriteAsync( } _channel.Writer.Complete(); - } - catch (Exception ex) { + } catch (Exception ex) { tcs.TrySetException(ex); _channel.Writer.TryComplete(ex); } @@ -424,7 +441,8 @@ public async IAsyncEnumerator GetAsyncEnumerator( CancellationToken cancellationToken = default ) { try { - await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { + await foreach (var message in + _channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { if (message is StreamMessage.NotFound) { throw new StreamNotFoundException(StreamName); } @@ -435,21 +453,24 @@ public async IAsyncEnumerator GetAsyncEnumerator( yield return e.ResolvedEvent; } - } - finally { + } finally { _cts.Cancel(); } } } - static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) => - new ResolvedEvent( + static ResolvedEvent ConvertToResolvedEvent( + Types.ReadEvent readEvent, + IMessageSerializer messageSerializer + ) => + ResolvedEvent.From( ConvertToEventRecord(readEvent.Event)!, ConvertToEventRecord(readEvent.Link), readEvent.PositionCase switch { - ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => readEvent.CommitPosition, - _ => null - } + Types.ReadEvent.PositionOneofCase.CommitPosition => readEvent.CommitPosition, + _ => null + }, + messageSerializer ); static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) => diff --git a/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs b/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs new file mode 100644 index 000000000..4a80d600c --- /dev/null +++ b/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs @@ -0,0 +1,7 @@ +using EventStore.Client.Serialization; + +namespace EventStore.Client { + public partial class KurrentClient { + readonly IMessageSerializer _messageSerializer; + } +} diff --git a/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs b/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs index 92adb172b..b381b585b 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs @@ -1,8 +1,9 @@ using System.Threading.Channels; using EventStore.Client.Diagnostics; +using EventStore.Client.Serialization; using EventStore.Client.Streams; using Grpc.Core; - +using Kurrent.Client.Core.Serialization; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; namespace EventStore.Client { @@ -64,6 +65,7 @@ public StreamSubscriptionResult SubscribeToAll( }, Settings, userCredentials, + _messageSerializer, cancellationToken ); @@ -122,6 +124,7 @@ public StreamSubscriptionResult SubscribeToStream( }, Settings, userCredentials, + _messageSerializer, cancellationToken ); @@ -175,7 +178,10 @@ async IAsyncEnumerable GetMessages() { internal StreamSubscriptionResult( Func> selectChannelInfo, - ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials, + ReadReq request, + KurrentClientSettings settings, + UserCredentials? userCredentials, + IMessageSerializer messageSerializer, CancellationToken cancellationToken ) { _request = request; @@ -208,7 +214,7 @@ async Task PumpMessages() { StreamMessage subscriptionMessage = response.ContentCase switch { Confirmation => new StreamMessage.SubscriptionConfirmation(response.Confirmation.SubscriptionId), - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, messageSerializer)), FirstStreamPosition => new StreamMessage.FirstStreamPosition(new StreamPosition(response.FirstStreamPosition)), LastStreamPosition => new StreamMessage.LastStreamPosition(new StreamPosition(response.LastStreamPosition)), LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( diff --git a/src/Kurrent.Client/Streams/KurrentClient.cs b/src/Kurrent.Client/Streams/KurrentClient.cs index 3dccf53ee..0229a7bab 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.cs @@ -1,6 +1,8 @@ using System.Text.Json; using System.Threading.Channels; +using EventStore.Client.Serialization; using Grpc.Core; +using Kurrent.Client.Core.Serialization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; @@ -23,10 +25,10 @@ public sealed partial class KurrentClient : KurrentClientBase { AllowSynchronousContinuations = true }; - readonly ILogger _log; - Lazy _batchAppenderLazy; - StreamAppender BatchAppender => _batchAppenderLazy.Value; - readonly CancellationTokenSource _disposedTokenSource; + readonly ILogger _log; + Lazy _batchAppenderLazy; + StreamAppender BatchAppender => _batchAppenderLazy.Value; + readonly CancellationTokenSource _disposedTokenSource; static readonly Dictionary> ExceptionMap = new() { [Constants.Exceptions.InvalidTransaction] = ex => new InvalidTransactionException(ex.Message, ex), @@ -66,9 +68,13 @@ public KurrentClient(IOptions options) : this(options.Val /// /// public KurrentClient(KurrentClientSettings? settings = null) : base(settings, ExceptionMap) { - _log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger(); + _log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger(); _disposedTokenSource = new CancellationTokenSource(); - _batchAppenderLazy = new Lazy(CreateStreamAppender); + _batchAppenderLazy = new Lazy(CreateStreamAppender); + + var serializationSettings = settings?.Serialization ?? KurrentClientSerializationSettings.Default(); + + _messageSerializer = MessageSerializerWrapper.From(serializationSettings); } void SwapStreamAppender(Exception ex) => diff --git a/test/Kurrent.Client.Tests.Common/Fixtures/KurrentPermanentFixture.cs b/test/Kurrent.Client.Tests.Common/Fixtures/KurrentPermanentFixture.cs index 0b657b61f..9aa9294ef 100644 --- a/test/Kurrent.Client.Tests.Common/Fixtures/KurrentPermanentFixture.cs +++ b/test/Kurrent.Client.Tests.Common/Fixtures/KurrentPermanentFixture.cs @@ -70,7 +70,8 @@ protected KurrentPermanentFixture(ConfigureFixture configure) { OperationOptions = Options.ClientSettings.OperationOptions, ConnectivitySettings = Options.ClientSettings.ConnectivitySettings, DefaultCredentials = Options.ClientSettings.DefaultCredentials, - DefaultDeadline = Options.ClientSettings.DefaultDeadline + DefaultDeadline = Options.ClientSettings.DefaultDeadline, + Serialization = Options.ClientSettings.Serialization }; InterlockedBoolean WarmUpCompleted { get; } = new InterlockedBoolean(); diff --git a/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs b/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs new file mode 100644 index 000000000..3e0405407 --- /dev/null +++ b/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs @@ -0,0 +1,81 @@ +using EventStore.Client; + +namespace Kurrent.Client.Tests.Streams.Serialization; + +[Trait("Category", "Target:Streams")] +[Trait("Category", "Operation:Append")] +public class SerializationTests(ITestOutputHelper output, SerializationTests.CustomSerializationFixture fixture) + : KurrentPermanentTests(output, fixture) { + [RetryFact] + public async Task appends_with_revision_serializes_using_default_json_serialization() { + var stream = $"{Fixture.GetStreamName()}_{0}"; + + var events = GenerateEvents(); + + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamRevision.None, + events + ); + + Assert.Equal(new(0), writeResult.NextExpectedStreamRevision); + + var resolvedEvents = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start) + .ToListAsync(); + + Assert.Single(resolvedEvents); + + var resolvedEvent = resolvedEvents.Single(); + + Assert.NotNull(resolvedEvent.DeserializedEvent); + Assert.Equal(events.First(), resolvedEvent.DeserializedEvent); + } + + [RetryFact] + public async Task appends_with_stream_state_serializes_using_default_json_serialization() { + var stream = $"{Fixture.GetStreamName()}_{StreamState.Any}"; + + var events = GenerateEvents(); + + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamState.Any, + events + ); + + Assert.Equal(new(0), writeResult.NextExpectedStreamRevision); + + var resolvedEvents = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start) + .ToListAsync(); + + Assert.Single(resolvedEvents); + + var resolvedEvent = resolvedEvents.Single(); + + Assert.NotNull(resolvedEvent.DeserializedEvent); + Assert.Equal(events.First(), resolvedEvent.DeserializedEvent); + } + + private List GenerateEvents(int count = 1) => + Enumerable.Range(0, count) + .Select( + _ => new UserRegistered( + Guid.NewGuid(), + new Address(Guid.NewGuid().ToString(), Guid.NewGuid().GetHashCode()) + ) + ).ToList(); + + public record Address(string Street, int Number); + + public record UserRegistered(Guid UserId, Address Address); + + public class CustomSerializationFixture() : KurrentPermanentFixture( + x => { + x.ClientSettings.Serialization = KurrentClientSerializationSettings + .Default() + .EnableAutomaticDeserialization(); + + return x; + } + ); +}