Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Couple fixes for subscriptions around configuration, error handling w… #3168

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/DaemonTests/SubscriptionAgentTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Marten.Events;
Expand All @@ -19,7 +20,7 @@ public class SubscriptionAgentTests

public SubscriptionAgentTests()
{
theAgent = new SubscriptionAgent(new ShardName("Projection1"), theOptions, theLoader, theExecution, new ShardStateTracker(NullLogger.Instance), NullLogger.Instance);
theAgent = new SubscriptionAgent(new ShardName("Projection1"), theOptions, TimeProvider.System, theLoader, theExecution, new ShardStateTracker(NullLogger.Instance), NullLogger.Instance);
}

[Fact]
Expand Down
122 changes: 122 additions & 0 deletions src/DaemonTests/Subscriptions/subscribe_from_present.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
using System;
using System.Threading.Tasks;
using DaemonTests.TestingSupport;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Time.Testing;
using Shouldly;
using Weasel.Postgresql;
using Xunit;
using Xunit.Abstractions;

namespace DaemonTests.Subscriptions;

[Collection("subscriptions")]
public class subscribe_from_present : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private readonly FakeSubscription theSubscription = new();
private readonly FakeTimeProvider theProvider = new();
private IHost _host;

public subscribe_from_present(ITestOutputHelper output)
{
_output = output;
}

public async Task InitializeAsync()
{
await SchemaUtils.DropSchema(ConnectionSource.ConnectionString, "subscriptions_start");

_host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMarten(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "subscriptions_start";

opts.DotNetLogger = new TestLogger<FakeSubscription>(_output);
opts.DisableNpgsqlLogging = true;

theSubscription.Options.SubscribeFromPresent();
opts.Events.Subscribe(theSubscription);

opts.Events.TimeProvider = theProvider;
});
}).StartAsync();

var store = _host.Services.GetRequiredService<IDocumentStore>();
await store.Advanced.Clean.DeleteAllEventDataAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

/*
* TODO
*
*
*
*/

[Fact]
public async Task start_from_scratch()
{
var theStore = _host.Services.GetRequiredService<IDocumentStore>();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await pumpInEvents();

await theStore.WaitForNonStaleProjectionDataAsync(20.Seconds());

theSubscription.EventsEncountered.Count.ShouldBe(16);
}

[Fact]
public async Task can_successfully_start_and_function()
{
var theStore = _host.Services.GetRequiredService<IDocumentStore>();

theProvider.SetUtcNow(DateTimeOffset.UtcNow.Subtract(1.Hours()));
await pumpInEvents();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

theProvider.Advance(1.Hours());

await pumpInEvents();

await theStore.WaitForNonStaleProjectionDataAsync(20.Seconds());

theSubscription.EventsEncountered.Count.ShouldBe(16);
}

