Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port to Equinox 4, Propulsion 3 #122

Merged
merged 43 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
208685a
Add DynamoStore support
bartelink Aug 9, 2022
e71b753
Port eqxPatterns to v4
bartelink Sep 9, 2022
e758151
Port equinox-web, adding Dynamo
bartelink Sep 9, 2022
3900ddd
Port eqxweb-cs
bartelink Sep 12, 2022
5e7e40e
Remove dead helpers
bartelink Sep 12, 2022
cec830b
Port feedSource
bartelink Sep 12, 2022
28e14d6
if eqx patterns
bartelink Sep 12, 2022
54ad370
Port periodicIngester
bartelink Sep 12, 2022
b729343
feedConsumer
bartelink Sep 12, 2022
337ef2c
summaryConsumer
bartelink Sep 12, 2022
98f6daa
trackingConsumer
bartelink Sep 12, 2022
aac71bc
proProjector
bartelink Sep 13, 2022
77baf6b
proArchiver
bartelink Sep 13, 2022
9f55c24
proReactorCosmos
bartelink Sep 13, 2022
20f4b34
f periodicIngester
bartelink Sep 13, 2022
d66f546
f feedApi
bartelink Sep 13, 2022
fe5a01c
f feedConsumer
bartelink Sep 13, 2022
0c80d62
f summaryConsumer
bartelink Sep 13, 2022
a36df1b
f trackingConsumer
bartelink Sep 13, 2022
20a910e
proPruner
bartelink Sep 13, 2022
4a9e4eb
proConsumer
bartelink Sep 13, 2022
d5b250f
eqxTestbed
bartelink Sep 13, 2022
247bc29
Eqx4rc1, Prp3b4.1
bartelink Sep 15, 2022
41d43b8
f
bartelink Sep 16, 2022
1cba2f7
f
bartelink Sep 16, 2022
498a9c8
f
bartelink Sep 16, 2022
d9347a5
f
bartelink Sep 16, 2022
5cd7155
f
bartelink Sep 16, 2022
4c07d4c
f sync a marveleqx
bartelink Sep 16, 2022
2fd4d7d
f reactor handler
bartelink Sep 16, 2022
2fa63b3
f
bartelink Sep 16, 2022
e0dc95e
f sync kafka
bartelink Sep 16, 2022
9644053
f sourceKafka
bartelink Sep 16, 2022
e05a690
f reactor refs
bartelink Sep 16, 2022
a41cc09
f proReactor
bartelink Sep 16, 2022
246a45a
f many
bartelink Sep 16, 2022
e42b26a
f eqxweb, sync
bartelink Sep 16, 2022
7bb209f
f proProjector
bartelink Sep 16, 2022
bd22078
f proReactor
bartelink Sep 16, 2022
fd2c519
Undo delete, fixes
bartelink Sep 16, 2022
fa73620
Fix parallelOnly cosmos
bartelink Sep 16, 2022
06a6c38
Fix?
bartelink Sep 16, 2022
4eb6658
another?
bartelink Sep 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `eqxShipping`: Use `Propulsion.DynamoStore`+`EventStoreDb`'s `AwaitCompletion` [#121](https://github.com/jet/dotnet-templates/pull/121)

### Changed

