diff --git a/samples/projection-management/Program.cs b/samples/projection-management/Program.cs index a8db65bbc..04f142253 100644 --- a/samples/projection-management/Program.cs +++ b/samples/projection-management/Program.cs @@ -5,6 +5,7 @@ using System.Text.Json; using System.Threading.Tasks; using EventStore.Client; +using Grpc.Core; namespace projection_management { public static class Program { @@ -100,7 +101,9 @@ private static async Task DisableNotFound(EventStoreProjectionManagementClient m #region DisableNotFound try { await managementClient.DisableAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) { + Console.WriteLine(e.Message); + } catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release Console.WriteLine(e.Message); } #endregion DisableNotFound @@ -116,7 +119,9 @@ private static async Task EnableNotFound(EventStoreProjectionManagementClient ma #region EnableNotFound try { await managementClient.EnableAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) { + Console.WriteLine(e.Message); + } catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release Console.WriteLine(e.Message); } #endregion EnableNotFound @@ -135,7 +140,9 @@ private static async Task Abort(EventStoreProjectionManagementClient managementC var js = "fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();"; await managementClient.CreateContinuousAsync("countEvents_Abort", js); - } catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.Aborted) { + // ignore was already created in a previous run + } catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release // ignore was already created in a previous run } @@ -149,7 +156,9 @@ private static async Task Abort_NotFound(EventStoreProjectionManagementClient ma #region Abort_NotFound try { await managementClient.AbortAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) { + Console.WriteLine(e.Message); + } catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release Console.WriteLine(e.Message); } #endregion Abort_NotFound @@ -160,7 +169,9 @@ private static async Task Reset(EventStoreProjectionManagementClient managementC var js = "fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();"; await managementClient.CreateContinuousAsync("countEvents_Reset", js); - } catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.Internal) { + // ignore was already created in a previous run + } catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release // ignore was already created in a previous run } @@ -175,7 +186,9 @@ private static async Task Reset_NotFound(EventStoreProjectionManagementClient ma #region Reset_NotFound try { await managementClient.ResetAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) { + Console.WriteLine(e.Message); + } catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release Console.WriteLine(e.Message); } #endregion Reset_NotFound @@ -227,7 +240,9 @@ private static async Task CreateContinuous_Conflict(EventStoreProjectionManageme try { await managementClient.CreateContinuousAsync(name, js); - } catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.AlreadyExists) { + Console.WriteLine(e.Message); + } catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release var format = $"{name} already exists"; Console.WriteLine(format); } @@ -259,7 +274,9 @@ private static async Task Update_NotFound(EventStoreProjectionManagementClient m #region Update_NotFound try { await managementClient.UpdateAsync("Update Not existing projection", "fromAll().when()"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) { + Console.WriteLine(e.Message); + } catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release Console.WriteLine("'Update Not existing projection' does not exists and can not be updated"); } #endregion Update_NotFound diff --git a/src/EventStore.Client/ChannelFactory.cs b/src/EventStore.Client/ChannelFactory.cs index 6030f0d51..10b9cb36c 100644 --- a/src/EventStore.Client/ChannelFactory.cs +++ b/src/EventStore.Client/ChannelFactory.cs @@ -13,6 +13,8 @@ #nullable enable namespace EventStore.Client { internal static class ChannelFactory { + private const int MaxReceiveMessageLength = 17 * 1024 * 1024; + public static TChannel CreateChannel(EventStoreClientSettings settings, EndPoint endPoint, bool https) => CreateChannel(settings, endPoint.ToUri(https)); @@ -32,7 +34,8 @@ public static TChannel CreateChannel(EventStoreClientSettings settings, Uri? add }, LoggerFactory = settings.LoggerFactory, Credentials = settings.ChannelCredentials, - DisposeHttpClient = true + DisposeHttpClient = true, + MaxReceiveMessageSize = MaxReceiveMessageLength }); HttpMessageHandler CreateHandler() { @@ -56,6 +59,8 @@ IEnumerable GetChannelOptions() { yield return new ChannelOption("grpc.keepalive_timeout_ms", GetValue((int)settings.ConnectivitySettings.KeepAliveTimeout.TotalMilliseconds)); + + yield return new ChannelOption("grpc.max_receive_message_length", MaxReceiveMessageLength); } static int GetValue(int value) => value switch { diff --git a/src/EventStore.Client/EventStoreClientBase.cs b/src/EventStore.Client/EventStoreClientBase.cs index a091d826d..ba01d112a 100644 --- a/src/EventStore.Client/EventStoreClientBase.cs +++ b/src/EventStore.Client/EventStoreClientBase.cs @@ -21,7 +21,7 @@ public abstract class EventStoreClientBase : private readonly IDictionary> _exceptionMap; private readonly CancellationTokenSource _cts; private readonly ChannelCache _channelCache; - private readonly SharingProvider _channelInfoProvider; + private readonly SharingProvider _channelInfoProvider; /// /// The name of the connection. @@ -48,27 +48,31 @@ protected EventStoreClientBase(EventStoreClientSettings? settings, ConnectionName = Settings.ConnectionName ?? $"ES-{Guid.NewGuid()}"; var channelSelector = new ChannelSelector(Settings, _channelCache); - _channelInfoProvider = new SharingProvider( - factory: (endPoint, onBroken) => GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token), - initialInput: null); + _channelInfoProvider = new SharingProvider( + factory: (endPoint, onBroken) => + GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token), + initialInput: ReconnectionRequired.Rediscover.Instance); } // Select a channel and query its capabilities. This is an expensive call that // we don't want to do often. private async Task GetChannelInfoExpensive( - DnsEndPoint? endPoint, - Action onBroken, + ReconnectionRequired reconnectionRequired, + Action onReconnectionRequired, IChannelSelector channelSelector, CancellationToken cancellationToken) { - var channel = endPoint is null - ? await channelSelector.SelectChannelAsync(cancellationToken).ConfigureAwait(false) - : channelSelector.SelectChannel(endPoint); + var channel = reconnectionRequired switch { + ReconnectionRequired.Rediscover => await channelSelector.SelectChannelAsync(cancellationToken) + .ConfigureAwait(false), + ReconnectionRequired.NewLeader (var endPoint) => channelSelector.SelectChannel(endPoint), + _ => throw new ArgumentException(null, nameof(reconnectionRequired)) + }; var invoker = channel.CreateCallInvoker() .Intercept(new TypedExceptionInterceptor(_exceptionMap)) .Intercept(new ConnectionNameInterceptor(ConnectionName)) - .Intercept(new ReportLeaderInterceptor(onBroken)); + .Intercept(new ReportLeaderInterceptor(onReconnectionRequired)); if (Settings.Interceptors is not null) { foreach (var interceptor in Settings.Interceptors) { @@ -92,6 +96,7 @@ protected async ValueTask GetChannelInfo(CancellationToken cancella // in cases where the server doesn't yet let the client know that it needs to. // see EventStoreClientExtensions.WarmUpWith. // note if rediscovery is already in progress it will continue, not restart. + // ReSharper disable once UnusedMember.Local private void Rediscover() { _channelInfoProvider.Reset(); } diff --git a/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs b/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs index e30c697f1..45841c61e 100644 --- a/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs +++ b/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs @@ -1,5 +1,4 @@ using System; -using System.Net; using System.Threading; using System.Threading.Tasks; using Grpc.Core; @@ -10,13 +9,13 @@ namespace EventStore.Client.Interceptors { // this has become more general than just detecting leader changes. // triggers the action on any rpc exception with StatusCode.Unavailable internal class ReportLeaderInterceptor : Interceptor { - private readonly Action _onError; + private readonly Action _onReconnectionRequired; private const TaskContinuationOptions ContinuationOptions = TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted; - internal ReportLeaderInterceptor(Action onError) { - _onError = onError; + internal ReportLeaderInterceptor(Action onReconnectionRequired) { + _onReconnectionRequired = onReconnectionRequired; } public override AsyncUnaryCall AsyncUnaryCall(TRequest request, @@ -24,7 +23,7 @@ public override AsyncUnaryCall AsyncUnaryCall(TR AsyncUnaryCallContinuation continuation) { var response = continuation(request, context); - response.ResponseAsync.ContinueWith(ReportNewLeader, ContinuationOptions); + response.ResponseAsync.ContinueWith(OnReconnectionRequired, ContinuationOptions); return new AsyncUnaryCall(response.ResponseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); @@ -35,7 +34,7 @@ public override AsyncClientStreamingCall AsyncClientStreami AsyncClientStreamingCallContinuation continuation) { var response = continuation(context); - response.ResponseAsync.ContinueWith(ReportNewLeader, ContinuationOptions); + response.ResponseAsync.ContinueWith(OnReconnectionRequired, ContinuationOptions); return new AsyncClientStreamingCall(response.RequestStream, response.ResponseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); @@ -47,7 +46,8 @@ public override AsyncDuplexStreamingCall AsyncDuplexStreami var response = continuation(context); return new AsyncDuplexStreamingCall(response.RequestStream, - new StreamReader(response.ResponseStream, ReportNewLeader), response.ResponseHeadersAsync, + new StreamReader(response.ResponseStream, OnReconnectionRequired), + response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } @@ -57,17 +57,23 @@ public override AsyncServerStreamingCall AsyncServerStreamingCall( - new StreamReader(response.ResponseStream, ReportNewLeader), response.ResponseHeadersAsync, + new StreamReader(response.ResponseStream, OnReconnectionRequired), + response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } - private void ReportNewLeader(Task task) { - if (task.Exception?.InnerException is NotLeaderException ex) { - _onError(ex.LeaderEndpoint); - } else if (task.Exception?.InnerException?.InnerException is RpcException rpcException && - rpcException.StatusCode == StatusCode.Unavailable) { - _onError(null); - } + private void OnReconnectionRequired(Task task) { + ReconnectionRequired reconnectionRequired = task.Exception?.InnerException switch { + NotLeaderException ex => new ReconnectionRequired.NewLeader(ex.LeaderEndpoint), + RpcException { + StatusCode: StatusCode.Unavailable + // or StatusCode.Unknown or TODO: use RPC exceptions on server + } => ReconnectionRequired.Rediscover.Instance, + _ => ReconnectionRequired.None.Instance + }; + + if (reconnectionRequired is not ReconnectionRequired.None) + _onReconnectionRequired(reconnectionRequired); } private class StreamReader : IAsyncStreamReader { diff --git a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs index e9327813a..ece3eaf8b 100644 --- a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs +++ b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs @@ -91,7 +91,7 @@ private static Exception ConvertRpcException(RpcException ex, StatusCode.DeadlineExceeded, ex.Status.Detail, ex.Status.DebugException)), (StatusCode.DeadlineExceeded, _) => ex, (StatusCode.Unauthenticated, _) => new NotAuthenticatedException(ex.Message, ex), - _ => new InvalidOperationException(ex.Message, ex) + _ => ex } }; } diff --git a/src/EventStore.Client/ReconnectionRequired.cs b/src/EventStore.Client/ReconnectionRequired.cs new file mode 100644 index 000000000..bf448971d --- /dev/null +++ b/src/EventStore.Client/ReconnectionRequired.cs @@ -0,0 +1,15 @@ +using System.Net; + +namespace EventStore.Client { + internal abstract record ReconnectionRequired { + public record None : ReconnectionRequired { + public static None Instance = new(); + } + + public record Rediscover : ReconnectionRequired { + public static Rediscover Instance = new(); + } + + public record NewLeader(DnsEndPoint EndPoint) : ReconnectionRequired; + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs index 573188416..bde91ae87 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToAll { @@ -21,10 +22,12 @@ protected override Task When() => } [Fact] - public Task the_completion_fails_with_invalid_operation_exception() => - Assert.ThrowsAsync( + public async Task the_completion_fails() { + var ex = await Assert.ThrowsAsync( () => _fixture.Client.CreateToAllAsync("group32", new PersistentSubscriptionSettings(), TestCredentials.Root)); + Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs index 95f5012cc..4e4558878 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs @@ -1,6 +1,8 @@ using System; +using System.IO; using System.Linq; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToAll { @@ -24,11 +26,13 @@ protected override async Task Given() { } [Fact] - public Task fails_with_invalid_operation_exception() => - Assert.ThrowsAsync(() => + public async Task fails() { + var ex = await Assert.ThrowsAsync(() => _fixture.Client.CreateToAllAsync("group57", new PersistentSubscriptionSettings( - startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)), - TestCredentials.Root)); + startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)), + TestCredentials.Root)); + Assert.Equal(StatusCode.Internal, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs index e17f95498..c3a395a78 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToAll { @@ -28,13 +29,15 @@ await Client.CreateToAllAsync(Group, } protected override Task Given() => Task.CompletedTask; } - + [Fact] - public Task fails_with_invalid_operation_exception() => - Assert.ThrowsAsync(() => + public async Task fails() { + var ex = await Assert.ThrowsAsync(() => _fixture.Client.UpdateToAllAsync(Group, new PersistentSubscriptionSettings( - startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)), - TestCredentials.Root)); + startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)), + TestCredentials.Root)); + Assert.Equal(StatusCode.Internal, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs index 5367f6762..00ad95d1b 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToStream { @@ -21,10 +22,12 @@ protected override Task When() => } [Fact] - public Task the_completion_fails_with_invalid_operation_exception() => - Assert.ThrowsAsync( + public async Task the_completion_fails() { + var ex = await Assert.ThrowsAsync( () => _fixture.Client.CreateAsync(Stream, "group32", new PersistentSubscriptionSettings(), TestCredentials.Root)); + Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs b/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs index 14209bf70..04b11f4d3 100644 --- a/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs +++ b/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs @@ -6,7 +6,7 @@ namespace EventStore.Client { public abstract class EventStoreClientFixture : EventStoreClientFixtureBase { public EventStoreClient Client { get; } protected EventStoreClientFixture(EventStoreClientSettings? settings = null, - IDictionary? env = null) : base(settings, env) { + Dictionary? env = null) : base(settings, env) { Client = new EventStoreClient(Settings); } diff --git a/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs b/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs index f0e3d8011..3273dde58 100644 --- a/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs +++ b/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Grpc.Core; using Polly; using Xunit; @@ -25,7 +26,11 @@ public async Task can_retry() { // writeTask cannot complete because ES is stopped var ex = await Assert.ThrowsAnyAsync(() => WriteAnEventAsync(new StreamRevision(0))); - Assert.True(ex is InvalidOperationException or DiscoveryException); + Assert.True(ex is RpcException { + Status: { + StatusCode: StatusCode.Unavailable + } + } or DiscoveryException); await _fixture.TestServer.StartAsync().WithTimeout(); diff --git a/test/EventStore.Client.Streams.Tests/reconnection.cs b/test/EventStore.Client.Streams.Tests/reconnection.cs new file mode 100644 index 000000000..1d4b6a2bb --- /dev/null +++ b/test/EventStore.Client.Streams.Tests/reconnection.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Xunit; + +namespace EventStore.Client { + public class reconnection : IClassFixture { + private readonly Fixture _fixture; + + public reconnection(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task when_the_connection_is_lost() { + var streamName = _fixture.GetStreamName(); + var eventCount = 512; + var tcs = new TaskCompletionSource(); + var signal = new TaskCompletionSource(); + var events = new List(); + var resubscribe = new TaskCompletionSource(); + + using var _ = await _fixture.Client.SubscribeToStreamAsync(streamName, FromStream.Start, + EventAppeared, subscriptionDropped: SubscriptionDropped) + .WithTimeout(); + + await _fixture.Client + .AppendToStreamAsync(streamName, StreamState.NoStream, _fixture.CreateTestEvents(eventCount)) + .WithTimeout(); // ensure we get backpressure + + _fixture.TestServer.Stop(); + await Task.Delay(TimeSpan.FromSeconds(2)); + + await _fixture.TestServer.StartAsync().WithTimeout(); + signal.SetResult(null); + + await resubscribe.Task.WithTimeout(TimeSpan.FromSeconds(10)); + + await tcs.Task.WithTimeout(TimeSpan.FromSeconds(10)); + + async Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { + await signal.Task; + events.Add(e); + if (events.Count == eventCount) { + tcs.TrySetResult(null); + } + } + + void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception ex) { + if (reason == SubscriptionDroppedReason.Disposed) { + return; + } + + if (ex is not RpcException { + Status: { + StatusCode: StatusCode.Unavailable + } + }) { + tcs.TrySetException(ex); + } else { + var task = _fixture.Client.SubscribeToStreamAsync(streamName, + FromStream.After(events[^1].OriginalEventNumber), + EventAppeared, subscriptionDropped: SubscriptionDropped); + task.ContinueWith(_ => resubscribe.SetResult(_.Result), TaskContinuationOptions.NotOnFaulted); + task.ContinueWith(_ => resubscribe.SetException(_.Exception!.GetBaseException()), + TaskContinuationOptions.OnlyOnFaulted); + } + } + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + + protected override Task When() => Task.CompletedTask; + + public Fixture() : base(env: new() { + ["EVENTSTORE_MEM_DB"] = "false" + }) { + Settings.ConnectivitySettings.DiscoveryInterval = TimeSpan.FromMilliseconds(100); + Settings.ConnectivitySettings.GossipTimeout = TimeSpan.FromMilliseconds(100); + } + } + } +} diff --git a/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs b/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs index 8aa8b2d8a..49b7e6961 100644 --- a/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs +++ b/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs @@ -18,11 +18,10 @@ public sending_and_receiving_large_messages(Fixture fixture, ITestOutputHelper o [Fact] public async Task over_the_hard_limit() { var streamName = _fixture.GetStreamName(); - var ex = await Assert.ThrowsAsync(() => _fixture.Client.AppendToStreamAsync( + var ex = await Assert.ThrowsAsync(() => _fixture.Client.AppendToStreamAsync( streamName, StreamState.NoStream, _fixture.LargeEvent)); - var rpcEx = Assert.IsType(ex.InnerException); - Assert.Equal(StatusCode.ResourceExhausted, rpcEx.StatusCode); + Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode); } public class Fixture : EventStoreClientFixture { diff --git a/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs b/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs index ab27174ae..c69237a3d 100644 --- a/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs +++ b/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs @@ -10,11 +10,16 @@ namespace EventStore.Client.Interceptors { public class ReportLeaderInterceptorTests { - private static readonly Marshaller _marshaller = - new Marshaller(_ => Array.Empty(), _ => new object()); - public delegate Task GrpcCall(Interceptor interceptor, Task response = null); + private static readonly Marshaller _marshaller = new(_ => Array.Empty(), _ => new object()); + + private static readonly StatusCode[] ForcesRediscoveryStatusCodes = { + //StatusCode.Unknown, TODO: use RPC exceptions on server + StatusCode.Unavailable + }; + + private static IEnumerable GrpcCalls() { yield return MakeUnaryCall; yield return MakeClientStreamingCall; @@ -22,17 +27,48 @@ private static IEnumerable GrpcCalls() { yield return MakeServerStreamingCall; } - public static IEnumerable TestCases() => GrpcCalls().Select(call => new object[] {call}); + public static IEnumerable ReportsNewLeaderCases() => GrpcCalls().Select(call => new object[] {call}); - [Theory, MemberData(nameof(TestCases))] + [Theory, MemberData(nameof(ReportsNewLeaderCases))] public async Task ReportsNewLeader(GrpcCall call) { - EndPoint actual = default; - var sut = new ReportLeaderInterceptor(ep => actual = ep); + ReconnectionRequired actual = default; + var sut = new ReportLeaderInterceptor(result => actual = result); var result = await Assert.ThrowsAsync(() => call(sut, Task.FromException(new NotLeaderException("a.host", 2112)))); - Assert.Equal(result.LeaderEndpoint, actual); + Assert.Equal(new ReconnectionRequired.NewLeader(result.LeaderEndpoint), actual); + } + + public static IEnumerable ForcesRediscoveryCases() => from call in GrpcCalls() + from statusCode in ForcesRediscoveryStatusCodes + select new object[] {call, statusCode}; + + [Theory, MemberData(nameof(ForcesRediscoveryCases))] + public async Task ForcesRediscovery(GrpcCall call, StatusCode statusCode) { + ReconnectionRequired actual = default; + var sut = new ReportLeaderInterceptor(result => actual = result); + + await Assert.ThrowsAsync(() => call(sut, + Task.FromException(new RpcException(new Status(statusCode, "oops"))))); + Assert.Equal(ReconnectionRequired.Rediscover.Instance, actual); + } + + public static IEnumerable DoesNotForceRediscoveryCases() => from call in GrpcCalls() + from statusCode in Enum.GetValues(typeof(StatusCode)) + .OfType() + .Except(ForcesRediscoveryStatusCodes) + select new object[] {call, statusCode}; + + [Theory, MemberData(nameof(DoesNotForceRediscoveryCases))] + public async Task DoesNotForceRediscovery(GrpcCall call, StatusCode statusCode) { + ReconnectionRequired actual = ReconnectionRequired.None.Instance; + var sut = new ReportLeaderInterceptor(result => actual = result); + + await Assert.ThrowsAsync(() => call(sut, + Task.FromException(new RpcException(new Status(statusCode, "oops"))))); + Assert.Equal(ReconnectionRequired.None.Instance, actual); } + private static async Task MakeUnaryCall(Interceptor interceptor, Task response = null) { using var call = interceptor.AsyncUnaryCall(new object(), @@ -73,8 +109,7 @@ private static async Task MakeDuplexStreamingCall(Interceptor interceptor, Task< private static void OnDispose() { } private static ClientInterceptorContext CreateClientInterceptorContext(MethodType methodType) => - new ClientInterceptorContext( - new Method(methodType, string.Empty, string.Empty, _marshaller, _marshaller), + new(new Method(methodType, string.Empty, string.Empty, _marshaller, _marshaller), null, new CallOptions(new Metadata())); private class TestAsyncStreamReader : IAsyncStreamReader {