Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect Server Capabilities #172

Merged
merged 6 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
framework: [netcoreapp3.1, net5.0, net48]
framework: [netcoreapp3.1, net5.0]
os: [ubuntu-18.04, windows-latest]
runs-on: ${{ matrix.os }}
name: scan-vulnerabilities/${{ matrix.os }}/${{ matrix.framework }}
Expand All @@ -41,7 +41,7 @@ jobs:
strategy:
fail-fast: false
matrix:
framework: [netcoreapp3.1, net5.0, net48]
framework: [netcoreapp3.1, net5.0]
os: [ubuntu-18.04]
test: ["", .Streams, .PersistentSubscriptions, .Operations, .UserManagement, .ProjectionManagement]
configuration: [release]
Expand All @@ -60,7 +60,6 @@ jobs:
docker pull ghcr.io/eventstore/eventstore:${{ matrix.docker-tag }}
- name: Install netcoreapp3.1
uses: actions/setup-dotnet@v1
if: matrix.framework == 'netcoreapp3.1'
with:
dotnet-version: 3.1.x
- name: Install net5.0
Expand Down
12 changes: 2 additions & 10 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net48;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>netcoreapp3.1;net5.0</TargetFrameworks>
<Platform>x64</Platform>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<Nullable>disable</Nullable>
Expand All @@ -14,15 +14,7 @@
<RootNamespace>EventStore.Client</RootNamespace>
<UseLinkBase>true</UseLinkBase>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'net48' Or '$(TargetFramework)' == 'netcoreapp3.1'">
<PropertyGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
<DefineConstants>$(DefineConstants);GRPC_CORE</DefineConstants>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NETStandard.Library" Version="2.0.3"/>
<PackageReference Include="System.Net.Http" Version="4.3.4"/>
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net48'">
<PackageReference Include="IndexRange" Version="1.0.0" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
</ItemGroup>
</Project>
5 changes: 3 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
<Copyright>Copyright 2012-2020 Event Store Ltd</Copyright>
<MinVerTagPrefix>v</MinVerTagPrefix>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<GrpcPackageVersion>2.38.0</GrpcPackageVersion>
<GrpcPackageVersion>2.40.0</GrpcPackageVersion>
<GrpcCorePackageVersion>2.42.0</GrpcCorePackageVersion>
</PropertyGroup>
<ItemGroup>
<None Include="..\..\LICENSE.md" Pack="true" PackagePath="\"/>
Expand All @@ -36,7 +37,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Grpc.Tools" Version="$(GrpcPackageVersion)">
<PackageReference Include="Grpc.Tools" Version="$(GrpcCorePackageVersion)">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
Expand Down
13 changes: 0 additions & 13 deletions src/EventStore.Client.Common/DeconstructionExtensions.cs

This file was deleted.

