diff --git a/src/EventStore.Client.Common/protos/persistentsubscriptions.proto b/src/EventStore.Client.Common/protos/persistentsubscriptions.proto index 2f17cb1ee..2f303bb7d 100644 --- a/src/EventStore.Client.Common/protos/persistentsubscriptions.proto +++ b/src/EventStore.Client.Common/protos/persistentsubscriptions.proto @@ -19,7 +19,11 @@ message ReadReq { } message Options { - event_store.client.StreamIdentifier stream_identifier = 1; + oneof stream_option { + event_store.client.StreamIdentifier stream_identifier = 1; + event_store.client.Empty all = 5; + } + string group_name = 2; int32 buffer_size = 3; UUIDOption uuid_option = 4; @@ -89,14 +93,40 @@ message CreateReq { Options options = 1; message Options { - event_store.client.StreamIdentifier stream_identifier = 1; + oneof stream_option { + StreamOptions stream = 4; + AllOptions all = 5; + } + event_store.client.StreamIdentifier stream_identifier = 1 [deprecated=true]; string group_name = 2; Settings settings = 3; } + message StreamOptions { + event_store.client.StreamIdentifier stream_identifier = 1; + oneof revision_option { + uint64 revision = 2; + event_store.client.Empty start = 3; + event_store.client.Empty end = 4; + } + } + + message AllOptions { + oneof all_option { + Position position = 1; + event_store.client.Empty start = 2; + event_store.client.Empty end = 3; + } + } + + message Position { + uint64 commit_position = 1; + uint64 prepare_position = 2; + } + message Settings { bool resolve_links = 1; - uint64 revision = 2; + uint64 revision = 2 [deprecated = true]; bool extra_statistics = 3; int32 max_retry_count = 5; int32 min_checkpoint_count = 7; @@ -130,14 +160,40 @@ message UpdateReq { Options options = 1; message Options { - event_store.client.StreamIdentifier stream_identifier = 1; + oneof stream_option { + StreamOptions stream = 4; + AllOptions all = 5; + } + event_store.client.StreamIdentifier stream_identifier = 1 [deprecated = true]; string group_name = 2; Settings settings = 3; } + message StreamOptions { + event_store.client.StreamIdentifier stream_identifier = 1; + oneof revision_option { + uint64 revision = 2; + event_store.client.Empty start = 3; + event_store.client.Empty end = 4; + } + } + + message AllOptions { + oneof all_option { + Position position = 1; + event_store.client.Empty start = 2; + event_store.client.Empty end = 3; + } + } + + message Position { + uint64 commit_position = 1; + uint64 prepare_position = 2; + } + message Settings { bool resolve_links = 1; - uint64 revision = 2; + uint64 revision = 2 [deprecated = true]; bool extra_statistics = 3; int32 max_retry_count = 5; int32 min_checkpoint_count = 7; @@ -171,7 +227,10 @@ message DeleteReq { Options options = 1; message Options { - event_store.client.StreamIdentifier stream_identifier = 1; + oneof stream_option { + event_store.client.StreamIdentifier stream_identifier = 1; + event_store.client.Empty all = 3; + } string group_name = 2; } } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs index e7b37b8cc..b82011730 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs @@ -14,6 +14,49 @@ partial class EventStorePersistentSubscriptionsClient { [SystemConsumerStrategies.Pinned] = CreateReq.Types.ConsumerStrategy.Pinned, }; + private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string streamName, StreamPosition position) { + if (position == StreamPosition.Start) { + return new CreateReq.Types.StreamOptions { + StreamIdentifier = streamName, + Start = new Empty() + }; + } + + if (position == StreamPosition.End) { + return new CreateReq.Types.StreamOptions { + StreamIdentifier = streamName, + End = new Empty() + }; + } + + return new CreateReq.Types.StreamOptions { + StreamIdentifier = streamName, + Revision = position.ToUInt64() + }; + } + + private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position) { + if (position == Position.Start) { + return new CreateReq.Types.AllOptions { + Start = new Empty() + }; + } + + if (position == Position.End) { + return new CreateReq.Types.AllOptions { + End = new Empty() + }; + } + + return new CreateReq.Types.AllOptions { + Position = new CreateReq.Types.Position { + CommitPosition = position.CommitPosition, + PreparePosition = position.PreparePosition + } + }; + } + + /// /// Creates a persistent subscription. /// @@ -39,13 +82,29 @@ public async Task CreateAsync(string streamName, string groupName, throw new ArgumentNullException(nameof(settings)); } + if (streamName != SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is StreamPosition)) { + throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream"); + } + + if (streamName == SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is Position)) { + throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}"); + } + await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq { Options = new CreateReq.Types.Options { - StreamIdentifier = streamName, + Stream = streamName != SystemStreams.AllStream ? + StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null, + All = streamName == SystemStreams.AllStream ? + AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End)) : null, + #pragma warning disable 612 + StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/ + #pragma warning restore 612 GroupName = groupName, Settings = new CreateReq.Types.Settings { - Revision = settings.StartFrom, + #pragma warning disable 612 + Revision = streamName != SystemStreams.AllStream ? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64() : default, /*for backwards compatibility*/ + #pragma warning restore 612 CheckpointAfterMs = (int)settings.CheckPointAfter.TotalMilliseconds, ExtraStatistics = settings.ExtraStatistics, MessageTimeoutMs = (int)settings.MessageTimeout.TotalMilliseconds, @@ -62,5 +121,24 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); } + + /// + /// Creates a persistent subscription to $all. + /// + /// + /// + /// + /// + /// + public async Task CreateToAllAsync(string groupName, + PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) => + await CreateAsync( + streamName: SystemStreams.AllStream, + groupName: groupName, + settings: settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken) + .ConfigureAwait(false); } } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs index 962acd256..a8ce1c970 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs @@ -15,13 +15,36 @@ partial class EventStorePersistentSubscriptionsClient { /// public async Task DeleteAsync(string streamName, string groupName, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { + var deleteOptions = new DeleteReq.Types.Options { + GroupName = groupName + }; + + if (streamName == SystemStreams.AllStream) { + deleteOptions.All = new Empty(); + } else { + deleteOptions.StreamIdentifier = streamName; + } + await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).DeleteAsync(new DeleteReq { - Options = new DeleteReq.Types.Options { - StreamIdentifier = streamName, - GroupName = groupName - } + Options = deleteOptions }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); } + + /// + /// Deletes a persistent subscription to $all. + /// + /// + /// + /// + /// + public async Task DeleteToAllAsync(string groupName, UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) => + await DeleteAsync( + streamName: SystemStreams.AllStream, + groupName: groupName, + userCredentials: userCredentials, + cancellationToken: cancellationToken) + .ConfigureAwait(false); } } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs index 279231f4a..76afaeef0 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs @@ -57,12 +57,47 @@ public async Task SubscribeAsync(string streamName, stri await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).Read(EventStoreCallOptions.Create( Settings, operationOptions, userCredentials, cancellationToken)); - return await PersistentSubscription.Confirm(call, new ReadReq.Types.Options { + var readOptions = new ReadReq.Types.Options { BufferSize = bufferSize, GroupName = groupName, - StreamIdentifier = streamName, UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()} - }, autoAck, eventAppeared, subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false); + }; + + if (streamName == SystemStreams.AllStream) { + readOptions.All = new Empty(); + } else { + readOptions.StreamIdentifier = streamName; + } + + return await PersistentSubscription.Confirm(call, readOptions, autoAck, eventAppeared, + subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false); } + + /// + /// Subscribes to a persistent subscription to $all. + /// + /// + /// + /// + /// + /// + /// + /// + /// + public async Task SubscribeToAllAsync(string groupName, + Func eventAppeared, + Action? subscriptionDropped = null, + UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true, + CancellationToken cancellationToken = default) => + await SubscribeAsync( + streamName: SystemStreams.AllStream, + groupName: groupName, + eventAppeared: eventAppeared, + subscriptionDropped: subscriptionDropped, + userCredentials: userCredentials, + bufferSize: bufferSize, + autoAck: autoAck, + cancellationToken: cancellationToken) + .ConfigureAwait(false); } } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs index c299deeed..b70669f85 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs @@ -14,6 +14,49 @@ public partial class EventStorePersistentSubscriptionsClient { [SystemConsumerStrategies.Pinned] = UpdateReq.Types.ConsumerStrategy.Pinned, }; + private static UpdateReq.Types.StreamOptions StreamOptionsForUpdateProto(string streamName, StreamPosition position) { + if (position == StreamPosition.Start) { + return new UpdateReq.Types.StreamOptions { + StreamIdentifier = streamName, + Start = new Empty() + }; + } + + if (position == StreamPosition.End) { + return new UpdateReq.Types.StreamOptions { + StreamIdentifier = streamName, + End = new Empty() + }; + } + + return new UpdateReq.Types.StreamOptions { + StreamIdentifier = streamName, + Revision = position.ToUInt64() + }; + } + + private static UpdateReq.Types.AllOptions AllOptionsForUpdateProto(Position position) { + if (position == Position.Start) { + return new UpdateReq.Types.AllOptions { + Start = new Empty() + }; + } + + if (position == Position.End) { + return new UpdateReq.Types.AllOptions { + End = new Empty() + }; + } + + return new UpdateReq.Types.AllOptions { + Position = new UpdateReq.Types.Position { + CommitPosition = position.CommitPosition, + PreparePosition = position.PreparePosition + } + }; + } + + /// /// Updates a persistent subscription. /// @@ -39,13 +82,29 @@ public async Task UpdateAsync(string streamName, string groupName, PersistentSub throw new ArgumentNullException(nameof(settings)); } + if (streamName != SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is StreamPosition)) { + throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream"); + } + + if (streamName == SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is Position)) { + throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}"); + } + await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).UpdateAsync(new UpdateReq { Options = new UpdateReq.Types.Options { - StreamIdentifier = streamName, GroupName = groupName, + Stream = streamName != SystemStreams.AllStream ? + StreamOptionsForUpdateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null, + All = streamName == SystemStreams.AllStream ? + AllOptionsForUpdateProto((Position)(settings.StartFrom ?? Position.End)) : null, + #pragma warning disable 612 + StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/ + #pragma warning restore 612 Settings = new UpdateReq.Types.Settings { - Revision = settings.StartFrom, + #pragma warning disable 612 + Revision = streamName != SystemStreams.AllStream ? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64() : default, /*for backwards compatibility*/ + #pragma warning restore 612 CheckpointAfterMs = (int)settings.CheckPointAfter.TotalMilliseconds, ExtraStatistics = settings.ExtraStatistics, MessageTimeoutMs = (int)settings.MessageTimeout.TotalMilliseconds, @@ -62,5 +121,24 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).UpdateAsync(ne } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); } + + /// + /// Updates a persistent subscription to $all. + /// + /// + /// + /// + /// + /// + public async Task UpdateToAllAsync(string groupName, PersistentSubscriptionSettings settings, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) => + await UpdateAsync( + streamName: SystemStreams.AllStream, + groupName: groupName, + settings: settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken) + .ConfigureAwait(false); } } diff --git a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscriptionSettings.cs b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscriptionSettings.cs index e5f32f001..789413ed3 100644 --- a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscriptionSettings.cs +++ b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscriptionSettings.cs @@ -12,9 +12,9 @@ public sealed class PersistentSubscriptionSettings { public readonly bool ResolveLinkTos; /// - /// Which event position in the stream the subscription should start from. + /// Which event position in the stream or transaction file the subscription should start from. /// - public readonly StreamPosition StartFrom; + public readonly IPosition? StartFrom; /// /// Whether to track latency statistics on this subscription. @@ -22,7 +22,7 @@ public sealed class PersistentSubscriptionSettings { public readonly bool ExtraStatistics; /// - /// The amount of time after which to consider a message as timedout and retried. + /// The amount of time after which to consider a message as timed out and retried. /// public readonly TimeSpan MessageTimeout; @@ -88,14 +88,13 @@ public sealed class PersistentSubscriptionSettings { /// /// /// - public PersistentSubscriptionSettings(bool resolveLinkTos = false, StreamPosition? startFrom = null, + public PersistentSubscriptionSettings(bool resolveLinkTos = false, IPosition? startFrom = null, bool extraStatistics = false, TimeSpan? messageTimeout = null, int maxRetryCount = 500, int liveBufferSize = 500, int readBatchSize = 10, int historyBufferSize = 20, TimeSpan? checkPointAfter = null, int minCheckPointCount = 10, int maxCheckPointCount = 1000, int maxSubscriberCount = 0, string namedConsumerStrategy = SystemConsumerStrategies.RoundRobin) { messageTimeout ??= TimeSpan.FromSeconds(30); checkPointAfter ??= TimeSpan.FromSeconds(2); - startFrom ??= StreamPosition.End; if (messageTimeout.Value < TimeSpan.Zero || messageTimeout.Value.TotalMilliseconds > int.MaxValue) { throw new ArgumentOutOfRangeException( @@ -110,7 +109,7 @@ public PersistentSubscriptionSettings(bool resolveLinkTos = false, StreamPositio } ResolveLinkTos = resolveLinkTos; - StartFrom = startFrom.Value; + StartFrom = startFrom; ExtraStatistics = extraStatistics; MessageTimeout = messageTimeout.Value; MaxRetryCount = maxRetryCount; diff --git a/src/EventStore.Client.Streams/SystemStreams.cs b/src/EventStore.Client.Streams/SystemMetadata.cs similarity index 51% rename from src/EventStore.Client.Streams/SystemStreams.cs rename to src/EventStore.Client.Streams/SystemMetadata.cs index 239c97ea8..7cce81c90 100644 --- a/src/EventStore.Client.Streams/SystemStreams.cs +++ b/src/EventStore.Client.Streams/SystemMetadata.cs @@ -1,53 +1,4 @@ -#nullable enable namespace EventStore.Client { - /// - /// A collection of constants and methods to identify streams. - /// - public static class SystemStreams { - /// - /// A stream containing links pointing to each stream in the EventStoreDB. - /// - public const string StreamsStream = "$streams"; - - /// - /// A stream containing system settings. - /// - public const string SettingsStream = "$settings"; - - /// - /// A stream containing statistics. - /// - public const string StatsStreamPrefix = "$stats"; - - /// - /// Returns True if the stream is a system stream. - /// - /// - /// - public static bool IsSystemStream(string streamId) => streamId.Length != 0 && streamId[0] == '$'; - - /// - /// Returns the metadata stream of the stream. - /// - /// - /// - public static string MetastreamOf(string streamId) => "$$" + streamId; - - /// - /// Returns true if the stream is a metadata stream. - /// - /// - /// - public static bool IsMetastream(string streamId) => streamId[..2] == "$$"; - - /// - /// Returns the original stream of the metadata stream. - /// - /// - /// - public static string OriginalStreamOf(string metastreamId) => metastreamId[2..]; - } - /// ///Constants for information in stream metadata /// @@ -97,23 +48,23 @@ internal static class SystemMetadata { public const string AclDelete = "$d"; /// - /// to read metadata + /// to read metadata /// public const string AclMetaRead = "$mr"; /// - /// to write metadata + /// to write metadata /// public const string AclMetaWrite = "$mw"; /// - /// The user default acl stream + /// The user default acl stream /// public const string UserStreamAcl = "$userStreamAcl"; /// - /// the system stream defaults acl stream + /// the system stream defaults acl stream /// public const string SystemStreamAcl = "$systemStreamAcl"; } diff --git a/src/EventStore.Client/IPosition.cs b/src/EventStore.Client/IPosition.cs new file mode 100644 index 000000000..e3c5da9bc --- /dev/null +++ b/src/EventStore.Client/IPosition.cs @@ -0,0 +1,7 @@ +namespace EventStore.Client { + /// + /// Represents the position in a stream or transaction file + /// + public interface IPosition { + } +} diff --git a/src/EventStore.Client/Position.cs b/src/EventStore.Client/Position.cs index 59d6988f4..77dcd6262 100644 --- a/src/EventStore.Client/Position.cs +++ b/src/EventStore.Client/Position.cs @@ -6,7 +6,7 @@ namespace EventStore.Client { /// A structure referring to a potential logical record position /// in the Event Store transaction file. /// - public readonly struct Position : IEquatable, IComparable { + public readonly struct Position : IEquatable, IComparable, IPosition { /// /// Position representing the start of the transaction file /// diff --git a/src/EventStore.Client/StreamPosition.cs b/src/EventStore.Client/StreamPosition.cs index 99e606898..1ff7aa4da 100644 --- a/src/EventStore.Client/StreamPosition.cs +++ b/src/EventStore.Client/StreamPosition.cs @@ -5,7 +5,7 @@ namespace EventStore.Client { /// /// A structure referring to an 's position within a stream. /// - public readonly struct StreamPosition : IEquatable, IComparable { + public readonly struct StreamPosition : IEquatable, IComparable, IPosition { private readonly ulong _value; /// diff --git a/src/EventStore.Client/SystemStreams.cs b/src/EventStore.Client/SystemStreams.cs new file mode 100644 index 000000000..8cf9b64dc --- /dev/null +++ b/src/EventStore.Client/SystemStreams.cs @@ -0,0 +1,55 @@ +#nullable enable +namespace EventStore.Client { + /// + /// A collection of constants and methods to identify streams. + /// + public static class SystemStreams { + /// + /// A stream containing all events in the EventStoreDB transaction file. + /// + public const string AllStream = "$all"; + + /// + /// A stream containing links pointing to each stream in the EventStoreDB. + /// + public const string StreamsStream = "$streams"; + + /// + /// A stream containing system settings. + /// + public const string SettingsStream = "$settings"; + + /// + /// A stream containing statistics. + /// + public const string StatsStreamPrefix = "$stats"; + + /// + /// Returns True if the stream is a system stream. + /// + /// + /// + public static bool IsSystemStream(string streamId) => streamId.Length != 0 && streamId[0] == '$'; + + /// + /// Returns the metadata stream of the stream. + /// + /// + /// + public static string MetastreamOf(string streamId) => "$$" + streamId; + + /// + /// Returns true if the stream is a metadata stream. + /// + /// + /// + public static bool IsMetastream(string streamId) => streamId[..2] == "$$"; + + /// + /// Returns the original stream of the metadata stream. + /// + /// + /// + public static string OriginalStreamOf(string metastreamId) => metastreamId[2..]; + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/a_nak_in_autoack_mode_drops_the_subscription.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/a_nak_in_autoack_mode_drops_the_subscription.cs new file mode 100644 index 000000000..3c9d695f4 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/a_nak_in_autoack_mode_drops_the_subscription.cs @@ -0,0 +1,56 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class a_nak_in_autoack_mode_drops_the_subscription + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "naktest"; + + + public a_nak_in_autoack_mode_drops_the_subscription(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_dropped() { + var (reason, exception) = await _fixture.SubscriptionDropped.WithTimeout(TimeSpan.FromSeconds(10)); + Assert.Equal(SubscriptionDroppedReason.SubscriberError, reason); + var ex = Assert.IsType(exception); + Assert.Equal("test", ex.Message); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _subscriptionDroppedSource; + + public Task<(SubscriptionDroppedReason reason, Exception exception)> SubscriptionDropped => + _subscriptionDroppedSource.Task; + + public readonly EventData[] Events; + private PersistentSubscription _subscription; + + public Fixture() { + _subscriptionDroppedSource = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); + Events = CreateTestEvents().ToArray(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start), TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + delegate { + throw new Exception("test"); + }, (subscription, reason, ex) => _subscriptionDroppedSource.SetResult((reason, ex)), TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/can_create_duplicate_name_on_different_streams.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/can_create_duplicate_name_on_different_streams.cs new file mode 100644 index 000000000..d3c2ccf9d --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/can_create_duplicate_name_on_different_streams.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class can_create_duplicate_name_on_different_streams + : IClassFixture { + public can_create_duplicate_name_on_different_streams(Fixture fixture) { + _fixture = fixture; + } + + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + + protected override Task When() => + Client.CreateToAllAsync("group3211", + new PersistentSubscriptionSettings(), TestCredentials.Root); + } + + [Fact] + public Task the_completion_succeeds() => + _fixture.Client.CreateAsync("someother", + "group3211", new PersistentSubscriptionSettings(), TestCredentials.Root); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_max_one_client.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_max_one_client.cs new file mode 100644 index 000000000..6f85a766e --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_max_one_client.cs @@ -0,0 +1,41 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_max_one_client + : IClassFixture { + private const string Group = "maxoneclient"; + + private readonly Fixture _fixture; + + public connect_to_existing_with_max_one_client(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_second_subscription_fails_to_connect() { + using var first = await _fixture.Client.SubscribeToAllAsync(Group, + delegate { return Task.CompletedTask; }, + userCredentials: TestCredentials.Root).WithTimeout(); + + var ex = await Assert.ThrowsAsync(async () => { + using var _ = await _fixture.Client.SubscribeToAllAsync(Group, + delegate { return Task.CompletedTask; }, + userCredentials: TestCredentials.Root); + }).WithTimeout(); + + Assert.Equal(SystemStreams.AllStream, ex.StreamName); + Assert.Equal(Group, ex.GroupName); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => + Client.CreateToAllAsync( + Group, + new PersistentSubscriptionSettings(maxSubscriberCount: 1), + TestCredentials.Root); + + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_permissions.cs new file mode 100644 index 000000000..ba17cafa0 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_permissions.cs @@ -0,0 +1,38 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_permissions + : IClassFixture { + private const string Group = "connectwithpermissions"; + + + private readonly Fixture _fixture; + + public connect_to_existing_with_permissions(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_succeeds() { + var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); + using var subscription = await _fixture.Client.SubscribeToAllAsync(Group, + delegate { return Task.CompletedTask; }, (s, reason, ex) => dropped.TrySetResult((reason, ex)), + TestCredentials.Root).WithTimeout(); + Assert.NotNull(subscription); + + await Assert.ThrowsAsync(() => dropped.Task.WithTimeout()); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => + Client.CreateToAllAsync( + Group, + new PersistentSubscriptionSettings(), + TestCredentials.Root); + + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs new file mode 100644 index 000000000..799b3c432 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_beginning.cs @@ -0,0 +1,65 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_start_from_beginning + : IClassFixture { + private readonly Fixture _fixture; + + private const string Group = "startfrombeginning"; + + + + public connect_to_existing_with_start_from_beginning(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_event_zero_as_its_first_event() { + var resolvedEvent = await _fixture.FirstEvent.WithTimeout(TimeSpan.FromSeconds(10)); + Assert.Equal(_fixture.Events[0].Event.EventId, resolvedEvent.Event.EventId); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstEventSource; + public Task FirstEvent => _firstEventSource.Task; + public ResolvedEvent[] Events; + private PersistentSubscription _subscription; + + public Fixture() { + _firstEventSource = new TaskCompletionSource(); + } + + protected override async Task Given() { + //append 10 events to random streams to make sure we have at least 10 events in the transaction file + foreach (var @event in CreateTestEvents(10)) { + await StreamsClient.AppendToStreamAsync(Guid.NewGuid().ToString(), StreamState.NoStream, new []{ @event }); + } + Events = await StreamsClient.ReadAllAsync(Direction.Forwards, Position.Start, 10, userCredentials: TestCredentials.Root).ToArrayAsync(); + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start), TestCredentials.Root); + } + + protected override async Task When() { + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => { + _firstEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstEventSource.TrySetException(ex!); + } + }, TestCredentials.Root); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs new file mode 100644 index 000000000..bba316a72 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set.cs @@ -0,0 +1,66 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_start_from_not_set + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "startfromend1"; + + + + public connect_to_existing_with_start_from_not_set( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_no_non_system_events() { + await Assert.ThrowsAsync(() => _fixture.FirstNonSystemEvent.WithTimeout()); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstNonSystemEventSource; + public Task FirstNonSystemEvent => _firstNonSystemEventSource.Task; + private PersistentSubscription _subscription; + + public Fixture() { + _firstNonSystemEventSource = new TaskCompletionSource(); + } + + protected override async Task Given() { + foreach (var @event in CreateTestEvents(10)) { + await StreamsClient.AppendToStreamAsync("non-system-stream-" + Guid.NewGuid(), + StreamState.Any, new[] {@event}); + } + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(), TestCredentials.Root); + } + + protected override async Task When() { + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => { + if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { + return Task.CompletedTask; + } + _firstNonSystemEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstNonSystemEventSource.TrySetException(ex!); + } + }, TestCredentials.Root); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs new file mode 100644 index 000000000..9582c92b2 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_not_set_then_event_written.cs @@ -0,0 +1,74 @@ +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class + connect_to_existing_with_start_from_not_set_then_event_written + : IClassFixture< + connect_to_existing_with_start_from_not_set_then_event_written + .Fixture> { + private readonly Fixture _fixture; + private const string Group = "startfromnotset2"; + + + public + connect_to_existing_with_start_from_not_set_then_event_written( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_the_written_event_as_its_first_non_system_event() { + var resolvedEvent = await _fixture.FirstNonSystemEvent.WithTimeout(); + Assert.Equal(_fixture.ExpectedEvent.EventId, resolvedEvent.Event.EventId); + Assert.Equal(_fixture.ExpectedStreamId, resolvedEvent.Event.EventStreamId); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstNonSystemEventSource; + public Task FirstNonSystemEvent => _firstNonSystemEventSource.Task; + private PersistentSubscription _subscription; + public readonly EventData ExpectedEvent; + public readonly string ExpectedStreamId; + + public Fixture() { + _firstNonSystemEventSource = new TaskCompletionSource(); + ExpectedEvent = CreateTestEvents(1).First(); + ExpectedStreamId = Guid.NewGuid().ToString(); + } + + protected override async Task Given() { + foreach (var @event in CreateTestEvents(10)) { + await StreamsClient.AppendToStreamAsync("non-system-stream-" + Guid.NewGuid(), + StreamState.Any, new[] {@event}); + } + + await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(), TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => { + if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { + return Task.CompletedTask; + } + _firstNonSystemEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstNonSystemEventSource.TrySetException(ex!); + } + }, TestCredentials.Root); + } + + protected override async Task When() { + await StreamsClient.AppendToStreamAsync(ExpectedStreamId, StreamState.NoStream, new []{ ExpectedEvent }); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs new file mode 100644 index 000000000..620720405 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position.cs @@ -0,0 +1,66 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_start_from_set_to_end_position + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "startfromend1"; + + + + public connect_to_existing_with_start_from_set_to_end_position( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_no_non_system_events() { + await Assert.ThrowsAsync(() => _fixture.FirstNonSystemEvent.WithTimeout()); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstNonSystemEventSource; + public Task FirstNonSystemEvent => _firstNonSystemEventSource.Task; + private PersistentSubscription _subscription; + + public Fixture() { + _firstNonSystemEventSource = new TaskCompletionSource(); + } + + protected override async Task Given() { + foreach (var @event in CreateTestEvents(10)) { + await StreamsClient.AppendToStreamAsync("non-system-stream-" + Guid.NewGuid(), + StreamState.Any, new[] {@event}); + } + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.End), TestCredentials.Root); + } + + protected override async Task When() { + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => { + if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { + return Task.CompletedTask; + } + _firstNonSystemEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstNonSystemEventSource.TrySetException(ex!); + } + }, TestCredentials.Root); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs new file mode 100644 index 000000000..db67ccb94 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_end_position_then_event_written.cs @@ -0,0 +1,74 @@ +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class + connect_to_existing_with_start_from_set_to_end_position_then_event_written + : IClassFixture< + connect_to_existing_with_start_from_set_to_end_position_then_event_written + .Fixture> { + private readonly Fixture _fixture; + private const string Group = "startfromnotset2"; + + + public + connect_to_existing_with_start_from_set_to_end_position_then_event_written( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_the_written_event_as_its_first_non_system_event() { + var resolvedEvent = await _fixture.FirstNonSystemEvent.WithTimeout(); + Assert.Equal(_fixture.ExpectedEvent.EventId, resolvedEvent.Event.EventId); + Assert.Equal(_fixture.ExpectedStreamId, resolvedEvent.Event.EventStreamId); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstNonSystemEventSource; + public Task FirstNonSystemEvent => _firstNonSystemEventSource.Task; + private PersistentSubscription _subscription; + public readonly EventData ExpectedEvent; + public readonly string ExpectedStreamId; + + public Fixture() { + _firstNonSystemEventSource = new TaskCompletionSource(); + ExpectedEvent = CreateTestEvents(1).First(); + ExpectedStreamId = Guid.NewGuid().ToString(); + } + + protected override async Task Given() { + foreach (var @event in CreateTestEvents(10)) { + await StreamsClient.AppendToStreamAsync("non-system-stream-" + Guid.NewGuid(), + StreamState.Any, new[] {@event}); + } + + await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(startFrom: Position.End), TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => { + if (SystemStreams.IsSystemStream(e.OriginalStreamId)) { + return Task.CompletedTask; + } + _firstNonSystemEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstNonSystemEventSource.TrySetException(ex!); + } + }, TestCredentials.Root); + } + + protected override async Task When() { + await StreamsClient.AppendToStreamAsync(ExpectedStreamId, StreamState.NoStream, new []{ ExpectedEvent }); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs new file mode 100644 index 000000000..2203646fa --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_invalid_middle_position.cs @@ -0,0 +1,57 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_start_from_set_to_invalid_middle_position + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "startfrominvalid1"; + + + + public connect_to_existing_with_start_from_set_to_invalid_middle_position( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_is_dropped() { + var (reason, exception) = await _fixture.Dropped.WithTimeout(); + Assert.Equal(SubscriptionDroppedReason.ServerError, reason); + Assert.IsType(exception); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _dropped; + public Task<(SubscriptionDroppedReason reason, Exception exception)> Dropped => _dropped.Task; + + private PersistentSubscription _subscription; + + public Fixture() { + _dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); + } + + protected override async Task Given() { + var invalidPosition = new Position(1L, 1L); + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: invalidPosition), TestCredentials.Root); + } + + protected override async Task When() { + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => Task.CompletedTask, + (subscription, reason, ex) => { + _dropped.TrySetResult((reason, ex)); + }, TestCredentials.Root); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs new file mode 100644 index 000000000..2ae49d7a0 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_with_start_from_set_to_valid_middle_position.cs @@ -0,0 +1,65 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_with_start_from_set_to_valid_middle_position + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "startfromvalid"; + + + + public connect_to_existing_with_start_from_set_to_valid_middle_position( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_the_event_at_the_specified_start_position_as_its_first_event() { + var resolvedEvent = await _fixture.FirstEvent.WithTimeout(); + Assert.Equal(_fixture.ExpectedEvent.OriginalPosition, resolvedEvent.Event.Position); + Assert.Equal(_fixture.ExpectedEvent.Event.EventId, resolvedEvent.Event.EventId); + Assert.Equal(_fixture.ExpectedEvent.Event.EventStreamId, resolvedEvent.Event.EventStreamId); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstEventSource; + public Task FirstEvent => _firstEventSource.Task; + private PersistentSubscription _subscription; + public ResolvedEvent ExpectedEvent { get; private set; } + + public Fixture() { + _firstEventSource = new TaskCompletionSource(); + } + + protected override async Task Given() { + var events = await StreamsClient.ReadAllAsync(Direction.Forwards, Position.Start, 10, + userCredentials: TestCredentials.Root).ToArrayAsync(); + ExpectedEvent = events[events.Length / 2]; //just a random event in the middle of the results + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: ExpectedEvent.OriginalPosition), TestCredentials.Root); + } + + protected override async Task When() { + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, r, ct) => { + _firstEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstEventSource.TrySetException(ex!); + } + }, TestCredentials.Root); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_without_permissions.cs new file mode 100644 index 000000000..145d8ab79 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_without_permissions.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_without_permissions + : IClassFixture { + + private readonly Fixture _fixture; + public connect_to_existing_without_permissions(Fixture fixture) { _fixture = fixture; } + + [Fact] + public Task throws_access_denied() => + Assert.ThrowsAsync(async () => { + using var _ = await _fixture.Client.SubscribeToAllAsync("agroupname55", + delegate { return Task.CompletedTask; }); + }).WithTimeout(); + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => + Client.CreateToAllAsync( + "agroupname55", + new PersistentSubscriptionSettings(), + TestCredentials.Root); + + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_without_read_all_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_without_read_all_permissions.cs new file mode 100644 index 000000000..19131370d --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_existing_without_read_all_permissions.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_existing_without_read_all_permissions + : IClassFixture { + + private readonly Fixture _fixture; + public connect_to_existing_without_read_all_permissions(Fixture fixture) { _fixture = fixture; } + + [Fact] + public Task throws_access_denied() => + Assert.ThrowsAsync(async () => { + using var _ = await _fixture.Client.SubscribeToAllAsync("agroupname55", + delegate { return Task.CompletedTask; }, userCredentials: TestCredentials.TestUser1); + }).WithTimeout(); + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => + Client.CreateToAllAsync( + "agroupname55", + new PersistentSubscriptionSettings(), + TestCredentials.Root); + + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_non_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_non_existing_with_permissions.cs new file mode 100644 index 000000000..26000b067 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_to_non_existing_with_permissions.cs @@ -0,0 +1,36 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_to_non_existing_with_permissions + : IClassFixture { + + private const string Group = "foo"; + + private readonly Fixture _fixture; + + public connect_to_non_existing_with_permissions(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task throws_persistent_subscription_not_found() { + var ex = await Assert.ThrowsAsync(async () => { + using var _ = await _fixture.Client.SubscribeToAllAsync( + Group, + delegate { + return Task.CompletedTask; + }, + userCredentials: TestCredentials.Root); + }).WithTimeout(); + + Assert.Equal(SystemStreams.AllStream, ex.StreamName); + Assert.Equal(Group, ex.GroupName); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs new file mode 100644 index 000000000..0c6fa949f --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/connect_with_retries.cs @@ -0,0 +1,57 @@ +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class connect_with_retries + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "retries"; + + + public connect_with_retries(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task events_are_retried_until_success() { + Assert.Equal(5, await _fixture.RetryCount.WithTimeout()); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _retryCountSource; + public Task RetryCount => _retryCountSource.Task; + private PersistentSubscription _subscription; + + public Fixture() { + _retryCountSource = new TaskCompletionSource(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start), TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + async (subscription, e, r, ct) => { + if (r > 4) { + _retryCountSource.TrySetResult(r.Value); + await subscription.Ack(e.Event.EventId); + } else { + await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, + "Not yet tried enough times", e); + } + }, autoAck: false, subscriptionDropped: (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _retryCountSource.TrySetException(ex!); + } + }, userCredentials:TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_after_deleting_the_same.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_after_deleting_the_same.cs new file mode 100644 index 000000000..25f0b080a --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_after_deleting_the_same.cs @@ -0,0 +1,30 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_after_deleting_the_same + : IClassFixture { + public create_after_deleting_the_same(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + + protected override async Task When() { + await Client.CreateToAllAsync("existing", + new PersistentSubscriptionSettings(), TestCredentials.Root); + await Client.DeleteToAllAsync("existing", + TestCredentials.Root); + } + } + + [Fact] + public async Task the_completion_succeeds() => + await _fixture.Client.CreateToAllAsync("existing", + new PersistentSubscriptionSettings(), TestCredentials.Root); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs new file mode 100644 index 000000000..573188416 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_duplicate + : IClassFixture { + public create_duplicate(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + + protected override Task When() => + Client.CreateToAllAsync("group32", + new PersistentSubscriptionSettings(), TestCredentials.Root); + } + + [Fact] + public Task the_completion_fails_with_invalid_operation_exception() => + Assert.ThrowsAsync( + () => _fixture.Client.CreateToAllAsync("group32", + new PersistentSubscriptionSettings(), + TestCredentials.Root)); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_on_all_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_on_all_stream.cs new file mode 100644 index 000000000..8774b86e8 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_on_all_stream.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_on_all_stream + : IClassFixture { + public create_on_all_stream(Fixture fixture) { + _fixture = fixture; + } + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public Task the_completion_succeeds() + => _fixture.Client.CreateToAllAsync( + "existing", new PersistentSubscriptionSettings(), TestCredentials.Root); + + [Fact] + public Task throws_argument_exception_if_wrong_start_from_type_passed() + => Assert.ThrowsAsync(() => _fixture.Client.CreateToAllAsync( + "existing", new PersistentSubscriptionSettings(startFrom: StreamPosition.End), TestCredentials.Root)); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_persistent_subscription_with_dont_timeout.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_persistent_subscription_with_dont_timeout.cs new file mode 100644 index 000000000..42ee4f76d --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_persistent_subscription_with_dont_timeout.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_with_dont_timeout + : IClassFixture { + public create_with_dont_timeout(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public Task the_subscription_is_created_without_error() => + _fixture.Client.CreateToAllAsync("dont-timeout", + new PersistentSubscriptionSettings(messageTimeout: TimeSpan.Zero), + TestCredentials.Root); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_equal_to_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_equal_to_last_indexed_position.cs new file mode 100644 index 000000000..91f1e9ff1 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_equal_to_last_indexed_position.cs @@ -0,0 +1,33 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_with_commit_position_equal_to_last_indexed_position + : IClassFixture { + public create_with_commit_position_equal_to_last_indexed_position(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + public ulong LastCommitPosition; + protected override async Task Given() { + var lastEvent = await StreamsClient.ReadAllAsync(Direction.Backwards, Position.End, 1, + userCredentials: TestCredentials.Root).FirstAsync(); + LastCommitPosition = lastEvent.OriginalPosition?.CommitPosition ?? throw new Exception(); + } + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public async Task the_completion_succeeds() => + await _fixture.Client.CreateToAllAsync("group57", + new PersistentSubscriptionSettings( + startFrom: new Position(_fixture.LastCommitPosition, _fixture.LastCommitPosition)), + TestCredentials.Root); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs new file mode 100644 index 000000000..95f5012cc --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs @@ -0,0 +1,34 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_with_commit_position_larger_than_last_indexed_position + : IClassFixture { + public create_with_commit_position_larger_than_last_indexed_position(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + public ulong LastCommitPosition; + protected override async Task Given() { + var lastEvent = await StreamsClient.ReadAllAsync(Direction.Backwards, Position.End, 1, + userCredentials: TestCredentials.Root).FirstAsync(); + LastCommitPosition = lastEvent.OriginalPosition?.CommitPosition ?? throw new Exception(); + } + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public Task fails_with_invalid_operation_exception() => + Assert.ThrowsAsync(() => + _fixture.Client.CreateToAllAsync("group57", + new PersistentSubscriptionSettings( + startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)), + TestCredentials.Root)); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_prepare_position_larger_than_commit_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_prepare_position_larger_than_commit_position.cs new file mode 100644 index 000000000..8bfefb59e --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_prepare_position_larger_than_commit_position.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_with_prepare_position_larger_than_commit_position + : IClassFixture { + public create_with_prepare_position_larger_than_commit_position(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public Task fails_with_argument_out_of_range_exception() => + Assert.ThrowsAsync(() => + _fixture.Client.CreateToAllAsync("group57", + new PersistentSubscriptionSettings( + startFrom: new Position(0, 1)), + TestCredentials.Root)); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_without_permissions.cs new file mode 100644 index 000000000..528ace63b --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_without_permissions.cs @@ -0,0 +1,25 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class create_without_permissions + : IClassFixture { + public create_without_permissions(Fixture fixture) { + _fixture = fixture; + } + + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public Task the_completion_fails_with_access_denied() => + Assert.ThrowsAsync(() => + _fixture.Client.CreateToAllAsync("group57", + new PersistentSubscriptionSettings())); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_permissions.cs new file mode 100644 index 000000000..372b377f6 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_permissions.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class deleting_existing_with_permissions + : IClassFixture { + + private readonly Fixture _fixture; + + public deleting_existing_with_permissions(Fixture fixture) { + _fixture = fixture; + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + + protected override Task When() => + Client.CreateToAllAsync("groupname123", + new PersistentSubscriptionSettings(), + TestCredentials.Root); + } + + [Fact] + public Task the_delete_of_group_succeeds() => + _fixture.Client.DeleteToAllAsync("groupname123", + TestCredentials.Root); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs new file mode 100644 index 000000000..71e09d31b --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_existing_with_subscriber.cs @@ -0,0 +1,61 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class deleting_existing_with_subscriber + : IClassFixture { + private readonly Fixture _fixture; + + + public deleting_existing_with_subscriber(Fixture fixture) { + _fixture = fixture; + } + + public class Fixture : EventStoreClientFixture { + public Task<(SubscriptionDroppedReason reason, Exception exception)> Dropped => _dropped.Task; + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _dropped; + private PersistentSubscription _subscription; + + public Fixture() { + _dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync("groupname123", + new PersistentSubscriptionSettings(), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync("groupname123", + (s, e, i, ct) => Task.CompletedTask, + (s, r, e) => _dropped.TrySetResult((r, e)), TestCredentials.Root); + } + + protected override Task When() => + Client.DeleteToAllAsync("groupname123", + TestCredentials.Root); + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + + [Fact] + public async Task the_subscription_is_dropped() { + var (reason, exception) = await _fixture.Dropped.WithTimeout(); + Assert.Equal(SubscriptionDroppedReason.ServerError, reason); + var ex = Assert.IsType(exception); + Assert.Equal(SystemStreams.AllStream, ex.StreamName); + Assert.Equal("groupname123", ex.GroupName); + } + + [Fact (Skip = "Isn't this how it should work?")] + public async Task the_subscription_is_dropped_with_not_found() { + var (reason, exception) = await _fixture.Dropped.WithTimeout(); + Assert.Equal(SubscriptionDroppedReason.ServerError, reason); + var ex = Assert.IsType(exception); + Assert.Equal(SystemStreams.AllStream, ex.StreamName); + Assert.Equal("groupname123", ex.GroupName); + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_nonexistent.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_nonexistent.cs new file mode 100644 index 000000000..c2d5c2dd0 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_nonexistent.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class deleting_nonexistent + : IClassFixture { + private readonly Fixture _fixture; + + + public deleting_nonexistent(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_delete_fails_with_argument_exception() { + await Assert.ThrowsAsync( + () => _fixture.Client.DeleteToAllAsync( + Guid.NewGuid().ToString(), TestCredentials.Root)); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_without_permissions.cs new file mode 100644 index 000000000..1c1667a74 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/deleting_without_permissions.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class deleting_without_permissions + : IClassFixture { + private readonly Fixture _fixture; + + + public deleting_without_permissions(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_delete_fails_with_access_denied() { + await Assert.ThrowsAsync( + () => _fixture.Client.DeleteToAllAsync( + Guid.NewGuid().ToString())); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_auto_ack.cs new file mode 100644 index 000000000..b76f870f2 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_auto_ack.cs @@ -0,0 +1,77 @@ +using System; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class happy_case_catching_up_to_link_to_events_auto_ack + : IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public happy_case_catching_up_to_link_to_events_auto_ack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount) + .Select((e, i) => new EventData(e.EventId, SystemEventTypes.LinkTo, + Encoding.UTF8.GetBytes($"{i}@test"), + contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream)) + .ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); + } + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, retryCount, ct) => { + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + + return Task.CompletedTask; + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs new file mode 100644 index 000000000..9323f1b79 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_link_to_events_manual_ack.cs @@ -0,0 +1,78 @@ +using System; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class happy_case_catching_up_to_link_to_events_manual_ack + : IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public happy_case_catching_up_to_link_to_events_manual_ack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount) + .Select((e, i) => new EventData(e.EventId, SystemEventTypes.LinkTo, + Encoding.UTF8.GetBytes($"{i}@test"), + contentType: Constants.Metadata.ContentTypes.ApplicationOctetStream)) + .ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); + } + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + async (subscription, e, retryCount, ct) => { + await subscription.Ack(e); + + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, + autoAck: false, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_auto_ack.cs new file mode 100644 index 000000000..0c67e6ade --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_auto_ack.cs @@ -0,0 +1,72 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class happy_case_catching_up_to_normal_events_auto_ack + : IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public happy_case_catching_up_to_normal_events_auto_ack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount).ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-"+Guid.NewGuid(), StreamState.Any, new[] {e}); + } + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, retryCount, ct) => { + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + + return Task.CompletedTask; + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs new file mode 100644 index 000000000..8e861dbec --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_catching_up_to_normal_events_manual_ack.cs @@ -0,0 +1,73 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class happy_case_catching_up_to_normal_events_manual_ack : + IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public happy_case_catching_up_to_normal_events_manual_ack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount).ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); + } + + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + async(subscription, e, retryCount, ct) => { + await subscription.Ack(e); + + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, + autoAck: false, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs new file mode 100644 index 000000000..b17c386a4 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs @@ -0,0 +1,72 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class happy_case_writing_and_subscribing_to_normal_events_auto_ack + : IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public happy_case_writing_and_subscribing_to_normal_events_auto_ack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount).ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.End, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + (subscription, e, retryCount, ct) => { + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + + return Task.CompletedTask; + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, autoAck: true, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override async Task When() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); + } + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs new file mode 100644 index 000000000..ed25ab4bc --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs @@ -0,0 +1,71 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class happy_case_writing_and_subscribing_to_normal_events_manual_ack + : IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public happy_case_writing_and_subscribing_to_normal_events_manual_ack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount).ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.End, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + async (subscription, e, retryCount, ct) => { + await subscription.Ack(e); + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, autoAck: false, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override async Task When() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); + } + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing.cs new file mode 100644 index 000000000..c3d5dc1f7 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing.cs @@ -0,0 +1,30 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_existing + : IClassFixture { + + private const string Group = "existing"; + private readonly Fixture _fixture; + + public update_existing(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_completion_succeeds() { + await _fixture.Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings(), TestCredentials.Root); + } + + public class Fixture : EventStoreClientFixture { + protected override async Task Given() { + await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(), + TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_equal_to_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_equal_to_last_indexed_position.cs new file mode 100644 index 000000000..749fcc77a --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_equal_to_last_indexed_position.cs @@ -0,0 +1,39 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_existing_with_commit_position_equal_to_last_indexed_position + : IClassFixture { + public update_existing_with_commit_position_equal_to_last_indexed_position(Fixture fixture) { + _fixture = fixture; + } + + + private const string Group = "existing"; + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + public ulong LastCommitPosition; + protected override async Task Given() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(), + TestCredentials.Root); + + var lastEvent = await StreamsClient.ReadAllAsync(Direction.Backwards, Position.End, 1, + userCredentials: TestCredentials.Root).FirstAsync(); + LastCommitPosition = lastEvent.OriginalPosition?.CommitPosition ?? throw new Exception(); + } + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public async Task the_completion_succeeds() => + await _fixture.Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings( + startFrom: new Position(_fixture.LastCommitPosition, _fixture.LastCommitPosition)), + TestCredentials.Root); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs new file mode 100644 index 000000000..e17f95498 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs @@ -0,0 +1,40 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_existing_with_commit_position_larger_than_last_indexed_position + : IClassFixture { + public update_existing_with_commit_position_larger_than_last_indexed_position(Fixture fixture) { + _fixture = fixture; + } + + + private const string Group = "existing"; + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + public ulong LastCommitPosition; + protected override async Task When() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(), + TestCredentials.Root); + + var lastEvent = await StreamsClient.ReadAllAsync(Direction.Backwards, Position.End, 1, + userCredentials: TestCredentials.Root).FirstAsync(); + LastCommitPosition = lastEvent.OriginalPosition?.CommitPosition ?? throw new Exception(); + } + protected override Task Given() => Task.CompletedTask; + } + + [Fact] + public Task fails_with_invalid_operation_exception() => + Assert.ThrowsAsync(() => + _fixture.Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings( + startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)), + TestCredentials.Root)); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_subscribers.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_subscribers.cs new file mode 100644 index 000000000..97afe225d --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_subscribers.cs @@ -0,0 +1,51 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_existing_with_subscribers + : IClassFixture { + + private const string Group = "existing"; + private readonly Fixture _fixture; + + public update_existing_with_subscribers(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task existing_subscriptions_are_dropped() { + var (reason, exception) = await _fixture.Dropped.WithTimeout(TimeSpan.FromSeconds(10)); + Assert.Equal(SubscriptionDroppedReason.ServerError, reason); + var ex = Assert.IsType(exception); + Assert.Equal(SystemStreams.AllStream, ex.StreamName); + Assert.Equal(Group, ex.GroupName); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _droppedSource; + public Task<(SubscriptionDroppedReason, Exception)> Dropped => _droppedSource.Task; + private PersistentSubscription _subscription; + + public Fixture() { + _droppedSource = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + delegate { return Task.CompletedTask; }, + (subscription, reason, ex) => _droppedSource.TrySetResult((reason, ex)), TestCredentials.Root); + } + + protected override Task When() => Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings(), TestCredentials.Root); + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_without_permissions.cs new file mode 100644 index 000000000..c9f2d5d2e --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_without_permissions.cs @@ -0,0 +1,31 @@ +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_existing_without_permissions + : IClassFixture { + + private const string Group = "existing"; + private readonly Fixture _fixture; + + public update_existing_without_permissions(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_completion_fails_with_access_denied() { + await Assert.ThrowsAsync( + () => _fixture.Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings())); + } + + public class Fixture : EventStoreClientFixture { + protected override async Task Given() { + await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings(), + TestCredentials.Root); + } + + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_non_existent.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_non_existent.cs new file mode 100644 index 000000000..e62b59dc0 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_non_existent.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_non_existent + : IClassFixture { + + private const string Group = "nonexistent"; + private readonly Fixture _fixture; + + public update_non_existent(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_completion_fails_with_not_found() { + await Assert.ThrowsAsync( + () => _fixture.Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings(), TestCredentials.Root)); + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_with_prepare_position_larger_than_commit_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_with_prepare_position_larger_than_commit_position.cs new file mode 100644 index 000000000..f06d1f77c --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_with_prepare_position_larger_than_commit_position.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class update_with_prepare_position_larger_than_commit_position + : IClassFixture { + public update_with_prepare_position_larger_than_commit_position(Fixture fixture) { + _fixture = fixture; + } + + + private const string Group = "existing"; + + private readonly Fixture _fixture; + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + + [Fact] + public Task fails_with_argument_out_of_range_exception() => + Assert.ThrowsAsync(() => + _fixture.Client.UpdateToAllAsync(Group, + new PersistentSubscriptionSettings( + startFrom: new Position(0, 1)), + TestCredentials.Root)); + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs new file mode 100644 index 000000000..e240910e7 --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_subscribing_to_normal_events_manual_nack.cs @@ -0,0 +1,74 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToAll { + public class when_writing_and_subscribing_to_normal_events_manual_nack + : IClassFixture { + + private const string Group = nameof(Group); + private const int BufferCount = 10; + private const int EventWriteCount = BufferCount * 2; + + private readonly Fixture _fixture; + + public when_writing_and_subscribing_to_normal_events_manual_nack(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task Test() { + await _fixture.EventsReceived.WithTimeout(); + } + + public class Fixture : EventStoreClientFixture { + private readonly EventData[] _events; + private readonly TaskCompletionSource _eventsReceived; + public Task EventsReceived => _eventsReceived.Task; + + private PersistentSubscription _subscription; + private int _eventReceivedCount; + + public Fixture() { + _events = CreateTestEvents(EventWriteCount) + .ToArray(); + _eventsReceived = new TaskCompletionSource(); + } + + protected override async Task Given() { + await Client.CreateToAllAsync(Group, + new PersistentSubscriptionSettings(startFrom: Position.Start, resolveLinkTos: true), + TestCredentials.Root); + _subscription = await Client.SubscribeToAllAsync(Group, + async (subscription, e, retryCount, ct) => { + await subscription.Nack(PersistentSubscriptionNakEventAction.Park, "fail", e); + + if (e.OriginalStreamId.StartsWith("test-") + && Interlocked.Increment(ref _eventReceivedCount) == _events.Length) { + _eventsReceived.TrySetResult(true); + } + }, (s, r, e) => { + if (e != null) { + _eventsReceived.TrySetException(e); + } + }, + autoAck: false, + bufferSize: BufferCount, + userCredentials: TestCredentials.Root); + } + + protected override async Task When() { + foreach (var e in _events) { + await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); + } + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/a_nak_in_autoack_mode_drops_the_subscription.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/a_nak_in_autoack_mode_drops_the_subscription.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/a_nak_in_autoack_mode_drops_the_subscription.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/a_nak_in_autoack_mode_drops_the_subscription.cs index 84d95e323..2c5f96197 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/a_nak_in_autoack_mode_drops_the_subscription.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/a_nak_in_autoack_mode_drops_the_subscription.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class a_nak_in_autoack_mode_drops_the_subscription : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/can_create_duplicate_name_on_different_streams.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/can_create_duplicate_name_on_different_streams.cs similarity index 94% rename from test/EventStore.Client.PersistentSubscriptions.Tests/can_create_duplicate_name_on_different_streams.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/can_create_duplicate_name_on_different_streams.cs index 8ace9bd66..8d8facc9f 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/can_create_duplicate_name_on_different_streams.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/can_create_duplicate_name_on_different_streams.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class can_create_duplicate_name_on_different_streams : IClassFixture { public can_create_duplicate_name_on_different_streams(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_max_one_client.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs similarity index 96% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_max_one_client.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs index 66c7dcd52..e4cc756f2 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_max_one_client.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_max_one_client.cs @@ -1,8 +1,7 @@ -using System; using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_max_one_client : IClassFixture { private const string Group = "startinbeginning1"; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs similarity index 95% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs index 4a386ccd8..865240edf 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_permissions.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_permissions : IClassFixture { private const string Stream = nameof(connect_to_existing_with_permissions); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_and_events_in_it.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs index 1437e3f28..0f219b2db 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_events_in_it.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_start_from_beginning_and_events_in_it : IClassFixture { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_and_no_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_and_no_stream.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs index 11c14605d..2821b3b33 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_and_no_stream.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_beginning_and_no_stream.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_start_from_beginning_and_no_stream : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs new file mode 100644 index 000000000..b9dbacf3c --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it.cs @@ -0,0 +1,61 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToStream { + public class connect_to_existing_with_start_from_not_set_and_events_in_it + : IClassFixture { + private readonly Fixture _fixture; + private const string Group = "startinbeginning1"; + + private const string Stream = + nameof(connect_to_existing_with_start_from_not_set_and_events_in_it); + + public connect_to_existing_with_start_from_not_set_and_events_in_it( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_no_events() { + await Assert.ThrowsAsync(() => _fixture.FirstEvent.WithTimeout()); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstEventSource; + public Task FirstEvent => _firstEventSource.Task; + public readonly EventData[] Events; + private PersistentSubscription _subscription; + + public Fixture() { + _firstEventSource = new TaskCompletionSource(); + Events = CreateTestEvents(10).ToArray(); + } + + protected override async Task Given() { + await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events); + await Client.CreateAsync(Stream, Group, + new PersistentSubscriptionSettings(), TestCredentials.Root); + } + + protected override async Task When() { + _subscription = await Client.SubscribeAsync(Stream, Group, + (subscription, e, r, ct) => { + _firstEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstEventSource.TrySetException(ex!); + } + }, TestCredentials.TestUser1); + } + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs new file mode 100644 index 000000000..188b97f8f --- /dev/null +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written.cs @@ -0,0 +1,67 @@ +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace EventStore.Client.SubscriptionToStream { + public class + connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written + : IClassFixture< + connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written + .Fixture> { + private readonly Fixture _fixture; + private const string Group = "startinbeginning1"; + + private const string Stream = + nameof( + connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written + ); + + public + connect_to_existing_with_start_from_not_set_and_events_in_it_then_event_written( + Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task the_subscription_gets_the_written_event_as_its_first_event() { + var resolvedEvent = await _fixture.FirstEvent.WithTimeout(); + Assert.Equal(new StreamPosition(10), resolvedEvent.Event.EventNumber); + Assert.Equal(_fixture.Events.Last().EventId, resolvedEvent.Event.EventId); + } + + public class Fixture : EventStoreClientFixture { + private readonly TaskCompletionSource _firstEventSource; + public Task FirstEvent => _firstEventSource.Task; + public readonly EventData[] Events; + private PersistentSubscription _subscription; + + public Fixture() { + _firstEventSource = new TaskCompletionSource(); + Events = CreateTestEvents(11).ToArray(); + } + + protected override async Task Given() { + await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events.Take(10)); + await Client.CreateAsync(Stream, Group, + new PersistentSubscriptionSettings(), TestCredentials.Root); + _subscription = await Client.SubscribeAsync(Stream, Group, + (subscription, e, r, ct) => { + _firstEventSource.TrySetResult(e); + return Task.CompletedTask; + }, (subscription, reason, ex) => { + if (reason != SubscriptionDroppedReason.Disposed) { + _firstEventSource.TrySetException(ex!); + } + }, TestCredentials.TestUser1); + } + + protected override Task When() => + StreamsClient.AppendToStreamAsync(Stream, new StreamRevision(9), Events.Skip(10)); + + public override Task DisposeAsync() { + _subscription?.Dispose(); + return base.DisposeAsync(); + } + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_not_set_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs similarity index 80% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_not_set_and_events_in_it.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs index 466e69e76..cdd0bc6cc 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_not_set_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it.cs @@ -3,17 +3,17 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { - public class connect_to_existing_with_start_from_beginning_not_set_and_events_in_it - : IClassFixture { private readonly Fixture _fixture; private const string Group = "startinbeginning1"; private const string Stream = - nameof(connect_to_existing_with_start_from_beginning_not_set_and_events_in_it); + nameof(connect_to_existing_with_start_from_set_to_end_position_and_events_in_it); - public connect_to_existing_with_start_from_beginning_not_set_and_events_in_it( + public connect_to_existing_with_start_from_set_to_end_position_and_events_in_it( Fixture fixture) { _fixture = fixture; } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs similarity index 82% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs index a17ebc461..ea26e24f1 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written.cs @@ -2,22 +2,22 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class - connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written + connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written : IClassFixture< - connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written + connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written .Fixture> { private readonly Fixture _fixture; private const string Group = "startinbeginning1"; private const string Stream = nameof( - connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written + connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written ); public - connect_to_existing_with_start_from_beginning_not_set_and_events_in_it_then_event_written( + connect_to_existing_with_start_from_set_to_end_position_and_events_in_it_then_event_written( Fixture fixture) { _fixture = fixture; } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_two_and_no_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_two_and_no_stream.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs index 0db5c0295..91bc7f1f4 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_two_and_no_stream.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_two_and_no_stream.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_start_from_two_and_no_stream : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_and_events_in_it.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_and_events_in_it.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs index 54bda713b..47fe75805 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_and_events_in_it.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_start_from_x_set_and_events_in_it : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs index 240013a4e..5796c8ad5 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written : IClassFixture< connect_to_existing_with_start_from_x_set_and_events_in_it_then_event_written. diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs index 812b607a0..7a2b03af9 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_with_start_from_x_set_higher_than_x_and_events_in_it_then_event_written : IClassFixture< diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs similarity index 94% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_without_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs index 4ed92f2a5..a606dfa95 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_existing_without_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_existing_without_permissions.cs @@ -1,8 +1,7 @@ -using System; using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_existing_without_permissions : IClassFixture { private const string Stream = "$" + nameof(connect_to_existing_without_permissions); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_non_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs similarity index 75% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_non_existing_with_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs index cdde826a4..0e4995ae7 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_to_non_existing_with_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_to_non_existing_with_permissions.cs @@ -1,10 +1,12 @@ -using System; using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_to_non_existing_with_permissions : IClassFixture { + private const string Stream = nameof(connect_to_non_existing_with_permissions); + private const string Group = "foo"; + private readonly Fixture _fixture; public connect_to_non_existing_with_permissions(Fixture fixture) { @@ -13,19 +15,18 @@ public connect_to_non_existing_with_permissions(Fixture fixture) { [Fact] public async Task throws_persistent_subscription_not_found() { - var streamName = _fixture.GetStreamName(); var ex = await Assert.ThrowsAsync(async () => { using var _ = await _fixture.Client.SubscribeAsync( - streamName, - "foo", + Stream, + Group, delegate { return Task.CompletedTask; }, userCredentials: TestCredentials.Root); }).WithTimeout(); - Assert.Equal(streamName, ex.StreamName); - Assert.Equal("foo", ex.GroupName); + Assert.Equal(Stream, ex.StreamName); + Assert.Equal(Group, ex.GroupName); } public class Fixture : EventStoreClientFixture { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_with_retries.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connect_with_retries.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs index fd56a15c7..236bc80c2 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connect_with_retries.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connect_with_retries.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connect_with_retries : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/connecting_to_a_persistent_subscription.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/connecting_to_a_persistent_subscription.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs index 0fc2810e2..4b91b39a1 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/connecting_to_a_persistent_subscription.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/connecting_to_a_persistent_subscription.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class connecting_to_a_persistent_subscription : IClassFixture< diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_after_deleting_the_same.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_after_deleting_the_same.cs similarity index 95% rename from test/EventStore.Client.PersistentSubscriptions.Tests/create_after_deleting_the_same.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_after_deleting_the_same.cs index 7eecfb76d..9a321244c 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_after_deleting_the_same.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_after_deleting_the_same.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class create_after_deleting_the_same : IClassFixture { public create_after_deleting_the_same(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_duplicate.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs similarity index 94% rename from test/EventStore.Client.PersistentSubscriptions.Tests/create_duplicate.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs index 8aef97145..5367f6762 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_duplicate.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class create_duplicate : IClassFixture { public create_duplicate(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_existing_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_on_existing_stream.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/create_on_existing_stream.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_on_existing_stream.cs index fc51fd129..5d7068ded 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_existing_stream.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_on_existing_stream.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class create_on_existing_stream : IClassFixture { public create_on_existing_stream(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_non_existing_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_on_non_existing_stream.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/create_on_non_existing_stream.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_on_non_existing_stream.cs index be48cad47..30f58ea99 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_non_existing_stream.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_on_non_existing_stream.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class create_on_non_existing_stream : IClassFixture { public create_on_non_existing_stream(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_persistent_subscription_with_dont_timeout.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_persistent_subscription_with_dont_timeout.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/create_persistent_subscription_with_dont_timeout.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_persistent_subscription_with_dont_timeout.cs index 0bc40ed7c..0a9698789 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_persistent_subscription_with_dont_timeout.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_persistent_subscription_with_dont_timeout.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class create_with_dont_timeout : IClassFixture { public create_with_dont_timeout(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_without_permissions.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/create_without_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_without_permissions.cs index 53f9ff065..1ebaebe68 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_without_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_without_permissions.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class create_without_permissions : IClassFixture { public create_without_permissions(Fixture fixture) { diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_existing_with_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_permissions.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/deleting_existing_with_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_permissions.cs index a58710d6c..249cc7abe 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_existing_with_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_permissions.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class deleting_existing_with_permissions : IClassFixture { private const string Stream = nameof(deleting_existing_with_permissions); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_existing_with_subscriber.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/deleting_existing_with_subscriber.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs index e2c0281c7..1be98bd1b 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_existing_with_subscriber.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_existing_with_subscriber.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class deleting_existing_with_subscriber : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_nonexistent.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_nonexistent.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/deleting_nonexistent.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_nonexistent.cs index c4e23b351..8b7a96fa6 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_nonexistent.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_nonexistent.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class deleting_nonexistent : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_without_permissions.cs similarity index 93% rename from test/EventStore.Client.PersistentSubscriptions.Tests/deleting_without_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_without_permissions.cs index 4cbbb5aa3..abd562f3a 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/deleting_without_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/deleting_without_permissions.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class deleting_without_permissions : IClassFixture { private readonly Fixture _fixture; diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_link_to_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_auto_ack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_link_to_events_auto_ack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_auto_ack.cs index e55bd079a..eb4c93638 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_link_to_events_auto_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_auto_ack.cs @@ -4,7 +4,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class happy_case_catching_up_to_link_to_events_auto_ack : IClassFixture { private const string Stream = nameof(happy_case_catching_up_to_link_to_events_auto_ack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_link_to_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_link_to_events_manual_ack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs index 33a4dcbd6..0dfee2c0c 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_link_to_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_link_to_events_manual_ack.cs @@ -4,7 +4,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class happy_case_catching_up_to_link_to_events_manual_ack : IClassFixture { private const string Stream = nameof(happy_case_catching_up_to_link_to_events_manual_ack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_auto_ack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_normal_events_auto_ack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_auto_ack.cs index 8b4ef170d..8c6cbfb0f 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_normal_events_auto_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_auto_ack.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class happy_case_catching_up_to_normal_events_auto_ack : IClassFixture { private const string Stream = nameof(happy_case_catching_up_to_normal_events_auto_ack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_normal_events_manual_ack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs index f09f2147a..f8af434c6 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_catching_up_to_normal_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_catching_up_to_normal_events_manual_ack.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class happy_case_catching_up_to_normal_events_manual_ack : IClassFixture { private const string Stream = nameof(happy_case_catching_up_to_normal_events_manual_ack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs index 883be0ab4..591ce24c8 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_auto_ack.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class happy_case_writing_and_subscribing_to_normal_events_auto_ack : IClassFixture { private const string Stream = nameof(happy_case_writing_and_subscribing_to_normal_events_auto_ack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs index ddbcfdbd3..56359c9e3 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/happy_case_writing_and_subscribing_to_normal_events_manual_ack.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class happy_case_writing_and_subscribing_to_normal_events_manual_ack : IClassFixture { private const string Stream = nameof(happy_case_writing_and_subscribing_to_normal_events_manual_ack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/update_existing.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing.cs similarity index 94% rename from test/EventStore.Client.PersistentSubscriptions.Tests/update_existing.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing.cs index 2ae854303..25500bce5 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/update_existing.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class update_existing : IClassFixture { private const string Stream = nameof(update_existing); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/update_existing_with_subscribers.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/update_existing_with_subscribers.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs index db53cbb52..b2d53079e 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/update_existing_with_subscribers.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_subscribers.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class update_existing_with_subscribers : IClassFixture { private const string Stream = nameof(update_existing_with_subscribers); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/update_existing_without_permissions.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_without_permissions.cs similarity index 95% rename from test/EventStore.Client.PersistentSubscriptions.Tests/update_existing_without_permissions.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_without_permissions.cs index a8dde6bca..f59640f11 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/update_existing_without_permissions.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_without_permissions.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class update_existing_without_permissions : IClassFixture { private const string Stream = nameof(update_existing_without_permissions); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/update_non_existient.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_non_existent.cs similarity index 80% rename from test/EventStore.Client.PersistentSubscriptions.Tests/update_non_existient.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_non_existent.cs index 519f251bb..8d6d14853 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/update_non_existient.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_non_existent.cs @@ -2,11 +2,11 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class update_non_existent : IClassFixture { private const string Stream = nameof(update_non_existent); - private const string Group = "existing"; + private const string Group = "nonexistent"; private readonly Fixture _fixture; public update_non_existent(Fixture fixture) { @@ -15,7 +15,7 @@ public update_non_existent(Fixture fixture) { [Fact] public async Task the_completion_fails_with_not_found() { - await Assert.ThrowsAsync( + await Assert.ThrowsAsync( () => _fixture.Client.UpdateAsync(Stream, Group, new PersistentSubscriptionSettings(), TestCredentials.Root)); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/when_writing_and_subscribing_to_normal_events_manual_nack.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs similarity index 97% rename from test/EventStore.Client.PersistentSubscriptions.Tests/when_writing_and_subscribing_to_normal_events_manual_nack.cs rename to test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs index da7b3932c..a06d8e613 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/when_writing_and_subscribing_to_normal_events_manual_nack.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/when_writing_and_subscribing_to_normal_events_manual_nack.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Xunit; -namespace EventStore.Client { +namespace EventStore.Client.SubscriptionToStream { public class when_writing_and_subscribing_to_normal_events_manual_nack : IClassFixture { private const string Stream = nameof(when_writing_and_subscribing_to_normal_events_manual_nack); diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_all_stream.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_all_stream.cs deleted file mode 100644 index d9f3c4e38..000000000 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/create_on_all_stream.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using System.Threading.Tasks; -using Xunit; - -namespace EventStore.Client { - public class create_on_all_stream - : IClassFixture { - public create_on_all_stream(Fixture fixture) { - _fixture = fixture; - } - - private readonly Fixture _fixture; - - public class Fixture : EventStoreClientFixture { - protected override Task Given() => Task.CompletedTask; - protected override Task When() => Task.CompletedTask; - } - - [Fact] - public Task the_completion_fails_with_invalid_stream() => - Assert.ThrowsAsync(() => - _fixture.Client.CreateAsync("$all", "shitbird", - new PersistentSubscriptionSettings(), TestCredentials.Root)); - } -} diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs index 08de97876..968d81344 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs @@ -121,7 +121,7 @@ public class Fixture : EventStoreClientFixture { }) { } - protected override Task Given() => Client.SetStreamMetadataAsync("$all", StreamState.Any, + protected override Task Given() => Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask; diff --git a/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs b/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs index 231b0077a..ec569a38a 100644 --- a/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs +++ b/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs @@ -11,7 +11,7 @@ public abstract class SecurityFixture : EventStoreClientFixture { public const string WriteStream = nameof(WriteStream); public const string MetaReadStream = nameof(MetaReadStream); public const string MetaWriteStream = nameof(MetaWriteStream); - public const string AllStream = "$all"; + public const string AllStream = SystemStreams.AllStream; public const string NormalAllStream = nameof(NormalAllStream); public const string SystemAllStream = "$" + nameof(SystemAllStream); public const string SystemAdminStream = "$" + nameof(SystemAdminStream); diff --git a/test/EventStore.Client.Streams.Tests/read_all_events_backward.cs b/test/EventStore.Client.Streams.Tests/read_all_events_backward.cs index d614941b8..91902a58c 100644 --- a/test/EventStore.Client.Streams.Tests/read_all_events_backward.cs +++ b/test/EventStore.Client.Streams.Tests/read_all_events_backward.cs @@ -70,9 +70,9 @@ public Fixture() { } protected override async Task Given() { var result = await Client.SetStreamMetadataAsync( - "$all", + SystemStreams.AllStream, StreamState.NoStream, - new StreamMetadata(acl: new StreamAcl(readRole: "$all")), + new StreamMetadata(acl: new StreamAcl(readRole: SystemRoles.All)), userCredentials: TestCredentials.Root); await Client.AppendToStreamAsync(Stream, StreamState.NoStream, Events); diff --git a/test/EventStore.Client.Streams.Tests/read_all_events_forward.cs b/test/EventStore.Client.Streams.Tests/read_all_events_forward.cs index db088b4f1..2d3a23dbc 100644 --- a/test/EventStore.Client.Streams.Tests/read_all_events_forward.cs +++ b/test/EventStore.Client.Streams.Tests/read_all_events_forward.cs @@ -80,9 +80,9 @@ public Fixture() { protected override async Task Given() { var result = await Client.SetStreamMetadataAsync( - "$all", + SystemStreams.AllStream, StreamState.NoStream, - new StreamMetadata(acl: new StreamAcl(readRole: "$all")), + new StreamMetadata(acl: new StreamAcl(readRole: SystemRoles.All)), userCredentials: TestCredentials.Root); await Client.AppendToStreamAsync(Stream, StreamState.NoStream, Events); } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs index d9b3a0aa3..acc98a746 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs @@ -158,7 +158,7 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, public class Fixture : EventStoreClientFixture { protected override Task Given() => - Client.SetStreamMetadataAsync("$all", StreamState.NoStream, + Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.NoStream, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask; diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs index f927ed810..6b977ea7b 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs @@ -168,7 +168,7 @@ Task CheckpointReached(StreamSubscription _, Position position, CancellationToke public class Fixture : EventStoreClientFixture { public const string FilteredOutStream = nameof(FilteredOutStream); - protected override Task Given() => Client.SetStreamMetadataAsync("$all", StreamState.Any, + protected override Task Given() => Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs index 73a6c43d6..7a3722887 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs @@ -86,7 +86,7 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, public class Fixture : EventStoreClientFixture { public const string FilteredOutStream = nameof(FilteredOutStream); - protected override Task Given() => Client.SetStreamMetadataAsync("$all", StreamState.Any, + protected override Task Given() => Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs index bae7066cb..7187744b6 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs @@ -98,7 +98,7 @@ Task CheckpointReached(StreamSubscription _, Position position, CancellationToke public class Fixture : EventStoreClientFixture { public const string FilteredOutStream = nameof(FilteredOutStream); - protected override Task Given() => Client.SetStreamMetadataAsync("$all", StreamState.Any, + protected override Task Given() => Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs index 9bae7ea75..51d5305e3 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs @@ -151,7 +151,7 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, public class Fixture : EventStoreClientFixture { protected override Task Given() => - Client.SetStreamMetadataAsync("$all", StreamState.NoStream, + Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.NoStream, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask; diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs index 335f56da9..9aa3b163c 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs @@ -184,7 +184,7 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, public class Fixture : EventStoreClientFixture { protected override Task Given() => - Client.SetStreamMetadataAsync("$all", StreamState.NoStream, + Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.NoStream, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask;