Skip to content

Commit

Permalink
[DEVEX-222] Added overloads of Append methods that take regular events
Browse files Browse the repository at this point in the history
They currently use dummy serialisation
  • Loading branch information
oskardudycz committed Jan 23, 2025
1 parent f0e35bd commit 33e2f5a
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 22 deletions.
134 changes: 112 additions & 22 deletions src/Kurrent.Client/Streams/KurrentClient.Append.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using Google.Protobuf;
using EventStore.Client.Streams;
Expand All @@ -14,6 +15,75 @@

namespace EventStore.Client {
public partial class KurrentClient {
/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="expectedState">The expected <see cref="StreamState"/> of the stream to append to.</param>
/// <param name="events">Messages to append to the stream.</param>
/// <param name="configureOperationOptions">An <see cref="Action{KurrentClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="deadline"></param>
/// <param name="userCredentials">The <see cref="UserCredentials"/> for the operation.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<IWriteResult> AppendToStreamAsync(
string streamName,
StreamState expectedState,
IEnumerable<object> events,
Action<KurrentClientOperationOptions>? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
IEnumerable<EventData> serializedEvents = events.Select(
_ => new EventData(Uuid.NewUuid(), "dummy", Encoding.UTF8.GetBytes("""{"dummy":"data"}"""))
).AsEnumerable()!; // Yes, I know 😅

return AppendToStreamAsync(
streamName,
expectedState,
serializedEvents,
configureOperationOptions,
deadline,
userCredentials,
cancellationToken
);
}

/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
/// <param name="streamName">The name of the stream to append events to.</param>
/// <param name="expectedRevision">The expected <see cref="StreamRevision"/> of the stream to append to.</param>
/// <param name="events">Messages to append to the stream.</param>
/// <param name="configureOperationOptions">An <see cref="Action{KurrentClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="deadline"></param>
/// <param name="userCredentials">The <see cref="UserCredentials"/> for the operation.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<IWriteResult> AppendToStreamAsync(
string streamName,
StreamRevision expectedRevision,
IEnumerable<object> events,
Action<KurrentClientOperationOptions>? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
IEnumerable<EventData>
serializedEvents = events.Select(_ => null as EventData).AsEnumerable()!; // Yes, I know 😅

return AppendToStreamAsync(
streamName,
expectedRevision,
serializedEvents,
configureOperationOptions,
deadline,
userCredentials,
cancellationToken
);
}

/// <summary>
/// Appends events asynchronously to a stream.
/// </summary>
Expand Down Expand Up @@ -114,16 +184,28 @@ ValueTask<IWriteResult> AppendToStreamInternal(
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.Kurrent.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithRequiredTag(
TelemetryTags.Kurrent.Stream,
header.Options.StreamIdentifier.StreamName.ToStringUtf8()
)
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);
.WithOptionalTag(
TelemetryTags.Database.User,
userCredentials?.Username ?? Settings.DefaultCredentials?.Username
);

return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);
return KurrentClientDiagnostics.ActivitySource.TraceClientOperation(
Operation,
TracingConstants.Operations.Append,
tags
);

async ValueTask<IWriteResult> Operation() {
using var call = new StreamsClient(channelInfo.CallInvoker)
.Append(KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
.Append(
KurrentCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

await call.RequestStream
.WriteAsync(header)
Expand Down Expand Up @@ -160,11 +242,13 @@ await call.RequestStream
}

IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
var currentRevision = response.Success.CurrentRevisionOptionCase
== AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);

var position = response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
var position = response.Success.PositionOptionCase
== AppendResp.Types.Success.PositionOptionOneofCase.Position
? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition)
: default;

Expand All @@ -181,7 +265,8 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, KurrentClientOperationOptions operationOptions
) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase
== CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;

Expand All @@ -193,7 +278,8 @@ IWriteResult HandleWrongExpectedRevision(
);

