Skip to content

Commit

Permalink
feat(changes): Add IDocumentChanges + RavenDB implementation
Browse files Browse the repository at this point in the history
- [Changes API: How to Subscribe to Document Changes | RavenDB 5.3 Documentation](https://ravendb.net/docs/article-page/5.3/csharp/client-api/changes/how-to-subscribe-to-document-changes#fordocumentsincollection)
- [Async / Await with IObserver<T> · Issue #459 · dotnet/reactive](dotnet/reactive#459)
- [Change Streams](https://mongodb.github.io/mongo-csharp-driver/2.9/reference/driver/change_streams/)
  • Loading branch information
jfoshee committed Sep 4, 2022
1 parent 3195df0 commit 9f77d47
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
namespace IntBasis.DocumentOriented.RavenDB.Tests;

public class RavenDbDocumentChangesTest
{
// Separate entity type for change tests so we don't pick up changes from other collections
class TestBook : IDocumentEntity
{
public string? Id {get;set;}
public string? Title { get; set; }
public int PageCount { get; set; }
}

[Theory(DisplayName = "Subscribe nothing changed"), Integration]
public async Task NoChanges(IDocumentChanges subject)
{
var invoked = false;
var subscription = subject.Subscribe<TestBook>(() =>
{
invoked = true;
return Task.CompletedTask;
});
await Task.Delay(100);
subscription.Dispose();
invoked.Should().BeFalse();
}

[Theory(DisplayName = "Subscribe 1 Change"), Integration]
public async Task OneChange(IDocumentChanges subject, IDocumentStorage documentStorage)
{
var entity = new TestBook();
await documentStorage.Store(entity);
var invoked = false;
using var subscription = subject.Subscribe<TestBook>(() =>
{
invoked = true;
return Task.CompletedTask;
});

entity.Title = "New title";
await documentStorage.Store(entity);

await Task.Delay(100);
invoked.Should().BeTrue();
}

[Theory(DisplayName = "Subscribe 5 Changes"), Integration]
public async Task FiveChanges(IDocumentChanges subject, IDocumentStorage documentStorage)
{
var entity = new TestBook();
await documentStorage.Store(entity);
var invoked = 0;
using var subscription = subject.Subscribe<TestBook>(() =>
{
invoked++;
return Task.CompletedTask;
});

for (int i = 0; i < 5; i++)
{
entity.PageCount = i;
await documentStorage.Store(entity);
}

await Task.Delay(500);
invoked.Should().BeInRange(5, 6); // TODO: Figure out why sometimes 6
}

[Theory(DisplayName = "Disposal ends subscription"), Integration]
public async Task Disposal(IDocumentChanges subject, IDocumentStorage documentStorage)
{
var invoked = false;
var subscription = subject.Subscribe<TestBook>(() =>
{
invoked = true;
return Task.CompletedTask;
});
subscription.Dispose();

await documentStorage.Store(new TestBook());

await Task.Delay(200);
invoked.Should().BeFalse();
}
}

//- [Change Streams] (https://mongodb.github.io/mongo-csharp-driver/2.9/reference/driver/change_streams/)
4 changes: 2 additions & 2 deletions IntBasis.DocumentOriented.RavenDB.Tests/RavenDbExampleTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class RavenDbExampleTest
// store.Maintenance.Server.Send<DatabasePutResult>(createOperation);
//}

RavenDbConfiguration TestConfig => new("Test", "http://127.0.0.1:8080");
IDocumentStore DocumentStore() => RavenDbInitialization.InitializeDocumentStore(TestConfig);
static RavenDbConfiguration TestConfig => new("Test", "http://127.0.0.1:8080");
static IDocumentStore DocumentStore() => RavenDbInitialization.InitializeDocumentStore(TestConfig);

[Theory(DisplayName = "RavenDB Store"), Integration]
public void Storage(IDocumentSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="RavenDB.Client" Version="5.3.103" />
<PackageReference Include="System.Reactive.Core" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
28 changes: 28 additions & 0 deletions IntBasis.DocumentOriented.RavenDB/RavenDbDocumentChanges.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Raven.Client.Documents;
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace IntBasis.DocumentOriented.RavenDB;

public class RavenDbDocumentChanges : IDocumentChanges
{
private readonly IDocumentStore documentStore;

public RavenDbDocumentChanges(IDocumentStore documentStore)
{
this.documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
}

/// <inheritdoc/>
public IDisposable Subscribe<T>(Func<Task> observer) where T : IDocumentEntity
{
// [Changes API: How to Subscribe to Document Changes](https://ravendb.net/docs/article-page/5.3/csharp/client-api/changes/how-to-subscribe-to-entity-changes#fordocumentsincollection)
// [Async / Await with IObserver](https://github.com/dotnet/reactive/issues/459)
var observable = documentStore.Changes()
.ForDocumentsInCollection<T>();
return observable.Select(document => Observable.FromAsync(observer))
.Concat()
.Subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public static class ServiceCollectionExtensions
/// External:
/// <list type="bullet">
/// <item> <see cref="IDocumentStorage"/> </item>
/// <item> <see cref="IDocumentChanges"/> </item>
/// </list>
/// <para/>
/// Internal:
Expand All @@ -27,6 +28,7 @@ public static IServiceCollection AddDocumentOrientedRavenDb(this IServiceCollect
RavenDbConfiguration configuration)
{
services.AddTransient<IDocumentStorage, RavenDbDocumentStorage>();
services.AddTransient<IDocumentChanges, RavenDbDocumentChanges>();
// A single instance of the Document Store (Singleton Pattern)
// should be created per cluster per the lifetime of your application.
// See https://ravendb.net/docs/article-page/5.3/csharp/client-api/what-is-a-document-store
Expand Down
18 changes: 18 additions & 0 deletions IntBasis.DocumentOriented/IDocumentChanges.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace IntBasis.DocumentOriented;

/// <summary>
/// Provides a way to subscribe to notifications of changes to Document Entities
/// </summary>
public interface IDocumentChanges
{
/// <summary>
/// Subscribes to changes to Document Entities of type <typeparamref name="T"/>
/// and invokes <paramref name="observer"/> for each change.
/// <para/>
/// The subscription is closed when the returned object is disposed.
/// </summary>
/// <typeparam name="T">The Document Entity type</typeparam>
/// <param name="observer">The delegate that is called for each change</param>
/// <returns>A subscription which can be closed by calling <see cref="IDisposable.Dispose"/></returns>
IDisposable Subscribe<T>(Func<Task> observer) where T : IDocumentEntity;
}

0 comments on commit 9f77d47

Please sign in to comment.