Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port to Propulsion.CosmosStore 2.11.0-rc1/Equinox.CosmosStore 3.0.1 #91

Merged
merged 25 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Target `Propulsion` v `2.11.0-rc1`, `Equinox` v `3.0.1`

### Removed
### Fixed

Expand Down
203 changes: 107 additions & 96 deletions dotnet-templates.sln

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions equinox-patterns/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.MemoryStore" Version="3.0.0" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.0" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />

<PackageReference Include="Equinox.MemoryStore" Version="2.6.0" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="FsCheck.Xunit" Version="2.14.2" />
<PackageReference Include="unquote" Version="5.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
12 changes: 6 additions & 6 deletions equinox-shipping/Domain.Tests/Fixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ module FinalizationTransaction =
module MemoryStore =
open Equinox.MemoryStore
let create store =
let resolver = Resolver(store, Events.codec, Fold.fold, Fold.initial)
create resolver.Resolve
let cat = MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial)
create cat.Resolve
module Container =
open Container
module MemoryStore =
open Equinox.MemoryStore
let create store =
let resolver = Resolver(store, Events.codec, Fold.fold, Fold.initial)
create resolver.Resolve
let cat = MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial)
create cat.Resolve
module Shipment =
open Shipment
module MemoryStore =
open Equinox.MemoryStore
let create store =
let resolver = Resolver(store, Events.codec, Fold.fold, Fold.initial)
create resolver.Resolve
let cat = MemoryStoreCategory(store, Events.codec, Fold.fold, Fold.initial)
create cat.Resolve

let createProcessManager maxDop store =
let transactions = FinalizationTransaction.MemoryStore.create store
Expand Down
8 changes: 4 additions & 4 deletions equinox-shipping/Domain/Container.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@ module Fold =
let interpretFinalize shipmentIds (state : Fold.State): Events.Event list =
[ if Array.isEmpty state.shipmentIds then yield Events.Finalized {| shipmentIds = shipmentIds |} ]

type Service internal (resolve : ContainerId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : ContainerId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Finalize(containerId, shipmentIds) : Async<unit> =
let decider = resolve containerId
decider.Transact(interpretFinalize shipmentIds)

let create resolveStream =
let resolve id = Equinox.Stream(Serilog.Log.ForContext<Service>(), resolveStream (streamName id), maxAttempts=3)
let resolve = streamName >> resolveStream >> Equinox.createDecider
Service(resolve)

module Cosmos =

open Equinox.Cosmos
open Equinox.CosmosStore

let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot)
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolver = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve
4 changes: 2 additions & 2 deletions equinox-shipping/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
<WarningLevel>5</WarningLevel>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
Expand All @@ -16,7 +16,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.Cosmos" Version="2.6.0" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
</ItemGroup>

</Project>
10 changes: 5 additions & 5 deletions equinox-shipping/Domain/FinalizationTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,22 @@ let decide (update : Events.Event option) (state : Fold.State) : Action * Events
let state' = Fold.fold state events
nextAction state', events

type Service internal (resolve : TransactionId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : TransactionId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Record(transactionId, update) : Async<Action> =
let decider = resolve transactionId
decider.Transact(decide update)

let create resolveStream =
let resolve id = Equinox.Stream(Serilog.Log.ForContext<Service>(), resolveStream (streamName id), maxAttempts=3)
let resolve = streamName >> resolveStream >> Equinox.createDecider
Service(resolve)

module Cosmos =

open Equinox.Cosmos
open Equinox.CosmosStore

let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot)
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve
let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create cat.Resolve
10 changes: 5 additions & 5 deletions equinox-shipping/Domain/Shipment.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let interpretAssign transactionId containerId : Fold.State -> Events.Event list
[ Events.Assigned {| container = containerId |} ]
| _ -> [] // Ignore if a) this transaction was not the one reserving it or b) it's already been assigned

