From 14a1749c86dcf0a2ff72f24acf555b0360bbe065 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 10 Mar 2020 21:42:43 +0000 Subject: [PATCH 1/2] Add FAQ and detailed doc re Cosmos AccessStrategy (#201) --- DOCUMENTATION.md | 75 ++++++++++++++++++++++++++++++++++++++++++++++++ README.md | 46 +++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index bff57604a..5c55eed30 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -958,6 +958,81 @@ match res with | AppendResult.Ok -> () | c -> failwithf "conflict %A" c ``` +## Access Strategies + +An Access Strategy defines any optimizations regarding how one arrives at a State of an Aggregate based on the Events stored in a Stream in a Store. + +The specifics of an Access Strategy depend on what makes sense for a given Store, i.e. `Equinox.Cosmos` necessarily has a significantly different set of strategies than `Equinox.EventStore` (although there is an intersection). + +Access Strategies only affect performance; you should still be able to infer the state of the aggregate based on the `fold` of all the `events` ever written on top of an `initial` state + +NOTE: its not important to select a strategy until you've actually actually modelled your aggregate, see [what if I change my access strategy](#changing-access-strategy) + +### `Equinox.Cosmos.AccessStrategy` + +TL;DR `Equinox.Cosmos`: (see also: [the storage model](DOCUMENTATION.md#Cosmos-Storage-Model) for a deep dive, and [glossary, below the table](#access-strategy-glossary) for definition of terms) +- keeps all the Events for a Stream in a single [CosmosDB _logical partition_](https://docs.microsoft.com/en-gb/azure/cosmos-db/partition-data) +- Transaction guarantees are provided at the logical partition level only. As for most typical Event Stores, the mechanism is based on [Optimistic Concurrency Control](https://en.wikipedia.org/wiki/Optimistic_concurrency_control). _There's no holding of a lock involved - it's based on conveying your premise alongside with the proposed change_; In terms of what we are doing, you observe a `state`, propose `events`, and the store is responsible for applying the change, or rejecting it if the `state` turns out to longer be the case when you get around to `sync`ing the change. The change is rejected if your premise (things have not changed since I saw them) is invalidated (whereupon you loop, working from the updated state). +- always has a special 'index' document (we term it the `Tip` document), per logical partition/stream that is accessible via an efficient _point read_ (it always has a CosmosDB `id` value of `"-1"`) +- Events are stored in Batches in immutable documents in the logical partition, the `Tip` document is the only document that's ever updated (it's _always_ updated, as it happens...). +- As all writes touch the `Tip`, we have a natural way to invalidate any cached State for a given Stream; we retain the `_etag` of the `Tip` document, and updates (or consistent reads) are contingent on it not having changed. +- The (optimistic) concurrency control of updates is by virtue of the fact that every update to the `Tip` touches the document _and thus alters (invalidates) the `_etag` value_. This means that, in contrast to how many SQL based stores (and most CosmosDB based ones) implement concurrency control, we don't rely on primary key constraint to prevent two writers writing conflicting events to the same stream. + - A secondary benefit of not basing consistency control on a primary key constraint or equivalent, is that we no longer having to insert an Event every time we are updating something. (This fact is crucial for the `RollingState` and `Custom` strategies). +- The `interpret`/`decide` function is expected to deduplicate writes by not producing `events` if the `state` implies such updates would be redundant. Equinox does not have any internal mechanism to deduplicate events, thus having correct deduplication is the key to reducing round-trips and hence minimizing RU consumption (and the adverse effects that the retry cycles due to contention have, which will most likely arise when load is at its highest). +- The `unfolds` maintained in `Tip` have the bodies (the `d` and `m` fields) 1) deflated 2) base64 encoded (as everyone is reading the Tip, its worthwhile having the writer take on the burden of compressing, with the payback being that write amplification effects are reduced by readers paying less RUs to read them). The snapshots can be inspected securely via the `eqx` tool's `dump` facility, or _unsecurely_ online via the [**decode** button on this tool, _if the data is not sensitive_](https://jgraph.github.io/drawio-tools/tools/convert.html). +- The `Tip` document, (and the fact we hold its `_etag` in our cache alongside the State we have derived from the Events), is at the heart of why consistent reads are guaranteed to be efficient (Equinox does the read of the `Tip` document contingent on the `_etag` not having changed; a read of any size costs only 1 RU if the result is `304 NOT Modified`) +- Specific Access Strategies: + - define what we put in the `Tip` + - control how we short circuit the process of loading all the Events and `fold`ing them from the start, if we encounter a Snapshot or Reset Event + - allow one to post-process the events we are writing as required for reasons of optimization + +#### `Cosmos` Access Strategy overviews + +| Strategy | TL;DR | `Tip` document maintains | Best suited for | +| :--- | :--- | :--- | :--- | +| `Unoptimized` | Keep Events only (but there's still an (empty) `Tip` document) | Count of event (and ability to have any insertion invalidate the cache for any reader) | No load, event counts or event sizes worth talking about.
initial implementations. | +| `LatestKnownEvent` | Special mode for when every event is completely independent, so we completely short-circuit loading the events and folding them and instead only use the latest event (if any) | A copy of the most recent event, together with the count | 1) Maintaining a local copy of a Summary Event representing information obtained from a partner service that is authoritative for that data
2) Tracking changes to a document where we're not really modelling events as such, but with optimal efficiency (every read is a point read of a single document) | +| `Snapshot` | Keep a (single) snapshot in `Tip` at all times, guaranteed to include _all_ events | A single unfold produced by `toSnapshot` based on the `state'` (i.e., including any events being written) every time, together with the event count | Typical event-sourced usage patterns.
Good default approach.
Very applicable where you have lots of small 'delta' events that you only consider collectively. | +| `MultiSnapshot` | As per `Snapshot`, but `toSnapshots` can return arbitrary (0, one or many) `unfold`s | Multiple (consistent) snapshots (and `_etag`/event count for concurrency control as per other strategies) | Blue-green deployments where an old version and a new version of the app cannot share a single snapshot schema | +| `RollingState` | "No event sourcing" mode - no events, just `state`. Perf as good as `Snapshot`, but don't even store the events so we will never hit CosmosDB stream size limits | `toSnapshot state'`: the squashed `state` + `events` which replace the incoming `state` | Maintaining state of an Aggregate with lots of changes
a) that you don't need a record of the individual changes of yet
b) you would like to model, test and develop as if one did
DO NOT use if
a) you want to be able to debug state transitions by looking at individual change events
b) you need to react to and/or project events relating to individual updates (CosmosDB does not provide a way to provide a notification of every single update, even if it does have a guarantee of always showing the final state on the Change Feed eventually)| +| `Custom` | General form that all the preceding strategies are implemented in terms of | Anything `transmute` yields as the `fst` of its result (but, typically the equivalent of what Snapshot writes) | Limited by your imagination, e.g. [emitting events once per hour but otherwise like `RollingState`](https://github.com/jet/propulsion/search?q=transmute&unscoped_q=transmute) | + + +#### Glossary +- `decide`/`interpret`: Application function that inspects a `state` to propose `events`, under control of a `Transact` loop. +- `Transact` loop: Equinox Core function that runs your `decide`/`interpret`, and then `sync`s any generated `events` to the Store. +- `state`: The (potentially cached) version of the State of the Aggregate that `Transact` supplied to your `decide`/ `interpret`. +- `events`: The changes the `decide`/`interpret` generated (that `Transact` is trying to `sync` to the Store). +- `fold`: standard function, supplied per Aggregate, which is used to apply Events to a given State in order to [`evolve`](http://thinkbeforecoding.github.io/FsUno.Prod/Dynamical%20Systems.html) the State per implications of each Event that has occurred +- `state'`: The State of an Aggregate (post-state in FP terms), derived from the current `state` + the proposed `events` being `sync`ed. +- `Tip`: Special document stored alongside the Events (in the same logical partition) which holds the `unfolds` associated with the current State of the _stream_. +- `sync`: [Stored procedure we use to manage consistent update of the Tip alongside insertion of Event Batch Documents](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#sync-stored-procedure) (contingent on the Tip's `_etag` not having changed) +- `unfold`: JSON objects maintained in the `Tip`, which represent Snapshots taken at a given point in the event timeline for this stream. + - the `fold`/`evolve` function is presented Snapshots as if it was yet-another-Event + - the only differences are + - a) they are not stored as in (immutable) Event documents as other Events are + - b) every write replaces all the `unfold`s in `Tip` with the result of the `toSnapshot`(s) function as defined in the given Access Strategy. +- `isOrigin`: A predicate function supplied to an Access Strategy that defines the starting point from which we'll build a `state`. + - Must yield `true` for relevant Snapshots or Reset Events. +- `initial`: The (Application-defined) _state_ value all loaded events `fold` into, if an `isOrigin` event is not encountered while walking back through th `unfolds` and Events and instead hit the start of the stream. +- Snapshot: a single serializable representation of the `state'` + - Facilitates optimal retrieval patterns when a stream contains a significant number of events + - NOTE: Snapshots should not ever yield an observable difference in the `state` when compared to building it from the timeline of events; it should be solely a behavior-preserving optimization. +- Reset Event: an event (i.e. a permanent event, not a Snapshot) for which an `isOrigin` predicate yields `true` + - e.g., for a Cart aggregate, a CartCleared event means there is no point in looking at _any_ preceding events in order to determine what's in the cart; we can start `fold`ing from that point.) + - Multiple Reset Event Types are possible per Category, and a stream can often have multiple reset points (e.g., each time a Cart is `Cleared`, we enter a known state) + - A _Tombstone Event_ can also be viewed as a Reset Event, e.g. if you have a (long running) bank account represented as a Stream per year, one might annually write a `TrancheCarriedForwardAndClosed` event which a) bears everything we care about (the final balance) b) signifies the fact that this tranche has now transitioned to read-only mode. Conversely, a `Closed` event is not by itself a _Tombstone Event_ - while you can infer the Open/Closed mode aspect of the Stream's State, you would still need to look further back through its history to be able to determine the balance that applied at the point the period was marked `Closed`. + +#### `Cosmos` Read and Write policies + +| Strategy | Reads involve | Writes involve | +| :--- | :--- | :--- | +| `Unoptimized` | Querying for, and `fold`ing all events (although the cache means it only reads events it has not seen)
the `Tip` is never read, even e.g. if someone previously put a snapshot in there | 1) Insert a document with the events
2) Update `Tip` to reflect updated event count (as a transaction, as with all updates) | +| `LatestKnownEvent` | Reading the `Tip` (never the events) | 1) Inserting a document with the new event.
2) Updating the Tip to a) up count/invalidate the `_etag` b) CC the event for efficient access | +| `Snapshot` | 1) read Tip; stop if `isOrigin` accepts a snapshot from within
2) read backwards until the provided `isOrigin` function returns `true` for an Event, or we hit start of stream | 1) Produce proposed `state'`
2) write events to new document + `toSnapshot state'` result into Tip with new event count | +| `MultiSnapshot` | As per `Snapshot`, stop if `isOrigin` yields `true` for any `unfold` (then fall back to folding from base event or a reset event) | 1) Produce `state'`
2) Write events to new document + `toSnapshots state'` to `Tip` _(could be 0 or many, vs exactly one)_ | +| `RollingState` | Read `Tip`
(can fall back to building from events as per `Snapshot` mode if nothing in Tip, but normally there are no events) | 1) produce `state'`
2) update `Tip` with `toSnapshot state'`
3) **no events are written**
4) Concurrency Control is based on the `_etag` of the Tip, not an expectedVersion / event count | +| `Custom` | As per `Snapshot` or `MultiSnapshot`
1) see if any `unfold`s pass the `isOrigin` test
2) Otherwise, work backward until a _Reset Event_ or start of stream | 1) produce `state'`
2) use `transmute events state` to determine a) the `unfold`s (if any) to write b) the `events` _(if any)_ to emit
3) execute the insert and/or upsert operations, contingent on the `_etag` of the opening `state` | # Ideas diff --git a/README.md b/README.md index 4aa2d6b94..311fac735 100644 --- a/README.md +++ b/README.md @@ -582,6 +582,52 @@ I expand (too much!) on some more of the considerations in https://github.com/je The other thing that should be pointed out is the caching can typically cover a lot of perf stuff as long as stream lengths stay sane - Snapshotting (esp polluting the stream with snapshot events should definitely be toward the bottom of your list of tactics for managing a stream efficiently given long streams are typically a design smell) + +### Changing Access / Representation strategies in `Equinox.Cosmos` - what happens? + +> Does Equinox adapt the stream if we start writing with `Equinox.Cosmos.AccessStrategy.RollingState` and change to `Snapshotted` for instance? It could take the last RollingState writing and make the first snapshot ? + +> what about the opposite? It deletes all events and start writing `RollingState` ? + +TL;DR yes and no respectively + +#### Some context + +Firstly, it's recommended to read the [documentation section on Access Strategies](DOCUMENTATION.md#access-strategies) + +General rules: +- Events are the atoms from which state is built, they live forever in immutable Batch documents with id <> -1. +- Snapshots/unfolds live in the `.u` array in the Tip doc (id: -1) + loading/build of state is composed of +- regardless of what happens, Events are _never_ destroyed, updated or touched in any way, ever. Having said that, if your Event DU does not match them, they're also as good as not there from the point of view of how State is established. +- Reads always get the `Tip` first (one exception: `Unoptimized` mode skips reading the `Tip` as, by definition, you're not using snapshots/unfolds/any tricks), Writes always touch the `Tip` (yes, even in `Unoptimized` mode; there's no such thing as a stream that has ever been written to that does not have a `Tip`). +- In the current implementation, the calling code in the server figures out everything that's going to go in the ~~snapshots~~ unfolds list if this sync is successful. + + The high level skeleton of the loading in a given access strategy is: + a) load and decode unfolds from tip (followed by events, if and only if necessary) + b) offer the events to an `isOrigin` function to allow us to stop when we've got a start point (a Reset Event, a relevant snapshot, or, failing that, the start of the stream) + +It may be helpful to look at [how an `AccessStrategy` is mapped to `isOrigin`, `toSnapshot` and `transmute` lambdas internally](https://github.com/jet/equinox/blob/74129903e85e01ce584b4449f629bf3e525515ea/src/Equinox.Cosmos/Cosmos.fs#L1029) + +#### Aaand answering the question + +Whenever a State is being built, it always loads `Tip` first and shows any ~~events~~ ~~snapshots~~ _unfolds_ in there... + +If `isOrigin` says no to those and/or the `EventType`s of those unfolds are not in the union / event type to which the codec is mapping, the next thing is a query backwards of the Batches of events, in order. + +All those get pushed onto a stack until we either hit the start, or `isOrigin` says - yes, we can start from here (at which point all the decoded events are then passed (in forward order) to the `fold` to make the `'state`). + +So, if you are doing `RollingState` or any other mode, there are still events and unfolds; and they all have `EventType`s - there are just some standard combos of steps that happen. + +If the `EventType` of the Event or Unfold matches, the `fold`/`evolve` will see them and build `'state` from that. + +Then, whenever you emit events from a `decide` or `interpret`, the `AccessStrategy` will define what happens next; a mix of: +- write actual events (not if `RollingState`) +- write updated unfolds/snapshots +- remove or adjust events before they get passed down to the `sync` stored procedure (`Custom`, `RollingState`, `LatestKnownEvent` modes) + +Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's really not that :scream:](https://github.com/jet/equinox/blob/74129903e85e01ce584b4449f629bf3e525515ea/src/Equinox.Cosmos/Cosmos.fs#L870). + ### OK, but you didn't answer my question, you just talked about stuff you wanted to talk about! 😲Please raise a question-Issue, and we'll be delighted to either answer directly, or incorporate the question and answer here From ebeb34d4a8cd30872f3d366ff7cc2f70257341f1 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 12 Mar 2020 12:27:11 +0000 Subject: [PATCH 2/2] Apply updated resolve idiom (#204) --- DOCUMENTATION.md | 172 +++++++++++------- samples/Infrastructure/Services.fs | 8 +- samples/Store/Backend/Cart.fs | 13 +- samples/Store/Backend/ContactPreferences.fs | 14 +- samples/Store/Backend/Favorites.fs | 11 +- samples/Store/Backend/InventoryItem.fs | 12 +- samples/Store/Backend/SavedForLater.fs | 10 +- samples/Store/Domain/Cart.fs | 7 +- samples/Store/Domain/ContactPreferences.fs | 5 +- samples/Store/Domain/Favorites.fs | 6 +- samples/Store/Domain/InventoryItem.fs | 6 +- samples/Store/Domain/SavedForLater.fs | 6 +- samples/Store/Integration/CartIntegration.fs | 6 +- .../ContactPreferencesIntegration.fs | 13 +- .../Store/Integration/FavoritesIntegration.fs | 18 +- samples/Store/Integration/LogIntegration.fs | 6 +- samples/TodoBackend/Todo.fs | 19 +- samples/Tutorial/AsAt.fsx | 16 +- samples/Tutorial/Cosmos.fsx | 18 +- samples/Tutorial/Counter.fsx | 10 +- samples/Tutorial/Favorites.fsx | 29 ++- samples/Tutorial/FulfilmentCenter.fsx | 18 +- samples/Tutorial/Gapless.fs | 27 +-- samples/Tutorial/Index.fs | 30 +-- samples/Tutorial/Sequence.fs | 30 +-- samples/Tutorial/Set.fs | 30 +-- samples/Tutorial/Todo.fsx | 15 +- samples/Tutorial/Upload.fs | 50 ++--- .../CosmosIntegration.fs | 44 ++--- .../StoreIntegration.fs | 26 +-- .../MemoryStoreIntegration.fs | 9 +- 31 files changed, 367 insertions(+), 317 deletions(-) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 5c55eed30..fba3e7a09 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -82,17 +82,65 @@ Term | Description -----|------------ Cache | `System.Net.MemoryCache` or equivalent holding _State_ and/or `etag` information for a Stream with a view to reducing roundtrips, latency and/or Request Charges Rolling Snapshot | Event written to an EventStore stream in order to ensure minimal roundtrips to EventStore when there is a Cache miss -Unfolds | Snapshot information, represented as Events that are stored in an appropriate storage location (outside of a Stream's actual events) to minimize Queries and the attendant Request Charges when there is a Cache miss +Unfolds | Snapshot information, stored in an appropriate storage location (outside of a Stream's actual events), _but represented as Events_, to minimize Queries and the attendant Request Charges when there is a Cache miss # Programming Model NB this has lots of room for improvement, having started as a placeholder in [#50](https://github.com/jet/equinox/issues/50); **improvements are absolutely welcome, as this is intended for an audience with diverse levels of familiarity with event sourcing in general, and Equinox in particular**. +## Aggregate module + +In code handling a given Aggregate’s Commands and Synchronous Queries, the code you write divides into the following canonical organization: + +```fsharp +module Aggregate + +(* StreamName section *) + +let [] Category = "category" +let streamName id = FsCodec.StreamName.create Category (Id.toString id) + +(* Optionally, Helpers/Types *) + +module Events = + + type Event = + | ... + // optionally: `encode`, `tryDecode` (only if you're doing manual decoding) + let codec = FsCodec ... Codec.Create(...) + +module Fold = + + type State = + let initial : State = ... + let evolve state = function + | Events.X -> (state update) + | Events.Y -> (state update) + let fold events = Seq.fold evolve events + (* Storage Model helpers, e.g. isOrigin, toSnapshot etc *) + +let interpretX ... (state : Fold.State) : Events list = ... +let decideY ... (state : Fold.State) : 'result * Events list = ... + +type Service internal (resolve : Id -> Equinox.Stream 'event seq -> 'state`: function used to fold one or more loaded (or proposed) events (real ones and/or unfolded ones) into a given running [persistent data structure](https://en.wikipedia.org/wiki/Persistent_data_structure) of type `'state` @@ -132,7 +180,8 @@ The following example is a minimal version of [the Favorites model](samples/Stor ```fsharp (* Event stream naming + schemas *) -let (|ForClientId|) (id: ClientId) = FsCodec.StreamName.create "Favorites" (ClientId.toString id) +let [] Category = "Favorites" +let streamName (id : ClientId) = FsCodec.StreamName.create Category (ClientId.toString id) type Item = { id: int; name: string; added: DateTimeOffset } type Event = @@ -170,12 +219,11 @@ let interpret command state = let toSnapshot state = [Event.Snapshotted (Array.ofList state)] -(* The Service defines operations in business terms with minimal reference to Equinox terms - or need to talk in terms of infrastructure; typically the service is stateless and can be a Singleton *) - -type Service(log, resolve, ?maxAttempts) = +(* The Service defines operations in business terms, neutral to any concrete store selection or implementation + supplied only a `resolve` function that can be used to map from ids (as supplied to the `streamName` function) to an Equinox Stream + typically the service should be a stateless Singleton *) - let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : ClientId -> Equinox.Stream) = let execute clientId command : Async = let stream = resolve clientId @@ -192,6 +240,10 @@ type Service(log, resolve, ?maxAttempts) = execute clientId (Command.Unfavorite skus) member __.List clientId : Async = read clientId + +let create resolve : Service = + let resolve id = Equinox.Stream(Serilog.Log.ForContext(), resolve (streamName id), maxAttempts = 3) + Service(resolve) ``` @@ -219,16 +271,6 @@ At a high level we have: ## Programming Model walkthrough -### Core elements - -In the code handling a given Aggregate’s Commands and Synchronous Queries, the code you write divide into: - -- Events (`codec`, `encode`, `tryDecode`, `categoryId`, `(|For...|)` etc.) -- State/Fold (`evolve`, `fold`, `initial`) -- Storage Model helpers (`isOrigin`,`unfold`,`toSnapshot` etc) - -while these are not omnipresent, for the purposes of this discussion we’ll treat them as that. See the [Programming Model](#programming-model) for a drilldown into these elements and their roles. - ### Flows and Streams Equinox’s Command Handling consists of < 200 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch in a normal application are: @@ -242,8 +284,8 @@ Its recommended to read the examples in conjunction with perusing the code in or #### Stream Members ```fsharp -type Equinox.Stream(stream, log, maxAttempts) = - +type Equinox.Stream(stream : IStream<'event, 'state>, log, maxAttempts) = +StoreIntegration // Run interpret function with present state, retrying with Optimistic Concurrency member __.Transact(interpret : State -> Event list) : Async @@ -313,20 +355,24 @@ A final consideration to mention is that, even when correct idempotent handling #### `Stream` usage ```fsharp -type Service(log, resolve, ?maxAttempts) = - - let streamFor (clientId: string) = - let streamName = FsCodec.StreamName.create "Favorites" clientId - let stream = resolve streamName - Equinox.Stream(log, stream, defaultArg maxAttempts 3) +let [] Category = Favorites +let streamName (clientId : String) = FsCodec.StreamName.create Category clientId + +type Service internal (resolve : string -> Equinox.Stream) = let execute clientId command : Async = - let stream = streamFor clientId + let stream = resolve clientId stream.Transact(interpret command) let read clientId : Async = - let stream = streamFor clientId + let stream = resolve clientId inner.Query id + +let create resolve = + let resolve clientId = + let streamName = streamName clientId + Equinox.Stream(log, resolve streamName, maxAttempts = 3) + Service(resolve) ``` The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Equinox.fs`](src/Equinox/Equinox.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs). @@ -372,15 +418,14 @@ See [the TodoBackend.com sample](README.md#TodoBackend) for reference info regar #### `Event`s ```fsharp -let (|ForClientId|) (id : string) = FsCodec.StreamName.create "Todos" id - -type Todo = { id: int; order: int; title: string; completed: bool } -type Event = - | Added of Todo - | Updated of Todo - | Deleted of int - | Cleared - | Compacted of Todo[] +module Events = + type Todo = { id: int; order: int; title: string; completed: bool } + type Event = + | Added of Todo + | Updated of Todo + | Deleted of int + | Cleared + | Compacted of Todo[] ``` The fact that we have a `Cleared` Event stems from the fact that the spec defines such an operation. While one could implement this by emitting a `Deleted` event per currently existing item, there many reasons to do model this as a first class event:- @@ -433,9 +478,7 @@ let interpret c (state : State) = #### `Service` ```fsharp -type Service(log, resolve, ?maxAttempts) = - - let resolve (ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : ClientId -> Equinox.Stream) = let execute clientId command : Async = let stream = resolve clientId @@ -493,7 +536,7 @@ member __.Read(clientId) = # Command+Decision Handling functions -Commands or Decisions are handled via `Equinox.Stream`'s `Transact' method +Commands or Decisions are handled via `Equinox.Stream`'s `Transact` method ## Commands (`interpret` signature) @@ -520,22 +563,20 @@ let interpret (context, command) state : Events.Event list = | Some eventDetails -> // accepted, mapped to event details record [Event.HandledCommand eventDetails] -type Service(...) - -/// ... +type Service internal (resolve : ClientId -> Equinox.Stream) -// Given the supplied context, apply the command for the specified clientId -member __.Execute(clientId, context, command) : Async = - let stream = resolve clientId - stream.Transact(fun state -> interpretCommand (context, command) state) + // Given the supplied context, apply the command for the specified clientId + member __.Execute(clientId, context, command) : Async = + let stream = resolve clientId + stream.Transact(fun state -> interpretCommand (context, command) state) -// Given the supplied context, apply the command for the specified clientId -// Throws if this client's data is marked Read Only -member __.Execute(clientId, context, command) : Async = - let stream = resolve clientId - stream.Transact(fun state -> - if state.isReadOnly then raise AccessDeniedException() // Mapped to 403 externally - interpretCommand (context, command) state) + // Given the supplied context, apply the command for the specified clientId + // Throws if this client's data is marked Read Only + member __.Execute(clientId, context, command) : Async = + let stream = resolve clientId + stream.Transact(fun state -> + if state.isReadOnly then raise AccessDeniedException() // Mapped to 403 externally + interpretCommand (context, command) state) ``` ## Decisions (`Transact`ing Commands that also emit a result using the `decide` signature) @@ -555,17 +596,15 @@ let decide (context, command) state : int * Events.Event list = // ... if `snd` contains event, they are written // `fst` (an `int` in this instance) is returned as the outcome to the caller -type Service(...) - -/// ... +type Service internal (resolve : ClientId -> Equinox.Stream) = -// Given the supplied context, attempt to apply the command for the specified clientId -// NOTE Try will return the `fst` of the tuple that `decide` returned -// If >1 attempt was necessary (e.g., due to conflicting events), the `fst` from the last attempt is the outcome -member __.Try(clientId, context, command) : Async = - let stream = resolve clientId - stream.Transact(fun state -> - decide (context, command) state) + // Given the supplied context, attempt to apply the command for the specified clientId + // NOTE Try will return the `fst` of the tuple that `decide` returned + // If >1 attempt was necessary (e.g., due to conflicting events), the `fst` from the last attempt is the outcome + member __.Try(clientId, context, command) : Async = + let stream = resolve clientId + stream.Transact(fun state -> + decide (context, command) state) ``` ## DOs @@ -646,7 +685,8 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list = let state' = fold state events state', acc @ events) -type Service ... = +type Service internal (resolve : CartId -> Equinox.Stream) = + member __.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async = let stream = resolve (cartId,if optimistic then Some Equinox.AllowStale else None) stream.TransactAsync(fun state -> async { diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index 30bcaabd5..2ba19ba9b 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -30,17 +30,17 @@ type ServiceBuilder(storageConfig, handlerLog) = member __.CreateFavoritesService() = let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot - Backend.Favorites.Service(handlerLog, resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot)) + Backend.Favorites.create handlerLog (resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot)) member __.CreateSaveForLaterService() = let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact - Backend.SavedForLater.Service(handlerLog, resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot), maxSavedItems=50) + Backend.SavedForLater.create 50 handlerLog (resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot)) member __.CreateTodosService() = let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot - TodoBackend.Service(handlerLog, resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot)) + TodoBackend.create handlerLog (resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot)) let register (services : IServiceCollection, storageConfig, handlerLog) = let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore @@ -49,4 +49,4 @@ let register (services : IServiceCollection, storageConfig, handlerLog) = regF <| fun sp -> sp.GetService().CreateFavoritesService() regF <| fun sp -> sp.GetService().CreateSaveForLaterService() - regF <| fun sp -> sp.GetService().CreateTodosService() \ No newline at end of file + regF <| fun sp -> sp.GetService().CreateTodosService() diff --git a/samples/Store/Backend/Cart.fs b/samples/Store/Backend/Cart.fs index fecb0e2fe..5e9a68eac 100644 --- a/samples/Store/Backend/Cart.fs +++ b/samples/Store/Backend/Cart.fs @@ -1,5 +1,6 @@ module Backend.Cart +open Domain open Domain.Cart #if ACCUMULATOR @@ -44,9 +45,7 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list = state', acc @ events) #endif -type Service(log, resolve) = - - let resolve (Events.ForCartId streamId, opt) = Equinox.Stream(log, resolve (streamId,opt), maxAttempts = 3) +type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equinox.Stream) = member __.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async = let stream = resolve (cartId,if optimistic then Some Equinox.AllowStale else None) @@ -72,4 +71,10 @@ type Service(log, resolve) = stream.Query id member __.ReadStale cartId = let stream = resolve (cartId,Some Equinox.ResolveOption.AllowStale) - stream.Query id \ No newline at end of file + stream.Query id + +let create log resolve = + let resolve (id, opt) = + let stream = resolve (streamName id, opt) + Equinox.Stream(log, stream, maxAttempts = 3) + Service(resolve) diff --git a/samples/Store/Backend/ContactPreferences.fs b/samples/Store/Backend/ContactPreferences.fs index 79eb1b8f8..3f1431b3a 100644 --- a/samples/Store/Backend/ContactPreferences.fs +++ b/samples/Store/Backend/ContactPreferences.fs @@ -2,18 +2,20 @@ open Domain.ContactPreferences -type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : Id -> Equinox.Stream) = let update email value : Async = let stream = resolve email - let command = Update { email = email; preferences = value } + let command = let (Id email) = email in Update { email = email; preferences = value } stream.Transact(Commands.interpret command) - member __.Update email value = + member __.Update(email, value) = update email value member __.Read(email) = let stream = resolve email - stream.Query id \ No newline at end of file + stream.Query id + +let create log resolve = + let resolve id = Equinox.Stream(log, resolve (streamName id), maxAttempts = 3) + Service(resolve) diff --git a/samples/Store/Backend/Favorites.fs b/samples/Store/Backend/Favorites.fs index 209bc0f61..783264375 100644 --- a/samples/Store/Backend/Favorites.fs +++ b/samples/Store/Backend/Favorites.fs @@ -1,11 +1,10 @@ module Backend.Favorites +open Domain open Domain.Favorites open System -type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 2) +type Service internal (resolve : ClientId -> Equinox.Stream) = let execute clientId command : Async = let stream = resolve clientId @@ -24,4 +23,8 @@ type Service(log, resolve, ?maxAttempts) = execute clientId (Command.Unfavorite skus) member __.List clientId : Async = - read clientId \ No newline at end of file + read clientId + +let create log resolve = + let resolve id = Equinox.Stream(log, resolve (streamName id), maxAttempts = 3) + Service(resolve) diff --git a/samples/Store/Backend/InventoryItem.fs b/samples/Store/Backend/InventoryItem.fs index 348e7ad88..b95890055 100644 --- a/samples/Store/Backend/InventoryItem.fs +++ b/samples/Store/Backend/InventoryItem.fs @@ -1,10 +1,9 @@ module Backend.InventoryItem +open Domain open Domain.InventoryItem -type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForInventoryItemId id) = Equinox.Stream(log, resolve id, defaultArg maxAttempts 3) +type Service internal (resolve : InventoryItemId -> Equinox.Stream) = member __.Execute(itemId, command) = let stream = resolve itemId @@ -12,4 +11,9 @@ type Service(log, resolve, ?maxAttempts) = member __.Read(itemId) = let stream = resolve itemId - stream.Query id \ No newline at end of file + stream.Query id + +let create resolve = + let resolve id = + Equinox.Stream(Serilog.Log.ForContext(), resolve (streamName id), maxAttempts = 3) + Service(resolve) diff --git a/samples/Store/Backend/SavedForLater.fs b/samples/Store/Backend/SavedForLater.fs index f4ade0809..d938694f1 100644 --- a/samples/Store/Backend/SavedForLater.fs +++ b/samples/Store/Backend/SavedForLater.fs @@ -4,12 +4,10 @@ open Domain open Domain.SavedForLater open System -type Service(handlerLog, resolve, maxSavedItems : int, ?maxAttempts) = +type Service internal (resolve : ClientId -> Equinox.Stream, maxSavedItems) = do if maxSavedItems < 0 then invalidArg "maxSavedItems" "must be non-negative value." - let resolve (Events.ForClientId streamId) = Equinox.Stream(handlerLog, resolve streamId, defaultArg maxAttempts 3) - let execute clientId command : Async = let stream = resolve clientId stream.Transact(Commands.decide maxSavedItems command) @@ -39,4 +37,8 @@ type Service(handlerLog, resolve, maxSavedItems : int, ?maxAttempts) = member __.Merge(clientId, targetId) : Async = async { let! state = read clientId - return! execute targetId (Merge state) } \ No newline at end of file + return! execute targetId (Merge state) } + +let create maxSavedItems log resolve = + let resolve id = Equinox.Stream(log, resolve (streamName id), maxAttempts = 3) + Service(resolve, maxSavedItems) diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index 348515519..d2575d878 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -1,10 +1,10 @@ module Domain.Cart +let streamName (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let (|ForCartId|) (id: CartId) = FsCodec.StreamName.create "Cart" (CartId.toString id) - type ContextInfo = { time: System.DateTime; requestId: RequestId } type ItemInfo = { context: ContextInfo; item: ItemInfo } @@ -27,6 +27,7 @@ module Events = let codec = FsCodec.NewtonsoftJson.Codec.Create() module Fold = + type ItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool } type State = { items: ItemInfo list } module State = @@ -79,4 +80,4 @@ module Commands = match waived with | Some waived when itemExistsWithDifferentWaiveStatus skuId waived -> yield Events.ItemWaiveReturnsChanged { context = c; skuId = skuId; waived = waived } - | _ -> () ] \ No newline at end of file + | _ -> () ] diff --git a/samples/Store/Domain/ContactPreferences.fs b/samples/Store/Domain/ContactPreferences.fs index 263efebef..ad635114a 100644 --- a/samples/Store/Domain/ContactPreferences.fs +++ b/samples/Store/Domain/ContactPreferences.fs @@ -1,12 +1,11 @@ module Domain.ContactPreferences type Id = Id of email: string +let streamName (Id email) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64 // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let (|ForClientId|) (email: string) = FsCodec.StreamName.create "ContactPreferences" email // TODO hash >> base64 - type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool } type Value = { email : string; preferences : Preferences } @@ -37,4 +36,4 @@ module Commands = match command with | Update ({ preferences = preferences } as value) -> if state = preferences then [] else - [ Events.Updated value ] \ No newline at end of file + [ Events.Updated value ] diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index d9e0bef13..f9a19bc4e 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -1,10 +1,10 @@ module Domain.Favorites +let streamName (id: ClientId) = FsCodec.StreamName.create "Favorites" (ClientId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let (|ForClientId|) (id: ClientId) = FsCodec.StreamName.create "Favorites" (ClientId.toString id) - type Favorited = { date: System.DateTimeOffset; skuId: SkuId } type Unfavorited = { skuId: SkuId } type Snapshotted = { net: Favorited[] } @@ -56,4 +56,4 @@ module Commands = yield Events.Favorited { date = date; skuId = skuId } ] | Unfavorite skuId -> if doesntHave skuId then [] else - [ Events.Unfavorited { skuId = skuId } ] \ No newline at end of file + [ Events.Unfavorited { skuId = skuId } ] diff --git a/samples/Store/Domain/InventoryItem.fs b/samples/Store/Domain/InventoryItem.fs index c2cf01dc0..b2c2b6f37 100644 --- a/samples/Store/Domain/InventoryItem.fs +++ b/samples/Store/Domain/InventoryItem.fs @@ -3,11 +3,11 @@ module Domain.InventoryItem open System +let streamName (id : InventoryItemId) = FsCodec.StreamName.create "InventoryItem" (InventoryItemId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let (|ForInventoryItemId|) (id : InventoryItemId) = FsCodec.StreamName.create "InventoryItem" (InventoryItemId.toString id) - type Event = | Created of name: string | Deactivated @@ -55,4 +55,4 @@ module Commands = [ Events.CheckedIn count ] | Deactivate -> if not state.active then invalidOp "Already deactivated" - [ Events.Deactivated ] \ No newline at end of file + [ Events.Deactivated ] diff --git a/samples/Store/Domain/SavedForLater.fs b/samples/Store/Domain/SavedForLater.fs index 994ccc1f0..866e32aed 100644 --- a/samples/Store/Domain/SavedForLater.fs +++ b/samples/Store/Domain/SavedForLater.fs @@ -3,11 +3,11 @@ open System open System.Collections.Generic +let streamName (id: ClientId) = FsCodec.StreamName.create "SavedForLater" (ClientId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let (|ForClientId|) (id: ClientId) = FsCodec.StreamName.create "SavedForLater" (ClientId.toString id) - type Item = { skuId : SkuId; dateSaved : DateTimeOffset } type Added = { skus : SkuId []; dateSaved : DateTimeOffset } @@ -104,4 +104,4 @@ module Commands = let index = Index state let net = skus |> Array.filter (index.DoesNotAlreadyContainSameOrMoreRecent dateSaved) if Array.isEmpty net then true, [] - else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ] \ No newline at end of file + else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ] diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index cbd377a58..bc3e49129 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -15,7 +15,7 @@ let createMemoryStore () = // we want to validate that the JSON UTF8 is working happily VolatileStore() let createServiceMemory log store = - Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) + Backend.Cart.create log (fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) let codec = Domain.Cart.Events.codec @@ -58,7 +58,7 @@ type Tests(testOutputHelper) = let log = createLog () let! conn = connect log let gateway = choose conn defaultBatchSize - return Backend.Cart.Service(log, resolve gateway) } + return Backend.Cart.create log (resolve gateway) } [] let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async { @@ -82,4 +82,4 @@ type Tests(testOutputHelper) = let ``Can roundtrip against Cosmos, correctly folding the events with With Snapshotting`` args = Async.RunSynchronously <| async { let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveCosmosStreamWithSnapshotStrategy do! act service args - } \ No newline at end of file + } diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index b592a9d90..c44677215 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -12,7 +12,7 @@ let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferenc let createMemoryStore () = new MemoryStore.VolatileStore<_>() let createServiceMemory log store = - Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) + Backend.ContactPreferences.create log (MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) let codec = Domain.ContactPreferences.Events.codec let resolveStreamGesWithOptimizedStorageSemantics gateway = @@ -33,10 +33,9 @@ type Tests(testOutputHelper) = let createLog () = createLogger testOutput let act (service : Backend.ContactPreferences.Service) (id,value) = async { - let (Domain.ContactPreferences.Id email) = id - do! service.Update email value + do! service.Update(id, value) - let! actual = service.Read email + let! actual = service.Read id test <@ value = actual @> } [] @@ -49,7 +48,7 @@ type Tests(testOutputHelper) = let log = createLog () let! conn = connect log let gateway = choose conn - return Backend.ContactPreferences.Service(log, resolve gateway) } + return Backend.ContactPreferences.create log (resolve gateway) } [] let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async { @@ -74,9 +73,9 @@ type Tests(testOutputHelper) = let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosWithLatestKnownEventSemantics do! act service args } - + [] let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async { let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds do! act service args - } \ No newline at end of file + } diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 424e4c42d..dce299a4d 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -12,21 +12,21 @@ let snapshot = Domain.Favorites.Fold.isOrigin, Domain.Favorites.Fold.snapshot let createMemoryStore () = new MemoryStore.VolatileStore<_>() let createServiceMemory log store = - Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) + Backend.Favorites.create log (MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) let codec = Domain.Favorites.Events.codec let createServiceGes gateway log = - let resolve = EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve - Backend.Favorites.Service(log, resolve) + let resolver = EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot) + Backend.Favorites.create log resolver.Resolve let createServiceCosmos gateway log = - let resolve = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve - Backend.Favorites.Service(log, resolve) + let resolver = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot) + Backend.Favorites.create log resolver.Resolve let createServiceCosmosRollingState gateway log = let access = Cosmos.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot - let resolve = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve - Backend.Favorites.Service(log, resolve) + let resolver = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access) + Backend.Favorites.create log resolver.Resolve type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper @@ -66,7 +66,7 @@ type Tests(testOutputHelper) = let service = createServiceCosmos gateway log do! act service args } - + [] let ``Can roundtrip against Cosmos, correctly folding the events with rolling unfolds`` args = Async.RunSynchronously <| async { let log = createLog () @@ -74,4 +74,4 @@ type Tests(testOutputHelper) = let gateway = createCosmosContext conn defaultBatchSize let service = createServiceCosmosRollingState gateway log do! act service args - } \ No newline at end of file + } diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 053d0786d..9e43d0f5b 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -111,7 +111,7 @@ type Tests() = let log = createLoggerWithMetricsExtraction buffer.Enqueue let! conn = connectToLocalEventStoreNode log let gateway = createGesGateway conn batchSize - let service = Backend.Cart.Service(log, CartIntegration.resolveGesStreamWithRollingSnapshots gateway) + let service = Backend.Cart.create log (CartIntegration.resolveGesStreamWithRollingSnapshots gateway) let itemCount = batchSize / 2 + 1 let cartId = % Guid.NewGuid() do! act buffer service itemCount context cartId skuId "ReadStreamEventsBackwardAsync-Duration" @@ -124,8 +124,8 @@ type Tests() = let log = createLoggerWithMetricsExtraction buffer.Enqueue let! conn = connectToSpecifiedCosmosOrSimulator log let gateway = createCosmosContext conn batchSize - let service = Backend.Cart.Service(log, CartIntegration.resolveCosmosStreamWithSnapshotStrategy gateway) + let service = Backend.Cart.create log (CartIntegration.resolveCosmosStreamWithSnapshotStrategy gateway) let itemCount = batchSize / 2 + 1 let cartId = % Guid.NewGuid() do! act buffer service itemCount context cartId skuId "EqxCosmos Tip " // one is a 404, one is a 200 - } \ No newline at end of file + } diff --git a/samples/TodoBackend/Todo.fs b/samples/TodoBackend/Todo.fs index 0349f1ada..6da0ca813 100644 --- a/samples/TodoBackend/Todo.fs +++ b/samples/TodoBackend/Todo.fs @@ -1,14 +1,15 @@ -namespace TodoBackend +module TodoBackend open Domain +// The TodoBackend spec does not dictate having multiple lists, tenants or clients +// Here, we implement such a discriminator in order to allow each virtual client to maintain independent state +let Category = "Todos" +let streamName (id : ClientId) = FsCodec.StreamName.create Category (ClientId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - // The TodoBackend spec does not dictate having multiple lists, tenants or clients - // Here, we implement such a discriminator in order to allow each virtual client to maintain independent state - let (|ForClientId|) (id : ClientId) = FsCodec.StreamName.create "Todos" (ClientId.toString id) - type Todo = { id: int; order: int; title: string; completed: bool } type Deleted = { id: int } type Snapshotted = { items: Todo[] } @@ -49,9 +50,7 @@ module Commands = | Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [Events.Deleted {id=id}] else [] | Clear -> if state.items |> List.isEmpty then [] else [Events.Cleared] -type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, maxAttempts = defaultArg maxAttempts 2) +type Service internal (resolve : ClientId -> Equinox.Stream) = let execute clientId command = let stream = resolve clientId @@ -81,4 +80,6 @@ type Service(log, resolve, ?maxAttempts) = member __.Patch(clientId, item: Events.Todo) : Async = async { let! state' = handle clientId (Command.Update item) - return List.find (fun x -> x.id = item.id) state' } \ No newline at end of file + return List.find (fun x -> x.id = item.id) state' } + +let create log resolve = Service(fun id -> Equinox.Stream(log, resolve (streamName id), maxAttempts = 3)) diff --git a/samples/Tutorial/AsAt.fsx b/samples/Tutorial/AsAt.fsx index 492a51334..7effbf8b0 100644 --- a/samples/Tutorial/AsAt.fsx +++ b/samples/Tutorial/AsAt.fsx @@ -36,9 +36,9 @@ open System -module Events = +let streamName clientId = FsCodec.StreamName.create "Account" clientId - let (|ForClientId|) clientId = FsCodec.StreamName.create "Account" clientId +module Events = type Delta = { count : int } type SnapshotInfo = { balanceLog : int[] } @@ -103,9 +103,7 @@ module Commands = if bal < delta then invalidArg "delta" (sprintf "delta %d exceeds balance %d" delta bal) else [-1L,Events.Removed {count = delta}] -type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : ClientId -> Equinox.Stream) = let execute clientId command : Async = let stream = resolve clientId @@ -152,7 +150,7 @@ module EventStore = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching // rig snapshots to be injected as events into the stream every `snapshotWindow` events let accessStrategy = AccessStrategy.RollingSnapshots (Fold.isValid,Fold.snapshot) - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) module Cosmos = open Equinox.Cosmos @@ -163,10 +161,10 @@ module Cosmos = let context = Context(conn, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching let accessStrategy = AccessStrategy.Snapshot (Fold.isValid,Fold.snapshot) - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) -let serviceES = Service(Log.log, EventStore.resolve) -let serviceCosmos = Service(Log.log, Cosmos.resolve) +let serviceES = Service(Log.log, EventStore.resolver.Resolve) +let serviceCosmos = Service(Log.log, Cosmos.resolver.Resolve) let client = "ClientA" serviceES.Add(client, 1) |> Async.RunSynchronously diff --git a/samples/Tutorial/Cosmos.fsx b/samples/Tutorial/Cosmos.fsx index 792b08a69..a18c8b89d 100644 --- a/samples/Tutorial/Cosmos.fsx +++ b/samples/Tutorial/Cosmos.fsx @@ -19,13 +19,11 @@ #r "Serilog.Sinks.Seq.dll" #r "Equinox.Cosmos.dll" -open Equinox.Cosmos -open System +let Category = "Favorites" +let streamName clientId = FsCodec.StreamName.create Category clientId module Favorites = - let (|ForClientId|) clientId = FsCodec.StreamName.create "Favorites" clientId - type Item = { sku : string } type Event = | Added of Item @@ -46,9 +44,7 @@ module Favorites = | Add sku -> if state |> List.contains sku then [] else [Added {sku = sku}] | Remove sku -> if state |> List.contains sku then [Removed {sku = sku}] else [] - type Service(log, resolve, maxAttempts) = - - let resolve (ForClientId streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) + type Service internal (resolve : ClientId -> Equinox.Stream) = let execute clientId command : Async = let stream = resolve clientId @@ -87,9 +83,13 @@ module Store = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching module FavoritesCategory = - let resolve = Resolver(Store.context, Favorites.codec, Favorites.fold, Favorites.initial, Store.cacheStrategy, AccessStrategy.Unoptimized).Resolve + let resolver = Resolver(Store.context, Favorites.codec, Favorites.fold, Favorites.initial, Store.cacheStrategy, AccessStrategy.Unoptimized) -let service = Favorites.Service(Log.log, FavoritesCategory.resolve, maxAttempts=3) +let service resolve = + let resolve clientId = + let streamName = streamName clientId + Equinox.Stream(Log.log, FavoritesCategory.resolver.Resolve streamName, maxAttempts = 3) + Favorites.Service resolve let client = "ClientJ" service.Favorite(client, "a") |> Async.RunSynchronously diff --git a/samples/Tutorial/Counter.fsx b/samples/Tutorial/Counter.fsx index 6490871b0..f7ea193ef 100644 --- a/samples/Tutorial/Counter.fsx +++ b/samples/Tutorial/Counter.fsx @@ -24,7 +24,7 @@ type Event = | Cleared of Cleared interface TypeShape.UnionContract.IUnionContract (* Kind of DDD aggregate ID *) -let (|ForCounterId|) (id : string) = FsCodec.StreamName.create "Counter" id +let streamName (id : string) = FsCodec.StreamName.create "Counter" id type State = State of int let initial : State = State 0 @@ -56,9 +56,7 @@ let decide command (State state) = | Clear i -> if state = i then [] else [Cleared {value = i}] -type Service(log, resolve, ?maxAttempts) = - - let resolve (ForCounterId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : string -> Equinox.Stream) = let execute counterId command : Async = let stream = resolve counterId @@ -77,10 +75,10 @@ type Service(log, resolve, ?maxAttempts) = let store = Equinox.MemoryStore.VolatileStore() let codec = FsCodec.Box.Codec.Create() -let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve +let resolver = Equinox.MemoryStore.Resolver(store, codec, fold, initial) open Serilog let log = LoggerConfiguration().WriteTo.Console().CreateLogger() -let service = Service(log, resolve, maxAttempts=3) +let service = Service(log, resolver.Resolve, maxAttempts = 3) let clientId = "ClientA" service.Read(clientId) |> Async.RunSynchronously service.Execute(clientId, Increment) |> Async.RunSynchronously diff --git a/samples/Tutorial/Favorites.fsx b/samples/Tutorial/Favorites.fsx index 790024b31..617edd156 100644 --- a/samples/Tutorial/Favorites.fsx +++ b/samples/Tutorial/Favorites.fsx @@ -72,8 +72,7 @@ let _removeBAgainEffect = interpret (Remove "b") favesCa b) a maximum number of attempts to make if we clash with a conflicting write *) // Example of wrapping Stream to encapsulate stream access patterns (see DOCUMENTATION.md for reasons why this is not advised in real apps) -type Handler(log, stream, ?maxAttempts) = - let inner = Equinox.Stream(log, stream, maxAttempts = defaultArg maxAttempts 2) +type Handler(stream : Equinox.Stream) = member __.Execute command : Async = inner.Transact(interpret command) member __.Read : Async = @@ -87,8 +86,8 @@ open Serilog let log = LoggerConfiguration().WriteTo.Console().CreateLogger() // related streams are termed a Category; Each client will have it's own Stream. -let categoryId = "Favorites" -let clientAFavoritesStreamId = FsCodec.StreamName.create categoryId "ClientA" +let Category = "Favorites" +let clientAFavoritesStreamName = FsCodec.StreamName.create Category "ClientA" // For test purposes, we use the in-memory store let store = Equinox.MemoryStore.VolatileStore() @@ -103,12 +102,13 @@ let codec = | "Remove", (:? string as x) -> Removed x |> Some | _ -> None FsCodec.Codec.Create(encode,tryDecode) -// Each store has a Resolver which provides an IStream instance which binds to a specific stream in a specific store +// Each store has a Resolver that generates IStream instances binding to a specific stream in a specific store // ... because the nature of the contract with the handler is such that the store hands over State, we also pass the `initial` and `fold` as we used above -let stream streamName = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve(streamName) +let resolver = Equinox.MemoryStore.Resolver(store, codec, fold, initial) +let stream streamName = Equinox.Stream(log, resolver.Resolve streamName, maxAttempts = 2) // We hand the streamId to the resolver -let clientAStream = stream clientAFavoritesStreamId +let clientAStream = stream clientAFavoritesStreamName // ... and pass the stream to the Handler let handler = Handler(log, clientAStream) @@ -131,12 +131,7 @@ handler.Read |> Async.RunSynchronously (* Building a service to package Command Handling and related functions No, this is not doing CQRS! *) -type Service(log, resolve) = - (* See Counter.fsx and Cosmos.fsx for a more compact representation which makes the Handler wiring less obtrusive *) - let streamFor (clientId: string) = - let streamName = FsCodec.StreamName.create "Favorites" clientId - let stream = resolve streamName - Handler(log, stream) +type Service(streamFor : string -> Equinox.Stream) = member __.Favorite(clientId, sku) = let stream = streamFor clientId @@ -150,9 +145,13 @@ type Service(log, resolve) = let stream = streamFor clientId stream.Read -let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve +(* See Counter.fsx and Cosmos.fsx for a more compact representation which makes the Handler wiring less obtrusive *) +let streamFor (clientId: string) = + let streamName = FsCodec.StreamName.create "Favorites" clientId + let stream = resolver.Resolve streamName + Handler(log, stream) -let service = Service(log, resolve) +let service = Service(streamFor) let client = "ClientB" service.Favorite(client, "a") |> Async.RunSynchronously diff --git a/samples/Tutorial/FulfilmentCenter.fsx b/samples/Tutorial/FulfilmentCenter.fsx index aa213749b..212486243 100644 --- a/samples/Tutorial/FulfilmentCenter.fsx +++ b/samples/Tutorial/FulfilmentCenter.fsx @@ -40,9 +40,9 @@ module Types = module FulfilmentCenter = - module Events = + let streamName id = FsCodec.StreamName.create "FulfilmentCenter" id - let (|ForFcId|) id = FsCodec.StreamName.create "FulfilmentCenter" id + module Events = type AddressData = { address : Address } type ContactInformationData = { contact : ContactInformation } @@ -84,9 +84,7 @@ module FulfilmentCenter = | UpdateDetails c when state.details = Some c -> [] | UpdateDetails c -> [Events.FcDetailsChanged { details = c }] - type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForFcId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) + type Service internal (resolve : string -> Equinox.Stream) = let execute fc command : Async = let stream = resolve fc @@ -135,8 +133,8 @@ module Store = open FulfilmentCenter -let resolve = Resolver(Store.context, Events.codec, Fold.fold, Fold.initial, Store.cacheStrategy, AccessStrategy.Unoptimized).Resolve -let service = Service(Log.log, resolve) +let resolver = Resolver(Store.context, Events.codec, Fold.fold, Fold.initial, Store.cacheStrategy, AccessStrategy.Unoptimized) +let service = Service(Log.log, resolver.Resolve) let fc = "fc0" service.UpdateName(fc, { code="FC000"; name="Head" }) |> Async.RunSynchronously @@ -147,7 +145,7 @@ Log.dumpMetrics () /// Manages ingestion of summary events tagged with the version emitted from FulmentCenter.Service.QueryWithVersion module FulfilmentCenterSummary = - let (|ForFcId|) id = FsCodec.StreamName.create "FulfilmentCenterSummary" id + let streamName id = FsCodec.StreamName.create "FulfilmentCenterSummary" id module Events = type UpdatedData = { version : int64; state : Summary } @@ -169,9 +167,7 @@ module FulfilmentCenterSummary = | Update (uv,_us) when state |> Option.exists (fun s -> s.version > uv) -> [] | Update (uv,us) -> [Events.Updated { version = uv; state = us }] - type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForFcId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) + type Service internal (resolve : string -> Equinox.Stream) = let execute fc command : Async = let stream = resolve fc diff --git a/samples/Tutorial/Gapless.fs b/samples/Tutorial/Gapless.fs index b16991b90..7042b4e10 100644 --- a/samples/Tutorial/Gapless.fs +++ b/samples/Tutorial/Gapless.fs @@ -4,12 +4,12 @@ module Gapless open System +let [] Category = "Gapless" +let streamName id = FsCodec.StreamName.create Category (SequenceId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let [] categoryId = "Gapless" - let (|ForSequenceId|) id = FsCodec.StreamName.create categoryId (SequenceId.toString id) - type Item = { id : int64 } type Snapshotted = { reservations : int64[]; nextId : int64 } type Event = @@ -54,9 +54,7 @@ let decideConfirm item (state : Fold.State) : Events.Event list = let decideRelease item (state : Fold.State) : Events.Event list = failwith "TODO" -type Service(log, resolve, ?maxAttempts) = - - let resolve (Events.ForSequenceId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : SequenceId -> Equinox.Stream) = member __.ReserveMany(series,count) : Async = let stream = resolve series @@ -79,19 +77,22 @@ let [] appName = "equinox-tutorial-gapless" module Cosmos = open Equinox.Cosmos - let private createService (context,cache,accessStrategy) = + let private create (context,cache,accessStrategy) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve - Service(Serilog.Log.Logger, resolve) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let resolve sequenceId = + let streamName = streamName sequenceId + Equinox.Stream(Serilog.Log.Logger, resolver.Resolve streamName, maxAttempts = 3) + Service(resolve) module Snapshot = - let createService (context,cache) = + let create (context,cache) = let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin,Fold.snapshot) - createService(context,cache,accessStrategy) + create(context,cache,accessStrategy) module RollingUnfolds = - let createService (context,cache) = + let create (context,cache) = let accessStrategy = AccessStrategy.RollingState Fold.snapshot - createService(context,cache,accessStrategy) \ No newline at end of file + create(context,cache,accessStrategy) diff --git a/samples/Tutorial/Index.fs b/samples/Tutorial/Index.fs index c45cd944c..92bd1c06b 100644 --- a/samples/Tutorial/Index.fs +++ b/samples/Tutorial/Index.fs @@ -1,11 +1,11 @@ module Index +let [] Category = "Index" +let streamName indexId = FsCodec.StreamName.create Category (IndexId.toString indexId) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let [] categoryId = "Index" - let (|ForIndexId|) indexId = FsCodec.StreamName.create categoryId (IndexId.toString indexId) - type ItemIds = { items : string[] } type Items<'v> = { items : Map } type Event<'v> = @@ -38,30 +38,30 @@ let interpret add remove (state : Fold.State<'v>) = [ if adds.Length <> 0 then yield Events.Added { items = Map.ofSeq adds } if removes.Length <> 0 then yield Events.Deleted { items = removes } ] -type Service<'t> internal (indexId, resolve, maxAttempts) = - - let log = Serilog.Log.ForContext>() - let resolve (Events.ForIndexId streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) - let stream = resolve indexId +type Service<'t> internal (stream : Equinox.Stream, Fold.State<'t>>) = member __.Ingest(adds : seq, removes : string seq) : Async = stream.Transact(interpret adds removes) member __.Read() : Async> = stream.Query id -let create resolve indexId = Service(indexId, resolve, maxAttempts = 3) +let create<'t> resolve indexId = + let log = Serilog.Log.ForContext>() + let streamName = streamName indexId + let stream = Equinox.Stream(log, resolve streamName, maxAttempts = 3) + Service(stream) module Cosmos = open Equinox.Cosmos - let createService<'v> (context,cache) = + let create<'v> (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.RollingState Fold.snapshot - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve - create resolve + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create resolver.Resolve module MemoryStore = - let createService store = - let resolve = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial).Resolve - create resolve \ No newline at end of file + let create store = + let resolver = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial) + create resolver.Resolve diff --git a/samples/Tutorial/Sequence.fs b/samples/Tutorial/Sequence.fs index 36e6aa633..c69acb510 100644 --- a/samples/Tutorial/Sequence.fs +++ b/samples/Tutorial/Sequence.fs @@ -4,6 +4,9 @@ module Sequence open System +let [] Category = "Sequence" +let streamName id = FsCodec.StreamName.create Category (SequenceId.toString id) + // shim for net461 module Seq = let tryLast (source : seq<_>) = @@ -18,9 +21,6 @@ module Seq = // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let [] categoryId = "Sequence" - let (|ForSequenceId|) id = FsCodec.StreamName.create categoryId (SequenceId.toString id) - type Reserved = { next : int64 } type Event = | Reserved of Reserved @@ -40,32 +40,34 @@ module Fold = let decideReserve (count : int) (state : Fold.State) : int64 * Events.Event list = state.next,[Events.Reserved { next = state.next + int64 count }] -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.ForSequenceId streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) +type Service internal (resolve : SequenceId -> Equinox.Stream) = /// Reserves an id, yielding the reserved value. Optional count enables reserving more than the default count of 1 in a single transaction member __.Reserve(series,?count) : Async = let stream = resolve series stream.Transact(decideReserve (defaultArg count 1)) -let create resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 3) +let create resolve = + let resolve sequenceId = + let streamName = streamName sequenceId + Equinox.Stream(Serilog.Log.ForContext(), resolve streamName, maxAttempts = 3) + Service(resolve) module Cosmos = open Equinox.Cosmos - let private createService (context,cache,accessStrategy) = + let private create (context,cache,accessStrategy) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve - create resolve + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create resolver.Resolve module LatestKnownEvent = - let createService (context,cache) = + let create (context,cache) = let accessStrategy = AccessStrategy.LatestKnownEvent - createService (context,cache,accessStrategy) + create (context,cache,accessStrategy) module RollingUnfolds = - let createService (context,cache) = - createService (context,cache,AccessStrategy.RollingState Fold.snapshot) + let create (context,cache) = + create (context,cache,AccessStrategy.RollingState Fold.snapshot) diff --git a/samples/Tutorial/Set.fs b/samples/Tutorial/Set.fs index 4e7437b60..b9b5a3ae7 100644 --- a/samples/Tutorial/Set.fs +++ b/samples/Tutorial/Set.fs @@ -1,11 +1,11 @@ module Set +let [] Category = "Set" +let streamName id = FsCodec.StreamName.create Category (SetId.toString id) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let [] categoryId = "Set" - let (|ForSetId|) id = FsCodec.StreamName.create categoryId (SetId.toString id) - type Items = { items : string[] } type Event = | Added of Items @@ -38,30 +38,30 @@ let interpret add remove (state : Fold.State) = [ if adds.Length <> 0 then yield Events.Added { items = adds } if removes.Length <> 0 then yield Events.Deleted { items = removes } ] -type Service internal (log, setId, resolve, maxAttempts) = - - let resolve (Events.ForSetId streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) - let stream = resolve setId +type Service internal (stream : Equinox.Stream) = - member __.Add(add : string seq,remove : string seq) : Async = + member __.Add(add : string seq, remove : string seq) : Async = stream.Transact(interpret add remove) member __.Read() : Async> = stream.Query id -let create resolve setId = Service(Serilog.Log.ForContext(), setId, resolve, maxAttempts = 3) +let create resolve setId = + let streamName = streamName setId + let stream = Equinox.Stream(Serilog.Log.ForContext(), resolve streamName, maxAttempts = 3) + Service(stream) module Cosmos = open Equinox.Cosmos - let createService (context,cache) = + let create (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.RollingState Fold.snapshot - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve - create resolve + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create resolver.Resolve module MemoryStore = - let createService store = - let resolve = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial).Resolve - create resolve \ No newline at end of file + let create store = + let resolver = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial) + create resolver.Resolve diff --git a/samples/Tutorial/Todo.fsx b/samples/Tutorial/Todo.fsx index d75d0a42c..c2212ed29 100644 --- a/samples/Tutorial/Todo.fsx +++ b/samples/Tutorial/Todo.fsx @@ -17,13 +17,11 @@ #r "Microsoft.Azure.DocumentDb.Core.dll" #r "Equinox.Cosmos.dll" -open Equinox.Cosmos -open System - (* NB It's recommended to look at Favorites.fsx first as it establishes the groundwork This tutorial stresses different aspects *) -let (|ForClientId|) (id : string) = FsCodec.StreamName.create "Todos" id +let Category = "Todos" +let streamName (id : string) = FsCodec.StreamName.create Category id type Todo = { id: int; order: int; title: string; completed: bool } type DeletedInfo = { id: int } @@ -61,9 +59,7 @@ let interpret c (state : State) = | Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [Deleted { id=id }] else [] | Clear -> if state.items |> List.isEmpty then [] else [Cleared] -type Service(log, resolve, ?maxAttempts) = - - let resolve (ForClientId streamId) = Equinox.Stream(log, resolve streamId, defaultArg maxAttempts 3) +type Service internal (resolve : string -> Equinox.Stream) = let execute clientId command : Async = let stream = resolve clientId @@ -118,6 +114,7 @@ let log = LoggerConfiguration().WriteTo.Console().CreateLogger() let [] appName = "equinox-tutorial" let cache = Equinox.Cache(appName, 20) +open Equinox.Cosmos module Store = let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get @@ -130,9 +127,9 @@ module Store = module TodosCategory = let access = AccessStrategy.Snapshot (isOrigin,snapshot) - let resolve = Resolver(Store.store, codec, fold, initial, Store.cacheStrategy, access=access).Resolve + let resolver = Resolver(Store.store, codec, fold, initial, Store.cacheStrategy, access=access) -let service = Service(log, TodosCategory.resolve) +let service = Service(log, TodosCategory.resolver.Resolve) let client = "ClientJ" let item = { id = 0; order = 0; title = "Feed cat"; completed = false } diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index 5de5e7a6b..f7b5b742c 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -1,8 +1,21 @@ /// Simple example of how one might have multiple uploaders agree/share a common UploadId for a given OrderId module Upload -open System open FSharp.UMX +open System + +type PurchaseOrderId = int +and [] purchaseOrderId +module PurchaseOrderId = + let toString (value : PurchaseOrderId) : string = string %value + +type CompanyId = string +and [] companyId +module CompanyId = + let toString (value : CompanyId) : string = %value + +let [] Category = "Upload" +let streamName (companyId, purchaseOrderId) = FsCodec.StreamName.compose Category [PurchaseOrderId.toString purchaseOrderId; CompanyId.toString companyId] // shim for net461 module Seq = @@ -15,16 +28,6 @@ module Seq = else None -type PurchaseOrderId = int -and [] purchaseOrderId -module PurchaseOrderId = - let toString (value : PurchaseOrderId) : string = string %value - -type CompanyId = string -and [] companyId -module CompanyId = - let toString (value : CompanyId) : string = %value - type UploadId = string and [] uploadId module UploadId = @@ -33,9 +36,6 @@ module UploadId = // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = - let [] categoryId = "Upload" - let (|ForCompanyAndPurchaseOrder|) (companyId, purchaseOrderId) = FsCodec.StreamName.compose categoryId [PurchaseOrderId.toString purchaseOrderId; CompanyId.toString companyId] - type IdAssigned = { value : UploadId } type Event = | IdAssigned of IdAssigned @@ -56,26 +56,28 @@ let decide (value : UploadId) (state : Fold.State) : Choice * | None -> Choice1Of2 value, [Events.IdAssigned { value = value}] | Some value -> Choice2Of2 value, [] -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.ForCompanyAndPurchaseOrder streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) +type Service internal (resolve : CompanyId * PurchaseOrderId -> Equinox.Stream) = member __.Sync(companyId, purchaseOrderId, value) : Async> = let stream = resolve (companyId, purchaseOrderId) stream.Transact(decide value) -let create resolve = Service(Serilog.Log.ForContext(), resolve, 3) +let create resolve = + let resolve ids = + let streamName = streamName ids + Equinox.Stream(Serilog.Log.ForContext(), resolve streamName, maxAttempts = 3) + Service(resolve) module Cosmos = open Equinox.Cosmos - let createService (context,cache) = + let create (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve - create resolve + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent) + create resolver.Resolve module EventStore = open Equinox.EventStore - let createService context = - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent).Resolve - create resolve + let create context = + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent) + create resolver.Resolve diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 0b254b5f0..5c60ad020 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -15,42 +15,42 @@ module Cart = let createServiceWithoutOptimization connection batchSize log = let store = createCosmosContext connection batchSize let resolve (id,opt) = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + Backend.Cart.create log resolve let projection = "Compacted",snd snapshot /// Trigger looking in Tip (we want those calls to occur, but without leaning on snapshots, which would reduce the paths covered) let createServiceWithEmptyUnfolds connection batchSize log = let store = createCosmosContext connection batchSize let unfArgs = Domain.Cart.Fold.isOrigin, fun _ -> Seq.empty let resolve (id,opt) = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.MultiSnapshot unfArgs).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + Backend.Cart.create log resolve let createServiceWithSnapshotStrategy connection batchSize log = let store = createCosmosContext connection batchSize let resolve (id,opt) = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + Backend.Cart.create log resolve let createServiceWithSnapshotStrategyAndCaching connection batchSize log cache = let store = createCosmosContext connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) let resolve (id,opt) = Resolver(store, codec, fold, initial, sliding20m, AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + Backend.Cart.create log resolve let createServiceWithRollingState connection log = let store = createCosmosContext connection 1 let access = AccessStrategy.RollingState Domain.Cart.Fold.snapshot let resolve (id,opt) = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, access).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + Backend.Cart.create log resolve module ContactPreferences = let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferences.Fold.initial let codec = Domain.ContactPreferences.Events.codec let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate = let gateway = createGateway defaultBatchSize - let resolve = Resolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve - Backend.ContactPreferences.Service(log, resolve) + let resolver = Resolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized) + Backend.ContactPreferences.create log resolver.Resolve let createService log createGateway = - let resolve = Resolver(createGateway 1, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.LatestKnownEvent).Resolve - Backend.ContactPreferences.Service(log, resolve) + let resolver = Resolver(createGateway 1, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.LatestKnownEvent) + Backend.ContactPreferences.create log resolver.Resolve let createServiceWithLatestKnownEvent createGateway log cachingStrategy = - let resolve = Resolver(createGateway 1, codec, fold, initial, cachingStrategy, AccessStrategy.LatestKnownEvent).Resolve - Backend.ContactPreferences.Service(log, resolve) + let resolver = Resolver(createGateway 1, codec, fold, initial, cachingStrategy, AccessStrategy.LatestKnownEvent) + Backend.ContactPreferences.create log resolver.Resolve #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -197,19 +197,19 @@ type Tests(testOutputHelper) = let! conn = connectToSpecifiedCosmosOrSimulator log let service = ContactPreferences.createService log (createCosmosContext conn) - let email = let g = System.Guid.NewGuid() in g.ToString "N" + let id = ContactPreferences.Id (let g = System.Guid.NewGuid() in g.ToString "N") //let (Domain.ContactPreferences.Id email) = id () // Feed some junk into the stream for i in 0..11 do let quickSurveysValue = i % 2 = 0 - do! service.Update email { value with quickSurveys = quickSurveysValue } + do! service.Update(id, { value with quickSurveys = quickSurveysValue }) // Ensure there will be something to be changed by the Update below - do! service.Update email { value with quickSurveys = not value.quickSurveys } + do! service.Update(id, { value with quickSurveys = not value.quickSurveys }) capture.Clear() - do! service.Update email value + do! service.Update(id, value) - let! result = service.Read email + let! result = service.Read id test <@ value = result @> test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> @@ -220,18 +220,18 @@ type Tests(testOutputHelper) = let! conn = connectToSpecifiedCosmosOrSimulator log let service = ContactPreferences.createServiceWithLatestKnownEvent (createCosmosContext conn) log CachingStrategy.NoCaching - let email = let g = System.Guid.NewGuid() in g.ToString "N" + let id = ContactPreferences.Id (let g = System.Guid.NewGuid() in g.ToString "N") // Feed some junk into the stream for i in 0..11 do let quickSurveysValue = i % 2 = 0 - do! service.Update email { value with quickSurveys = quickSurveysValue } + do! service.Update(id, { value with quickSurveys = quickSurveysValue }) // Ensure there will be something to be changed by the Update below - do! service.Update email { value with quickSurveys = not value.quickSurveys } + do! service.Update(id, { value with quickSurveys = not value.quickSurveys }) capture.Clear() - do! service.Update email value + do! service.Update(id, value) - let! result = service.Read email + let! result = service.Read id test <@ value = result @> test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> @@ -375,4 +375,4 @@ type Tests(testOutputHelper) = capture.Clear() do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1 test <@ [EqxAct.Append] = capture.ExternalCalls @> - } \ No newline at end of file + } diff --git a/tests/Equinox.EventStore.Integration/StoreIntegration.fs b/tests/Equinox.EventStore.Integration/StoreIntegration.fs index 16370df83..3d6ff231a 100644 --- a/tests/Equinox.EventStore.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/StoreIntegration.fs @@ -51,26 +51,27 @@ module Cart = let codec = Domain.Cart.Events.codec let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot let createServiceWithoutOptimization log gateway = - Backend.Cart.Service(log, fun (id,opt) -> Resolver(gateway, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) + let resolve (id,opt) = Resolver(gateway, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt) + Backend.Cart.create log resolve let createServiceWithCompaction log gateway = let resolve (id,opt) = Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + Backend.Cart.create log resolve let createServiceWithCaching log gateway cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Backend.Cart.Service(log, fun (id,opt) -> Resolver(gateway, codec, fold, initial, sliding20m).Resolve(id,?option=opt)) + Backend.Cart.create log (fun (id,opt) -> Resolver(gateway, codec, fold, initial, sliding20m).Resolve(id,?option=opt)) let createServiceWithCompactionAndCaching log gateway cache = let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Backend.Cart.Service(log, fun (id,opt) -> Resolver(gateway, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)) + Backend.Cart.create log (fun (id,opt) -> Resolver(gateway, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)) module ContactPreferences = let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferences.Fold.initial let codec = Domain.ContactPreferences.Events.codec let createServiceWithoutOptimization log connection = let gateway = createGesGateway connection defaultBatchSize - Backend.ContactPreferences.Service(log, Resolver(gateway, codec, fold, initial).Resolve) + Backend.ContactPreferences.create log (Resolver(gateway, codec, fold, initial).Resolve) let createService log connection = - let resolve = Resolver(createGesGateway connection 1, codec, fold, initial, access = AccessStrategy.LatestKnownEvent).Resolve - Backend.ContactPreferences.Service(log, resolve) + let resolver = Resolver(createGesGateway connection 1, codec, fold, initial, access = AccessStrategy.LatestKnownEvent) + Backend.ContactPreferences.create log resolver.Resolve #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -259,18 +260,17 @@ type Tests(testOutputHelper) = let! conn = connectToLocalStore log let service = ContactPreferences.createService log conn - let (Domain.ContactPreferences.Id email) = id // Feed some junk into the stream for i in 0..11 do let quickSurveysValue = i % 2 = 0 - do! service.Update email { value with quickSurveys = quickSurveysValue } + do! service.Update(id, { value with quickSurveys = quickSurveysValue }) // Ensure there will be something to be changed by the Update below - do! service.Update email { value with quickSurveys = not value.quickSurveys } + do! service.Update(id, { value with quickSurveys = not value.quickSurveys }) capture.Clear() - do! service.Update email value + do! service.Update(id, value) - let! result = service.Read email + let! result = service.Read id test <@ value = result @> test <@ batchBackwardsAndAppend @ singleBatchBackwards = capture.ExternalCalls @> @@ -382,4 +382,4 @@ type Tests(testOutputHelper) = let! _ = service2.Read cartId let suboptimalExtraSlice = [singleSliceForward] test <@ singleBatchBackwards @ batchBackwardsAndAppend @ suboptimalExtraSlice @ singleBatchForward = capture.ExternalCalls @> - } \ No newline at end of file + } diff --git a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs index 6cc73bfc1..e367101d1 100644 --- a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs +++ b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs @@ -4,11 +4,12 @@ open Swensen.Unquote open Equinox.MemoryStore let createMemoryStore () = - new VolatileStore<_>() + VolatileStore<_>() let createServiceMemory log store = - let resolve (id,opt) = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Cart.Fold.fold, Domain.Cart.Fold.initial).Resolve(id,?option=opt) - Backend.Cart.Service(log, resolve) + let resolver = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Cart.Fold.fold, Domain.Cart.Fold.initial) + let resolve (id,opt) = resolver.Resolve(id,?option=opt) + Backend.Cart.create log resolve #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -44,4 +45,4 @@ type Tests(testOutputHelper) = | x -> x |> failwithf "Expected to find item, got %A" verifyFoldedStateReflectsCommand expected verifyFoldedStateReflectsCommand actual - } \ No newline at end of file + }