Skip to content

Commit

Permalink
Implement persistent subscriptions to $all:
Browse files Browse the repository at this point in the history
Update persistent subscriptions proto file

Make StreamPosition and Position implement new interface: IPosition
Convert `StreamPosition StartFrom` to `IPosition? StartFrom` in PersistentSubscriptionSettings

Move SystemStreams.cs from src/EventStore.Client.Streams to src/EventStore.Client and add the $all stream
Move SystemMetadata class to a separate file in the EventStore.Client.Streams project

Added: Create persistent subscriptions to $all

Added: Update persistent subscriptions to $all

Added: Delete persistent subscriptions to $all

Added: Read from persistent subscriptions to $all

Delete no longer valid test: create_on_all_stream.the_completion_fails_with_invalid_stream

Update existing tests to use SystemStreams.AllStream/SystemRoles.All instead of "$all"

Move all persistent subscription to stream tests to SubscriptionToStream directory

Move stream persistent subscription tests to EventStore.Client.SubscriptionToStream namespace

Added tests: create persistent subscriptions to $all

Added tests: update persistent subscriptions to $all

Added tests: delete persistent subscriptions to $all

Added tests: connect to persistent subscriptions to $all

Added tests: ACK/NAK for persistent subscriptions to $all

Add the following wrappers to the persistent subscription client: CreateToAllAsync, DeleteToAllAsync, UpdateToAllAsync, SubscribeToAllAsync

Update tests to use wrappers: CreateToAllAsync, DeleteToAllAsync, UpdateToAllAsync, SubscribeToAllAsync
  • Loading branch information
shaan1337 committed May 5, 2021
1 parent 0745490 commit fdb6281
Show file tree
Hide file tree
Showing 102 changed files with 2,480 additions and 170 deletions.
71 changes: 65 additions & 6 deletions src/EventStore.Client.Common/protos/persistentsubscriptions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ message ReadReq {
}