- Target `Equinox` v `4.0.0`, `Propulsion` v `3.0.0`, `FsCodec` v `3.0.0`, `net6.0` [#122](https://github.com/jet/dotnet-templates/pull/122)

### Removed

- `eqxProjector --source cosmos --kafka --synthesizeSequence`: Removed custom mode [#122](https://github.com/jet/dotnet-templates/pull/122)
- `proReactor`: remove `--filter` (see `proSync`) and `--changeFeedOnly` (see `proReactorCosmos`) [#122](https://github.com/jet/dotnet-templates/pull/122)

### Fixed

<a name="5.3.0"></a>
Expand Down
10 changes: 2 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ The following templates focus specifically on the usage of `Propulsion` componen

* `-k --parallelOnly` schedule kafka emission to operate in parallel at document (rather than accumulated span of events for a stream) level

* `-k --synthesizeSequence` parse documents, preserving input order as items are produced to Kafka

2. `--source eventStore`: EventStoreDB's `$all` feed

3. `--source sqlStreamStore`: [`SqlStreamStore`](https://github.com/SQLStreamStore/SQLStreamStore)'s `$all` feed
Expand All @@ -51,8 +49,7 @@ The specific behaviors carried out in reaction to incoming events often use `Equ

Input options are:

0. (default) dual mode CosmosDB ChangeFeed Processor and/or EventStore `$all` stream projector/reactor using `Propulsion.Cosmos`/`Propulsion.EventStore` depending on whether the program is run with `cosmos` or `es` arguments
1. `--source changeFeedOnly`: removes `EventStore` wiring from commandline processing
0. (default) `Propulsion.Cosmos`/`Propulsion.DynamoStore`/`Propulsion.EventStore` depending on whether the program is run with `cosmos`, `dynamo`, `es` arguments
2. `--source kafkaEventSpans`: changes source to be Kafka Event Spans, as emitted from `dotnet new proProjector --kafka`

The reactive behavior template has the following options:
Expand All @@ -62,9 +59,6 @@ The specific behaviors carried out in reaction to incoming events often use `Equ
2. `--kafka` (without `--blank`): adds Optional projection to Apache Kafka using [`Propulsion.Kafka`](https://github.com/jet/propulsion) (instead of ingesting into a local `Cosmos` store). Produces versioned [Summary Event](http://verraes.net/2019/05/patterns-for-decoupling-distsys-summary-event/) feed.
3. `--kafka --blank`: provides wiring for producing to Kafka, without summary reading logic etc

Miscellaneous options:
- `--filter` - include category filtering boilerplate

**NOTE At present, checkpoint storage when projecting from EventStore uses Azure CosmosDB - help wanted ;)**

- [`feedSource`](feed-source/) - Boilerplate for an ASP.NET Core Web Api serving a feed of items stashed in an `Equinox.CosmosStore`. See `dotnet new feedConsumer` for the associated consumption logic
Expand Down Expand Up @@ -192,7 +186,7 @@ There's [integration tests in the repo](https://github.com/jet/dotnet-templates/

dotnet build build.proj # build Equinox.Templates package, run tests \/
dotnet pack build.proj # build Equinox.Templates package only
dotnet test build.proj # Test aphabetically newest file in bin/nupkgs only
dotnet test build.proj -c Release # Test aphabetically newest file in bin/nupkgs only (-c Release to run full tests)

One can also do it manually:

Expand Down
11 changes: 5 additions & 6 deletions equinox-patterns/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<WarningLevel>5</WarningLevel>
<IsPackable>false</IsPackable>
<OutputType>Library</OutputType>
</PropertyGroup>

<ItemGroup>
Expand All @@ -13,12 +12,12 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />

<PackageReference Include="FsCheck.Xunit" Version="3.0.0-beta1" />
<PackageReference Include="FsCheck.Xunit" Version="3.0.0-beta2" />
<PackageReference Include="unquote" Version="5.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5" />
</ItemGroup>

<ItemGroup>
Expand Down
12 changes: 7 additions & 5 deletions equinox-patterns/Domain/Config.fs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
module Patterns.Domain.Config

let log = Serilog.Log.ForContext("isMetric", true)
let createDecider stream = Equinox.Decider(log, stream, maxAttempts = 3)
let resolveDecider cat = Equinox.Decider.resolve log cat

module EventCodec =

open FsCodec.SystemTextJson

let private defaultOptions = Options.Create()
let create<'t when 't :> TypeShape.UnionContract.IUnionContract> () =
Codec.Create<'t>(options = defaultOptions).ToByteArrayCodec()
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
Codec.Create<'t>(options = defaultOptions)
let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
CodecJsonElement.Create<'t>(options = defaultOptions)

module Memory =

let create codec initial fold store =
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
let create codec initial fold store : Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)

module Cosmos =

Expand Down
6 changes: 3 additions & 3 deletions equinox-patterns/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.MemoryStore" Version="3.0.7" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.7" />
<PackageReference Include="FsCodec.SystemTextJson" Version="2.3.2" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.1" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.1" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.7.1" />
</ItemGroup>

</Project>
35 changes: 13 additions & 22 deletions equinox-patterns/Domain/ListEpoch.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Patterns.Domain.ListEpoch

let [<Literal>] Category = "ListEpoch"
let streamName = ListEpochId.toString >> FsCodec.StreamName.create Category
let streamName id = struct (Category, ListEpochId.toString id)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand All @@ -12,7 +12,7 @@ module Events =
| Closed
| Snapshotted of {| ids : ItemId[]; closed : bool |}
interface TypeShape.UnionContract.IUnionContract
let codec = Config.EventCodec.create<Event>()
let codec, codecJe = Config.EventCodec.gen<Event>, Config.EventCodec.genJsonElement<Event>

module Fold =

Expand Down Expand Up @@ -49,18 +49,16 @@ let decide shouldClose candidateIds = function
// NOTE see feedSource for example of separating Service logic into Ingestion and Read Services in order to vary the folding and/or state held
type Service internal
( shouldClose : ItemId[] -> ItemId[] -> bool, // let outer layers decide whether ingestion should trigger closing of the batch
resolve_ : Equinox.ResolveOption option -> ListEpochId -> Equinox.Decider<Events.Event, Fold.State>) =
let resolve = resolve_ None
let resolveStale = resolve_ (Some Equinox.AllowStale)
resolve : ListEpochId -> Equinox.Decider<Events.Event, Fold.State>) =

/// Ingest the supplied items. Yields relevant elements of the post-state to enable generation of stats
/// and facilitate deduplication of incoming items in order to avoid null store round-trips where possible
member _.Ingest(epochId, items) : Async<ExactlyOnceIngester.IngestResult<_, _>> =
let decider = resolveStale epochId
/// NOTE decider which will initially transact against potentially stale cached state, which will trigger a
/// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk
/// of writes are presumed to be coming from within this same process
decider.Transact(decide shouldClose items)
let decider = resolve epochId
// NOTE decider which will initially transact against potentially stale cached state, which will trigger a
// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk
// of writes are presumed to be coming from within this same process
decider.Transact(decide shouldClose items, load = Equinox.AllowStale)

/// Returns all the items currently held in the stream (Not using AllowStale on the assumption this needs to see updates from other apps)
member _.Read epochId : Async<Fold.State> =
Expand All @@ -69,16 +67,9 @@ type Service internal

module Config =

let private create_ shouldClose resolve = Service(shouldClose, resolve)
let private resolveStream opt = function
| Config.Store.Memory store ->
let cat = Config.Memory.create Events.codec Fold.initial Fold.fold store
fun sn -> cat.Resolve(sn, ?option = opt)
| Config.Store.Cosmos (context, cache) ->
let cat = Config.Cosmos.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
fun sn -> cat.Resolve(sn, ?option = opt)
let private resolveDecider store opt = streamName >> resolveStream opt store >> Config.createDecider
let private create__ shouldClose = resolveDecider >> create_ shouldClose
let create maxItemsPerEpoch =
let private (|Category|) = function
| Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store
| Config.Store.Cosmos (context, cache) -> Config.Cosmos.createSnapshotted Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
let create maxItemsPerEpoch (Category cat) =
let shouldClose candidateItems currentItems = Array.length currentItems + Array.length candidateItems >= maxItemsPerEpoch
create__ shouldClose
Service(shouldClose, streamName >> Config.resolveDecider cat)
25 changes: 9 additions & 16 deletions equinox-patterns/Domain/ListSeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Patterns.Domain.ListSeries
let [<Literal>] Category = "ListSeries"
// TOCONSIDER: if you need multiple lists series/epochs in a single system, the Series and Epoch streams should have a SeriesId in the stream name
// See also the implementation in the feedSource template, where the Series aggregate also functions as an index of series held in the system
let streamName () = ListSeriesId.wellKnownId |> ListSeriesId.toString |> FsCodec.StreamName.create Category
let streamName () = struct (Category, ListSeriesId.toString ListSeriesId.wellKnownId)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand All @@ -16,7 +16,7 @@ module Events =
| Started of {| epochId : ListEpochId |}
| Snapshotted of {| active : ListEpochId |}
interface TypeShape.UnionContract.IUnionContract
let codec = Config.EventCodec.create<Event>()
let codec, codecJe = Config.EventCodec.gen<Event>, Config.EventCodec.genJsonElement<Event>

module Fold =

Expand All @@ -34,10 +34,7 @@ let interpret epochId (state : Fold.State) =
[if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ListEpochId.initial then
yield Events.Started {| epochId = epochId |}]

type Service internal (resolve_ : Equinox.ResolveOption option -> unit -> Equinox.Decider<Events.Event, Fold.State>) =

let resolve = resolve_ None
let resolveStale = resolve_ (Some Equinox.AllowStale)
type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.State>) =

/// Determines the current active epoch
/// Uses cached values as epoch transitions are rare, and caller needs to deal with the inherent race condition in any case
Expand All @@ -48,17 +45,13 @@ type Service internal (resolve_ : Equinox.ResolveOption option -> unit -> Equino
/// Mark specified `epochId` as live for the purposes of ingesting
/// Writers are expected to react to having writes to an epoch denied (due to it being Closed) by anointing a successor via this
member _.MarkIngestionEpochId epochId : Async<unit> =
let decider = resolveStale ()
decider.Transact(interpret epochId)
let decider = resolve ()
decider.Transact(interpret epochId, load = Equinox.AllowStale)

module Config =

let private resolveStream opt = function
| Config.Store.Memory store ->
let cat = Config.Memory.create Events.codec Fold.initial Fold.fold store
fun sn -> cat.Resolve(sn, ?option = opt)
let private (|Category|) = function
| Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store
| Config.Store.Cosmos (context, cache) ->
let cat = Config.Cosmos.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
fun sn -> cat.Resolve(sn, ?option = opt)
let private resolveDecider store opt = streamName >> resolveStream opt store >> Config.createDecider
let create = resolveDecider >> Service
Config.Cosmos.createSnapshotted Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
let create (Category cat) = Service(streamName >> Config.resolveDecider cat)
28 changes: 12 additions & 16 deletions equinox-patterns/Domain/Period.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
module Patterns.Domain.Period

let [<Literal>] Category = "Period"
let streamName periodId = FsCodec.StreamName.create Category (PeriodId.toString periodId)
let streamName periodId = struct (Category, PeriodId.toString periodId)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand All @@ -18,7 +18,7 @@ module Events =
| Added of ItemIds
| CarriedForward of Balance
interface TypeShape.UnionContract.IUnionContract
let codec = Config.EventCodec.create<Event>()
let codec, codecJe = Config.EventCodec.gen<Event>, Config.EventCodec.genJsonElement<Event>

module Fold =

Expand Down Expand Up @@ -89,7 +89,7 @@ let decideIngestWithCarryForward rules req s : Async<Result<'req, 'result> * Eve
}

/// Manages Application of Requests to the Period's stream, including closing preceding periods as appropriate
type Service internal (resolve : Equinox.ResolveOption option -> PeriodId -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve : PeriodId -> Equinox.Decider<Events.Event, Fold.State>) =

let calcBalance state =
let createEventsBalance items : Events.Balance = { items = items }
Expand All @@ -103,20 +103,20 @@ type Service internal (resolve : Equinox.ResolveOption option -> PeriodId -> Equ
{ getIncomingBalance = fun () -> close periodId
decideIngestion = fun () _state -> (), (), []
decideCarryForward = fun () -> genBalance } // always close
let decider = resolve (Some Equinox.AllowStale) periodId
let decider = resolve periodId
let decide' s = async {
let! r, es = decideIngestWithCarryForward rules () s
return Option.get r.carryForward, es }
decider.Transact(decide')
decider.TransactAsync(decide', load = Equinox.AllowStale)

/// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary
let tryTransact periodId getIncoming (decide : 'request -> Fold.State -> 'request * 'result * Events.Event list) request shouldClose : Async<Result<'request, 'result>> =
let rules : Rules<'request, 'result> =
{ getIncomingBalance = getIncoming
decideIngestion = fun request state -> let residual, result, events = decide request state in residual, result, events
decideCarryForward = fun res state -> async { if shouldClose res then return! genBalance state else return None } } // also close, if we should
let decider = resolve (Some Equinox.AllowStale) periodId
decider.Transact(decideIngestWithCarryForward rules request)
let decider = resolve periodId
decider.TransactAsync(decideIngestWithCarryForward rules request, load = Equinox.AllowStale)

/// Runs the decision function on the specified Period, closing and bringing forward balances from preceding Periods if necessary
/// Processing completes when `decide` yields None for the residual of the 'request
Expand All @@ -135,19 +135,15 @@ type Service internal (resolve : Equinox.ResolveOption option -> PeriodId -> Equ
/// Exposes the full state to a reader (which is appropriate for a demo but is an anti-pattern in the general case)
/// NOTE unlike for the Transact method, we do not supply ResolveOption.AllowStale, which means we'll see updates from other instances
member _.Read periodId =
let decider = resolve None periodId
let decider = resolve periodId
decider.Query id

module Config =

let private resolveStream opt = function
| Config.Store.Memory store ->
let cat = Config.Memory.create Events.codec Fold.initial Fold.fold store
fun sn -> cat.Resolve(sn, ?option = opt)
let private (|Category|) = function
| Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store
| Config.Store.Cosmos (context, cache) ->
// Not using snapshots, on the basis that the writes are all coming from this process, so the cache will be sufficient
// to make reads cheap enough, with the benefit of writes being cheaper as you're not paying to maintain the snapshot
let cat = Config.Cosmos.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
fun sn -> cat.Resolve(sn, ?option = opt)
let private resolveDecider store opt = streamName >> resolveStream opt store >> Config.createDecider
let create = resolveDecider >> Service
Config.Cosmos.createUnoptimized Events.codecJe Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamName >> Config.resolveDecider cat)
20 changes: 20 additions & 0 deletions equinox-shipping/Watchdog.Integration/DynamoConnections.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Shipping.Watchdog.Integration

open Equinox.DynamoStore
open Shipping.Watchdog.Infrastructure

type DynamoConnections(serviceUrl, accessKey, secretKey, table, indexTable) =
let requestTimeout, retries = System.TimeSpan.FromSeconds 5., 5
let connector = DynamoStoreConnector(serviceUrl, accessKey, secretKey, requestTimeout, retries)
let client = connector.CreateClient()
let storeClient = DynamoStoreClient(client, table)
let storeContext = storeClient |> DynamoStoreContext.create
let cache = Equinox.Cache ("Tests", sizeMb = 10)

new (c : Shipping.Watchdog.Program.Configuration) = DynamoConnections(c.DynamoServiceUrl, c.DynamoAccessKey, c.DynamoSecretKey, c.DynamoTable, c.DynamoIndexTable)
new () = DynamoConnections(Shipping.Watchdog.Program.Configuration EnvVar.tryGet)

member val IndexClient = DynamoStoreClient(client, match indexTable with Some x -> x | None -> table + "-index")
member val StoreContext = storeContext
member _.DynamoStore = (storeContext, cache)
member _.Store = Shipping.Domain.Config.Store<Core.EncodedBody>.Dynamo (storeContext, cache)
Loading