type Service internal (resolve : ShipmentId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : ShipmentId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.TryReserve(shipmentId, transactionId) : Async<bool> =
let decider = resolve shipmentId
Expand All @@ -60,15 +60,15 @@ type Service internal (resolve : ShipmentId -> Equinox.Stream<Events.Event, Fold
decider.Transact(interpretAssign transactionId containerId)

let create resolveStream =
let resolve id = Equinox.Stream(Serilog.Log.ForContext<Service>(), resolveStream (streamName id), maxAttempts=3)
let resolve = streamName >> resolveStream >> Equinox.createDecider
Service(resolve)

module Cosmos =

open Equinox.Cosmos
open Equinox.CosmosStore

let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.toSnapshot)
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve
let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create cat.Resolve
15 changes: 14 additions & 1 deletion equinox-shipping/Domain/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,17 @@ type TransactionId = string<transactionId>
module TransactionId =
let toString (x : TransactionId) : string = %x
let parse (x : string) = %x
let (|Parse|) = parse
let (|Parse|) = parse

namespace global

module Log =

let forMetrics () =
Serilog.Log.ForContext("isMetric", true)

module Equinox =

let createDecider stream =
Equinox.Decider(Log.forMetrics (), stream, maxAttempts = 3)

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />

<PackageReference Include="Equinox.MemoryStore" Version="2.6.0" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="FsCheck.Xunit" Version="2.14.2" />
<PackageReference Include="unquote" Version="5.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
4 changes: 2 additions & 2 deletions equinox-shipping/Watchdog/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ let isRelevant = function
| FinalizationTransaction.Match _ -> true
| _ -> false

let transformOrFilter (changeFeedDocument: Microsoft.Azure.Documents.Document) : Propulsion.Streams.StreamEvent<_> seq = seq {
for batch in Propulsion.Cosmos.EquinoxCosmosParser.enumStreamEvents changeFeedDocument do
let transformOrFilter changeFeedDocument : Propulsion.Streams.StreamEvent<_> seq = seq {
for batch in Propulsion.CosmosStore.EquinoxNewtonsoftParser.enumStreamEvents changeFeedDocument do
if isRelevant batch.stream then
yield batch
}
Expand Down
43 changes: 32 additions & 11 deletions equinox-shipping/Watchdog/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,12 @@ module EnvVar =
[<System.Runtime.CompilerServices.Extension>]
type LoggerConfigurationExtensions() =

[<System.Runtime.CompilerServices.Extension>]
static member inline ExcludeChangeFeedProcessorV2InternalDiagnostics(c : LoggerConfiguration) =
let isCfp429a = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.LeaseManagement.DocumentServiceLeaseUpdater").Invoke
let isCfp429b = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement.LeaseRenewer").Invoke
let isCfp429c = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement.PartitionLoadBalancer").Invoke
let isCfp429d = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.PartitionProcessor").Invoke
let isCfp x = isCfp429a x || isCfp429b x || isCfp429c x || isCfp429d x
c.Filter.ByExcluding(fun x -> isCfp x)

[<System.Runtime.CompilerServices.Extension>]
static member inline ConfigureChangeFeedProcessorLogging(c : LoggerConfiguration, verbose : bool) =
// LibLog writes to the global logger, so we need to control the emission
let cfpl = if verbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning
// TODO figure out what CFP v3 requires
c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)
|> fun c -> if verbose then c else c.ExcludeChangeFeedProcessorV2InternalDiagnostics()

[<System.Runtime.CompilerServices.Extension>]
type Logging() =
Expand All @@ -37,5 +28,35 @@ type Logging() =
.Enrich.FromLogContext()
|> fun c -> if verbose = Some true then c.MinimumLevel.Debug() else c
|> fun c -> c.ConfigureChangeFeedProcessorLogging((changeFeedProcessorVerbose = Some true))
|> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId,2} {Message:lj} {NewLine}{Exception}"
|> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj} {NewLine}{Exception}"
c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)

type Equinox.CosmosStore.CosmosStoreConnector with

member private x.LogConfiguration(connectionName, databaseId, containerId) =
let o = x.Options
let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests
Log.Information("CosmosDb {name} {mode} {endpointUri} timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
connectionName, o.ConnectionMode, x.Endpoint, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds)
Log.Information("CosmosDb {name} Database {database} Container {container}",
connectionName, databaseId, containerId)

/// Use sparingly; in general one wants to use CreateAndInitialize to avoid slow first requests
member x.CreateUninitialized(databaseId, containerId) =
x.CreateUninitialized().GetDatabase(databaseId).GetContainer(containerId)

/// Connect a CosmosStoreClient, including warming up
member x.ConnectStore(connectionName, databaseId, containerId) =
x.LogConfiguration(connectionName, databaseId, containerId)
Equinox.CosmosStore.CosmosStoreClient.Connect(x.CreateAndInitialize, databaseId, containerId)

/// Creates a CosmosClient suitable for running a CFP via CosmosStoreSource
member x.ConnectMonitored(databaseId, containerId, ?connectionName) =
x.LogConfiguration(defaultArg connectionName "Source", databaseId, containerId)
x.CreateUninitialized(databaseId, containerId)

/// Connects to a Store as both a ChangeFeedProcessor Monitored Container and a CosmosStoreClient
member x.ConnectStoreAndMonitored(databaseId, containerId) =
let monitored = x.ConnectMonitored(databaseId, containerId, "Main")
let storeClient = Equinox.CosmosStore.CosmosStoreClient(monitored.Database.Client, databaseId, containerId)
storeClient, monitored
Loading