Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fscodec-stj
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 12, 2020
2 parents b0a761c + ebeb34d commit b374271
Show file tree
Hide file tree
Showing 32 changed files with 485 additions and 315 deletions.
247 changes: 181 additions & 66 deletions DOCUMENTATION.md

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,52 @@ I expand (too much!) on some more of the considerations in https://github.com/je

The other thing that should be pointed out is the caching can typically cover a lot of perf stuff as long as stream lengths stay sane - Snapshotting (esp polluting the stream with snapshot events should definitely be toward the bottom of your list of tactics for managing a stream efficiently given long streams are typically a design smell)

<a name="changing-access-strategy"></a>
### Changing Access / Representation strategies in `Equinox.Cosmos` - what happens?

> Does Equinox adapt the stream if we start writing with `Equinox.Cosmos.AccessStrategy.RollingState` and change to `Snapshotted` for instance? It could take the last RollingState writing and make the first snapshot ?
> what about the opposite? It deletes all events and start writing `RollingState` ?
TL;DR yes and no respectively

#### Some context

Firstly, it's recommended to read the [documentation section on Access Strategies](DOCUMENTATION.md#access-strategies)

General rules:
- Events are the atoms from which state is built, they live forever in immutable Batch documents with id <> -1.
- Snapshots/unfolds live in the `.u` array in the Tip doc (id: -1)
loading/build of state is composed of
- regardless of what happens, Events are _never_ destroyed, updated or touched in any way, ever. Having said that, if your Event DU does not match them, they're also as good as not there from the point of view of how State is established.
- Reads always get the `Tip` first (one exception: `Unoptimized` mode skips reading the `Tip` as, by definition, you're not using snapshots/unfolds/any tricks), Writes always touch the `Tip` (yes, even in `Unoptimized` mode; there's no such thing as a stream that has ever been written to that does not have a `Tip`).
- In the current implementation, the calling code in the server figures out everything that's going to go in the ~~snapshots~~ unfolds list if this sync is successful.

The high level skeleton of the loading in a given access strategy is:
a) load and decode unfolds from tip (followed by events, if and only if necessary)
b) offer the events to an `isOrigin` function to allow us to stop when we've got a start point (a Reset Event, a relevant snapshot, or, failing that, the start of the stream)

It may be helpful to look at [how an `AccessStrategy` is mapped to `isOrigin`, `toSnapshot` and `transmute` lambdas internally](https://github.com/jet/equinox/blob/74129903e85e01ce584b4449f629bf3e525515ea/src/Equinox.Cosmos/Cosmos.fs#L1029)

#### Aaand answering the question

Whenever a State is being built, it always loads `Tip` first and shows any ~~events~~ ~~snapshots~~ _unfolds_ in there...

If `isOrigin` says no to those and/or the `EventType`s of those unfolds are not in the union / event type to which the codec is mapping, the next thing is a query backwards of the Batches of events, in order.

All those get pushed onto a stack until we either hit the start, or `isOrigin` says - yes, we can start from here (at which point all the decoded events are then passed (in forward order) to the `fold` to make the `'state`).

So, if you are doing `RollingState` or any other mode, there are still events and unfolds; and they all have `EventType`s - there are just some standard combos of steps that happen.

If the `EventType` of the Event or Unfold matches, the `fold`/`evolve` will see them and build `'state` from that.

Then, whenever you emit events from a `decide` or `interpret`, the `AccessStrategy` will define what happens next; a mix of:
- write actual events (not if `RollingState`)
- write updated unfolds/snapshots
- remove or adjust events before they get passed down to the `sync` stored procedure (`Custom`, `RollingState`, `LatestKnownEvent` modes)

Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's really not that :scream:](https://github.com/jet/equinox/blob/74129903e85e01ce584b4449f629bf3e525515ea/src/Equinox.Cosmos/Cosmos.fs#L870).

### OK, but you didn't answer my question, you just talked about stuff you wanted to talk about!

😲Please raise a question-Issue, and we'll be delighted to either answer directly, or incorporate the question and answer here
Expand Down
19 changes: 11 additions & 8 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,30 @@ type ServiceBuilder(storageConfig, handlerLog) =

member __.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot

match storageConfig with
| Storage.StorageConfig.Cosmos _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithJsonElementCodec(Favorites.Events.codecStj, fold, initial, snapshot))
| _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(Favorites.Events.codecNewtonsoft, fold, initial, snapshot))
| Storage.StorageConfig.Cosmos _ -> resolver.ResolveWithJsonElementCodec(Favorites.Events.codecStj, fold, initial, snapshot)
| _ -> resolver.ResolveWithUtf8ArrayCodec(Favorites.Events.codecNewtonsoft, fold, initial, snapshot)
|> Backend.Favorites.create handlerLog