8 changes: 1 addition & 7 deletions src/EventStore.Client.Common/EpochExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
#nullable enable
namespace EventStore.Client {
internal static class EpochExtensions {
private static readonly DateTime UnixEpoch =
#if NETFRAMEWORK
new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc)
#else
DateTime.UnixEpoch
#endif
;
private static readonly DateTime UnixEpoch = DateTime.UnixEpoch;

public static DateTime FromTicksSinceEpoch(this long value) =>
new DateTime(UnixEpoch.Ticks + value, DateTimeKind.Utc);
Expand Down
19 changes: 19 additions & 0 deletions src/EventStore.Client.Common/protos/serverfeatures.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";
package event_store.client.server_features;
option java_package = "com.eventstore.dbclient.proto.serverfeatures";
import "shared.proto";

service ServerFeatures {
rpc GetSupportedMethods (event_store.client.Empty) returns (SupportedMethods);
}

message SupportedMethods {
repeated SupportedMethod methods = 1;
string event_store_server_version = 2;
}

message SupportedMethod {
string method_name = 1;
string service_name = 2;
repeated string features = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ public partial class EventStoreOperationsClient {
public async Task ShutdownAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ShutdownAsync(EmptyResult,
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ShutdownAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -30,9 +32,11 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ShutdownAsync(
public async Task MergeIndexesAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).MergeIndexesAsync(EmptyResult,
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).MergeIndexesAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -44,9 +48,11 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).MergeIndexesAs
public async Task ResignNodeAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ResignNodeAsync(EmptyResult,
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ResignNodeAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -59,10 +65,12 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ResignNodeAsyn
public async Task SetNodePriorityAsync(int nodePriority,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).SetNodePriorityAsync(
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).SetNodePriorityAsync(
new SetNodePriorityReq {Priority = nodePriority},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -73,10 +81,12 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).SetNodePriorit
/// <returns></returns>
public async Task RestartPersistentSubscriptions(UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).RestartPersistentSubscriptionsAsync(
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).RestartPersistentSubscriptionsAsync(
EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
throw new ArgumentOutOfRangeException(nameof(startFromChunk));
}

var result = await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).StartScavengeAsync(
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).StartScavengeAsync(
new StartScavengeReq {
Options = new StartScavengeReq.Types.Options {
ThreadCount = threadCount,
Expand All @@ -38,6 +39,7 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).StartScavengeA
},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand All @@ -58,8 +60,9 @@ public async Task<DatabaseScavengeResult> StopScavengeAsync(
string scavengeId,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
var result = await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).StopScavengeAsync(new StopScavengeReq {
channelInfo.CallInvoker).StopScavengeAsync(new StopScavengeReq {
Options = new StopScavengeReq.Types.Options {
ScavengeId = scavengeId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ partial class EventStorePersistentSubscriptionsClient {
[SystemConsumerStrategies.Pinned] = CreateReq.Types.ConsumerStrategy.Pinned,
};

private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string streamName, StreamPosition position) {
private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string streamName,
StreamPosition position) {
if (position == StreamPosition.Start) {
return new CreateReq.Types.StreamOptions {
StreamIdentifier = streamName,
Expand Down Expand Up @@ -42,8 +43,7 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi
allOptions = new CreateReq.Types.AllOptions {
Start = new Empty(),
};
}
else if (position == Position.End) {
} else if (position == Position.End) {
allOptions = new CreateReq.Types.AllOptions {
End = new Empty()
};
Expand Down Expand Up @@ -153,33 +153,53 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve
throw new ArgumentNullException(nameof(settings));
}

if (streamName != SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is StreamPosition)) {
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream");
if (streamName != SystemStreams.AllStream && settings.StartFrom != null &&
!(settings.StartFrom is StreamPosition)) {
throw new ArgumentException(
$"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream");
}

if (streamName == SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is Position)) {
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
if (streamName == SystemStreams.AllStream && settings.StartFrom != null &&
!(settings.StartFrom is Position)) {
throw new ArgumentException(
$"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}

if (eventFilter != null && streamName != SystemStreams.AllStream) {
throw new ArgumentException($"Filters are only supported when subscribing to {SystemStreams.AllStream}");
throw new ArgumentException(
$"Filters are only supported when subscribing to {SystemStreams.AllStream}");
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);

if (streamName == SystemStreams.AllStream &&
!channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
throw new NotSupportedException("The server does not support persistent subscriptions to $all.");
}

using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
channelInfo.CallInvoker).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
Stream = streamName != SystemStreams.AllStream ?
StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null,
All = streamName == SystemStreams.AllStream ?
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter) : null,
#pragma warning disable 612
StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
Stream = streamName != SystemStreams.AllStream
? StreamOptionsForCreateProto(streamName,
(StreamPosition)(settings.StartFrom ?? StreamPosition.End))
: null,
All = streamName == SystemStreams.AllStream
? AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter)
: null,
#pragma warning disable 612
StreamIdentifier =
streamName != SystemStreams.AllStream
? streamName
: string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
GroupName = groupName,
Settings = new CreateReq.Types.Settings {
#pragma warning disable 612
Revision = streamName != SystemStreams.AllStream ? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64() : default, /*for backwards compatibility*/
#pragma warning restore 612
#pragma warning disable 612
Revision = streamName != SystemStreams.AllStream
? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64()
: default, /*for backwards compatibility*/
#pragma warning restore 612
CheckpointAfterMs = (int)settings.CheckPointAfter.TotalMilliseconds,
ExtraStatistics = settings.ExtraStatistics,
MessageTimeoutMs = (int)settings.MessageTimeout.TotalMilliseconds,
Expand All @@ -195,6 +215,7 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne
}
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -210,12 +231,12 @@ public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.PersistentSubscriptions;
Expand All @@ -15,6 +16,13 @@ partial class EventStorePersistentSubscriptionsClient {
/// <returns></returns>
public async Task DeleteAsync(string streamName, string groupName, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);

if (streamName == SystemStreams.AllStream &&
!channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
throw new NotSupportedException("The server does not support persistent subscriptions to $all.");
}

var deleteOptions = new DeleteReq.Types.Options {
GroupName = groupName
};
Expand All @@ -25,10 +33,12 @@ public async Task DeleteAsync(string streamName, string groupName, UserCredentia
deleteOptions.StreamIdentifier = streamName;
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).DeleteAsync(new DeleteReq {
Options = deleteOptions
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
using var call =
new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(channelInfo.CallInvoker)
.DeleteAsync(new DeleteReq {Options = deleteOptions},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand Down
Loading