private async Task pumpInEvents()
{
var theStore = _host.Services.GetRequiredService<IDocumentStore>();
await using var theSession = theStore.LightweightSession();

var events1 = new object[] { new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events2 = new object[] { new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events3 = new object[] { new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.AEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };
var events4 = new object[] { new EventSourcingTests.Aggregation.EEvent(), new EventSourcingTests.Aggregation.BEvent(), new EventSourcingTests.Aggregation.DEvent(), new EventSourcingTests.Aggregation.CEvent() };

theSession.Events.StartStream(Guid.NewGuid(), events1);
theSession.Events.StartStream(Guid.NewGuid(), events2);
theSession.Events.StartStream(Guid.NewGuid(), events3);
theSession.Events.StartStream(Guid.NewGuid(), events4);

await theSession.SaveChangesAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ProjectionCoordinator : IProjectionCoordinator
private readonly ResiliencePipeline _resilience;
private readonly CancellationTokenSource _cancellation = new();
private Task _runner;
private readonly TimeProvider _timeProvider;

public ProjectionCoordinator(IDocumentStore documentStore, ILogger<ProjectionCoordinator> logger)
{
Expand All @@ -53,7 +54,7 @@ public ProjectionCoordinator(IDocumentStore documentStore, ILogger<ProjectionCoo
_options = store.Options;
_logger = logger;
_resilience = store.Options.ResiliencePipeline;

_timeProvider = _options.Events.TimeProvider;
Store = store;
}

Expand Down Expand Up @@ -231,7 +232,7 @@ private async Task startAgentsIfNecessaryAsync(IProjectionSet set,
{
await tryStartAgent(stoppingToken, daemon, name, set).ConfigureAwait(false);
}
else if (agent.Status == AgentStatus.Paused && agent.PausedTime.HasValue && DateTimeOffset.UtcNow.Subtract(agent.PausedTime.Value) > _options.Projections.HealthCheckPollingTime)
else if (agent.Status == AgentStatus.Paused && agent.PausedTime.HasValue && _timeProvider.GetUtcNow().Subtract(agent.PausedTime.Value) > _options.Projections.HealthCheckPollingTime)
{
await tryStartAgent(stoppingToken, daemon, name, set).ConfigureAwait(false);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Marten/Events/Daemon/Internals/IAgentFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private SubscriptionAgent buildAgentForShard(MartenDatabase database, AsyncProje
var loader = new EventLoader(_store, database, shard, options);
var wrapped = new ResilientEventLoader(_store.Options.ResiliencePipeline, loader);

return new SubscriptionAgent(shard.Name, options, wrapped, execution, database.Tracker, logger);
return new SubscriptionAgent(shard.Name, options, _store.Events.TimeProvider, wrapped, execution, database.Tracker, logger);
}

if (shard.SubscriptionSource != null)
Expand All @@ -68,7 +68,7 @@ private SubscriptionAgent buildAgentForShard(MartenDatabase database, AsyncProje
var loader = new EventLoader(_store, database, shard, options);
var wrapped = new ResilientEventLoader(_store.Options.ResiliencePipeline, loader);

return new SubscriptionAgent(shard.Name, options, wrapped, execution, database.Tracker, logger);
return new SubscriptionAgent(shard.Name, options, _store.Options.Events.TimeProvider, wrapped, execution, database.Tracker, logger);
}

throw new ArgumentOutOfRangeException(nameof(shard), "This shard has neither a subscription nor projection");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Services;
Expand All @@ -24,6 +26,12 @@ SessionOptions sessionOptions
protected internal override IDocumentStorage<T> selectStorage<T>(DocumentProvider<T> provider) =>
TrackingMode == DocumentTracking.IdentityOnly ? provider.IdentityMap : provider.Lightweight;

// Do nothing here! See GH-3167
protected override Task tryApplyTombstoneEventsAsync(CancellationToken token)
{
return Task.CompletedTask;
}

protected internal override void ejectById<T>(long id)
{
// nothing
Expand Down
4 changes: 3 additions & 1 deletion src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public Task WaitForCompletion()
// TODO -- make this private
public ActionBlock<IStorageOperation> Queue { get; }



IEnumerable<IDeletion> IUnitOfWork.Deletions()
{
throw new NotSupportedException();
Expand Down Expand Up @@ -165,7 +167,7 @@ void ISessionWorkTracker.Sort(StoreOptions options)
throw new NotSupportedException();
}

List<StreamAction> ISessionWorkTracker.Streams => throw new NotSupportedException();
List<StreamAction> ISessionWorkTracker.Streams => new();


IReadOnlyList<IStorageOperation> ISessionWorkTracker.AllOperations => throw new NotSupportedException();
Expand Down
6 changes: 4 additions & 2 deletions src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Marten.Events.Daemon.Internals;

public class SubscriptionAgent: ISubscriptionAgent, IAsyncDisposable
{
private readonly TimeProvider _timeProvider;
private readonly IEventLoader _loader;
private readonly ISubscriptionExecution _execution;
private readonly ShardStateTracker _tracker;
Expand All @@ -20,10 +21,11 @@ public class SubscriptionAgent: ISubscriptionAgent, IAsyncDisposable
private readonly ActionBlock<Command> _commandBlock;
private IDaemonRuntime _runtime = new NulloDaemonRuntime();

public SubscriptionAgent(ShardName name, AsyncOptions options, IEventLoader loader,
public SubscriptionAgent(ShardName name, AsyncOptions options, TimeProvider timeProvider, IEventLoader loader,
ISubscriptionExecution execution, ShardStateTracker tracker, ILogger logger)
{
Options = options;
_timeProvider = timeProvider;
_loader = loader;
_execution = execution;
_tracker = tracker;
Expand Down Expand Up @@ -72,7 +74,7 @@ public async Task ReportCriticalFailureAsync(Exception ex)
_cancellation.Cancel();
#endif
await _execution.HardStopAsync().ConfigureAwait(false);
PausedTime = DateTimeOffset.UtcNow;
PausedTime = _timeProvider.GetUtcNow();
Status = AgentStatus.Paused;
_tracker.Publish(new ShardState(Name, LastCommitted) { Action = ShardAction.Paused, Exception = ex});

Expand Down
3 changes: 1 addition & 2 deletions src/Marten/Events/Daemon/ProjectionDaemon.Rebuilding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,9 @@ private async Task rewindAgentProgress(string shardName, CancellationToken token
sessionOptions.AllowAnyTenant = true;
await using var session = _store.LightweightSession(sessionOptions);

session.QueueSqlCommand($"delete from {_store.Options.EventGraph.ProgressionTable} where name = ?", shardName);
if (sequenceFloor > 0)
{
session.QueueSqlCommand($"update {_store.Options.EventGraph.ProgressionTable} set last_seq_id = ? where name = ?", sequenceFloor, shardName);
session.QueueSqlCommand($"insert into {_store.Options.EventGraph.ProgressionTable} (name, last_seq_id) values (?, ?) on conflict (name) do update set last_seq_id = ?", shardName, sequenceFloor, sequenceFloor);
}

await session.SaveChangesAsync(token).ConfigureAwait(false);
Expand Down
14 changes: 11 additions & 3 deletions src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,18 @@ public void Subscribe(ISubscriptionSource subscription)
/// <param name="configure"></param>
public void Subscribe(ISubscription subscription, Action<ISubscriptionOptions>? configure = null)
{
var wrapper = new SubscriptionWrapper(subscription);
configure?.Invoke(wrapper);
var source = subscription as ISubscriptionSource ?? new SubscriptionWrapper(subscription);

_subscriptions.Add(wrapper);
if (source is ISubscriptionOptions options)
{
configure?.Invoke(options);
}
else if (configure != null)
{
throw new InvalidOperationException("Unable to apply subscription options to " + subscription);
}

_subscriptions.Add(source);
}

internal bool Any()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private async Task executeBeforeCommitListeners(IUpdateBatch batch)
}
}

private Task tryApplyTombstoneEventsAsync(CancellationToken token)
protected virtual Task tryApplyTombstoneEventsAsync(CancellationToken token)
{
if (Options.EventGraph.TryCreateTombstoneBatch(this, out var tombstoneBatch))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Storage/MartenDatabase.DocumentCleaner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ IF EXISTS(SELECT * FROM information_schema.tables
THEN TRUNCATE TABLE {Options.Events.DatabaseSchemaName}.mt_streams CASCADE; END IF;
IF EXISTS(SELECT * FROM information_schema.tables
WHERE table_name = 'mt_mark_event_progression' AND table_schema = '{Options.Events.DatabaseSchemaName}')
THEN TRUNCATE TABLE {Options.Events.DatabaseSchemaName}.mt_mark_event_progression CASCADE; END IF;
THEN delete from {Options.Events.DatabaseSchemaName}.mt_mark_event_progression; END IF;
END; $$;
";
}
Expand Down
Loading