Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing helpers for async projections #2624

Closed
agross opened this issue Jun 30, 2023 · 9 comments
Closed

Add testing helpers for async projections #2624

agross opened this issue Jun 30, 2023 · 9 comments

Comments

@agross
Copy link
Contributor

agross commented Jun 30, 2023

Some of my tests require async projections to be up-to-date in order to be successful.

Let's say I build a test targeting code that queries models created by an async projection:

  1. Create stream a predefined set of events
  2. Wait for projection to catch up with the events
  3. Execute the SUT
  4. Assert the right thing™ happened

The second step today is a bit cumbersome.

I tried a suggestion by @oskardudycz using an IDocumentSessionListener that writes projection update information to a System.Threading.Channels.Channel. Later I could read from the channel, but every event can only be read once. So I needed to cache all information a second time in a ConcurrentBag. When asserting whether a (>=1) projection was updated I first inspected the bag and if nothing was found attempted to read from the channel. It kind of worked, but also was unreliable (flaky tests, even with an additional Thread.Sleep for good measure) and tedious to define all necessary projections for every integration scenario.

The solution I currently use disables the Async Daemon for testing scenarios and explicitly runs the daemon to update all projections w.r.t. the current event count. It's reliable and works so far.

Since I think that I am not the only person to work with async projections I believe this "wait for everything" to be up-to-date should be generalized and easy to access. Especially it should not require disabling the async daemon for testing scenarios, but rather asking it to be up to date within some timeout. I tried that, but I could not find a way to access the running agent from the IDocumentStore.

The APIs that would be helpful:

  • store.WaitForAllAsyncProjections(timeout)
  • store.WaitForAsyncProjection<TModel>(id, timeout)
  • store.WaitForAsyncProjection<TModel>(predicate, timeout)
@jeremydmiller
Copy link
Member

What you're asking for here is syntactic sugar on top of the existing functionality we already use today in testing the async daemon. See this:

https://github.com/JasperFx/marten/blob/master/src/Marten/Events/Daemon/ShardStateTracker.cs#L92

It's using IObservables, and the daemon already publishes events for its progress. We could easily just expose that on IProjectionDaemon and add your suggested API's if you want.

@jeremydmiller jeremydmiller added this to the 6.1.0 milestone Jul 5, 2023
@agross
Copy link
Contributor Author

agross commented Jul 5, 2023

I found this tracking helper but I didn't have much luck with it.

https://discord.com/channels/1074998995086225460/1074999076896112661/1123886607695618079

@oskardudycz
Copy link
Collaborator

I think that it’d be good to expose such methods to make it more accessible plus document them.

@oskardudycz oskardudycz modified the milestones: 6.1.0, 6.2.0 Sep 8, 2023
@jeremydmiller
Copy link
Member

This is a little bigger than it might have sounded because of the potential for multi-tenancy. Will have to be done database by database.

@jeremydmiller
Copy link
Member

IProjectionDaemon.WaitForNonStaleData(timeout) already does this. We might push this to being a documentation effort, because it's not obvious. You can't hang anything off of DocumentStore unless it's going to crudely poll the event status table.

I'm pushing this back since there are usable workarounds right now

@jeremydmiller jeremydmiller modified the milestones: 6.2.0, 7.0.0 Sep 12, 2023
@a-shtifanov-laya
Copy link
Contributor

a-shtifanov-laya commented Oct 27, 2023

hi guys, found this issue while dealing with integration tests as well 😄

inspired by your suggestions, this gist helped me to await all projections:

// when you start with a cold DB (like a clean db in docker), AllProjectionProgress will return 0 projections
// so we need to await that as well
var asyncProjectionsCount = 
  _store.Options.Events.Projections().Count(x => x.Lifecycle == ProjectionLifecycle.Async)
  + 1 /* one additional for HighWaterMark projection */;

var awaitTask = Task.Run(async () =>
{
    do
    {
        await Task.Delay(TimeSpan.FromSeconds(1), ct);
        var statistics = await _store.Storage.Database.FetchEventStoreStatistics(ct);
        var projections = await _store.Storage.Database.AllProjectionProgress(ct);
        if (projections.Count == asyncProjectionsCount && projections.All(x => x.Sequence >= statistics.EventSequenceNumber)) return;
    } while (true);
}, ct);
await Task.WhenAny(awaitTask, Task.Delay(TimeSpan.FromSeconds(15), ct));

i placed it into a test endpoint and i'm calling this endpoint from tests before other code that requires projections

@jeremydmiller
Copy link
Member

I'm going to take this today, and pretty well do a polling approach like @a-shtifanov-laya was doing so there's no issue about what or where the daemon is already running.

@baltie
Copy link
Contributor

baltie commented Mar 22, 2024

@jeremydmiller It seems that WaitForNonStaleProjectionDataAsync only partially solved the issue, at least in our case. In a cold start scenario where the database is empty (which is the case for our integration tests), we first need to wait for the async daemon to create all the expected rows that track projection state for all async projections. WaitForNonStaleProjectionDataAsync happily proceeds if only HighWaterMark is there and does not consider which async projections should have been present.

This is what we do right now in our WebApplicationFactory subclass to support our tests which relies on async projections (heavily based on the comment of @a-shtifanov-laya above). We do not use database per tenant yet, so that would complicate this a bit.

public class MyWebApplicationFactory : WebApplicationFactory<Program>, IAsyncLifetime, ICollectionFixture<MyWebApplicationFactory>
{
    // ....

    public async Task WaitForAsyncProjections()
    {
        CancellationTokenSource cts = new(10.Seconds());

        // Since WaitForNonStaleProjectionDataAsync does not wait until the projections exists in the tables,
        // we need to wait for the async daemon to prepare first. We do that by waiting for the projection count
        // is the same in both configuration and actual database data.
        IDocumentStore store = Services.GetRequiredService<IDocumentStore>();
        IReadOnlyProjectionData[] asyncProjections = store.Options.Events.Projections()
            .Where(x => x.Lifecycle == ProjectionLifecycle.Async)
            .ToArray();

        while (!cts.IsCancellationRequested)
        {
            IReadOnlyList<ShardState> projectionProgress = await store.Storage.Database.AllProjectionProgress(cts.Token);
            if (projectionProgress.Count == asyncProjections.Length + 1) // +1 to include HighWaterMark
                break;
            await Task.Delay(100.Milliseconds(), cts.Token);
        }

        await store.WaitForNonStaleProjectionDataAsync(10.Seconds());
    }
}

@baltie
Copy link
Contributor

baltie commented Mar 22, 2024

And now that I got a bit further, it seems WaitForNonStaleProjectionDataAsync doesn't work. That is, it works for the first test case since I wait for projections to appear, but it fails for the rest. Reading the source code, you get the projection progress only once, and then keep checking that same result over and over until timeout. See

var projections = await database.AllProjectionProgress(cancellationSource.Token).ConfigureAwait(false);
while (!projections.All(x => x.Sequence >= initial.EventSequenceNumber) &&
!cancellationSource.IsCancellationRequested)
{
await Task.Delay(100.Milliseconds(), cancellationSource.Token).ConfigureAwait(false);
}
. AllProjectionProgress returns an IReadOnlyList<ShardState>.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants