Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Track updates, CategoryName #134

Merged
merged 14 commits into from
Nov 25, 2023
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Changed

- Target `Equinox` v `4.0.0-rc.13`, `Propulsion` v `3.0.0-rc.8.10`, `FsCodec` v `3.0.0-rc.11.1` [#131](https://github.com/jet/dotnet-templates/pull/131)
- Target `Equinox` v `4.0.0-rc.14.5`, `Propulsion` v `3.0.0-rc.9.11`, `FsCodec` v `3.0.0-rc.14.1` [#131](https://github.com/jet/dotnet-templates/pull/131)

### Removed

Expand Down
8 changes: 4 additions & 4 deletions equinox-patterns/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

<ItemGroup>
<!-- Equinox.Core.Batching -->
<PackageReference Include="Equinox.Core" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.11.1" />
<PackageReference Include="Equinox.Core" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.14.1" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion equinox-patterns/Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Codec =
module Memory =

let create name codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial)

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20
let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/ContainerTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ open Swensen.Unquote
let [<Property>] ``events roundtrip`` (x: Events.Event) =
let ee = Events.codec.Encode((), x)
let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
let des = Events.codec.TryDecode e
let des = Events.codec.Decode e
test <@ des = ValueSome x @>
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />

<PackageReference Include="FsCheck.Xunit" Version="3.0.0-beta2" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.8.10" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.9.11" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/FinalizationProcessTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Properties(testOutput) =
test <@ res1 && set eventTypes = set expectedEvents @>
let containerEvents =
buffer.Queue(Container.Reactions.streamName containerId1)
|> Seq.chooseV (FsCodec.Deflate.EncodeUncompressed Container.Events.codec).TryDecode
|> Seq.chooseV (FsCodec.Compression.EncodeUncompressed Container.Events.codec).Decode
|> List.ofSeq
test <@ match containerEvents with
| [ Container.Events.Finalized e ] -> e.shipmentIds = requestedShipmentIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ open Swensen.Unquote
let [<Property>] ``events roundtrip`` (x: Events.Event) =
let ee = Events.codec.Encode((), x)
let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
let des = Events.codec.TryDecode e
let des = Events.codec.Decode e
test <@ des = ValueSome x @>
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/ShipmentTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ open Swensen.Unquote
let [<Property>] ``events roundtrip`` (x: Events.Event) =
let ee = Events.codec.Encode((), x)
let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
let des = Events.codec.TryDecode e
let des = Events.codec.Decode e
test <@ des = ValueSome x @>
18 changes: 8 additions & 10 deletions equinox-shipping/Domain/Container.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
module Shipping.Domain.Container

module private Stream =
let [<Literal>] Category = "Container"
let id = FsCodec.StreamId.gen ContainerId.toString
let name = id >> FsCodec.StreamName.create Category
let [<Literal>] private CategoryName = "Container"
let private streamId = FsCodec.StreamId.gen ContainerId.toString

module Reactions =
let streamName = Stream.name
let streamName = streamId >> FsCodec.StreamName.create CategoryName

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -44,8 +42,8 @@ type Service internal (resolve: ContainerId -> Equinox.Decider<Events.Event, Fol
module Factory =

let private (|Category|) = function
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
10 changes: 5 additions & 5 deletions equinox-shipping/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.13" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.11.1" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.14.5" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.14.5" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.14.1" />
</ItemGroup>

</Project>
26 changes: 12 additions & 14 deletions equinox-shipping/Domain/FinalizationTransaction.fs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
module Shipping.Domain.FinalizationTransaction

module private Stream =
let [<Literal>] Category = "FinalizationTransaction"
let id = FsCodec.StreamId.gen TransactionId.toString
let decodeId = FsCodec.StreamId.dec TransactionId.parse
let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId
let [<Literal>] private CategoryName = "FinalizationTransaction"
let private streamId = FsCodec.StreamId.gen TransactionId.toString
let private catId = CategoryId(CategoryName, streamId, FsCodec.StreamId.dec TransactionId.parse)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -36,8 +34,8 @@ module Reactions =
/// Used by the Watchdog to infer whether a given event signifies that the processing has reached a terminal state
let isTerminalEvent (encoded: FsCodec.ITimelineEvent<_>) =
encoded.EventType = nameof(Events.Completed)
let [<Literal>] categoryName = Stream.Category
let [<return: Struct>] (|For|_|) = Stream.tryDecode
let [<Literal>] categoryName = CategoryName
let [<return: Struct>] (|For|_|) = catId.TryDecode

module Fold =

Expand Down Expand Up @@ -84,9 +82,9 @@ module Flow =
match state, event with
| Fold.State.Initial, Events.FinalizationRequested _
| Fold.State.Reserving _, Events.RevertCommenced _
| Fold.State.Reserving _, Events.ReservationCompleted _
| Fold.State.Reserving _, Events.ReservationCompleted
| Fold.State.Reverting _, Events.Completed
| Fold.State.Assigning _, Events.AssignmentCompleted _
| Fold.State.Assigning _, Events.AssignmentCompleted
| Fold.State.Assigned _, Events.Completed -> true
| _ -> false

Expand All @@ -107,8 +105,8 @@ type Service internal (resolve: TransactionId -> Equinox.Decider<Events.Event, F
module Factory =

let private (|Category|) = function
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
47 changes: 24 additions & 23 deletions equinox-shipping/Domain/Shipment.fs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
module Shipping.Domain.Shipment

module private Stream =
let [<Literal>] Category = "Shipment"
let id = FsCodec.StreamId.gen ShipmentId.toString
let [<Literal>] private CategoryName = "Shipment"
let private streamId = FsCodec.StreamId.gen ShipmentId.toString

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -30,40 +29,42 @@ module Fold =
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let toSnapshot (state: State) = Events.Snapshotted {| reservation = state.reservation; association = state.association |}

let decideReserve transactionId: Fold.State -> bool * Events.Event[] = function
| { reservation = Some r } when r = transactionId -> true, [||]
| { reservation = None } -> true, [| Events.Reserved {| transaction = transactionId |} |]
| _ -> false, [||]
module Decisions =

let reserve transactionId: Fold.State -> bool * Events.Event[] = function
| { reservation = Some r } when r = transactionId -> true, [||]
| { reservation = None } -> true, [| Events.Reserved {| transaction = transactionId |} |]
| _ -> false, [||]

let interpretRevoke transactionId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Revoked |]
| _ -> [||] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId
let revoke transactionId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Revoked |]
| _ -> [||] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId

let interpretAssign transactionId containerId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Assigned {| container = containerId |} |]
| _ -> [||] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned
let assign transactionId containerId: Fold.State -> Events.Event[] = function
| { reservation = Some r; association = None } when r = transactionId ->
[| Events.Assigned {| container = containerId |} |]
| _ -> [||] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned

