Skip to content

Commit

Permalink
Merge pull request EventStore#194 from thefringeninja/rfc-032
Browse files Browse the repository at this point in the history
Implement RFC 032 gRPC Client Deadlines
  • Loading branch information
timothycoleman authored and thefringeninja committed Feb 22, 2022
2 parents 24fa50c + cf2c284 commit 79ed1f6
Show file tree
Hide file tree
Showing 167 changed files with 906 additions and 988 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

All notable changes to this project will be documented in this file.

## [Unreleased]
## [22.0.0]

### Fixed
- Get Certifications Path More Reliably [EventStore-Client-DotNet#178](https://github.com/EventStore/EventStore-Client-Dotnet/pull/178)
- Make Client More Backwards Compatibility Friendly [EventStore-Client-DotNet#125](https://github.com/EventStore/EventStore-Client-Dotnet/pull/125)
- Send correct writeCheckpoint option when disabling/aborting a projection [EventStore-Client-DotNet#116](https://github.com/EventStore/EventStore-Client-Dotnet/pull/116)
- Force Rediscovery Only when Lost Connection [EventStore-Client-DotNet#195](https://github.com/EventStore/EventStore-Client-Dotnet/pull/195)
- Align Persistent Subscription Names [EventStore-Client-DotNet#198](https://github.com/EventStore/EventStore-Client-Dotnet/pull/198)

### Removed
- Remove autoAck from Persistent Subscriptions [EventStore-Client-DotNet#175](https://github.com/EventStore/EventStore-Client-Dotnet/pull/175)

### Added
- Introduce New Types For Subscription Positions [EventStore-Client-DotNet#188](https://github.com/EventStore/EventStore-Client-Dotnet/pull/188)
- Detect Server Capabilities [EventStore-Client-DotNet#172](https://github.com/EventStore/EventStore-Client-Dotnet/pull/172)
- Implement Last/Next StreamPosition/Position [EventStore-Client-DotNet#151](https://github.com/EventStore/EventStore-Client-Dotnet/pull/151)
- Add filtered persistent subscriptions [EventStore-Client-DotNet#122](https://github.com/EventStore/EventStore-Client-Dotnet/pull/122)
Expand All @@ -22,6 +25,8 @@ All notable changes to this project will be documented in this file.
### Changed
- Adjustments to Disposal [EventStore-Client-DotNet#189](https://github.com/EventStore/EventStore-Client-Dotnet/pull/189)
- send 'requires-leader' header based on NodePreference [EventStore-Client-DotNet#131](https://github.com/EventStore/EventStore-Client-Dotnet/pull/131)
- Rename SoftDeleteAsync to DeleteAsync [EventStore-Client-DotNet#197](https://github.com/EventStore/EventStore-Client-Dotnet/pull/197)
- Standardize gRPC Client Deadlines [EventStore-Client-DotNet#194](https://github.com/EventStore/EventStore-Client-Dotnet/pull/194)

## [21.2.0] - 2021-02-22

Expand Down
4 changes: 2 additions & 2 deletions samples/appending-events/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ private static async Task AppendWithConcurrencyCheck(EventStoreClient client) {
await client.AppendToStreamAsync("concurrency-stream", StreamRevision.None,
new[] {new EventData(Uuid.NewUuid(), "-", ReadOnlyMemory<byte>.Empty)});
#region append-with-concurrency-check

var clientOneRead = client.ReadStreamAsync(
Direction.Forwards,
"concurrency-stream",
StreamPosition.Start,
configureOperationOptions: options => options.ThrowOnAppendFailure = false);
StreamPosition.Start);
var clientOneRevision = (await clientOneRead.LastAsync()).Event.EventNumber.ToUInt64();

var clientTwoRead = client.ReadStreamAsync(Direction.Forwards, "concurrency-stream", StreamPosition.Start);
Expand Down
8 changes: 4 additions & 4 deletions samples/persistent-subscriptions/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ await client.CreateAsync(
"test-stream",
"subscription-group",
settings,
userCredentials);
userCredentials: userCredentials);
#endregion create-persistent-subscription-to-stream
}

Expand Down Expand Up @@ -56,7 +56,7 @@ await client.CreateToAllAsync(
"subscription-group",
filter,
settings,
userCredentials);
userCredentials: userCredentials);
#endregion create-persistent-subscription-to-all
}

Expand Down Expand Up @@ -101,7 +101,7 @@ await client.UpdateAsync(
"test-stream",
"subscription-group",
settings,
userCredentials);
userCredentials: userCredentials);
#endregion update-persistent-subscription
}

Expand All @@ -111,7 +111,7 @@ static async Task DeletePersistentSubscription(EventStorePersistentSubscriptions
await client.DeleteAsync(
"test-stream",
"subscription-group",
userCredentials);
userCredentials: userCredentials);
#endregion delete-persistent-subscription
}

Expand Down
19 changes: 15 additions & 4 deletions src/EventStore.Client.Common/EventStoreCallOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
using System.Threading;
using Grpc.Core;
using Timeout_ = System.Threading.Timeout;

#nullable enable
namespace EventStore.Client {
internal static class EventStoreCallOptions {
public static CallOptions Create(EventStoreClientSettings settings,
EventStoreClientOperationOptions operationOptions, UserCredentials? userCredentials,
CancellationToken cancellationToken) => new(
// deadline falls back to infinity
public static CallOptions CreateStreaming(EventStoreClientSettings settings,
TimeSpan? deadline = null, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
Create(settings, deadline, userCredentials, cancellationToken);

// deadline falls back to connection DefaultDeadline
public static CallOptions CreateNonStreaming(EventStoreClientSettings settings, TimeSpan? deadline,
UserCredentials? userCredentials, CancellationToken cancellationToken) => Create(settings,
deadline ?? settings.DefaultDeadline, userCredentials, cancellationToken);

private static CallOptions Create(EventStoreClientSettings settings, TimeSpan? deadline,
UserCredentials? userCredentials, CancellationToken cancellationToken) => new(
cancellationToken: cancellationToken,
deadline: DeadlineAfter(operationOptions.TimeoutAfter),
deadline: DeadlineAfter(deadline),
headers: new Metadata {
{
Constants.Headers.RequiresLeader,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.Operations;
Expand All @@ -10,82 +11,93 @@ public partial class EventStoreOperationsClient {
/// <summary>
/// Shuts down the EventStoreDB node.
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ShutdownAsync(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ShutdownAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Initiates an index merge operation.
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task MergeIndexesAsync(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).MergeIndexesAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Resigns a node.
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ResignNodeAsync(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ResignNodeAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Sets the node priority.
/// </summary>
/// <param name="nodePriority"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task SetNodePriorityAsync(int nodePriority,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).SetNodePriorityAsync(
new SetNodePriorityReq {Priority = nodePriority},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Restart persistent subscriptions
/// </summary>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task RestartPersistentSubscriptions(UserCredentials? userCredentials = null,
public async Task RestartPersistentSubscriptions(
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).RestartPersistentSubscriptionsAsync(
EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ public partial class EventStoreOperationsClient {
/// </summary>
/// <param name="threadCount"></param>
/// <param name="startFromChunk"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public async Task<DatabaseScavengeResult> StartScavengeAsync(
int threadCount = 1,
int startFromChunk = 0,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
if (threadCount <= 0) {
Expand All @@ -37,8 +39,7 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
StartFromChunk = startFromChunk
}
},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return result.ScavengeResult switch {
Expand All @@ -53,11 +54,13 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
/// Stops a scavenge operation.
/// </summary>
/// <param name="scavengeId"></param>
/// <param name="deadline"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<DatabaseScavengeResult> StopScavengeAsync(
string scavengeId,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
Expand All @@ -66,7 +69,7 @@ public async Task<DatabaseScavengeResult> StopScavengeAsync(
Options = new StopScavengeReq.Types.Options {
ScavengeId = scavengeId
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}, EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System;
using System.Net.Http;
using EventStore.Client;
using EventStore.Client.Operations;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
Expand Down
Loading

0 comments on commit 79ed1f6

Please sign in to comment.