Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RFC 574 #195

Merged
merged 2 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions samples/projection-management/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text.Json;
using System.Threading.Tasks;
using EventStore.Client;
using Grpc.Core;

namespace projection_management {
public static class Program {
Expand Down Expand Up @@ -100,7 +101,9 @@ private static async Task DisableNotFound(EventStoreProjectionManagementClient m
#region DisableNotFound
try {
await managementClient.DisableAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
} catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
#endregion DisableNotFound
Expand All @@ -116,7 +119,9 @@ private static async Task EnableNotFound(EventStoreProjectionManagementClient ma
#region EnableNotFound
try {
await managementClient.EnableAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
} catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
#endregion EnableNotFound
Expand All @@ -135,7 +140,9 @@ private static async Task Abort(EventStoreProjectionManagementClient managementC
var js =
"fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();";
await managementClient.CreateContinuousAsync("countEvents_Abort", js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.Aborted) {
// ignore was already created in a previous run
} catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release
// ignore was already created in a previous run
}

Expand All @@ -149,7 +156,9 @@ private static async Task Abort_NotFound(EventStoreProjectionManagementClient ma
#region Abort_NotFound
try {
await managementClient.AbortAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
} catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
#endregion Abort_NotFound
Expand All @@ -160,7 +169,9 @@ private static async Task Reset(EventStoreProjectionManagementClient managementC
var js =
"fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();";
await managementClient.CreateContinuousAsync("countEvents_Reset", js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.Internal) {
// ignore was already created in a previous run
} catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release
// ignore was already created in a previous run
}

Expand All @@ -175,7 +186,9 @@ private static async Task Reset_NotFound(EventStoreProjectionManagementClient ma
#region Reset_NotFound
try {
await managementClient.ResetAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
} catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine(e.Message);
}
#endregion Reset_NotFound
Expand Down Expand Up @@ -227,7 +240,9 @@ private static async Task CreateContinuous_Conflict(EventStoreProjectionManageme
try {

await managementClient.CreateContinuousAsync(name, js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.AlreadyExists) {
Console.WriteLine(e.Message);
} catch (RpcException e) when (e.Message.Contains("Conflict")) { // will be removed in a future release
var format = $"{name} already exists";
Console.WriteLine(format);
}
Expand Down Expand Up @@ -259,7 +274,9 @@ private static async Task Update_NotFound(EventStoreProjectionManagementClient m
#region Update_NotFound
try {
await managementClient.UpdateAsync("Update Not existing projection", "fromAll().when()");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (RpcException e) when (e.StatusCode is StatusCode.NotFound) {
Console.WriteLine(e.Message);
} catch (RpcException e) when (e.Message.Contains("NotFound")) { // will be removed in a future release
Console.WriteLine("'Update Not existing projection' does not exists and can not be updated");
}
#endregion Update_NotFound
Expand Down
7 changes: 6 additions & 1 deletion src/EventStore.Client/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#nullable enable
namespace EventStore.Client {
internal static class ChannelFactory {
private const int MaxReceiveMessageLength = 17 * 1024 * 1024;

public static TChannel CreateChannel(EventStoreClientSettings settings, EndPoint endPoint, bool https) =>
CreateChannel(settings, endPoint.ToUri(https));

Expand All @@ -32,7 +34,8 @@ public static TChannel CreateChannel(EventStoreClientSettings settings, Uri? add
},
LoggerFactory = settings.LoggerFactory,
Credentials = settings.ChannelCredentials,
DisposeHttpClient = true
DisposeHttpClient = true,
MaxReceiveMessageSize = MaxReceiveMessageLength
});

HttpMessageHandler CreateHandler() {
Expand All @@ -56,6 +59,8 @@ IEnumerable<ChannelOption> GetChannelOptions() {

yield return new ChannelOption("grpc.keepalive_timeout_ms",
GetValue((int)settings.ConnectivitySettings.KeepAliveTimeout.TotalMilliseconds));

yield return new ChannelOption("grpc.max_receive_message_length", MaxReceiveMessageLength);
}

static int GetValue(int value) => value switch {
Expand Down
25 changes: 15 additions & 10 deletions src/EventStore.Client/EventStoreClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public abstract class EventStoreClientBase :
private readonly IDictionary<string, Func<RpcException, Exception>> _exceptionMap;
private readonly CancellationTokenSource _cts;
private readonly ChannelCache _channelCache;
private readonly SharingProvider<DnsEndPoint?, ChannelInfo> _channelInfoProvider;
private readonly SharingProvider<ReconnectionRequired, ChannelInfo> _channelInfoProvider;

/// <summary>
/// The name of the connection.
Expand All @@ -48,27 +48,31 @@ protected EventStoreClientBase(EventStoreClientSettings? settings,
ConnectionName = Settings.ConnectionName ?? $"ES-{Guid.NewGuid()}";

var channelSelector = new ChannelSelector(Settings, _channelCache);
_channelInfoProvider = new SharingProvider<DnsEndPoint?, ChannelInfo>(
factory: (endPoint, onBroken) => GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token),
initialInput: null);
_channelInfoProvider = new SharingProvider<ReconnectionRequired, ChannelInfo>(
factory: (endPoint, onBroken) =>
GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token),
initialInput: ReconnectionRequired.Rediscover.Instance);
}

// Select a channel and query its capabilities. This is an expensive call that
// we don't want to do often.
private async Task<ChannelInfo> GetChannelInfoExpensive(
DnsEndPoint? endPoint,
Action<DnsEndPoint?> onBroken,
ReconnectionRequired reconnectionRequired,
Action<ReconnectionRequired> onReconnectionRequired,
IChannelSelector channelSelector,
CancellationToken cancellationToken) {

var channel = endPoint is null
? await channelSelector.SelectChannelAsync(cancellationToken).ConfigureAwait(false)
: channelSelector.SelectChannel(endPoint);
var channel = reconnectionRequired switch {
ReconnectionRequired.Rediscover => await channelSelector.SelectChannelAsync(cancellationToken)
.ConfigureAwait(false),
ReconnectionRequired.NewLeader (var endPoint) => channelSelector.SelectChannel(endPoint),
_ => throw new ArgumentException(null, nameof(reconnectionRequired))
};

var invoker = channel.CreateCallInvoker()
.Intercept(new TypedExceptionInterceptor(_exceptionMap))
.Intercept(new ConnectionNameInterceptor(ConnectionName))
.Intercept(new ReportLeaderInterceptor(onBroken));
.Intercept(new ReportLeaderInterceptor(onReconnectionRequired));

if (Settings.Interceptors is not null) {
foreach (var interceptor in Settings.Interceptors) {
Expand All @@ -92,6 +96,7 @@ protected async ValueTask<ChannelInfo> GetChannelInfo(CancellationToken cancella
// in cases where the server doesn't yet let the client know that it needs to.
// see EventStoreClientExtensions.WarmUpWith.
// note if rediscovery is already in progress it will continue, not restart.
// ReSharper disable once UnusedMember.Local
private void Rediscover() {
_channelInfoProvider.Reset();
}
Expand Down
36 changes: 21 additions & 15 deletions src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
Expand All @@ -10,21 +9,21 @@ namespace EventStore.Client.Interceptors {
// this has become more general than just detecting leader changes.
// triggers the action on any rpc exception with StatusCode.Unavailable
internal class ReportLeaderInterceptor : Interceptor {
private readonly Action<DnsEndPoint?> _onError;
private readonly Action<ReconnectionRequired> _onReconnectionRequired;

private const TaskContinuationOptions ContinuationOptions =
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted;

internal ReportLeaderInterceptor(Action<DnsEndPoint?> onError) {
_onError = onError;
internal ReportLeaderInterceptor(Action<ReconnectionRequired> onReconnectionRequired) {
_onReconnectionRequired = onReconnectionRequired;
}

public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation) {
var response = continuation(request, context);

response.ResponseAsync.ContinueWith(ReportNewLeader, ContinuationOptions);
response.ResponseAsync.ContinueWith(OnReconnectionRequired, ContinuationOptions);

return new AsyncUnaryCall<TResponse>(response.ResponseAsync, response.ResponseHeadersAsync,
response.GetStatus, response.GetTrailers, response.Dispose);
Expand All @@ -35,7 +34,7 @@ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreami
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) {
var response = continuation(context);

response.ResponseAsync.ContinueWith(ReportNewLeader, ContinuationOptions);
response.ResponseAsync.ContinueWith(OnReconnectionRequired, ContinuationOptions);

return new AsyncClientStreamingCall<TRequest, TResponse>(response.RequestStream, response.ResponseAsync,
response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose);
Expand All @@ -47,7 +46,8 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami
var response = continuation(context);

return new AsyncDuplexStreamingCall<TRequest, TResponse>(response.RequestStream,
new StreamReader<TResponse>(response.ResponseStream, ReportNewLeader), response.ResponseHeadersAsync,
new StreamReader<TResponse>(response.ResponseStream, OnReconnectionRequired),
response.ResponseHeadersAsync,
response.GetStatus, response.GetTrailers, response.Dispose);
}

Expand All @@ -57,17 +57,23 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
var response = continuation(request, context);

return new AsyncServerStreamingCall<TResponse>(
new StreamReader<TResponse>(response.ResponseStream, ReportNewLeader), response.ResponseHeadersAsync,
new StreamReader<TResponse>(response.ResponseStream, OnReconnectionRequired),
response.ResponseHeadersAsync,
response.GetStatus, response.GetTrailers, response.Dispose);
}

private void ReportNewLeader<TResponse>(Task<TResponse> task) {
if (task.Exception?.InnerException is NotLeaderException ex) {
_onError(ex.LeaderEndpoint);
} else if (task.Exception?.InnerException?.InnerException is RpcException rpcException &&
rpcException.StatusCode == StatusCode.Unavailable) {
_onError(null);
}
private void OnReconnectionRequired<TResponse>(Task<TResponse> task) {
ReconnectionRequired reconnectionRequired = task.Exception?.InnerException switch {
NotLeaderException ex => new ReconnectionRequired.NewLeader(ex.LeaderEndpoint),
RpcException {
StatusCode: StatusCode.Unavailable
// or StatusCode.Unknown or TODO: use RPC exceptions on server
} => ReconnectionRequired.Rediscover.Instance,
_ => ReconnectionRequired.None.Instance
};

if (reconnectionRequired is not ReconnectionRequired.None)
_onReconnectionRequired(reconnectionRequired);
}

private class StreamReader<T> : IAsyncStreamReader<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private static Exception ConvertRpcException(RpcException ex,
StatusCode.DeadlineExceeded, ex.Status.Detail, ex.Status.DebugException)),
(StatusCode.DeadlineExceeded, _) => ex,
(StatusCode.Unauthenticated, _) => new NotAuthenticatedException(ex.Message, ex),
_ => new InvalidOperationException(ex.Message, ex)
_ => ex
}
};
}
Expand Down
15 changes: 15 additions & 0 deletions src/EventStore.Client/ReconnectionRequired.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Net;

namespace EventStore.Client {
internal abstract record ReconnectionRequired {
public record None : ReconnectionRequired {
public static None Instance = new();
}

public record Rediscover : ReconnectionRequired {
public static Rediscover Instance = new();
}

public record NewLeader(DnsEndPoint EndPoint) : ReconnectionRequired;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToAll {
Expand All @@ -21,10 +22,12 @@ protected override Task When() =>
}

[Fact]
public Task the_completion_fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(
public async Task the_completion_fails() {
var ex = await Assert.ThrowsAsync<RpcException>(
() => _fixture.Client.CreateToAllAsync("group32",
new PersistentSubscriptionSettings(),
TestCredentials.Root));
Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToAll {
Expand All @@ -24,11 +26,13 @@ protected override async Task Given() {
}

[Fact]
public Task fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(() =>
public async Task fails() {
var ex = await Assert.ThrowsAsync<RpcException>(() =>
_fixture.Client.CreateToAllAsync("group57",
new PersistentSubscriptionSettings(
startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)),
TestCredentials.Root));
startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)),
TestCredentials.Root));
Assert.Equal(StatusCode.Internal, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToAll {
Expand Down Expand Up @@ -28,13 +29,15 @@ await Client.CreateToAllAsync(Group,
}
protected override Task Given() => Task.CompletedTask;
}

[Fact]
public Task fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(() =>
public async Task fails() {
var ex = await Assert.ThrowsAsync<RpcException>(() =>
_fixture.Client.UpdateToAllAsync(Group,
new PersistentSubscriptionSettings(
startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)),
TestCredentials.Root));
startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)),
TestCredentials.Root));
Assert.Equal(StatusCode.Internal, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToStream {
Expand All @@ -21,10 +22,12 @@ protected override Task When() =>
}

[Fact]
public Task the_completion_fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(
public async Task the_completion_fails() {
var ex = await Assert.ThrowsAsync<RpcException>(
() => _fixture.Client.CreateAsync(Stream, "group32",
new PersistentSubscriptionSettings(),
TestCredentials.Root));
Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace EventStore.Client {
public abstract class EventStoreClientFixture : EventStoreClientFixtureBase {
public EventStoreClient Client { get; }
protected EventStoreClientFixture(EventStoreClientSettings? settings = null,
IDictionary<string, string>? env = null) : base(settings, env) {
Dictionary<string, string>? env = null) : base(settings, env) {
Client = new EventStoreClient(Settings);
}

Expand Down
Loading