diff --git a/src/Kurrent.Client/Core/ResolvedEvent.cs b/src/Kurrent.Client/Core/ResolvedEvent.cs index 27cc72154..085eb0c1c 100644 --- a/src/Kurrent.Client/Core/ResolvedEvent.cs +++ b/src/Kurrent.Client/Core/ResolvedEvent.cs @@ -1,3 +1,6 @@ +using System.Diagnostics.CodeAnalysis; +using EventStore.Client.Serialization; + namespace EventStore.Client { /// /// A structure representing a single event or a resolved link event. @@ -43,23 +46,31 @@ public readonly struct ResolvedEvent { /// public bool IsResolved => Link != null && Event != null; + readonly ISchemaSerializer _serializer; + /// /// Constructs a new . /// /// /// /// - public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) { - Event = @event; - Link = link; + /// + public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition, ISchemaSerializer serializer) { + Event = @event; + Link = link; + _serializer = serializer; OriginalPosition = commitPosition.HasValue ? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition) : new Position?(); } - public bool TryDeserialize(out object o) { - o = true; - return true; +#if NET48 + public bool TryDeserialize(out object? deserialized) { +#else + public bool TryDeserialize([NotNullWhen(true)] out object? deserialized) { +#endif + deserialized = _serializer.Deserialize(OriginalEvent.Data, OriginalEvent.EventType); + return deserialized != null; } } } diff --git a/src/Kurrent.Client/Core/Serialization/EventTypeMapper.cs b/src/Kurrent.Client/Core/Serialization/EventTypeMapper.cs new file mode 100644 index 000000000..14310b31c --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/EventTypeMapper.cs @@ -0,0 +1,52 @@ +using System.Collections.Concurrent; + +namespace Kurrent.Client.Tests.Streams.Serialization; + +// TODO: Discuss how to proceed with that and whether to move the Schema Registry code here +// The scanning part and registration seems to be more robust there +// I used this for simplicity +public interface IEventTypeMapper { + void AddCustomMap(string eventTypeName); + void AddCustomMap(Type eventType, string eventTypeName); + string ToName(); + string ToName(Type eventType); + Type? ToType(string eventTypeName); +} + +public class EventTypeMapper : IEventTypeMapper { + public static readonly EventTypeMapper Instance = new(); + + private readonly ConcurrentDictionary typeMap = new(); + private readonly ConcurrentDictionary typeNameMap = new(); + + public void AddCustomMap(string eventTypeName) => AddCustomMap(typeof(T), eventTypeName); + + public void AddCustomMap(Type eventType, string eventTypeName) + { + typeNameMap.AddOrUpdate(eventType, eventTypeName, (_, typeName) => typeName); + typeMap.AddOrUpdate(eventTypeName, eventType, (_, type) => type); + } + + public string ToName() => ToName(typeof(TEventType)); + + public string ToName(Type eventType) => typeNameMap.GetOrAdd(eventType, _ => + { + var eventTypeName = eventType.FullName!; + + typeMap.TryAdd(eventTypeName, eventType); + + return eventTypeName; + }); + + public Type? ToType(string eventTypeName) => typeMap.GetOrAdd(eventTypeName, _ => + { + var type = TypeProvider.GetFirstMatchingTypeFromCurrentDomainAssembly(eventTypeName); + + if (type == null) + return null; + + typeNameMap.TryAdd(type, eventTypeName); + + return type; + }); +} diff --git a/src/Kurrent.Client/Core/Serialization/ISchemaSerializer.cs b/src/Kurrent.Client/Core/Serialization/ISchemaSerializer.cs new file mode 100644 index 000000000..d89091e14 --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/ISchemaSerializer.cs @@ -0,0 +1,7 @@ +namespace EventStore.Client.Serialization; + +public interface ISchemaSerializer { + public (ReadOnlyMemory Bytes, string typeName) Serialize(object value); + + public object? Deserialize(ReadOnlyMemory data, string typeName); +} diff --git a/src/Kurrent.Client/Core/Serialization/ISerializer.cs b/src/Kurrent.Client/Core/Serialization/ISerializer.cs new file mode 100644 index 000000000..224a4b0bd --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/ISerializer.cs @@ -0,0 +1,7 @@ +namespace EventStore.Client.Serialization; + +public interface ISerializer { + public ReadOnlyMemory Serialize(object value); + + public object? Deserialize(ReadOnlyMemory data, Type type); +} diff --git a/src/Kurrent.Client/Core/Serialization/SchemaSerializer.cs b/src/Kurrent.Client/Core/Serialization/SchemaSerializer.cs new file mode 100644 index 000000000..0bbcf683e --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/SchemaSerializer.cs @@ -0,0 +1,17 @@ +using Kurrent.Client.Tests.Streams.Serialization; + +namespace EventStore.Client.Serialization; + +public class SchemaSerializer(ISerializer serializer, IEventTypeMapper eventTypeMapper) : ISchemaSerializer { + public (ReadOnlyMemory Bytes, string typeName) Serialize(object value) { + var eventType = eventTypeMapper.ToName(value.GetType()); + var bytes = serializer.Serialize(value); + + return (bytes, eventType); + } + + public object? Deserialize(ReadOnlyMemory data, string typeName) { + var clrType = eventTypeMapper.ToType(typeName); + return clrType != null ? serializer.Deserialize(data, clrType) : null; + } +} diff --git a/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs new file mode 100644 index 000000000..39f4c85ec --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/SystemTextJsonSerializer.cs @@ -0,0 +1,14 @@ +using System.Text; +using System.Text.Json; +using EventStore.Client.Serialization; + +namespace Kurrent.Client.Core.Serialization; + +public class SystemTextJsonSerializer: ISerializer { + public ReadOnlyMemory Serialize(object value) { + return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(value)); + } + public object? Deserialize(ReadOnlyMemory data, Type type) { + return JsonSerializer.Deserialize(data.Span, type); + } +} diff --git a/src/Kurrent.Client/Core/Serialization/TypeProvider.cs b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs new file mode 100644 index 000000000..a4d7ec20e --- /dev/null +++ b/src/Kurrent.Client/Core/Serialization/TypeProvider.cs @@ -0,0 +1,26 @@ +using System.Reflection; + +namespace Kurrent.Client.Tests.Streams.Serialization; + +public static class TypeProvider +{ + public static Type? GetTypeFromAnyReferencingAssembly(string typeName) + { + var referencedAssemblies = Assembly.GetEntryAssembly()? + .GetReferencedAssemblies() + .Select(a => a.FullName); + + if (referencedAssemblies == null) + return null; + + return AppDomain.CurrentDomain.GetAssemblies() + .Where(a => referencedAssemblies.Contains(a.FullName)) + .SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName)) + .FirstOrDefault(); + } + + public static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string typeName) => + AppDomain.CurrentDomain.GetAssemblies() + .SelectMany(a => a.GetTypes().Where(x => x.FullName == typeName || x.Name == typeName)) + .FirstOrDefault(); +} diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs index 779413111..3e727c08c 100644 --- a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Read.cs @@ -1,6 +1,7 @@ using System.Threading.Channels; using EventStore.Client.PersistentSubscriptions; using EventStore.Client.Diagnostics; +using EventStore.Client.Serialization; using Grpc.Core; using static EventStore.Client.PersistentSubscriptions.PersistentSubscriptions; @@ -128,6 +129,7 @@ public PersistentSubscriptionResult SubscribeToStream( new() { Options = readOptions }, Settings, userCredentials, + _schemaSerializer, cancellationToken ); } @@ -221,9 +223,13 @@ async IAsyncEnumerable GetMessages() { } internal PersistentSubscriptionResult( - string streamName, string groupName, + string streamName, + string groupName, Func> selectChannelInfo, - ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials, + ReadReq request, + KurrentClientSettings settings, + UserCredentials? userCredentials, + ISchemaSerializer schemaSerializer, CancellationToken cancellationToken ) { StreamName = streamName; @@ -260,7 +266,7 @@ async Task PumpMessages() { response.SubscriptionConfirmation.SubscriptionId ), Event => new PersistentSubscriptionMessage.Event( - ConvertToResolvedEvent(response), + ConvertToResolvedEvent(response, schemaSerializer), response.Event.CountCase switch { ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount, _ => null @@ -363,13 +369,14 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params ResolvedEvent[] resolvedEvents) => Nack(action, reason, Array.ConvertAll(resolvedEvents, re => re.OriginalEvent.EventId)); - static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new( + static ResolvedEvent ConvertToResolvedEvent(ReadResp response, ISchemaSerializer schemaSerializer) => new( ConvertToEventRecord(response.Event.Event)!, ConvertToEventRecord(response.Event.Link), response.Event.PositionCase switch { ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition, _ => null - } + }, + schemaSerializer ); Task AckInternal(params Uuid[] eventIds) { diff --git a/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs new file mode 100644 index 000000000..d31abd6b3 --- /dev/null +++ b/src/Kurrent.Client/PersistentSubscriptions/KurrentPersistentSubscriptionsClient.Serialization.cs @@ -0,0 +1,13 @@ +using EventStore.Client.Serialization; +using Kurrent.Client.Core.Serialization; +using Kurrent.Client.Tests.Streams.Serialization; + +namespace EventStore.Client { + public partial class KurrentPersistentSubscriptionsClient { + // TODO: Resolve based on options + ISchemaSerializer _schemaSerializer = new SchemaSerializer( + new SystemTextJsonSerializer(), + EventTypeMapper.Instance + ); + } +} diff --git a/src/Kurrent.Client/Streams/KurrentClient.Append.cs b/src/Kurrent.Client/Streams/KurrentClient.Append.cs index b0b80a8cc..533c43522 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Append.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Append.cs @@ -30,6 +30,7 @@ public Task AppendToStreamAsync( string streamName, StreamState expectedState, IEnumerable events, + // TODO: I don't like those numerous options, but I'd prefer to tackle that in a dedicated PR Action? configureOperationOptions = null, TimeSpan? deadline = null, UserCredentials? userCredentials = null, @@ -65,13 +66,18 @@ public Task AppendToStreamAsync( string streamName, StreamRevision expectedRevision, IEnumerable events, + // TODO: I don't like those numerous options, but I'd prefer to tackle that in a dedicated PR Action? configureOperationOptions = null, TimeSpan? deadline = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default ) { - IEnumerable - serializedEvents = events.Select(_ => null as EventData).AsEnumerable()!; // Yes, I know 😅 + var serializedEvents = events.Select( + @event => { + var (bytes, typeName) = _schemaSerializer.Serialize(@event); + return new EventData(Uuid.NewUuid(), typeName, bytes); + } + ).AsEnumerable(); return AppendToStreamAsync( streamName, @@ -83,7 +89,7 @@ public Task AppendToStreamAsync( cancellationToken ); } - + /// /// Appends events asynchronously to a stream. /// @@ -184,16 +190,28 @@ ValueTask AppendToStreamInternal( CancellationToken cancellationToken ) { var tags = new ActivityTagsCollection() - .WithRequiredTag(TelemetryTags.Kurrent.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8()) + .WithRequiredTag( + TelemetryTags.Kurrent.Stream, + header.Options.StreamIdentifier.StreamName.ToStringUtf8() + ) .WithGrpcChannelServerTags(channelInfo) .WithClientSettingsServerTags(Settings) - .WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username); + .WithOptionalTag( + TelemetryTags.Database.User, + userCredentials?.Username ?? Settings.DefaultCredentials?.Username + ); - return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags); + return KurrentClientDiagnostics.ActivitySource.TraceClientOperation( + Operation, + TracingConstants.Operations.Append, + tags + ); async ValueTask Operation() { using var call = new StreamsClient(channelInfo.CallInvoker) - .Append(KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)); + .Append( + KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); await call.RequestStream .WriteAsync(header) @@ -230,11 +248,13 @@ await call.RequestStream } IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { - var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream + var currentRevision = response.Success.CurrentRevisionOptionCase + == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream ? StreamRevision.None : new StreamRevision(response.Success.CurrentRevision); - var position = response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position + var position = response.Success.PositionOptionCase + == AppendResp.Types.Success.PositionOptionOneofCase.Position ? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition) : default; @@ -251,7 +271,8 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) { IWriteResult HandleWrongExpectedRevision( AppendResp response, AppendReq header, KurrentClientOperationOptions operationOptions ) { - var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision + var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase + == CurrentRevisionOptionOneofCase.CurrentRevision ? new StreamRevision(response.WrongExpectedVersion.CurrentRevision) : StreamRevision.None; @@ -263,7 +284,8 @@ IWriteResult HandleWrongExpectedRevision( ); if (operationOptions.ThrowOnAppendFailure) { - if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision) { + if (response.WrongExpectedVersion.ExpectedRevisionOptionCase + == ExpectedRevisionOptionOneofCase.ExpectedRevision) { throw new WrongExpectedVersionException( header.Options.StreamIdentifier!, new StreamRevision(response.WrongExpectedVersion.ExpectedRevision), @@ -285,7 +307,8 @@ IWriteResult HandleWrongExpectedRevision( ); } - var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision + var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase + == ExpectedRevisionOptionOneofCase.ExpectedRevision ? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision) : StreamRevision.None; @@ -297,7 +320,7 @@ IWriteResult HandleWrongExpectedRevision( } class StreamAppender : IDisposable { - readonly KurrentClientSettings _settings; + readonly KurrentClientSettings _settings; readonly CancellationToken _cancellationToken; readonly Action _onException; readonly Channel _channel; @@ -372,8 +395,7 @@ async ValueTask Operation() { try { foreach (var appendRequest in GetRequests(events, options, correlationId)) await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false); - } - catch (ChannelClosedException ex) { + } catch (ChannelClosedException ex) { // channel is closed, our tcs won't necessarily get completed, don't wait for it. throw ex.InnerException ?? ex; } @@ -403,8 +425,7 @@ async Task Duplex(ValueTask channelInfoTask) { _ = Task.Run(Receive, _cancellationToken); _isUsable.TrySetResult(true); - } - catch (Exception ex) { + } catch (Exception ex) { _isUsable.TrySetException(ex); _onException(ex); } @@ -414,7 +435,8 @@ async Task Duplex(ValueTask channelInfoTask) { async Task Send() { if (_call is null) return; - await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) + await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false); await _call.RequestStream.CompleteAsync().ConfigureAwait(false); @@ -424,20 +446,22 @@ async Task Receive() { if (_call is null) return; try { - await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) { - if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) { + await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken) + .ConfigureAwait(false)) { + if (!_pendingRequests.TryRemove( + Uuid.FromDto(response.CorrelationId), + out var writeResult + )) { continue; // TODO: Log? } try { writeResult.TrySetResult(response.ToWriteResult()); - } - catch (Exception ex) { + } catch (Exception ex) { writeResult.TrySetException(ex); } } - } - catch (Exception ex) { + } catch (Exception ex) { // signal that no tcs added to _pendingRequests after this point will necessarily complete _channel.Writer.TryComplete(ex); @@ -450,7 +474,9 @@ async Task Receive() { } } - IEnumerable GetRequests(IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId) { + IEnumerable GetRequests( + IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId + ) { var batchSize = 0; var first = true; var correlationIdDto = correlationId.ToDto(); diff --git a/src/Kurrent.Client/Streams/KurrentClient.Read.cs b/src/Kurrent.Client/Streams/KurrentClient.Read.cs index 0523d9516..d30da9bc0 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Read.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Read.cs @@ -1,4 +1,5 @@ using System.Threading.Channels; +using EventStore.Client.Serialization; using EventStore.Client.Streams; using Grpc.Core; using static EventStore.Client.Streams.ReadResp; @@ -91,6 +92,7 @@ public ReadAllStreamResult ReadAllAsync( Settings, deadline, userCredentials, + _schemaSerializer, cancellationToken ); } @@ -139,8 +141,12 @@ async IAsyncEnumerable GetMessages() { } internal ReadAllStreamResult( - Func> selectCallInvoker, ReadReq request, - KurrentClientSettings settings, TimeSpan? deadline, UserCredentials? userCredentials, + Func> selectCallInvoker, + ReadReq request, + KurrentClientSettings settings, + TimeSpan? deadline, + UserCredentials? userCredentials, + ISchemaSerializer schemaSerializer, CancellationToken cancellationToken ) { var callOptions = KurrentCallOptions.CreateStreaming( @@ -172,7 +178,7 @@ async Task PumpMessages() { await _channel.Writer.WriteAsync( response.ContentCase switch { StreamNotFound => StreamMessage.NotFound.Instance, - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, schemaSerializer)), FirstStreamPosition => new StreamMessage.FirstStreamPosition(new StreamPosition(response.FirstStreamPosition)), LastStreamPosition => new StreamMessage.LastStreamPosition(new StreamPosition(response.LastStreamPosition)), LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( @@ -267,6 +273,7 @@ public ReadStreamResult ReadStreamAsync( Settings, deadline, userCredentials, + _schemaSerializer, cancellationToken ); } @@ -338,8 +345,12 @@ async IAsyncEnumerable GetMessages() { public Task ReadState { get; } internal ReadStreamResult( - Func> selectCallInvoker, ReadReq request, - KurrentClientSettings settings, TimeSpan? deadline, UserCredentials? userCredentials, + Func> selectCallInvoker, + ReadReq request, + KurrentClientSettings settings, + TimeSpan? deadline, + UserCredentials? userCredentials, + ISchemaSerializer schemaSerializer, CancellationToken cancellationToken ) { var callOptions = KurrentCallOptions.CreateStreaming( @@ -391,7 +402,7 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo await _channel.Writer.WriteAsync( response.ContentCase switch { StreamNotFound => StreamMessage.NotFound.Instance, - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, schemaSerializer)), ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition( new StreamPosition(response.FirstStreamPosition) ), @@ -442,14 +453,15 @@ public async IAsyncEnumerator GetAsyncEnumerator( } } - static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) => + static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent, ISchemaSerializer schemaSerializer) => new ResolvedEvent( ConvertToEventRecord(readEvent.Event)!, ConvertToEventRecord(readEvent.Link), readEvent.PositionCase switch { ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => readEvent.CommitPosition, _ => null - } + }, + schemaSerializer ); static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) => diff --git a/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs b/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs new file mode 100644 index 000000000..9ca19b6f6 --- /dev/null +++ b/src/Kurrent.Client/Streams/KurrentClient.Serialization.cs @@ -0,0 +1,13 @@ +using EventStore.Client.Serialization; +using Kurrent.Client.Core.Serialization; +using Kurrent.Client.Tests.Streams.Serialization; + +namespace EventStore.Client { + public partial class KurrentClient { + // TODO: Resolve based on options + ISchemaSerializer _schemaSerializer = new SchemaSerializer( + new SystemTextJsonSerializer(), + EventTypeMapper.Instance + ); + } +} diff --git a/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs b/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs index 92adb172b..f162cb1c0 100644 --- a/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs +++ b/src/Kurrent.Client/Streams/KurrentClient.Subscriptions.cs @@ -1,5 +1,6 @@ using System.Threading.Channels; using EventStore.Client.Diagnostics; +using EventStore.Client.Serialization; using EventStore.Client.Streams; using Grpc.Core; @@ -64,6 +65,7 @@ public StreamSubscriptionResult SubscribeToAll( }, Settings, userCredentials, + _schemaSerializer, cancellationToken ); @@ -122,6 +124,7 @@ public StreamSubscriptionResult SubscribeToStream( }, Settings, userCredentials, + _schemaSerializer, cancellationToken ); @@ -175,7 +178,10 @@ async IAsyncEnumerable GetMessages() { internal StreamSubscriptionResult( Func> selectChannelInfo, - ReadReq request, KurrentClientSettings settings, UserCredentials? userCredentials, + ReadReq request, + KurrentClientSettings settings, + UserCredentials? userCredentials, + ISchemaSerializer schemaSerializer, CancellationToken cancellationToken ) { _request = request; @@ -208,7 +214,7 @@ async Task PumpMessages() { StreamMessage subscriptionMessage = response.ContentCase switch { Confirmation => new StreamMessage.SubscriptionConfirmation(response.Confirmation.SubscriptionId), - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event, schemaSerializer)), FirstStreamPosition => new StreamMessage.FirstStreamPosition(new StreamPosition(response.FirstStreamPosition)), LastStreamPosition => new StreamMessage.LastStreamPosition(new StreamPosition(response.LastStreamPosition)), LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( diff --git a/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs b/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs index 15d151102..efbaed125 100644 --- a/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs +++ b/test/Kurrent.Client.Tests/Streams/Serialization/SerializationTests.cs @@ -8,9 +8,33 @@ public class SerializationTests : KurrentPermanentTests public SerializationTests(ITestOutputHelper output, KurrentPermanentFixture fixture) : base(output, fixture) { Fixture.ClientSettings.Serialization = KurrentClientSerializationSettings.Default(); } + + [RetryFact] + public async Task appends_with_revision_serializes_using_default_json_serialization() { + var stream = $"{Fixture.GetStreamName()}_{StreamState.Any}"; + + var events = GenerateEvents(); + + var writeResult = await Fixture.Streams.AppendToStreamAsync( + stream, + StreamRevision.None, + events + ); + + Assert.Equal(new(0), writeResult.NextExpectedStreamRevision); + + var resolvedEvents = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 2).ToListAsync(); + Assert.Single(resolvedEvents); + + var resolvedEvent = resolvedEvents.Single(); + + Assert.True(resolvedEvent.TryDeserialize(out var message)); + Assert.Equal(events.First(), message); + } + [RetryFact] - public async Task appends_and_reads_with_default_serialization() { + public async Task appends_with_stream_state_serializes_using_default_json_serialization() { var stream = $"{Fixture.GetStreamName()}_{StreamState.Any}"; var writeResult = await Fixture.Streams.AppendToStreamAsync( @@ -18,6 +42,8 @@ public async Task appends_and_reads_with_default_serialization() { StreamState.Any, GenerateEvents() ); + + var events = GenerateEvents(); Assert.Equal(new(0), writeResult.NextExpectedStreamRevision); @@ -28,16 +54,17 @@ public async Task appends_and_reads_with_default_serialization() { Assert.True(resolvedEvent.TryDeserialize(out var message)); Assert.NotNull(message); + Assert.Equal(events.First(), message); } - private IEnumerable GenerateEvents(int count = 1) => + private List GenerateEvents(int count = 1) => Enumerable.Range(0, count) .Select( _ => new UserRegistered( Guid.NewGuid(), new Address(Guid.NewGuid().ToString(), Guid.NewGuid().GetHashCode()) ) - ); + ).ToList(); public record Address(string Street, int Number);