member __.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
let snapshot = SavedForLater.Fold.isOrigin, SavedForLater.Fold.compact

match storageConfig with
| Storage.StorageConfig.Cosmos _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithJsonElementCodec(SavedForLater.Events.codecStj,fold,initial,snapshot), maxSavedItems=50)
| _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(SavedForLater.Events.codecNewtonsoft,fold,initial,snapshot), maxSavedItems=50)
| Storage.StorageConfig.Cosmos _ -> resolver.ResolveWithJsonElementCodec(SavedForLater.Events.codecStj, fold, initial, snapshot)
| _ -> resolver.ResolveWithUtf8ArrayCodec(SavedForLater.Events.codecNewtonsoft, fold, initial, snapshot)
|> Backend.SavedForLater.create 50 handlerLog

member __.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot

match storageConfig with
| Storage.StorageConfig.Cosmos _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithJsonElementCodec(TodoBackend.Events.codecStj,fold,initial,snapshot))
| _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(TodoBackend.Events.codecNewtonsoft,fold,initial,snapshot))
| Storage.StorageConfig.Cosmos _ -> resolver.ResolveWithJsonElementCodec(TodoBackend.Events.codecStj, fold, initial, snapshot)
| _ -> resolver.ResolveWithUtf8ArrayCodec(TodoBackend.Events.codecNewtonsoft, fold, initial, snapshot)
|> TodoBackend.create handlerLog

let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
Expand Down
13 changes: 9 additions & 4 deletions samples/Store/Backend/Cart.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Backend.Cart

open Domain
open Domain.Cart

#if ACCUMULATOR
Expand Down Expand Up @@ -44,9 +45,7 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list =
state', acc @ events)
#endif

type Service(log, resolve) =

let resolve (Events.ForCartId streamId, opt) = Equinox.Stream(log, resolve (streamId,opt), maxAttempts = 3)
type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equinox.Stream<Events.Event, Fold.State>) =

member __.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let stream = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
Expand All @@ -72,4 +71,10 @@ type Service(log, resolve) =
stream.Query id
member __.ReadStale cartId =
let stream = resolve (cartId,Some Equinox.ResolveOption.AllowStale)
stream.Query id
stream.Query id

let create log resolve =
let resolve (id, opt) =
let stream = resolve (streamName id, opt)
Equinox.Stream(log, stream, maxAttempts = 3)
Service(resolve)
14 changes: 8 additions & 6 deletions samples/Store/Backend/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

open Domain.ContactPreferences

type Service(log, resolve, ?maxAttempts) =

let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3)
type Service internal (resolve : Id -> Equinox.Stream<Events.Event, Fold.State>) =

let update email value : Async<unit> =
let stream = resolve email
let command = Update { email = email; preferences = value }
let command = let (Id email) = email in Update { email = email; preferences = value }
stream.Transact(Commands.interpret command)

member __.Update email value =
member __.Update(email, value) =
update email value

member __.Read(email) =
let stream = resolve email
stream.Query id
stream.Query id

