Skip to content

Commit

Permalink
implement subscription position
Browse files Browse the repository at this point in the history
  • Loading branch information
thefringeninja committed Jan 28, 2022
1 parent 7f1ce66 commit e01ee93
Show file tree
Hide file tree
Showing 24 changed files with 500 additions and 290 deletions.
9 changes: 5 additions & 4 deletions samples/subscribing-to-streams/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ static async Task Main(string[] args) {
private static async Task SubscribeToStream(EventStoreClient client) {
#region subscribe-to-stream
await client.SubscribeToStreamAsync("some-stream",
SubscriptionStreamPosition.Start,
async (subscription, evnt, cancellationToken) => {
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
Expand All @@ -29,21 +30,21 @@ await client.SubscribeToStreamAsync("some-stream",
#region subscribe-to-stream-from-position
await client.SubscribeToStreamAsync(
"some-stream",
StreamPosition.FromInt64(20),
SubscriptionStreamPosition.After(StreamPosition.FromInt64(20)),
EventAppeared);
#endregion subscribe-to-stream-from-position

#region subscribe-to-stream-live
await client.SubscribeToStreamAsync(
"some-stream",
StreamPosition.End,
SubscriptionStreamPosition.Live,
EventAppeared);
#endregion subscribe-to-stream-live

#region subscribe-to-stream-resolving-linktos
await client.SubscribeToStreamAsync(
"$et-myEventType",
StreamPosition.Start,
SubscriptionStreamPosition.Start,
EventAppeared,
resolveLinkTos: true);
#endregion subscribe-to-stream-resolving-linktos
Expand All @@ -52,7 +53,7 @@ await client.SubscribeToStreamAsync(
var checkpoint = StreamPosition.Start;
await client.SubscribeToStreamAsync(
"some-stream",
checkpoint,
SubscriptionStreamPosition.After(checkpoint),
eventAppeared: async (subscription, evnt, cancellationToken) => {
await HandleEvent(evnt);
checkpoint = evnt.OriginalEventNumber;
Expand Down
151 changes: 10 additions & 141 deletions src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,10 @@
#nullable enable
namespace EventStore.Client {
public partial class EventStoreClient {
private Task<StreamSubscription> SubscribeToAllAsync(
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = new ReadReq.Types.Options.Types.AllOptions {
Start = new Empty()
},
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
}, operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
filterOptions?.CheckpointReached, cancellationToken);
}

/// <summary>
/// Subscribes to all events. Use this when you have no <see cref="Position">checkpoint</see>.
/// Subscribes to all events.
/// </summary>
/// <param name="start">A <see cref="SubscriptionPosition"/> (exclusive of) to start the subscription from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
Expand All @@ -42,6 +19,7 @@ private Task<StreamSubscription> SubscribeToAllAsync(
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToAllAsync(
SubscriptionPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Expand All @@ -52,25 +30,13 @@ public Task<StreamSubscription> SubscribeToAllAsync(
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToAllAsync(eventAppeared, operationOptions, resolveLinkTos, subscriptionDropped,
filterOptions, userCredentials, cancellationToken);
}

private Task<StreamSubscription> SubscribeToAllAsync(Position start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = ReadReq.Types.Options.Types.AllOptions.FromPosition(start),
All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
Expand All @@ -79,136 +45,39 @@ private Task<StreamSubscription> SubscribeToAllAsync(Position start,
}

/// <summary>
/// Subscribes to all events from a <see cref="Position">checkpoint</see>. This is exclusive of.
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
/// </summary>
/// <param name="start">A <see cref="Position"/> (exclusive of) to start the subscription from.</param>
/// <param name="start">A <see cref="SubscriptionStreamPosition"/> (exclusive of) to start the subscription from.</param>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="filterOptions">The optional <see cref="SubscriptionFilterOptions"/> to apply.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToAllAsync(Position start,
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
SubscriptionStreamPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToAllAsync(start, eventAppeared, operationOptions, resolveLinkTos, subscriptionDropped,
filterOptions, userCredentials, cancellationToken);
}

private Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = new ReadReq.Types.Options.Types.StreamOptions {
Start = new Empty(),
StreamIdentifier = streamName
},
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
}, operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);
}

/// <summary>
/// Subscribes to all events in a stream. Use this when you have no <see cref="StreamPosition">checkpoint</see>.
/// </summary>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToStreamAsync(streamName, eventAppeared, operationOptions, resolveLinkTos,
subscriptionDropped,
userCredentials, cancellationToken);
}

private Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
StreamPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = start == StreamPosition.End
? new ReadReq.Types.Options.Types.StreamOptions {
End = new Empty(),
StreamIdentifier = streamName
}
: new ReadReq.Types.Options.Types.StreamOptions {
Revision = start,
StreamIdentifier = streamName
},
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
},
operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);
}

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>. This is exclusive of.
/// </summary>
/// <param name="start">A <see cref="Position"/> (exclusive of) to start the subscription from.</param>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
StreamPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToStreamAsync(streamName, start, eventAppeared, operationOptions, resolveLinkTos,
subscriptionDropped, userCredentials, cancellationToken);
}
}
}
33 changes: 28 additions & 5 deletions src/EventStore.Client.Streams/Streams/ReadReq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,27 @@ partial class Types {
partial class Options {
partial class Types {
partial class StreamOptions {
public static StreamOptions FromSubscriptionPosition(string streamName,
SubscriptionStreamPosition subscriptionPosition) {
if (subscriptionPosition == SubscriptionStreamPosition.Live) {
return new StreamOptions {
StreamIdentifier = streamName,
End = new Empty()
};
}

if (subscriptionPosition == SubscriptionStreamPosition.Start) {
return new StreamOptions {
StreamIdentifier = streamName,
Start = new Empty()
};
}

return new StreamOptions {
StreamIdentifier = streamName,
Revision = subscriptionPosition.ToUInt64()
};
}
public static StreamOptions FromStreamNameAndRevision(
string streamName,
StreamPosition streamRevision) {
Expand Down Expand Up @@ -35,23 +56,25 @@ public static StreamOptions FromStreamNameAndRevision(
}

partial class AllOptions {
public static AllOptions FromPosition(Client.Position position) {
if (position == Client.Position.End) {
public static AllOptions FromSubscriptionPosition(SubscriptionPosition position) {
if (position == SubscriptionPosition.Live) {
return new AllOptions {
End = new Empty()
};
}

if (position == Client.Position.Start) {
if (position == SubscriptionPosition.Start) {
return new AllOptions {
Start = new Empty()
};
}

var (c, p) = position.ToUInt64();

return new AllOptions {
Position = new Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
CommitPosition = c,
PreparePosition = p
}
};
}
Expand Down
Loading

0 comments on commit e01ee93

Please sign in to comment.