type Service internal (resolve: ShipmentId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.TryReserve(shipmentId, transactionId): Async<bool> =
let decider = resolve shipmentId
decider.Transact(decideReserve transactionId)
decider.Transact(Decisions.reserve transactionId)

member _.Revoke(shipmentId, transactionId): Async<unit> =
let decider = resolve shipmentId
decider.Transact(interpretRevoke transactionId)
decider.Transact(Decisions.revoke transactionId)

member _.Assign(shipmentId, containerId, transactionId): Async<unit> =
let decider = resolve shipmentId
decider.Transact(interpretAssign transactionId containerId)
decider.Transact(Decisions.assign transactionId containerId)

module Factory =

let private (|Category|) = function
| Store.Config.Memory store -> Store.Memory.create Stream.Category Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(Stream.id >> Store.createDecider cat)
| Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
4 changes: 2 additions & 2 deletions equinox-shipping/Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module Codec =
module Memory =

let create name codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Deflate.EncodeUncompressed codec, fold, initial)
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial)

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20
let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
Expand All @@ -40,7 +40,7 @@ module Dynamo =
open Equinox.DynamoStore

let private createCached name codec initial fold accessStrategy (context, cache) =
DynamoStoreCategory(context, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, cacheStrategy cache)
DynamoStoreCategory(context, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, cacheStrategy cache)