let create log resolve =
let resolve id = Equinox.Stream(log, resolve (streamName id), maxAttempts = 3)
Service(resolve)
11 changes: 7 additions & 4 deletions samples/Store/Backend/Favorites.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
module Backend.Favorites

open Domain
open Domain.Favorites
open System

type Service(log, resolve, ?maxAttempts) =

let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 2)
type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.State>) =

let execute clientId command : Async<unit> =
let stream = resolve clientId
Expand All @@ -24,4 +23,8 @@ type Service(log, resolve, ?maxAttempts) =
execute clientId (Command.Unfavorite skus)

member __.List clientId : Async<Events.Favorited []> =
read clientId
read clientId

let create log resolve =
let resolve id = Equinox.Stream(log, resolve (streamName id), maxAttempts = 3)
Service(resolve)
12 changes: 8 additions & 4 deletions samples/Store/Backend/InventoryItem.fs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
module Backend.InventoryItem

open Domain
open Domain.InventoryItem

type Service(log, resolve, ?maxAttempts) =

let resolve (Events.ForInventoryItemId id) = Equinox.Stream(log, resolve id, defaultArg maxAttempts 3)
type Service internal (resolve : InventoryItemId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.Execute(itemId, command) =
let stream = resolve itemId
stream.Transact(Commands.interpret command)

member __.Read(itemId) =
let stream = resolve itemId
stream.Query id
stream.Query id

let create resolve =
let resolve id =
Equinox.Stream(Serilog.Log.ForContext<Service>(), resolve (streamName id), maxAttempts = 3)
Service(resolve)
10 changes: 6 additions & 4 deletions samples/Store/Backend/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ open Domain
open Domain.SavedForLater
open System

type Service(handlerLog, resolve, maxSavedItems : int, ?maxAttempts) =
type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.State>, maxSavedItems) =

do if maxSavedItems < 0 then invalidArg "maxSavedItems" "must be non-negative value."

let resolve (Events.ForClientId streamId) = Equinox.Stream(handlerLog, resolve streamId, defaultArg maxAttempts 3)

let execute clientId command : Async<bool> =
let stream = resolve clientId
stream.Transact(Commands.decide maxSavedItems command)
Expand Down Expand Up @@ -39,4 +37,8 @@ type Service(handlerLog, resolve, maxSavedItems : int, ?maxAttempts) =

member __.Merge(clientId, targetId) : Async<bool> = async {
let! state = read clientId
return! execute targetId (Merge state) }
return! execute targetId (Merge state) }

let create maxSavedItems log resolve =
let resolve id = Equinox.Stream(log, resolve (streamName id), maxAttempts = 3)
Service(resolve, maxSavedItems)
5 changes: 3 additions & 2 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module Domain.Cart

let streamName (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

let (|ForCartId|) (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id)

type ContextInfo = { time: System.DateTime; requestId: RequestId }

type ItemInfo = { context: ContextInfo; item: ItemInfo }
Expand All @@ -29,6 +29,7 @@ module Events =
let codecStj = FsCodec.SystemTextJson.Codec.Create<Event>()

module Fold =

type ItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool }
type State = { items: ItemInfo list }
module State =
Expand Down
3 changes: 1 addition & 2 deletions samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
module Domain.ContactPreferences

type Id = Id of email: string
let streamName (Id email) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

let (|ForClientId|) (email: string) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64

type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool }
type Value = { email : string; preferences : Preferences }

Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module Domain.Favorites

