Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 26, 2023
1 parent 584336b commit 232150f
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 25 deletions.
2 changes: 1 addition & 1 deletion equinox-shipping/Watchdog/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ module Dynamo =
| Choice2Of2 (serviceUrl, accessKey, secretKey) ->
Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries)
let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
member val Verbose = p.Contains Verbose
// member val Verbose = p.Contains Verbose
member _.Connect() = connector.CreateClient().CreateContext("Main", table)

type [<RequireQualifiedAccess; NoComparison; NoEquality>]
Expand Down
14 changes: 6 additions & 8 deletions propulsion-projector/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Configuration(tryGet: string -> string option) =
member x.DynamoTable = x.get TABLE
member x.DynamoRegion = x.tryGet REGION

#if esdb
// #if esdb
open Argu

module Cosmos =
Expand Down Expand Up @@ -63,8 +63,8 @@ module Cosmos =
let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode)
let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase)
let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer)
member val Verbose = p.Contains Verbose
member _.Connect() = connector.ConnectStore("Target", database, container)
// member val Verbose = p.Contains Verbose
member _.Connect() = connector.ConnectContext("Target", database, container)

module Dynamo =

Expand Down Expand Up @@ -108,8 +108,6 @@ module Dynamo =
| Choice2Of2 (serviceUrl, accessKey, secretKey) ->
Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries)
let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
member val Verbose = p.Contains Verbose
member _.Connect() = connector.LogConfiguration()
let client = connector.CreateClient()
client.ConnectStore("Main", table)
#endif
// member val Verbose = p.Contains Verbose
member _.CreateContext() = connector.CreateClient().CreateContext("Main", table)
// #endif
22 changes: 11 additions & 11 deletions propulsion-projector/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Logging() =
|> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u1}] {Message:lj} {Properties:j}{NewLine}{Exception}"
c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)

// #if (cosmos || esdb || sss)
//#if (cosmos || esdb || sss)
module CosmosStoreConnector =

let private get (role: string) (client: Microsoft.Azure.Cosmos.CosmosClient) databaseId containerId =
Expand All @@ -40,12 +40,17 @@ type Equinox.CosmosStore.CosmosStoreConnector with
member private x.CreateAndInitialize(role, databaseId, containers) =
x.LogConfiguration(role, databaseId, containers)
x.CreateAndInitialize(databaseId, containers)
member private x.Connect(role, databaseId, containers) =
x.LogConfiguration(role, databaseId, containers)
x.Connect(databaseId, containers)
member x.ConnectFeed(databaseId, containerId, auxContainerId, ?role) = async {
let! cosmosClient = x.CreateAndInitialize(defaultArg role "Source", databaseId, [| containerId; auxContainerId|])
return CosmosStoreConnector.getSourceAndLeases cosmosClient databaseId containerId auxContainerId }
member x.ConnectContext(role, databaseId, containerId) =
x.Connect(role, databaseId, [| containerId |])

// #endif
// #if (dynamo || esdb || sss)
//#endif
//#if (dynamo || esdb || sss)
module Dynamo =

open Equinox.DynamoStore
Expand All @@ -61,14 +66,9 @@ module Dynamo =

type Equinox.DynamoStore.DynamoStoreConnector with

member private x.LogConfiguration() =
Log.Information("DynamoDB {endpoint} Timeout {timeoutS}s Retries {retries}",
x.Endpoint, (let t = x.Timeout in t.TotalSeconds), x.Retries)

member x.CreateClient() =
x.LogConfiguration()
x.CreateDynamoDbClient()
|> Equinox.DynamoStore.DynamoStoreClient
Log.Information("DynamoDB {endpoint} Timeout {timeoutS}s Retries {retries}", x.Endpoint, (let t = x.Timeout in t.TotalSeconds), x.Retries)
Equinox.DynamoStore.DynamoStoreClient <| x.CreateDynamoDbClient()

type Equinox.DynamoStore.DynamoStoreClient with

Expand All @@ -84,4 +84,4 @@ type Equinox.DynamoStore.DynamoStoreContext with
let checkpointInterval = defaultArg checkpointInterval (TimeSpan.FromHours 1.)
Propulsion.Feed.ReaderCheckpoint.DynamoStore.create log (consumerGroupName, checkpointInterval) (context, cache)

// #endif
//#endif
1 change: 0 additions & 1 deletion propulsion-projector/Projector.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<WarningLevel>5</WarningLevel>
<DefineConstants>dynamo</DefineConstants>
</PropertyGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions propulsion-projector/SourceArgs.fs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ module Esdb =
member x.ConnectTarget(cache): Store.Config =
match p.GetSubCommand() with
| Cosmos a ->
let context = Args.Cosmos.Arguments(c, a).Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
let context = Args.Cosmos.Arguments(c, a).Connect() |> Async.RunSynchronously
Store.Config.Cosmos (context, cache)
| Dynamo a ->
let context = Args.Dynamo.Arguments(c, a).Connect() |> DynamoStoreContext.create
let context = Args.Dynamo.Arguments(c, a).CreateContext()
Store.Config.Dynamo (context, cache)
| _ -> Args.missingArg "Must specify `cosmos` or `dynamo` checkpoint store when source is `esdb`"

Expand Down
4 changes: 2 additions & 2 deletions propulsion-reactor/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ module Cosmos =
let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode)
let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase)
let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer)
member val Verbose = p.Contains Verbose
// member val Verbose = p.Contains Verbose
member _.Connect() = connector.ConnectContext("Target", database, container)

module Dynamo =
Expand Down Expand Up @@ -119,6 +119,6 @@ module Dynamo =
| Choice2Of2 (serviceUrl, accessKey, secretKey) ->
Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries)
let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
member val Verbose = p.Contains Verbose
// member val Verbose = p.Contains Verbose
member _.Connect() = connector.CreateClient().CreateContext("Main", table)
#endif

0 comments on commit 232150f

Please sign in to comment.