diff --git a/CHANGELOG.md b/CHANGELOG.md index 44f97cb08..84e252e03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - Stores: `*Category.Resolve`: Replace `Resolve(sn, ?ResolveOption)` with `?load = LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308) - Stores: `*Category` ctor: Add mandatory `name` argument, and `Name` property [#410](https://github.com/jet/equinox/pull/410) - Stores: `*Category` ctor: Change `caching` to be last argument, to reflect that it is applied over the top [#410](https://github.com/jet/equinox/pull/410) +- Stores: `*Category` ctor: Change `caching` and `access` to be mandatory, adding `NoCaching` and `Unoptimized` modes to represent the former defaults [#417](https://github.com/jet/equinox/pull/417) - `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310) - `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) - `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index 4184c6909..6a0572baf 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -15,21 +15,21 @@ type Store(store) = snapshot: ('event -> bool) * ('state -> 'event)): Category<'event, 'state, unit> = match store with | Store.Context.Memory store -> - Equinox.MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial) + MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial) | Store.Context.Cosmos (store, caching, unfolds) -> - let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized - Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching) + let accessStrategy = if unfolds then CosmosStore.AccessStrategy.Snapshot snapshot else CosmosStore.AccessStrategy.Unoptimized + CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching) | Store.Context.Dynamo (store, caching, unfolds) -> - let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized - Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching) + let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized + DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching) | Store.Context.Es (context, caching, unfolds) -> - let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None - Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, ?access = accessStrategy, ?caching = caching) + let accessStrategy = if unfolds then EventStoreDb.AccessStrategy.RollingSnapshots snapshot else EventStoreDb.AccessStrategy.Unoptimized + EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching) | Store.Context.Sql (context, caching, unfolds) -> - let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None - Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, name, codec, fold, initial, ?access = accessStrategy, ?caching = caching) + let accessStrategy = if unfolds then SqlStreamStore.AccessStrategy.RollingSnapshots snapshot else SqlStreamStore.AccessStrategy.Unoptimized + SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching) | Store.Context.Mdb (context, caching) -> - Equinox.MessageDb.MessageDbCategory<'event,'state,_>(context, name, codec, fold, initial, ?caching = caching) + MessageDb.MessageDbCategory<'event,'state,_>(context, name, codec, fold, initial, MessageDb.AccessStrategy.Unoptimized, caching) type ServiceBuilder(storageConfig, handlerLog) = let store = Store storageConfig diff --git a/samples/Infrastructure/Store.fs b/samples/Infrastructure/Store.fs index c97e56877..cb3445126 100644 --- a/samples/Infrastructure/Store.fs +++ b/samples/Infrastructure/Store.fs @@ -1,18 +1,19 @@ module Samples.Infrastructure.Store open Argu +open Equinox open Serilog open System [] type Context = // For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems - | Memory of Equinox.MemoryStore.VolatileStore> - | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool - | Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.DynamoStore.CachingStrategy * unfolds: bool - | Es of Equinox.EventStoreDb.EventStoreContext * Equinox.CachingStrategy option * unfolds: bool - | Mdb of Equinox.MessageDb.MessageDbContext * Equinox.CachingStrategy option - | Sql of Equinox.SqlStreamStore.SqlStreamStoreContext * Equinox.CachingStrategy option * unfolds: bool + | Memory of MemoryStore.VolatileStore> + | Cosmos of CosmosStore.CosmosStoreContext * CachingStrategy * unfolds: bool + | Dynamo of DynamoStore.DynamoStoreContext * CachingStrategy * unfolds: bool + | Es of EventStoreDb.EventStoreContext * CachingStrategy * unfolds: bool + | Mdb of MessageDb.MessageDbContext * CachingStrategy + | Sql of SqlStreamStore.SqlStreamStoreContext * CachingStrategy * unfolds: bool module MemoryStore = type [] Parameters = @@ -258,7 +259,7 @@ module EventStore = let timeout = a.Timeout log.Information("EventStoreDB {connectionString} {timeout}s", a.ConnectionString, timeout.TotalSeconds) let connection = connect a.ConnectionString a.Credentials timeout - let cacheStrategy = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)) + let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching Context.Es (EventStoreContext(connection, batchSize = a.BatchSize), cacheStrategy, unfolds) // see https://github.com/jet/equinox#provisioning-mssql @@ -266,7 +267,7 @@ module Sql = open Equinox.SqlStreamStore - let cacheStrategy cache = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)) + let cacheStrategy = function Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching module Ms = type [] Parameters = | [] ConnectionString of string @@ -366,5 +367,5 @@ module MessageDb = let config (log : ILogger) cache (p : ParseResults) = let a = Arguments(p) let connection = connect log a.ConnectionString - let cache = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow(c, TimeSpan.FromMinutes 20.)) + let cache = match cache with Some c -> CachingStrategy.SlidingWindow(c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching Context.Mdb (MessageDbContext(connection, batchSize = a.BatchSize), cache) diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 3d48822bf..943b158ee 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -19,14 +19,14 @@ let codec = Cart.Events.codec let codecJe = Cart.Events.codecJe let categoryGesStreamWithRollingSnapshots context = - EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot) + EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, EventStoreDb.AccessStrategy.RollingSnapshots snapshot, CachingStrategy.NoCaching) let categoryGesStreamWithoutCustomAccessStrategy context = - EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial) + EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, EventStoreDb.AccessStrategy.Unoptimized, CachingStrategy.NoCaching) let categoryCosmosStreamWithSnapshotStrategy context = - CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CachingStrategy.NoCaching) let categoryCosmosStreamWithoutCustomAccessStrategy context = - CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CachingStrategy.NoCaching) let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count = service.ExecuteManyAsync(cartId, false, seq { diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index 5ed00de4a..88f82d79d 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -17,17 +17,17 @@ let Category = ContactPreferences.Category let codec = ContactPreferences.Events.codec let codecJe = ContactPreferences.Events.codecJe let categoryGesWithOptimizedStorageSemantics context = - EventStoreDb.EventStoreCategory(context 1, Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.LatestKnownEvent) + EventStoreDb.EventStoreCategory(context 1, Category, codec, fold, initial, EventStoreDb.AccessStrategy.LatestKnownEvent, CachingStrategy.NoCaching) let categoryGesWithoutAccessStrategy context = - EventStoreDb.EventStoreCategory(context defaultBatchSize, Category, codec, fold, initial) + EventStoreDb.EventStoreCategory(context defaultBatchSize, Category, codec, fold, initial, EventStoreDb.AccessStrategy.Unoptimized, CachingStrategy.NoCaching) let categoryCosmosWithLatestKnownEventSemantics context = - CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.LatestKnownEvent, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.LatestKnownEvent, CachingStrategy.NoCaching) let categoryCosmosUnoptimized context = - CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CachingStrategy.NoCaching) let categoryCosmosRollingUnfolds context = let access = CosmosStore.AccessStrategy.Custom(ContactPreferences.Fold.isOrigin, ContactPreferences.Fold.transmute) - CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CachingStrategy.NoCaching) type Tests(testOutputHelper) = let testOutput = TestOutput testOutputHelper diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 6fb5749de..1e4bc8c36 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -18,18 +18,18 @@ let createServiceMemory log store = let codec = Favorites.Events.codec let codecJe = Favorites.Events.codecJe let createServiceGes log context = - EventStoreDb.EventStoreCategory(context, Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot) + EventStoreDb.EventStoreCategory(context, Category, codec, fold, initial, EventStoreDb.AccessStrategy.RollingSnapshots snapshot, CachingStrategy.NoCaching) |> Decider.forStream log |> Favorites.create let createServiceCosmosSnapshotsUncached log context = - CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CachingStrategy.NoCaching) |> Decider.forStream log |> Favorites.create let createServiceCosmosRollingStateUncached log context = let access = CosmosStore.AccessStrategy.RollingState Favorites.Fold.snapshot - CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CosmosStore.CachingStrategy.NoCaching) + CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CachingStrategy.NoCaching) |> Decider.forStream log |> Favorites.create @@ -37,7 +37,7 @@ let createServiceCosmosUnoptimizedButCached log context = let access = CosmosStore.AccessStrategy.Unoptimized let caching = let cache = Cache ("name", 10) - CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, caching) |> Decider.forStream log |> Favorites.create diff --git a/samples/Tutorial/AsAt.fsx b/samples/Tutorial/AsAt.fsx index 925ac6693..3c01cd709 100644 --- a/samples/Tutorial/AsAt.fsx +++ b/samples/Tutorial/AsAt.fsx @@ -168,7 +168,7 @@ module Cosmos = let connector = CosmosStoreConnector(discovery, TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., Microsoft.Azure.Cosmos.ConnectionMode.Gateway) let storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") |> Async.RunSynchronously let context = CosmosStoreContext(storeClient, tipMaxEvents = 10) - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching let accessStrategy = AccessStrategy.Snapshot (Fold.isValid,Fold.snapshot) let cat = CosmosStoreCategory(context, Category, Events.codecJe, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) let resolve = Equinox.Decider.forStream Log.log cat diff --git a/samples/Tutorial/Cosmos.fsx b/samples/Tutorial/Cosmos.fsx index 2a017ca96..ecab85245 100644 --- a/samples/Tutorial/Cosmos.fsx +++ b/samples/Tutorial/Cosmos.fsx @@ -87,7 +87,7 @@ module Favorites = open Equinox.CosmosStore // Everything outside of this module is completely storage agnostic so can be unit tested simply and/or bound to any store let accessStrategy = AccessStrategy.Unoptimized // Or Snapshot etc https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#access-strategies let category (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) let [] appName = "equinox-tutorial" diff --git a/samples/Tutorial/FulfilmentCenter.fsx b/samples/Tutorial/FulfilmentCenter.fsx index 3796b4383..906723d7a 100644 --- a/samples/Tutorial/FulfilmentCenter.fsx +++ b/samples/Tutorial/FulfilmentCenter.fsx @@ -140,7 +140,7 @@ module Store = let storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") |> Async.RunSynchronously let context = CosmosStoreContext(storeClient, tipMaxEvents = 256) let cache = Equinox.Cache(appName, 20) - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching open FulfilmentCenter diff --git a/samples/Tutorial/Gapless.fs b/samples/Tutorial/Gapless.fs index 7ed66c5f9..a467c105b 100644 --- a/samples/Tutorial/Gapless.fs +++ b/samples/Tutorial/Gapless.fs @@ -80,7 +80,7 @@ module Cosmos = open Equinox.CosmosStore let private category (context, cache, accessStrategy) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) module Snapshot = diff --git a/samples/Tutorial/Index.fs b/samples/Tutorial/Index.fs index 982c2e007..ad7b93fb5 100644 --- a/samples/Tutorial/Index.fs +++ b/samples/Tutorial/Index.fs @@ -52,7 +52,7 @@ module Cosmos = open Equinox.CosmosStore let category (context,cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.RollingState Fold.snapshot CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) diff --git a/samples/Tutorial/Sequence.fs b/samples/Tutorial/Sequence.fs index a8f206d9e..769db083c 100644 --- a/samples/Tutorial/Sequence.fs +++ b/samples/Tutorial/Sequence.fs @@ -42,7 +42,7 @@ module Cosmos = open Equinox.CosmosStore let private create (context, cache, accessStrategy) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) module LatestKnownEvent = diff --git a/samples/Tutorial/Set.fs b/samples/Tutorial/Set.fs index ab379c60e..44aa5ad41 100644 --- a/samples/Tutorial/Set.fs +++ b/samples/Tutorial/Set.fs @@ -53,7 +53,7 @@ module Cosmos = open Equinox.CosmosStore let category (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.RollingState Fold.snapshot CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) diff --git a/samples/Tutorial/Todo.fsx b/samples/Tutorial/Todo.fsx index b0f9153eb..c4f5c61fc 100644 --- a/samples/Tutorial/Todo.fsx +++ b/samples/Tutorial/Todo.fsx @@ -130,7 +130,7 @@ module Store = let connector = CosmosStoreConnector(discovery, TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5.) let storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") |> Async.RunSynchronously let context = CosmosStoreContext(storeClient, tipMaxEvents = 100) // Keep up to 100 events in tip before moving events to a new document - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) let access = AccessStrategy.Snapshot (isOrigin,snapshot) let category = CosmosStoreCategory(context, Category, codec, fold, initial, access, cacheStrategy) diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index 8dd422ea8..227dd4941 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -58,10 +58,10 @@ module Cosmos = open Equinox.CosmosStore let category (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching CosmosStoreCategory(context, Category, Events.codecJe, Fold.fold, Fold.initial, AccessStrategy.LatestKnownEvent, cacheStrategy) module EventStore = open Equinox.EventStoreDb let category context = - EventStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, access = AccessStrategy.LatestKnownEvent) + EventStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, AccessStrategy.LatestKnownEvent, Equinox.CachingStrategy.NoCaching) diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index 234687e61..38576f2db 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -101,6 +101,8 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) = member val Inner = inner type [] CachingStrategy = + /// Do not apply any caching strategy for this Category. + | NoCaching /// Retain a single 'state per streamName. /// Each cache hit for a stream renews the retention period for the defined window. /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. @@ -112,4 +114,6 @@ type [] CachingStrategy = | FixedTimeSpan of Cache * period: TimeSpan /// Prefix is used to segregate multiple folded states per stream when they are stored in the cache. /// Semantics are otherwise identical to SlidingWindow. + /// NOTE In general, its much preferred to ensure that the snapshot includes all relevant state + /// or use an AccessStrategy such as MultiSnapshot where available | SlidingWindowPrefixed of Cache * window: TimeSpan * prefix: string diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs index 43a436f87..7f85019a0 100644 --- a/src/Equinox.Core/Caching.fs +++ b/src/Equinox.Core/Caching.fs @@ -35,15 +35,13 @@ let private mkKey prefix streamName = let internal policySlidingExpiration (slidingExpiration: System.TimeSpan) () = System.Runtime.Caching.CacheItemPolicy(SlidingExpiration = slidingExpiration) -let internal policyFixedTimeSpan (period: System.TimeSpan) () = +let private policyFixedTimeSpan (period: System.TimeSpan) () = let expirationPoint = let creationDate = System.DateTimeOffset.UtcNow in creationDate.Add period System.Runtime.Caching.CacheItemPolicy(AbsoluteExpiration = expirationPoint) -let private mapStrategy = function - | Equinox.CachingStrategy.FixedTimeSpan (cache, period) -> struct ( cache, mkKey null, policyFixedTimeSpan period) - | Equinox.CachingStrategy.SlidingWindow (cache, window) -> cache, mkKey null, policySlidingExpiration window - | Equinox.CachingStrategy.SlidingWindowPrefixed (cache, window, prefix) -> cache, mkKey prefix, policySlidingExpiration window -let apply isStale x (cat: 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state>): ICategory<_, _, _> = +let apply isStale x (cat: 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state>) = match x with - | None -> cat - | Some x -> mapStrategy x |> fun struct (cache, createKey, createOptions) -> Decorator(cat, cache, isStale, createKey, createOptions) + | Equinox.CachingStrategy.NoCaching -> (cat : ICategory<_, _, _>) + | Equinox.CachingStrategy.FixedTimeSpan (cache, period) -> Decorator(cat, cache, isStale, mkKey null, policyFixedTimeSpan period) + | Equinox.CachingStrategy.SlidingWindow (cache, window) -> Decorator(cat, cache, isStale, mkKey null, policySlidingExpiration window) + | Equinox.CachingStrategy.SlidingWindowPrefixed (cache, window, prefix) -> Decorator(cat, cache, isStale, mkKey prefix, policySlidingExpiration window) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index d6037b6f5..8dec482bd 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1310,31 +1310,6 @@ type CosmosStoreContext(storeClient: CosmosStoreClient, tipOptions, queryOptions let store = StoreClient(cg.Container, cg.Fallback, x.QueryOptions, x.TipOptions) struct (store, streamName, cg.Initialize) -/// For CosmosDB, caching is typically a central aspect of managing RU consumption to maintain performance and capacity. -/// The cache holds the Tip document's etag, which enables use of etag-contingent Reads (which cost only 1RU in the case where the document is unchanged) -/// Omitting can make sense in specific cases; if cache hit rates are low, or there's always a usable snapshot in a relatively small Tip document -[] -type CachingStrategy = - /// Do not apply any caching strategy for this Stream. - /// NB opting not to leverage caching when using CosmosDB can have significant implications for the scalability - /// of your application, both in terms of latency and running costs. - /// While the cost of a cache miss can be ameliorated to varying degrees by employing an appropriate `AccessStrategy` - /// [that works well and has been validated for your scenario with real data], even a cache with a low Hit Rate provides - /// a direct benefit in terms of the number of Request Unit (RU)s that need to be provisioned to your CosmosDB instances. - | NoCaching - /// Retain a single 'state per streamName, together with the associated etag. - /// Each cache hit for a stream renews the retention period for the defined window. - /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. - /// Unless LoadOption.AnyCachedValue or AllowStale are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). - // NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to - // track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip - | SlidingWindow of Equinox.Cache * window: TimeSpan - /// Retain a single 'state per streamName, together with the associated etag. - /// Upon expiration of the defined period, a full reload is triggered. - /// Typically combined with an `Equinox.LoadOption` to minimize loads. - /// Unless LoadOption.AnyCachedValue or AllowStale are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). - | FixedTimeSpan of Equinox.Cache * period: TimeSpan - [] type AccessStrategy<'event, 'state> = /// Don't apply any optimized reading logic. Note this can be extremely RU cost prohibitive @@ -1369,7 +1344,17 @@ type AccessStrategy<'event, 'state> = type CosmosStoreCategory<'event, 'state, 'context> internal (name, resolveStream) = inherit Equinox.Category<'event, 'state, 'context>(name, resolveStream = resolveStream) - new(context: CosmosStoreContext, name, codec, fold, initial, access, caching, + new(context: CosmosStoreContext, name, codec, fold, initial, access, + // For CosmosDB, caching is typically a central aspect of managing RU consumption to maintain performance and capacity. + // The cache holds the Tip document's etag, which enables use of etag-contingent Reads (which cost only 1RU in the case where the document is unchanged) + // Omitting can make sense in specific cases; if cache hit rates are low, or there's always a usable snapshot in a relatively small Tip document + // NOTE Using NoCaching with CosmosDB can have significant implications for the scalability of your application, both in terms of latency and running costs. + // While the cost of a cache miss can be ameliorated to varying degrees by employing an appropriate `AccessStrategy` + // [that works well and has been validated for your scenario with real data], even a cache with a low Hit Rate provides + // a direct benefit in terms of the number of Request Unit (RU)s that need to be provisioned to your CosmosDB instances. + // NOTE Unless LoadOption.AnyCachedValue or AllowStale are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified). + // NOTE re SlidingWindowPrefixed: the recommended approach is to track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip + caching, // Compress Unfolds in Tip. Default: true. // NOTE when set to false, requires Equinox.CosmosStore or Equinox.Cosmos Version >= 2.3.0 to be able to read [] ?compressUnfolds) = @@ -1382,10 +1367,6 @@ type CosmosStoreCategory<'event, 'state, 'context> internal (name, resolveStream | AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ state -> unfold state) | AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton) | AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute - let caching = caching |> function - | CachingStrategy.NoCaching -> None - | CachingStrategy.SlidingWindow (cache, window) -> Some (Equinox.CachingStrategy.SlidingWindow (cache, window)) - | CachingStrategy.FixedTimeSpan (cache, period) -> Some (Equinox.CachingStrategy.FixedTimeSpan (cache, period)) let categories = System.Collections.Concurrent.ConcurrentDictionary>() let resolveInner struct (container, categoryName, init) = let createCategory _name: ICategory<_, _, 'context> = diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 979e76458..dc11bbc68 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -1261,30 +1261,6 @@ type DynamoStoreContext(storeClient: DynamoStoreClient, tipOptions, queryOptions let container, fallback, streamName = storeClient.ResolveContainerFallbackAndStreamName(categoryName, streamId) struct (StoreClient(container, fallback, x.QueryOptions, x.TipOptions), streamName) -/// For DynamoDB, caching is typically a central aspect of managing RU consumption to maintain performance and capacity. -/// Omitting can make sense in specific cases; if streams are short, or there's always a usable snapshot in the Tip -[] -type CachingStrategy = - /// Do not apply any caching strategy for this Stream. - /// NB opting not to leverage caching when using DynamoDB can have significant implications for the scalability - /// of your application, both in terms of latency and running costs. - /// While the cost of a cache miss can be ameliorated to varying degrees by employing an appropriate `AccessStrategy` - /// [that works well and has been validated for your scenario with real data], even a cache with a low Hit Rate provides - /// a direct benefit in terms of the number of Read and/or Write Request Charge Units (RCU)s that need to be provisioned for your Tables. - | NoCaching - /// Retain a single 'state per streamName, together with the associated etag. - /// Each cache hit for a stream renews the retention period for the defined window. - /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. - /// Unless a LoadOption is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match - // NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to - // track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the unfolds in Tip - | SlidingWindow of Equinox.Cache * window: TimeSpan - /// Retain a single 'state per streamName, together with the associated etag. - /// Upon expiration of the defined period, a full reload is triggered. - /// Typically combined with an `Equinox.LoadOption` to minimize loads. - /// Unless a LoadOption is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match - | FixedTimeSpan of Equinox.Cache * period: TimeSpan - [] type AccessStrategy<'event, 'state> = /// Don't apply any optimized reading logic. Note this can be extremely RU cost prohibitive @@ -1319,7 +1295,16 @@ type AccessStrategy<'event, 'state> = type DynamoStoreCategory<'event, 'state, 'context>(name, resolveStream) = inherit Equinox.Category<'event, 'state, 'context>(name, resolveStream = resolveStream) - new(context: DynamoStoreContext, name, codec, fold, initial, access, caching) = + new(context: DynamoStoreContext, name, codec, fold, initial, access, + // For DynamoDB, caching is typically a central aspect of managing RU consumption to maintain performance and capacity. + // Omitting can make sense in specific cases; if streams are short, or there's always a usable snapshot in the Tip + // NOTE Using NoCaching with DynamoDB can have significant implications for the scalability of your application, both in terms of latency and running costs. + // While the cost of a cache miss can be ameliorated to varying degrees by employing an appropriate `AccessStrategy` + // [that works well and has been validated for your scenario with real data], even a cache with a low Hit Rate provides + // a direct benefit in terms of the number of Read and/or Write Request Charge Units (RCU)s that need to be provisioned for your Tables. + // NOTE Unless a LoadOption is used, each cache hit still involves a read roundtrip (RU charges incurred, transport latency) though deserialization is skipped due to etag match + // NOTE re SlidingWindowPrefixed: the recommended approach is to track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip + caching) = let isOrigin, checkUnfolds, mapUnfolds = match access with | AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 () @@ -1328,10 +1313,6 @@ type DynamoStoreCategory<'event, 'state, 'context>(name, resolveStream) = | AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ (state: 'state) -> unfold state) | AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton) | AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute - let caching = caching |> function - | CachingStrategy.NoCaching -> None - | CachingStrategy.SlidingWindow (cache, window) -> Some (Equinox.CachingStrategy.SlidingWindow (cache, window)) - | CachingStrategy.FixedTimeSpan (cache, period) -> Some (Equinox.CachingStrategy.FixedTimeSpan (cache, period)) let categories = System.Collections.Concurrent.ConcurrentDictionary>() let resolveInner (categoryName, container) = let createCategory _name: ICategory<_, _, 'context> = diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 7915f2583..9f426b771 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -422,6 +422,8 @@ type EventStoreContext(connection: EventStoreConnection, batchOptions: BatchOpti [] type AccessStrategy<'event, 'state> = + /// Read events forward, in batches. + | Unoptimized /// Load only the single most recent event defined in 'event and trust that doing a fold from any such event /// will yield a correct and complete state /// In other words, the fold function should not need to consider either the preceding 'state or 'events. @@ -440,18 +442,18 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code let tryDecode (e: ResolvedEvent) = e |> UnionEncoderAdapters.encodedEventOfResolvedEvent |> codec.TryDecode let isOrigin = match access with - | None | Some AccessStrategy.LatestKnownEvent -> fun _ -> true - | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> isValid + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> fun _ -> true + | AccessStrategy.RollingSnapshots (isValid, _) -> isValid let loadAlgorithm log streamName requireLeader = match access with - | None -> context.LoadBatched(log, streamName, requireLeader, tryDecode, None) - | Some AccessStrategy.LatestKnownEvent - | Some (AccessStrategy.RollingSnapshots _) -> context.LoadBackwardsStoppingAtCompactionEvent(log, streamName, requireLeader, tryDecode, isOrigin) + | AccessStrategy.Unoptimized -> context.LoadBatched(log, streamName, requireLeader, tryDecode, None) + | AccessStrategy.LatestKnownEvent + | AccessStrategy.RollingSnapshots _ -> context.LoadBackwardsStoppingAtCompactionEvent(log, streamName, requireLeader, tryDecode, isOrigin) let compactionPredicate = match access with - | None -> None - | Some AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> Some isValid + | AccessStrategy.Unoptimized -> None + | AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) + | AccessStrategy.RollingSnapshots (isValid, _) -> Some isValid let fetch state f = task { let! token', events = f in return struct (token', fold state events) } let reload (log, sn, leader, token, state) = fetch state (context.Reload(log, sn, leader, token, tryDecode, compactionPredicate)) interface ICategory<'event, 'state, 'context> with @@ -461,8 +463,8 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, _ct) = task { let events = match access with - | None | Some AccessStrategy.LatestKnownEvent -> events - | Some (AccessStrategy.RollingSnapshots (_, compact)) -> + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> events + | AccessStrategy.RollingSnapshots (_, compact) -> let cc = CompactionContext(Array.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then Array.append events (fold state events |> compact |> Array.singleton) else events let encode e = codec.Encode(ctx, e) @@ -474,16 +476,15 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code type EventStoreCategory<'event, 'state, 'context> internal (name, inner) = inherit Equinox.Category<'event, 'state, 'context>(name, inner = inner) - new(context: EventStoreContext, name, codec: FsCodec.IEventCodec<_, _, 'context>, fold, initial, [] ?access, + new(context: EventStoreContext, name, codec: FsCodec.IEventCodec<_, _, 'context>, fold, initial, access, // Caching can be overkill for EventStore esp considering the degree to which its intrinsic caching is a first class feature - // e.g., A key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load - [] ?caching) = - do match access with - | Some AccessStrategy.LatestKnownEvent when Option.isSome caching -> - "Equinox.EventStore does not support (and it would make things _less_ efficient even if it did)" - + "mixing AccessStrategy.LatestKnownEvent with Caching at present." - |> invalidOp - | _ -> () + // e.g., a key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load + caching) = + match access, caching with + | AccessStrategy.LatestKnownEvent, Equinox.CachingStrategy.NoCaching -> () + | AccessStrategy.LatestKnownEvent, _ -> + invalidOp "Equinox.EventStore does not support mixing AccessStrategy.LatestKnownEvent with Caching at present." + | _ -> () let cat = Category<'event, 'state, 'context>(context, codec, fold, initial, access) |> Caching.apply Token.isStale caching EventStoreCategory(name, inner = cat) diff --git a/src/Equinox.EventStoreDb/EventStoreDb.fs b/src/Equinox.EventStoreDb/EventStoreDb.fs index df02e1d3d..264773beb 100644 --- a/src/Equinox.EventStoreDb/EventStoreDb.fs +++ b/src/Equinox.EventStoreDb/EventStoreDb.fs @@ -364,6 +364,8 @@ type EventStoreContext(connection: EventStoreConnection, batchOptions: BatchOpti [] type AccessStrategy<'event, 'state> = + /// Read events forward, in batches. + | Unoptimized /// Load only the single most recent event defined in 'event and trust that doing a fold from any such event /// will yield a correct and complete state /// In other words, the fold function should not need to consider either the preceding 'state or 'events. @@ -382,19 +384,19 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code let tryDecode (e: ResolvedEvent) = e.Event |> ClientCodec.timelineEvent |> codec.TryDecode let isOrigin = match access with - | None | Some AccessStrategy.LatestKnownEvent -> fun _ -> true - | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> isValid + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> fun _ -> true + | AccessStrategy.RollingSnapshots (isValid, _) -> isValid let loadAlgorithm log streamName requireLeader ct = let compacted limit = context.LoadBackwardsStoppingAtCompactionEvent(log, streamName, requireLeader, limit, tryDecode, isOrigin, ct) match access with - | None -> context.LoadBatched(log, streamName, requireLeader, tryDecode, None, ct) - | Some AccessStrategy.LatestKnownEvent -> compacted (Some 1) - | Some (AccessStrategy.RollingSnapshots _) -> compacted None + | AccessStrategy.Unoptimized -> context.LoadBatched(log, streamName, requireLeader, tryDecode, None, ct) + | AccessStrategy.LatestKnownEvent -> compacted (Some 1) + | AccessStrategy.RollingSnapshots _ -> compacted None let compactionPredicate = match access with - | None -> None - | Some AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> Some isValid + | AccessStrategy.Unoptimized -> None + | AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) + | AccessStrategy.RollingSnapshots (isValid, _) -> Some isValid let fetch state f = task { let! struct (token', events) = f in return struct (token', fold state events) } let reload (log, sn, leader, token, state) ct = fetch state (context.Reload(log, sn, leader, token, tryDecode, compactionPredicate, ct)) interface Caching.IReloadable<'state> with member _.Reload(log, sn, leader, token, state, ct) = reload (log, sn, leader, token, state) ct @@ -405,8 +407,8 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, ct) = task { let events = match access with - | None | Some AccessStrategy.LatestKnownEvent -> events - | Some (AccessStrategy.RollingSnapshots (_, compact)) -> + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> events + | AccessStrategy.RollingSnapshots (_, compact) -> let cc = CompactionContext(Array.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then Array.append events (fold state events |> compact |> Array.singleton) else events let encode e = codec.Encode(ctx, e) @@ -417,16 +419,15 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code type EventStoreCategory<'event, 'state, 'context> internal (name, inner) = inherit Equinox.Category<'event, 'state, 'context>(name, inner = inner) - new(context: EventStoreContext, name, codec: FsCodec.IEventCodec<_, _, 'context>, fold, initial, [] ?access, + new(context: EventStoreContext, name, codec: FsCodec.IEventCodec<_, _, 'context>, fold, initial, access, // Caching can be overkill for EventStore esp considering the degree to which its intrinsic caching is a first class feature - // e.g., A key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load - [] ?caching) = - do match access with - | Some AccessStrategy.LatestKnownEvent when Option.isSome caching -> - "Equinox.EventStoreDb does not support (and it would make things _less_ efficient even if it did)" - + "mixing AccessStrategy.LatestKnownEvent with Caching at present." - |> invalidOp - | _ -> () + // e.g., a key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load + caching) = + match access, caching with + | AccessStrategy.LatestKnownEvent, Equinox.CachingStrategy.NoCaching -> () + | AccessStrategy.LatestKnownEvent, _ -> + invalidOp "Equinox.EventStoreDb does not support mixing AccessStrategy.LatestKnownEvent with Caching at present." + | _ -> () let cat = Category<'event, 'state, 'context>(context, codec, fold, initial, access) |> Caching.apply Token.isStale caching EventStoreCategory(name, inner = cat) diff --git a/src/Equinox.MessageDb/MessageDb.fs b/src/Equinox.MessageDb/MessageDb.fs index fc89429b8..627693c34 100644 --- a/src/Equinox.MessageDb/MessageDb.fs +++ b/src/Equinox.MessageDb/MessageDb.fs @@ -356,6 +356,8 @@ type MessageDbContext(client: MessageDbClient, batchOptions: BatchOptions) = [] type AccessStrategy<'event, 'state> = + /// Read events forward, in batches. + | Unoptimized /// Load only the single most recent event defined in in a stream and trust that it'll be decoded and /// doing a fold from any such event will yield a correct and complete state /// In other words, the fold function should not need to consider either the preceding 'state or 'events. @@ -374,9 +376,9 @@ type AccessStrategy<'event, 'state> = type private Category<'event, 'state, 'context>(context: MessageDbContext, codec: IEventCodec<_, _, 'context>, fold, initial, access) = let loadAlgorithm log category streamId streamName requireLeader ct = match access with - | None -> context.LoadBatched(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) - | Some AccessStrategy.LatestKnownEvent -> context.LoadLast(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) - | Some (AccessStrategy.AdjacentSnapshots (snapshotType, _)) -> task { + | AccessStrategy.Unoptimized -> context.LoadBatched(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) + | AccessStrategy.LatestKnownEvent -> context.LoadLast(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) + | AccessStrategy.AdjacentSnapshots (snapshotType, _) -> task { match! context.LoadSnapshot(log, category, streamId, requireLeader, codec.TryDecode, snapshotType, ct) with | ValueSome (pos, snapshotEvent) -> let state = fold initial [| snapshotEvent |] @@ -395,8 +397,8 @@ type private Category<'event, 'state, 'context>(context: MessageDbContext, codec | GatewaySyncResult.Written token' -> let state' = fold state events match access with - | None | Some AccessStrategy.LatestKnownEvent -> () - | Some (AccessStrategy.AdjacentSnapshots(_, toSnap)) -> + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> () + | AccessStrategy.AdjacentSnapshots(_, toSnap) -> if Token.shouldSnapshot context.BatchOptions.BatchSize token token' then do! x.StoreSnapshot(log, categoryName, streamId, ctx, token', toSnap state', ct) return SyncResult.Written (token', state') @@ -410,8 +412,11 @@ type private Category<'event, 'state, 'context>(context: MessageDbContext, codec FsCodec.Core.EventData.Create(rawEvent.EventType, rawEvent.Data, meta = Snapshot.meta token) context.StoreSnapshot(log, category, streamId, encodedWithMeta, ct) -type MessageDbCategory<'event, 'state, 'context>(context: MessageDbContext, name, codec, fold, initial, []?access, - // For MessageDb, caching is less critical than it is for e.g. CosmosDB - // As such, it can often be omitted, particularly if streams are short, or events are small and/or database latency aligns with request latency requirements - []?caching) = +type MessageDbCategory<'event, 'state, 'context>(context: MessageDbContext, name, codec, fold, initial, access, + // For MessageDb, caching is less critical than it is for e.g. CosmosDB. + // However, while not necessary to control costs, caching can improve the throughput of your application a few times over, + // as such you should only skip it if you know what you're doing + // e.g. if streams are always short, events are always small, you are absolutely certain there will be no cache hits + // (and you have a cheerful but bored DBA) + caching) = inherit Equinox.Category<'event, 'state, 'context>(name, Category(context, codec, fold, initial, access) |> Caching.apply Token.isStale caching) diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 2af6d2363..be95c42ea 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -398,6 +398,8 @@ type SqlStreamStoreContext(connection: SqlStreamStoreConnection, batchOptions: B [] type AccessStrategy<'event, 'state> = + /// Read events forward, in batches. + | Unoptimized /// Load only the single most recent event defined in 'event and trust that doing a fold from any such event /// will yield a correct and complete state /// In other words, the fold function should not need to consider either the preceding 'state or 'events. @@ -416,18 +418,18 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext, let tryDecode (e: ResolvedEvent) = e |> UnionEncoderAdapters.encodedEventOfResolvedEvent |> codec.TryDecode let isOrigin = match access with - | None | Some AccessStrategy.LatestKnownEvent -> fun _ -> true - | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> isValid + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> fun _ -> true + | AccessStrategy.RollingSnapshots (isValid, _) -> isValid let loadAlgorithm log streamName requireLeader ct = match access with - | None -> context.LoadBatched(log, streamName, requireLeader, tryDecode, None, ct) - | Some AccessStrategy.LatestKnownEvent - | Some (AccessStrategy.RollingSnapshots _) -> context.LoadBackwardsStoppingAtCompactionEvent(log, streamName, requireLeader, tryDecode, isOrigin, ct) + | AccessStrategy.Unoptimized -> context.LoadBatched(log, streamName, requireLeader, tryDecode, None, ct) + | AccessStrategy.LatestKnownEvent + | AccessStrategy.RollingSnapshots _ -> context.LoadBackwardsStoppingAtCompactionEvent(log, streamName, requireLeader, tryDecode, isOrigin, ct) let compactionPredicate = match access with - | None -> None - | Some AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> Some isValid + | AccessStrategy.Unoptimized -> None + | AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) + | AccessStrategy.RollingSnapshots (isValid, _) -> Some isValid let fetch state f = task { let! token', events = f in return struct (token', fold state events) } let reload (log, sn, leader, token, state) ct = fetch state (context.Reload(log, sn, leader, token, tryDecode, compactionPredicate, ct)) interface ICategory<'event, 'state, 'context> with @@ -437,8 +439,8 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext, member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, ct) = task { let events = match access with - | None | Some AccessStrategy.LatestKnownEvent -> events - | Some (AccessStrategy.RollingSnapshots (_, compact)) -> + | AccessStrategy.Unoptimized | AccessStrategy.LatestKnownEvent -> events + | AccessStrategy.RollingSnapshots (_, compact) -> let cc = CompactionContext(Array.length events, token.batchCapacityLimit.Value) if cc.IsCompactionDue then Array.append events (fold state events |> compact |> Array.singleton) else events let encode e = codec.Encode(ctx, e) @@ -450,16 +452,15 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext, type SqlStreamStoreCategory<'event, 'state, 'context> internal (name, inner) = inherit Equinox.Category<'event, 'state, 'context>(name, inner = inner) - new(context: SqlStreamStoreContext, name, codec: FsCodec.IEventCodec<_, _, 'context>, fold, initial, []?access, + new(context: SqlStreamStoreContext, name, codec: FsCodec.IEventCodec<_, _, 'context>, fold, initial, access, // For SqlStreamStore, caching is less critical than it is for e.g. CosmosDB - // As such, it can often be omitted, particularly if streams are short, or events are small and/or database latency aligns with request latency requirements - []?caching) = - do match access with - | Some AccessStrategy.LatestKnownEvent when Option.isSome caching -> - "Equinox.SqlStreamStore does not support (and it would make things _less_ efficient even if it did)" - + "mixing AccessStrategy.LatestKnownEvent with Caching at present." - |> invalidOp - | _ -> () + // As such, it can sometimes be omitted, particularly if streams are short, or events are small and/or database latency aligns with request latency requirements + caching) = + match access, caching with + | AccessStrategy.LatestKnownEvent, Equinox.CachingStrategy.NoCaching -> () + | AccessStrategy.LatestKnownEvent, _ -> + invalidOp "Equinox.SqlStreamStore does not support mixing AccessStrategy.LatestKnownEvent with Caching at present." + | _ -> () let cat = Category<'event, 'state, 'context>(context, codec, fold, initial, access) |> Caching.apply Token.isStale caching SqlStreamStoreCategory(name, cat) diff --git a/tests/Equinox.Core.Tests/CachingTests.fs b/tests/Equinox.Core.Tests/CachingTests.fs index aa3d7d577..75783c67f 100644 --- a/tests/Equinox.Core.Tests/CachingTests.fs +++ b/tests/Equinox.Core.Tests/CachingTests.fs @@ -59,9 +59,9 @@ let [] ``AsyncLazy.Empty is a true singleton, does not allocate`` () = let i2 = Equinox.Core.AsyncLazy.Empty test <@ obj.ReferenceEquals(i1, i2) @> -let [] ``No strategy, no wrapping`` () = +let [] ``NoCaching strategy does no wrapping`` () = let cat = SpyCategory() - let sut = Equinox.Core.Caching.apply isStale None cat + let sut = Equinox.Core.Caching.apply isStale Equinox.CachingStrategy.NoCaching cat test <@ obj.ReferenceEquals(cat, sut) @> type Tests() = @@ -69,7 +69,7 @@ type Tests() = let cache = Equinox.Cache("tests", 1) let strategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 10) let cat = SpyCategory() - let sut = Equinox.Core.Caching.apply isStale (Some strategy) cat + let sut = Equinox.Core.Caching.apply isStale strategy cat let sn = Guid.NewGuid |> string let write () = write sn sut diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs index 40ff2010c..5c6e0bfa9 100644 --- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs +++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs @@ -15,10 +15,10 @@ open Xunit module WiringHelpers = let private createCategoryUncached name codec initial fold accessStrategy context = - let noCachingCacheStrategy = CachingStrategy.NoCaching + let noCachingCacheStrategy = Equinox.CachingStrategy.NoCaching StoreCategory(context, name, codec, fold, initial, accessStrategy, noCachingCacheStrategy) let private createCategory name codec initial fold accessStrategy (context, cache) = - let sliding20mCacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let sliding20mCacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) StoreCategory(context, name, codec, fold, initial, accessStrategy, sliding20mCacheStrategy) let createCategoryUnoptimizedUncached name codec initial fold context = diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs index c52125abf..145296f43 100644 --- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs @@ -23,27 +23,27 @@ module Cart = let codec = Cart.Events.codecJe #endif let createServiceWithoutOptimization log context = - StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.Unoptimized, CachingStrategy.NoCaching) + StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.Unoptimized, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create /// Trigger looking in Tip (we want those calls to occur, but without leaning on snapshots, which would reduce the paths covered) let createServiceWithEmptyUnfolds log context = let unfArgs = Cart.Fold.isOrigin, fun _ -> Array.empty - StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.MultiSnapshot unfArgs, CachingStrategy.NoCaching) + StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.MultiSnapshot unfArgs, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create let createServiceWithSnapshotStrategy log context = - StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.Snapshot snapshot, CachingStrategy.NoCaching) + StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.Snapshot snapshot, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create let createServiceWithSnapshotStrategyAndCaching log context cache = - let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) StoreCategory(context, Cart.Category, codec, fold, initial, AccessStrategy.Snapshot snapshot, sliding20m) |> Equinox.Decider.forStream log |> Cart.create let createServiceWithRollingState log context = let access = AccessStrategy.RollingState Cart.Fold.snapshot - StoreCategory(context, Cart.Category, codec, fold, initial, access, CachingStrategy.NoCaching) + StoreCategory(context, Cart.Category, codec, fold, initial, access, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create let streamName = Cart.streamId >> StreamName.render Cart.Category @@ -60,9 +60,9 @@ module ContactPreferences = |> Equinox.Decider.forStream log |> ContactPreferences.create let createServiceWithoutCaching log context = - createServiceWithLatestKnownEvent context log CachingStrategy.NoCaching + createServiceWithLatestKnownEvent context log Equinox.CachingStrategy.NoCaching let createServiceWithCaching log context cache = - let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) createServiceWithLatestKnownEvent context log sliding20m let streamName = ContactPreferences.streamId >> StreamName.render ContactPreferences.Category diff --git a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs index 7a96d37a6..33dd0cfe7 100644 --- a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs @@ -103,34 +103,34 @@ module SimplestThing = let [] CategoryName = "SimplestThing" let streamId = Equinox.StreamId.gen Guid.toStringN let decider log context id = - let cat = Category(context, CategoryName, codec, fold, initial) + let cat = Category(context, CategoryName, codec, fold, initial, AccessStrategy.Unoptimized, Equinox.CachingStrategy.NoCaching) Equinox.Decider.forStream log cat (streamId id) module Cart = let fold, initial = Cart.Fold.fold, Cart.Fold.initial let codec = Cart.Events.codec let createServiceWithoutOptimization log context = - Category(context, Cart.Category, codec, fold, initial) + Category(context, Cart.Category, codec, fold, initial, AccessStrategy.Unoptimized, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create #if STORE_MESSAGEDB let snapshot = Cart.Fold.snapshotEventCaseName, Cart.Fold.snapshot let createServiceWithAdjacentSnapshotting log context = - Category(context, Cart.Category, codec, fold, initial, access = AccessStrategy.AdjacentSnapshots snapshot) + Category(context, Cart.Category, codec, fold, initial, AccessStrategy.AdjacentSnapshots snapshot, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create #else let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot let createServiceWithCompaction log context = - Category(context, Cart.Category, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot) + Category(context, Cart.Category, codec, fold, initial, AccessStrategy.RollingSnapshots snapshot, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> Cart.create #endif let createServiceWithCaching log context cache = let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Category(context, Cart.Category, codec, fold, initial, caching = sliding20m) + Category(context, Cart.Category, codec, fold, initial, AccessStrategy.Unoptimized, caching = sliding20m) |> Equinox.Decider.forStream log |> Cart.create @@ -153,12 +153,12 @@ module ContactPreferences = let codec = ContactPreferences.Events.codec let createServiceWithoutOptimization log connection = let context = createContext connection defaultBatchSize - Category(context, ContactPreferences.Category, codec, fold, initial) + Category(context, ContactPreferences.Category, codec, fold, initial, AccessStrategy.Unoptimized, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> ContactPreferences.create let createService log connection = - Category(createContext connection 1, ContactPreferences.Category, codec, fold, initial, access = AccessStrategy.LatestKnownEvent) + Category(createContext connection 1, ContactPreferences.Category, codec, fold, initial, AccessStrategy.LatestKnownEvent, Equinox.CachingStrategy.NoCaching) |> Equinox.Decider.forStream log |> ContactPreferences.create