How to disable certain EventHandlers when restoring from EventStore #299
-
Hey, I am using
Example 1 On<BellRinged>(
builder => builder
.UpdateOne
.DefaultId()
.UpdateFromContext(
(ctx, update) => update
.Set(x => x.HasBellRinged, true)
)
);
Example 2 On<BellRinged>(async ctx => await _notificationService.Handle(
new NotifyAdminThatBellRinged(),
CancellationToken.None
)
.ConfigureAwait(true)
); I works fine until I want to rebuild my read models and delete the corresponding collections and the checkpoints. When I now emit a new event, the read models get rebuild (1). That's fine. How can I disable these events or the whole eventhandlers. Do I have to save somewhere the messageId and perfom manual checks? Thanks for help and answers! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
I did some research and came up with two possible solutions(?). 1. AttemptI created a custom public abstract class OrchestratorHandler : EventHandler
{
private readonly DateTime _subscriptionsFrom;
protected OrchestratorHandler(TypeMapper? typeMap = null)
: base(typeMap) =>
_subscriptionsFrom = DateTime.Now;
public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context)
{
Guard.Against.Null(context);
if (context.Created.ToUniversalTime() < _subscriptionsFrom.ToUniversalTime())
{
return EventHandlingStatus.Ignored;
}
return await base.HandleEvent(context).ConfigureAwait(false);
}
} This works, but has the drawback, that the event handler gets all events from the beginning when the checkpoint is deleted. 2. AttemptI thought again about this problem, had a look in the EventStoreDB docs and realized that these are two types of event handers: catch-up and live subscriptions. Catch-up subscriptions can subscribe from the start or from a specific position. Live subscriptions subscribe from the end of the stream. Subscribing from the start In Eventuous, I use /// <summary>
/// Catch-up subscription for EventStoreDB, using the $all global stream
/// </summary>
[PublicAPI]
public class AllStreamSubscription
: EventStoreCatchUpSubscriptionBase<AllStreamSubscriptionOptions>, IMeasuredSubscription {
/// <summary>
/// Creates EventStoreDB catch-up subscription service for $all
/// </summary>
/// <param name="eventStoreClient">EventStoreDB gRPC client instance</param>
/// <param name="subscriptionId">Subscription ID</param>
/// <param name="checkpointStore">Checkpoint store instance</param>
/// <param name="consumePipe"></param>
/// <param name="eventSerializer">Event serializer instance</param>
/// <param name="metaSerializer"></param>
/// <param name="eventFilter">Optional: server-side event filter</param>
/// <param name="loggerFactory"></param> In there we have protected override async ValueTask Subscribe(CancellationToken cancellationToken) {
var filterOptions = new SubscriptionFilterOptions(
Options.EventFilter ?? EventTypeFilter.ExcludeSystemEvents(),
Options.CheckpointInterval
);
var (_, position) = await GetCheckpoint(cancellationToken).NoContext();
var fromAll = position == null ? FromAll.Start : FromAll.After(new Position(position.Value, position.Value));
Subscription = await EventStoreClient.SubscribeToAllAsync(
fromAll,
(_, @event, ct) => HandleEvent(@event, ct),
Options.ResolveLinkTos,
HandleDrop,
filterOptions,
Options.Credentials,
cancellationToken
)
.NoContext(); In there the position is defined to start from start or from the last known position. So I need to set the position to public sealed class AllStreamLiveSubscription : AllStreamSubscription
{
public AllStreamLiveSubscription(EventStoreClient eventStoreClient, string subscriptionId, ICheckpointStore checkpointStore, ConsumePipe consumePipe, IEventSerializer? eventSerializer = null,
IMetadataSerializer? metaSerializer = null,
IEventFilter? eventFilter = null,
ILoggerFactory? loggerFactory = null)
: base(eventStoreClient, subscriptionId, checkpointStore, consumePipe, eventSerializer, metaSerializer, eventFilter, loggerFactory)
{ }
public AllStreamLiveSubscription(EventStoreClient eventStoreClient, AllStreamSubscriptionOptions options, ICheckpointStore checkpointStore, ConsumePipe consumePipe, ILoggerFactory? loggerFactory)
: base(eventStoreClient, options, checkpointStore, consumePipe, loggerFactory)
{ }
protected override async ValueTask Subscribe(CancellationToken cancellationToken) {
var filterOptions = new SubscriptionFilterOptions(
Options.EventFilter ?? EventTypeFilter.ExcludeSystemEvents(),
Options.CheckpointInterval
);
Subscription = await EventStoreClient.SubscribeToAllAsync(
FromAll.End,
(_, @event, ct) => HandleEvent(@event, ct),
Options.ResolveLinkTos,
HandleDrop,
filterOptions,
Options.Credentials,
cancellationToken
)
.ConfigureAwait(false);
async Task HandleEvent(ResolvedEvent re, CancellationToken ct)
=> await HandleInternal(CreateContext(re, ct)).ConfigureAwait(false);
void HandleDrop(global::EventStore.Client.StreamSubscription _, SubscriptionDroppedReason reason, Exception? ex)
=> Dropped(EsdbMappings.AsDropReason(reason), ex);
}
IMessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancellationToken) {
var evt = DeserializeData(
re.Event.ContentType,
re.Event.EventType,
re.Event.Data,
re.Event.EventStreamId,
re.Event.EventNumber
);
return new MessageConsumeContext(
re.Event.EventId.ToString(),
re.Event.EventType,
re.Event.ContentType,
re.Event.EventStreamId,
re.Event.EventNumber,
re.OriginalEventNumber,
re.Event.Position.CommitPosition,
_sequence++,
re.Event.Created,
evt,
Options.MetadataSerializer.DeserializeMeta(Options, re.Event.Metadata, re.Event.EventStreamId),
SubscriptionId,
cancellationToken
);
}
ulong _sequence;
}
static class EsdbMappings {
public static DropReason AsDropReason(SubscriptionDroppedReason reason)
=> reason switch {
SubscriptionDroppedReason.Disposed => DropReason.Stopped,
SubscriptionDroppedReason.ServerError => DropReason.ServerError,
SubscriptionDroppedReason.SubscriberError => DropReason.SubscriptionError,
_ => throw new ArgumentOutOfRangeException(nameof(reason), reason, null)
};
} In my Program.cs: services.AddSubscription<AllStreamLiveSubscription, AllStreamSubscriptionOptions>(
"MyEventHandlerStreamName",
builder => builder
.UseCheckpointStore<NoOpCheckpointStore>()
.AddEventHandler<MyEventHandler>()
);
...
static SubscriptionBuilder<AppLiveAllStreamSubscription, AllStreamSubscriptionOptions> UseCheckpointStore<T>(
this SubscriptionBuilder<AppLiveAllStreamSubscription, AllStreamSubscriptionOptions> builder
) where T : class, ICheckpointStore
=> builder.UseCheckpointStore<AppLiveAllStreamSubscription, AllStreamSubscriptionOptions, T>(); Two remarks: I could not use MongoCheckpointStore anymore because I got this error
So I changed it to What I also didn't like was to be forced to copy a lot of code out of AllStreamSubscription, maybe there is a better solution? |
Beta Was this translation helpful? Give feedback.
-
There are two things I always suggest:
|
Beta Was this translation helpful? Give feedback.
There are two things I always suggest:
NotifyAdminThatBellRinged
) must be separated from the subscription that produces read models. It is unconditional, and applies to the next point as well.