diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs index 25635f26a..8bc94aedf 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs @@ -1,10 +1,5 @@ -using System; -using System.Collections.Generic; -using System.Linq; using System.Runtime.CompilerServices; -using System.Threading; using System.Threading.Channels; -using System.Threading.Tasks; using EventStore.Client.Streams; using Grpc.Core; using static EventStore.Client.Streams.ReadResp; @@ -42,23 +37,76 @@ public ReadAllStreamResult ReadAllAsync( Options = new() { ReadDirection = direction switch { Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards, - Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards, - _ => throw InvalidOption(direction) + Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards, + _ => throw InvalidOption(direction) }, ResolveLinks = resolveLinkTos, All = new() { Position = new() { - CommitPosition = position.CommitPosition, + CommitPosition = position.CommitPosition, PreparePosition = position.PreparePosition } }, - Count = (ulong)maxCount, - UuidOption = new() {Structured = new()}, - NoFilter = new(), + Count = (ulong)maxCount, + UuidOption = new() {Structured = new()}, + NoFilter = new(), ControlOption = new() {Compatibility = 1} } }, Settings, deadline, userCredentials, cancellationToken); } + + /// + /// Asynchronously reads all events with filtering. + /// + /// The in which to read. + /// The to start reading from. + /// The to apply. + /// The maximum count to read. + /// Whether to resolve LinkTo events automatically. + /// + /// The optional to perform operation with. + /// The optional . + /// + public ReadAllStreamResult ReadAllAsync( + Direction direction, + Position position, + IEventFilter eventFilter, + long maxCount = long.MaxValue, + bool resolveLinkTos = false, + TimeSpan? deadline = null, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default + ) { + if (maxCount <= 0) { + throw new ArgumentOutOfRangeException(nameof(maxCount)); + } + + var readReq = new ReadReq { + Options = new() { + ReadDirection = direction switch { + Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards, + Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards, + _ => throw InvalidOption(direction) + }, + ResolveLinks = resolveLinkTos, + All = new() { + Position = new() { + CommitPosition = position.CommitPosition, + PreparePosition = position.PreparePosition + } + }, + Count = (ulong)maxCount, + UuidOption = new() { Structured = new() }, + ControlOption = new() { Compatibility = 1 }, + Filter = GetFilterOptions(eventFilter) + } + }; + + return new ReadAllStreamResult(async _ => { + var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); + return channelInfo.CallInvoker; + }, readReq, Settings, deadline, userCredentials, cancellationToken); + } /// /// A class that represents the result of a read operation on the $all stream. You may either enumerate this instance directly or . Do not enumerate more than once. diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs index 33041d303..e9d7fd85c 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.cs @@ -73,14 +73,11 @@ private StreamAppender CreateStreamAppender() { } } - private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions( - SubscriptionFilterOptions? filterOptions) { - if (filterOptions == null) { + private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) { + if (filter == null) { return null; } - var filter = filterOptions.Filter; - var options = filter switch { StreamFilter => new ReadReq.Types.Options.Types.FilterOptions { StreamIdentifier = (filter.Prefixes, filter.Regex) switch { @@ -127,11 +124,14 @@ private StreamAppender CreateStreamAppender() { options.Count = new Empty(); } - options.CheckpointIntervalMultiplier = filterOptions.CheckpointInterval; + options.CheckpointIntervalMultiplier = checkpointInterval; return options; } + private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions) + => filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval); + /// public override void Dispose() { if (_streamAppenderLazy.IsValueCreated) diff --git a/test/EventStore.Client.Streams.Tests/Read/ReadAllEventsFixture.cs b/test/EventStore.Client.Streams.Tests/Read/ReadAllEventsFixture.cs index 297a79b70..521f486ab 100644 --- a/test/EventStore.Client.Streams.Tests/Read/ReadAllEventsFixture.cs +++ b/test/EventStore.Client.Streams.Tests/Read/ReadAllEventsFixture.cs @@ -10,11 +10,9 @@ public ReadAllEventsFixture() { userCredentials: TestCredentials.Root ); - Events = Enumerable - .Concat( - CreateTestEvents(20), - CreateTestEvents(2, metadataSize: 1_000_000) - ) + Events = CreateTestEvents(20) + .Concat(CreateTestEvents(2, metadataSize: 1_000_000)) + .Concat(CreateTestEvents(2, AnotherTestEventType)) .ToArray(); ExpectedStreamName = GetStreamName(); @@ -38,4 +36,4 @@ public ReadAllEventsFixture() { public EventBinaryData ExpectedFirstEvent { get; private set; } public EventBinaryData ExpectedLastEvent { get; private set; } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Streams.Tests/Read/read_all_events_backward.cs b/test/EventStore.Client.Streams.Tests/Read/read_all_events_backward.cs index 63135ec82..b07f199ef 100644 --- a/test/EventStore.Client.Streams.Tests/Read/read_all_events_backward.cs +++ b/test/EventStore.Client.Streams.Tests/Read/read_all_events_backward.cs @@ -72,6 +72,15 @@ public async Task with_timeout_fails_when_operation_expired() { ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded); } + [Fact] + public async Task filter_events_by_type() { + var result = await Fixture.Streams + .ReadAllAsync(Direction.Backwards, Position.End, EventTypeFilter.Prefix(EventStoreFixture.AnotherTestEventTypePrefix)) + .ToListAsync(); + + result.ForEach(x => x.Event.EventType.ShouldStartWith(EventStoreFixture.AnotherTestEventTypePrefix)); + } + [Fact(Skip = "Not Implemented")] public Task be_able_to_read_all_one_by_one_until_end_of_stream() => throw new NotImplementedException(); @@ -80,4 +89,4 @@ public async Task with_timeout_fails_when_operation_expired() { [Fact(Skip = "Not Implemented")] public Task when_got_int_max_value_as_maxcount_should_throw() => throw new NotImplementedException(); -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Streams.Tests/Read/read_all_events_forward.cs b/test/EventStore.Client.Streams.Tests/Read/read_all_events_forward.cs index ab93da29a..fed9629ad 100644 --- a/test/EventStore.Client.Streams.Tests/Read/read_all_events_forward.cs +++ b/test/EventStore.Client.Streams.Tests/Read/read_all_events_forward.cs @@ -139,6 +139,15 @@ await result.Messages.ToArrayAsync() ); } + [Fact] + public async Task filter_events_by_type() { + var result = await Fixture.Streams + .ReadAllAsync(Direction.Forwards, Position.Start, EventTypeFilter.Prefix(EventStoreFixture.AnotherTestEventTypePrefix)) + .ToListAsync(); + + result.ForEach(x => x.Event.EventType.ShouldStartWith(EventStoreFixture.AnotherTestEventTypePrefix)); + } + [Fact(Skip = "Not Implemented")] public Task be_able_to_read_all_one_by_one_until_end_of_stream() => throw new NotImplementedException(); @@ -147,4 +156,4 @@ await result.Messages.ToArrayAsync() [Fact(Skip = "Not Implemented")] public Task when_got_int_max_value_as_maxcount_should_throw() => throw new NotImplementedException(); -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs index e9f06b512..d1b6740d2 100644 --- a/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs +++ b/test/EventStore.Client.Tests.Common/Fixtures/EventStoreFixture.Helpers.cs @@ -4,7 +4,9 @@ namespace EventStore.Client.Tests; public partial class EventStoreFixture { - public const string TestEventType = "tst"; + public const string TestEventType = "test-event-type"; + public const string AnotherTestEventTypePrefix = "another"; + public const string AnotherTestEventType = $"{AnotherTestEventTypePrefix}-test-event-type"; public T NewClient(Action configure) where T : EventStoreClientBase, new() => (T)Activator.CreateInstance(typeof(T), new object?[] { ClientSettings.With(configure) })!; @@ -50,4 +52,4 @@ public async Task RestartService(TimeSpan delay) { await Streams.WarmUp(); Log.Information("Service restarted."); } -} \ No newline at end of file +}