let streamName (id: ClientId) = FsCodec.StreamName.create "Favorites" (ClientId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

let (|ForClientId|) (id: ClientId) = FsCodec.StreamName.create "Favorites" (ClientId.toString id)

type Favorited = { date: System.DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }
type Snapshotted = { net: Favorited[] }
Expand Down
6 changes: 3 additions & 3 deletions samples/Store/Domain/InventoryItem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module Domain.InventoryItem

open System

let streamName (id : InventoryItemId) = FsCodec.StreamName.create "InventoryItem" (InventoryItemId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

let (|ForInventoryItemId|) (id : InventoryItemId) = FsCodec.StreamName.create "InventoryItem" (InventoryItemId.toString id)

type Event =
| Created of name: string
| Deactivated
Expand Down Expand Up @@ -55,4 +55,4 @@ module Commands =
[ Events.CheckedIn count ]
| Deactivate ->
if not state.active then invalidOp "Already deactivated"
[ Events.Deactivated ]
[ Events.Deactivated ]
4 changes: 2 additions & 2 deletions samples/Store/Domain/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
open System
open System.Collections.Generic

let streamName (id: ClientId) = FsCodec.StreamName.create "SavedForLater" (ClientId.toString id)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

let (|ForClientId|) (id: ClientId) = FsCodec.StreamName.create "SavedForLater" (ClientId.toString id)

type Item = { skuId : SkuId; dateSaved : DateTimeOffset }

type Added = { skus : SkuId []; dateSaved : DateTimeOffset }
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let createMemoryStore () =
// we want to validate that the JSON UTF8 is working happily
VolatileStore<byte[]>()
let createServiceMemory log store =
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codecNewtonsoft, fold, initial).Resolve(id,?option=opt))
Backend.Cart.create log (fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codecNewtonsoft, fold, initial).Resolve(id,?option=opt))

let eventStoreCodec = Domain.Cart.Events.codecNewtonsoft
let resolveGesStreamWithRollingSnapshots gateway =
Expand Down Expand Up @@ -58,7 +58,7 @@ type Tests(testOutputHelper) =
let log = createLog ()
let! conn = connect log
let gateway = choose conn defaultBatchSize
return Backend.Cart.Service(log, resolve gateway) }
return Backend.Cart.create log (resolve gateway) }

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
Expand Down
9 changes: 4 additions & 5 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferenc
let createMemoryStore () =
MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)
Backend.ContactPreferences.create log (MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let eventStoreCodec = Domain.ContactPreferences.Events.codecNewtonsoft
let resolveStreamGesWithOptimizedStorageSemantics gateway =
Expand All @@ -33,10 +33,9 @@ type Tests(testOutputHelper) =
let createLog () = createLogger testOutput

let act (service : Backend.ContactPreferences.Service) (id,value) = async {
let (Domain.ContactPreferences.Id email) = id
do! service.Update email value
do! service.Update(id, value)

let! actual = service.Read email
let! actual = service.Read id
test <@ value = actual @> }

[<AutoData>]
Expand All @@ -49,7 +48,7 @@ type Tests(testOutputHelper) =
let log = createLog ()
let! conn = connect log
let gateway = choose conn
return Backend.ContactPreferences.Service(log, resolve gateway) }
return Backend.ContactPreferences.create log (resolve gateway) }

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
Expand Down
14 changes: 7 additions & 7 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ let snapshot = Domain.Favorites.Fold.isOrigin, Domain.Favorites.Fold.snapshot
let createMemoryStore () =
new MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)
Backend.Favorites.create log (MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let eventStoreCodec = Domain.Favorites.Events.codecNewtonsoft
let createServiceGes gateway log =
let resolve = EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve
Backend.Favorites.Service(log, resolve)
let resolver = EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot)
Backend.Favorites.create log resolver.Resolve

let cosmosCodec = Domain.Favorites.Events.codecStj
let createServiceCosmos gateway log =
let resolve = Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve
Backend.Favorites.Service(log, resolve)
let resolver = Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot)
Backend.Favorites.create log resolver.Resolve

let createServiceCosmosRollingState gateway log =
let access = Cosmos.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot
let resolve = Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve
Backend.Favorites.Service(log, resolve)
let resolver = Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, access)
Backend.Favorites.create log resolver.Resolve

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down
Loading

0 comments on commit b374271

Please sign in to comment.