let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) =
let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot)
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain/TransactionWatchdog.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ let fold: Events.Categorization seq -> Fold.State =

let (|TransactionStatus|) (codec: #FsCodec.IEventCodec<_, _, _>) events: Fold.State =
events
|> Seq.choose (codec.TryDecode >> function ValueSome x -> Some x | ValueNone -> None)
|> Seq.chooseV codec.Decode
|> fold

module Finalization =
Expand Down
9 changes: 9 additions & 0 deletions equinox-shipping/Domain/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@ module TransactionId =
let parse (x: string): TransactionId = %x
let (|Parse|) = parse

module Seq =

let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () }

module Guid =

let inline toStringN (x: System.Guid) = x.ToString "N"
let generateStringN () = let g = System.Guid.NewGuid() in toStringN g

/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers
type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) =
member _.StreamName = gen >> FsCodec.StreamName.create name
member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec
2 changes: 0 additions & 2 deletions equinox-shipping/Watchdog.Integration/CosmosConnector.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Shipping.Watchdog.Integration

open Shipping.Infrastructure

type CosmosConnector(connectionString, databaseId, containerId) =

let discovery = connectionString |> Equinox.CosmosStore.Discovery.ConnectionString
Expand Down
2 changes: 0 additions & 2 deletions equinox-shipping/Watchdog.Integration/DynamoConnector.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Shipping.Watchdog.Integration

open Shipping.Infrastructure

type DynamoConnector(connector: Equinox.DynamoStore.DynamoStoreConnector, table, indexTable) =

let client = connector.CreateClient()
Expand Down
1 change: 0 additions & 1 deletion equinox-shipping/Watchdog.Integration/EsdbConnector.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Shipping.Watchdog.Integration

open Shipping.Infrastructure
open System

type EsdbConnector(connection, credentials) =
Expand Down
8 changes: 2 additions & 6 deletions equinox-shipping/Watchdog.Integration/ReactorFixture.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ namespace Shipping.Watchdog.Integration

open Propulsion.Internal // IntervalTimer etc
open Shipping.Domain.Tests
open Shipping.Infrastructure
open Shipping.Watchdog
open System

Expand Down Expand Up @@ -38,10 +37,7 @@ type FixtureBase(messageSink, store, dumpStats, createSourceConfig) =
if stats.StatsInterval.RemainingMs > 3000 then
stats.StatsInterval.Trigger()
stats.StatsInterval.SleepUntilTriggerCleared()
member _.Await(propagationDelay) =
match awaitReactions with
| Some f -> f propagationDelay |> Async.ofTask
| None -> async { () }
member _.Await(propagationDelay) = awaitReactions propagationDelay |> Async.ofTask

interface IDisposable with

Expand Down Expand Up @@ -80,7 +76,7 @@ module CosmosReactor =
let store, monitored, leases = conn.Connect()
let createSourceConfig consumerGroupName =
let checkpointConfig = CosmosFeedConfig.Ephemeral consumerGroupName
SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval)
SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, TimeSpan.FromSeconds 60.)
new Fixture(messageSink, store, createSourceConfig)
member _.NullWait(_arguments) = async.Zero () // We could wire up a way to await all tranches having caught up, but not implemented yet
member val private Timeout = if System.Diagnostics.Debugger.IsAttached then TimeSpan.FromHours 1. else TimeSpan.FromMinutes 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<!-- jsii Roslyn analyzers (un-comment to obtain compile-time checks for missing required props-->
<!-- <PackageReference Include="Amazon.Jsii.Analyzers" Version="*" PrivateAssets="all" />-->

<PackageReference Include="Propulsion.DynamoStore.Constructs" Version="3.0.0-rc.8.10" />
<PackageReference Include="Propulsion.DynamoStore.Constructs" Version="3.0.0-rc.9.11" />
</ItemGroup>

</Project>
1 change: 0 additions & 1 deletion equinox-shipping/Watchdog.Lambda/Function.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ open Amazon.Lambda.SQSEvents
open Equinox.DynamoStore
open Serilog
open Shipping.Domain
open Shipping.Infrastructure
open Shipping.Watchdog
open System

Expand Down
Loading