if (operationOptions.ThrowOnAppendFailure) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision) {
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase
== ExpectedRevisionOptionOneofCase.ExpectedRevision) {
throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
Expand All @@ -215,7 +301,8 @@ IWriteResult HandleWrongExpectedRevision(
);
}

var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase == ExpectedRevisionOptionOneofCase.ExpectedRevision
var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase
== ExpectedRevisionOptionOneofCase.ExpectedRevision
? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision)
: StreamRevision.None;

Expand All @@ -227,7 +314,7 @@ IWriteResult HandleWrongExpectedRevision(
}

class StreamAppender : IDisposable {
readonly KurrentClientSettings _settings;
readonly KurrentClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action<Exception> _onException;
readonly Channel<BatchAppendReq> _channel;
Expand Down Expand Up @@ -302,8 +389,7 @@ async ValueTask<IWriteResult> Operation() {
try {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException ex) {
} catch (ChannelClosedException ex) {
// channel is closed, our tcs won't necessarily get completed, don't wait for it.
throw ex.InnerException ?? ex;
}
Expand Down Expand Up @@ -333,8 +419,7 @@ async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
_ = Task.Run(Receive, _cancellationToken);

_isUsable.TrySetResult(true);
}
catch (Exception ex) {
} catch (Exception ex) {
_isUsable.TrySetException(ex);
_onException(ex);
}
Expand All @@ -344,7 +429,8 @@ async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
async Task Send() {
if (_call is null) return;

await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
.ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);

await _call.RequestStream.CompleteAsync().ConfigureAwait(false);
Expand All @@ -354,20 +440,22 @@ async Task Receive() {
if (_call is null) return;

try {
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken).ConfigureAwait(false)) {
if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) {
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken)
.ConfigureAwait(false)) {
if (!_pendingRequests.TryRemove(
Uuid.FromDto(response.CorrelationId),
out var writeResult
)) {
continue; // TODO: Log?
}

try {
writeResult.TrySetResult(response.ToWriteResult());
}
catch (Exception ex) {
} catch (Exception ex) {
writeResult.TrySetException(ex);
}
}
}
catch (Exception ex) {
} catch (Exception ex) {
// signal that no tcs added to _pendingRequests after this point will necessarily complete
_channel.Writer.TryComplete(ex);

Expand All @@ -380,7 +468,9 @@ async Task Receive() {
}
}

IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppendReq.Types.Options options, Uuid correlationId) {
IEnumerable<BatchAppendReq> GetRequests(
IEnumerable<EventData> events, BatchAppendReq.Types.Options options, Uuid correlationId
) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using EventStore.Client;

namespace Kurrent.Client.Tests.Streams.Serialization;

[Trait("Category", "Target:Streams")]
[Trait("Category", "Operation:Append")]
public class SerializationTests : KurrentPermanentTests<KurrentPermanentFixture> {
public SerializationTests(ITestOutputHelper output, KurrentPermanentFixture fixture) : base(output, fixture) {
Fixture.ClientSettings.Serialization = KurrentClientSerializationSettings.Default();
}

[RetryFact]
public async Task appends_and_reads_with_default_serialization() {
var stream = $"{Fixture.GetStreamName()}_{StreamState.Any}";

var writeResult = await Fixture.Streams.AppendToStreamAsync(
stream,
StreamState.Any,
GenerateEvents()
);

Assert.Equal(new(0), writeResult.NextExpectedStreamRevision);

var count = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 2)
.CountAsync();

Assert.Equal(1, count);
}

private IEnumerable<UserRegistered> GenerateEvents(int count = 1) =>
Enumerable.Range(0, count)
.Select(
_ => new UserRegistered(
Guid.NewGuid(),
new Address(Guid.NewGuid().ToString(), Guid.NewGuid().GetHashCode())
)
);

public record Address(string Street, int Number);

public record UserRegistered(Guid UserId, Address Address);
}

0 comments on commit 33e2f5a

Please sign in to comment.