message Options {
event_store.client.StreamIdentifier stream_identifier = 1;
oneof stream_option {
event_store.client.StreamIdentifier stream_identifier = 1;
event_store.client.Empty all = 5;
}

string group_name = 2;
int32 buffer_size = 3;
UUIDOption uuid_option = 4;
Expand Down Expand Up @@ -89,14 +93,40 @@ message CreateReq {
Options options = 1;

message Options {
event_store.client.StreamIdentifier stream_identifier = 1;
oneof stream_option {
StreamOptions stream = 4;
AllOptions all = 5;
}
event_store.client.StreamIdentifier stream_identifier = 1 [deprecated=true];
string group_name = 2;
Settings settings = 3;
}

message StreamOptions {
event_store.client.StreamIdentifier stream_identifier = 1;
oneof revision_option {
uint64 revision = 2;
event_store.client.Empty start = 3;
event_store.client.Empty end = 4;
}
}

message AllOptions {
oneof all_option {
Position position = 1;
event_store.client.Empty start = 2;
event_store.client.Empty end = 3;
}
}

message Position {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}

message Settings {
bool resolve_links = 1;
uint64 revision = 2;
uint64 revision = 2 [deprecated = true];
bool extra_statistics = 3;
int32 max_retry_count = 5;
int32 min_checkpoint_count = 7;
Expand Down Expand Up @@ -130,14 +160,40 @@ message UpdateReq {
Options options = 1;

message Options {
event_store.client.StreamIdentifier stream_identifier = 1;
oneof stream_option {
StreamOptions stream = 4;
AllOptions all = 5;
}
event_store.client.StreamIdentifier stream_identifier = 1 [deprecated = true];
string group_name = 2;
Settings settings = 3;
}

message StreamOptions {
event_store.client.StreamIdentifier stream_identifier = 1;
oneof revision_option {
uint64 revision = 2;
event_store.client.Empty start = 3;
event_store.client.Empty end = 4;
}
}

message AllOptions {
oneof all_option {
Position position = 1;
event_store.client.Empty start = 2;
event_store.client.Empty end = 3;
}
}

message Position {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}

message Settings {
bool resolve_links = 1;
uint64 revision = 2;
uint64 revision = 2 [deprecated = true];
bool extra_statistics = 3;
int32 max_retry_count = 5;
int32 min_checkpoint_count = 7;
Expand Down Expand Up @@ -171,7 +227,10 @@ message DeleteReq {
Options options = 1;

message Options {
event_store.client.StreamIdentifier stream_identifier = 1;
oneof stream_option {
event_store.client.StreamIdentifier stream_identifier = 1;
event_store.client.Empty all = 3;
}
string group_name = 2;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,49 @@ partial class EventStorePersistentSubscriptionsClient {
[SystemConsumerStrategies.Pinned] = CreateReq.Types.ConsumerStrategy.Pinned,
};

private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string streamName, StreamPosition position) {
if (position == StreamPosition.Start) {
return new CreateReq.Types.StreamOptions {
StreamIdentifier = streamName,
Start = new Empty()
};
}

if (position == StreamPosition.End) {
return new CreateReq.Types.StreamOptions {
StreamIdentifier = streamName,
End = new Empty()
};
}

return new CreateReq.Types.StreamOptions {
StreamIdentifier = streamName,
Revision = position.ToUInt64()
};
}

private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position) {
if (position == Position.Start) {
return new CreateReq.Types.AllOptions {
Start = new Empty()
};
}

if (position == Position.End) {
return new CreateReq.Types.AllOptions {
End = new Empty()
};
}

return new CreateReq.Types.AllOptions {
Position = new CreateReq.Types.Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
};
}


/// <summary>
/// Creates a persistent subscription.
/// </summary>
Expand All @@ -39,13 +82,29 @@ public async Task CreateAsync(string streamName, string groupName,
throw new ArgumentNullException(nameof(settings));
}

if (streamName != SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is StreamPosition)) {
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream");
}

if (streamName == SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is Position)) {
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
StreamIdentifier = streamName,
Stream = streamName != SystemStreams.AllStream ?
StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null,
All = streamName == SystemStreams.AllStream ?
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End)) : null,
#pragma warning disable 612
StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
GroupName = groupName,
Settings = new CreateReq.Types.Settings {
Revision = settings.StartFrom,
#pragma warning disable 612
Revision = streamName != SystemStreams.AllStream ? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64() : default, /*for backwards compatibility*/
#pragma warning restore 612
CheckpointAfterMs = (int)settings.CheckPointAfter.TotalMilliseconds,
ExtraStatistics = settings.ExtraStatistics,
MessageTimeoutMs = (int)settings.MessageTimeout.TotalMilliseconds,
Expand All @@ -62,5 +121,24 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}

/// <summary>
/// Creates a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,36 @@ partial class EventStorePersistentSubscriptionsClient {
/// <returns></returns>
public async Task DeleteAsync(string streamName, string groupName, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var deleteOptions = new DeleteReq.Types.Options {
GroupName = groupName
};

if (streamName == SystemStreams.AllStream) {
deleteOptions.All = new Empty();
} else {
deleteOptions.StreamIdentifier = streamName;
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).DeleteAsync(new DeleteReq {
Options = new DeleteReq.Types.Options {
StreamIdentifier = streamName,
GroupName = groupName
}
Options = deleteOptions
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}

/// <summary>
/// Deletes a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task DeleteToAllAsync(string groupName, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await DeleteAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,47 @@ public async Task<PersistentSubscription> SubscribeAsync(string streamName, stri
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).Read(EventStoreCallOptions.Create(
Settings, operationOptions, userCredentials, cancellationToken));

return await PersistentSubscription.Confirm(call, new ReadReq.Types.Options {
var readOptions = new ReadReq.Types.Options {
BufferSize = bufferSize,
GroupName = groupName,
StreamIdentifier = streamName,
UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()}
}, autoAck, eventAppeared, subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false);
};

if (streamName == SystemStreams.AllStream) {
readOptions.All = new Empty();
} else {
readOptions.StreamIdentifier = streamName;
}

return await PersistentSubscription.Confirm(call, readOptions, autoAck, eventAppeared,
subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Subscribes to a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="eventAppeared"></param>
/// <param name="subscriptionDropped"></param>
/// <param name="userCredentials"></param>
/// <param name="bufferSize"></param>
/// <param name="autoAck"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<PersistentSubscription> SubscribeToAllAsync(string groupName,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = null,
UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true,
CancellationToken cancellationToken = default) =>
await SubscribeAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventAppeared: eventAppeared,
subscriptionDropped: subscriptionDropped,
userCredentials: userCredentials,
bufferSize: bufferSize,
autoAck: autoAck,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
Loading

0 comments on commit fdb6281

Please sign in to comment.