Skip to content

Commit

Permalink
Straggler SourceConfig changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 25, 2023
1 parent 2d00823 commit ccd6d32
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 24 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Changed

- Target `Equinox` v `4.0.0-rc.14.1`, `Propulsion` v `3.0.0-rc.9.1`, `FsCodec` v `3.0.0-rc.13` [#131](https://github.com/jet/dotnet-templates/pull/131)
- Target `Equinox` v `4.0.0-rc.14.5`, `Propulsion` v `3.0.0-rc.9.11`, `FsCodec` v `3.0.0-rc.14.1` [#131](https://github.com/jet/dotnet-templates/pull/131)

### Removed

Expand Down Expand Up @@ -57,7 +57,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Changed

- Target `Equinox` v `4.0.0-rc.7`, `Propulsion` v `3.0.0-rc.1`, `FsCodec` v `3.0.0-rc.9.1`, `net6.0` [#126](https://github.com/jet/dotnet-templates/pull/126) [#127](https://github.com/jet/dotnet-templates/pull/127)
- Target `Equinox` v `4.0.0-rc.7`, `Propulsion` v `3.0.0-rc.1`, `FsCodec` v `3.0.0-rc.9`, `net6.0` [#126](https://github.com/jet/dotnet-templates/pull/126) [#127](https://github.com/jet/dotnet-templates/pull/127)

### Removed

Expand Down
4 changes: 2 additions & 2 deletions equinox-shipping/Domain/FinalizationTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ module Flow =
match state, event with
| Fold.State.Initial, Events.FinalizationRequested _
| Fold.State.Reserving _, Events.RevertCommenced _
| Fold.State.Reserving _, Events.ReservationCompleted _
| Fold.State.Reserving _, Events.ReservationCompleted
| Fold.State.Reverting _, Events.Completed
| Fold.State.Assigning _, Events.AssignmentCompleted _
| Fold.State.Assigning _, Events.AssignmentCompleted
| Fold.State.Assigned _, Events.Completed -> true
| _ -> false

Expand Down
5 changes: 3 additions & 2 deletions propulsion-consumer/Examples.fs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ module MultiStreams =
| SavedForLater.Reactions.Decode (id, events) ->
let s = match saves.TryGetValue id with true, value -> value | false, _ -> []
SavedForLaterEvents (id, s, events)
| FsCodec.StreamName.Split (categoryName, _), events -> OtherCategory struct (categoryName, Array.length events)
| FsCodec.StreamName.Split (categoryName, _), events ->
OtherCategory struct (categoryName, Array.length events)

// each event is guaranteed to only be supplied once by virtue of having been passed through the Streams Scheduler
member _.Handle(streamName: FsCodec.StreamName, events: Propulsion.Sinks.Event[]) = async {
Expand Down Expand Up @@ -182,7 +183,7 @@ module MultiMessages =
match struct (streamName, Array.ofSeq raw) with
| Favorites.Reactions.Decode (_, events) -> yield! events |> Seq.map Fave
| SavedForLater.Reactions.Decode (_, events) -> yield! events |> Seq.map Save
| FsCodec.StreamName.Split (otherCategoryName, _), events -> yield OtherCat (otherCategoryName, events.Length) }
| FsCodec.StreamName.Split (otherCategoryName, _), events -> OtherCat (otherCategoryName, events.Length) }

// NB can be called in parallel, so must be thread-safe
member x.Handle(streamName: FsCodec.StreamName, spanJson: string) =
Expand Down
8 changes: 4 additions & 4 deletions propulsion-hotel/Domain/GuestStay.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module Fold =
| Events.Paid e -> Active { bal with balance = bal.balance - e.amount; payments = [| yield! bal.payments; e.paymentId |] }
| Events.CheckedOut _ -> Closed
| Events.TransferredToGroup e -> TransferredToGroup {| groupId = e.groupId; amount = e.residualBalance |}
| Closed _ | TransferredToGroup _ -> invalidOp "No events allowed after CheckedOut/TransferredToGroup"
| Closed | TransferredToGroup _ -> invalidOp "No events allowed after CheckedOut/TransferredToGroup"
let fold: State -> Events.Event seq -> State = Seq.fold evolve

module Decide =
Expand All @@ -48,16 +48,16 @@ module Decide =
let checkin at = function
| Active { checkedInAt = None } -> [ Events.CheckedIn {| at = at |} ]
| Active { checkedInAt = Some t } when t = at -> []
| Active _ | Closed _ | TransferredToGroup _ -> invalidOp "Invalid checkin"
| Active _ | Closed | TransferredToGroup _ -> invalidOp "Invalid checkin"

let charge at chargeId amount = function
| Closed _ | TransferredToGroup _ -> invalidOp "Cannot record charge for Closed account"
| Closed | TransferredToGroup _ -> invalidOp "Cannot record charge for Closed account"
| Active bal ->
if bal.charges |> Array.contains chargeId then [||]
else [| Events.Charged {| at = at; chargeId = chargeId; amount = amount |} |]

let payment at paymentId amount = function
| Closed _ | TransferredToGroup _ -> invalidOp "Cannot record payment for not opened account" // TODO fix message at source
| Closed | TransferredToGroup _ -> invalidOp "Cannot record payment for not opened account" // TODO fix message at source
| Active bal ->
if bal.payments |> Array.contains paymentId then [||]
else [| Events.Paid {| at = at; paymentId = paymentId; amount = amount |} |]
Expand Down
2 changes: 1 addition & 1 deletion propulsion-indexer/Indexer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ let build (args: Args.Arguments) = async {
let source =
let startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams
Propulsion.CosmosStore.CosmosStoreSource(Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink,
startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start()
startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start()
return sink, source }

open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException
Expand Down
10 changes: 4 additions & 6 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Args =
let buildSourceConfig log groupName =
let startFromTail, maxItems, tailSleepInterval, lagFrequency = a.MonitoringParams
let checkpointConfig = CosmosFeedConfig.Persistent (groupName, startFromTail, maxItems, lagFrequency)
SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval)
SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, x.StatsInterval)
buildSourceConfig, x.Sink, ignore
#endif
// #if dynamo
Expand Down Expand Up @@ -165,15 +165,13 @@ let build (args: Args.Arguments) =
#if (cosmos && parallelOnly)
// Custom logic for establishing the source, as we're not projecting StreamEvents - TODO could probably be generalized
let source =
let mapToStreamItems (x: System.Collections.Generic.IReadOnlyCollection<'a>): seq<'a> = upcast x
let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
match buildSourceConfig Log.Logger consumerGroupName with SourceConfig.Cosmos (monitoredContainer, leasesContainer, checkpoints, tailSleepInterval: TimeSpan) ->
match buildSourceConfig Log.Logger consumerGroupName with SourceConfig.Cosmos (monitoredContainer, leasesContainer, checkpoints, tailSleepInterval: TimeSpan, statsInterval) ->
match checkpoints with
| Ephemeral _ -> failwith "Unexpected"
| Persistent (processorName, startFromTail, maxItems, lagFrequency) ->

Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitoredContainer, leasesContainer, consumerGroupName, observer,
startFromTail = startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
Propulsion.CosmosStore.CosmosStoreSource(Log.Logger, statsInterval, monitoredContainer, leasesContainer, consumerGroupName, Handler.mapToStreamItems, sink,
startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start()
#else
let source, _awaitReactions =
let sourceConfig = buildSourceConfig Log.Logger consumerGroupName
Expand Down
2 changes: 1 addition & 1 deletion propulsion-projector/SourceArgs.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ module Cosmos =
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 30."

| LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`."
| FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxItems _ -> "maximum item count to supply for the Change Feed query. Default: use response size limit"
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1"
type Arguments(c: Args.Configuration, p: ParseResults<Parameters>) =
Expand Down
9 changes: 5 additions & 4 deletions propulsion-projector/SourceConfig.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type SourceConfig =
* leasesContainer: Microsoft.Azure.Cosmos.Container
* checkpoints: CosmosFeedConfig
* tailSleepInterval: TimeSpan
* statsInterval: TimeSpan
#endif
// #if dynamo
| Dynamo of indexContext: Equinox.DynamoStore.DynamoStoreContext
Expand Down Expand Up @@ -83,7 +84,7 @@ module SourceConfig =
#if esdb
module Esdb =
open Propulsion.EventStoreDb
let start log (sink: Propulsion.Sinks.Sink) (categories: string[])
let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[])
(client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task<unit>) =
let source =
EventStoreSource(
Expand All @@ -96,7 +97,7 @@ module SourceConfig =
#if sss
module Sss =
open Propulsion.SqlStreamStore
let start log (sink: Propulsion.Sinks.Sink) (categories: string[])
let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[])
(client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task<unit>) =
let source =
SqlStreamStoreSource(
Expand All @@ -108,8 +109,8 @@ module SourceConfig =
#endif
let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task<unit>) = function
#if cosmos
| SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) ->
Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval)
| SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) ->
Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval)
#endif
// #if dynamo
| SourceConfig.Dynamo (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) ->
Expand Down
4 changes: 2 additions & 2 deletions propulsion-reactor/Contract.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ let ofState (state: Todo.Fold.State): SummaryInfo =
#if blank
module Input =

let [<Literal>] Category = "CategoryName"
let [<Literal>] CategoryName = "CategoryName"
let decodeId = FsCodec.StreamId.dec ClientId.parse
let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId
let tryDecode = FsCodec.StreamName.tryFind CategoryName >> ValueOption.map decodeId

type Value = { field: int }
type Event =
Expand Down

0 comments on commit ccd6d32

Please sign in to comment.