Skip to content

Commit

Permalink
fix(DynamoStore): Events missed from index
Browse files Browse the repository at this point in the history
  • Loading branch information
epNickColeman authored and bartelink committed Jul 26, 2023
1 parent f05639f commit 35dfe3f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/Propulsion.DynamoStore.Indexer/Function.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Configuration(?tryGet) =
member _.DynamoAccessKey = get Propulsion.DynamoStore.Lambda.Args.Dynamo.ACCESS_KEY
member _.DynamoSecretKey = get Propulsion.DynamoStore.Lambda.Args.Dynamo.SECRET_KEY
member _.DynamoIndexTable = get Propulsion.DynamoStore.Lambda.Args.Dynamo.INDEX_TABLE
member _.OnlyWarnGap = tryGet Propulsion.DynamoStore.Lambda.Args.Dynamo.ONLY_WARN_GAP |> Option.map bool.Parse

type Store(connector : DynamoStoreConnector, table, dynamoItemSizeCutoffBytes) =
let queryMaxItems = 100
Expand Down Expand Up @@ -50,7 +51,7 @@ type Function() =
.Enrich.With({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt, _) = removeMetrics evt })
.WriteTo.Console(outputTemplate = template)
.CreateLogger()
let ingester = DynamoStoreIngester(log, store.Context)
let ingester = DynamoStoreIngester(log, store.Context, ?onlyWarnOnGap = config.OnlyWarnGap)

member _.Handle(dynamoEvent : DynamoDBEvent, _context : ILambdaContext) : System.Threading.Tasks.Task =
Handler.handle log ingester.Service dynamoEvent
1 change: 1 addition & 0 deletions src/Propulsion.DynamoStore.Lambda/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ module Dynamo =
let [<Literal>] SECRET_KEY = "EQUINOX_DYNAMO_SECRET_ACCESS_KEY"
let [<Literal>] TABLE = "EQUINOX_DYNAMO_TABLE"
let [<Literal>] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX"
let [<Literal>] ONLY_WARN_GAP = "EQUINOX_ONLY_WARN_GAP"
28 changes: 19 additions & 9 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,29 @@ module Fold =

module Ingest =

let (|Start|Append|Discard|) ({ versions = cur } : Fold.State, eventSpan : Events.StreamSpan) =
let (|Start|Append|Discard|Gap|) ({ versions = cur } : Fold.State, eventSpan : Events.StreamSpan) =
match cur.TryGetValue eventSpan.p with
| false, _ -> Start eventSpan
| true, curNext ->
match next eventSpan - curNext with
| appendLen when appendLen > eventSpan.c.Length -> Gap (appendLen - eventSpan.c.Length)
| appendLen when appendLen > 0 -> Append ({ p = eventSpan.p; i = curNext; c = Array.skip (eventSpan.c.Length - appendLen) eventSpan.c } : Events.StreamSpan)
| _ -> Discard

/// Takes a set of spans, flattens them and trims them relative to the currently established per-stream high-watermarks
let tryToIngested state (inputs : Events.StreamSpan seq) : Events.Ingested option =
let tryToIngested onlyWarnOnGap state (inputs: Events.StreamSpan seq): Events.Ingested option =
let started, appended = ResizeArray<Events.StreamSpan>(), ResizeArray<Events.StreamSpan>()
for eventSpan in flatten inputs do
match state, eventSpan with
| Start es -> started.Add es
| Append es -> appended.Add es
| Gap g ->
match onlyWarnOnGap with
| Some (log: Serilog.ILogger) ->
log.Warning("Gap of {gap} at {pos} in {stream}, expect stale state.", g, eventSpan.i, eventSpan.p)
appended.Add eventSpan
| None ->
invalidOp $"Invalid gap of %d{g} at %d{eventSpan.i} in '{eventSpan.p}'"
| Discard -> ()
match started.ToArray(), appended.ToArray() with
| [||], [||] -> None
Expand All @@ -97,11 +105,12 @@ module Ingest =
match state, eventSpan with
| Start es
| Append es -> es
| Gap _ -> eventSpan
| Discard -> () |]
let decide shouldClose (inputs : Events.StreamSpan seq) : _ -> _ * _ = function
let decide onlyWarnOnGap shouldClose (inputs : Events.StreamSpan seq) : _ -> _ * _ = function
| ({ closed = false; versions = cur } as state : Fold.State) ->
let closed, ingested, events =
match tryToIngested state inputs with
match tryToIngested onlyWarnOnGap state inputs with
| None -> false, Array.empty, [||]
| Some diff ->
let closing = shouldClose (diff.app.Length + diff.add.Length + cur.Count)
Expand All @@ -113,27 +122,28 @@ module Ingest =
| { closed = true } as state ->
{ accepted = [||]; closed = true; residual = removeDuplicates state inputs }, [||]

type Service internal (shouldClose, resolve : AppendsPartitionId * AppendsEpochId -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (onlyWarnOnGap, shouldClose, resolve: AppendsPartitionId * AppendsEpochId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.Ingest(partitionId, epochId, spans : Events.StreamSpan[], ?assumeEmpty) : Async<ExactlyOnceIngester.IngestResult<_, _>> =
member _.Ingest(partitionId, epochId, spans: Events.StreamSpan[]) : Async<ExactlyOnceIngester.IngestResult<_, _>> =
let decider = resolve (partitionId, epochId)
if Array.isEmpty spans then async { return { accepted = [||]; closed = false; residual = [||] } } else // special-case null round-trips

let isSelf p = match IndexStreamId.toStreamName p with FsCodec.StreamName.Category c -> c = Category
if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing"
decider.TransactEx((fun c -> (Ingest.decide (shouldClose (c.StreamEventBytes, c.Version))) spans c.State), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AnyCachedValue)
let decide (c: Equinox.ISyncContext<_>) = Ingest.decide onlyWarnOnGap (shouldClose (c.StreamEventBytes, c.Version)) spans c.State
decider.TransactEx(decide, Equinox.AnyCachedValue)

module Config =

let private createCategory (context, cache) = Config.createUnoptimized Category Events.codec Fold.initial Fold.fold (context, Some cache)
let create log (maxBytes : int, maxVersion : int64, maxStreams : int) store =
let create log (maxBytes: int, maxVersion: int64, maxStreams: int, onlyWarnOnGap) store =
let resolve = createCategory store |> Equinox.Decider.forStream log
let shouldClose (totalBytes : int64 voption, version) totalStreams =
let closing = totalBytes.Value > maxBytes || version >= maxVersion || totalStreams >= maxStreams
if closing then log.Information("Epoch Closing v{version}/{maxVersion} {streams}/{maxStreams} streams {kib:f0}/{maxKib:f0} KiB",
version, maxVersion, totalStreams, maxStreams, float totalBytes.Value / 1024., float maxBytes / 1024.)
closing
Service(shouldClose, streamId >> resolve)
Service((if onlyWarnOnGap then Some log else None), shouldClose, streamId >> resolve)

/// Manages the loading of Ingested Span Batches in a given Epoch from a given position forward
/// In the case where we are polling the tail, this should mean we typically do a single round-trip for a point read of the Tip
Expand Down
9 changes: 5 additions & 4 deletions src/Propulsion.DynamoStore/DynamoStoreIndexer.fs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
namespace Propulsion.DynamoStore

type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff, ?maxItemsPerEpoch, ?maxVersion, ?storeLog) =
type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff, ?maxItemsPerEpoch, ?maxVersion, ?storeLog, ?onlyWarnOnGap) =
let maxVersion = defaultArg maxVersion 5_000
let maxStreams = defaultArg maxItemsPerEpoch 100_000
do if maxStreams > AppendsEpoch.MaxItemsPerEpoch then invalidArg (nameof maxStreams) "Cannot exceed AppendsEpoch.MaxItemsPerEpoch"
let storeLog = defaultArg storeLog log
let log = log.ForContext<DynamoStoreIndexer>()
let onlyWarnOnGap = defaultArg onlyWarnOnGap false

let ingester =
let epochs = AppendsEpoch.Config.create storeLog (epochBytesCutoff, maxVersion, maxStreams) (context, cache)
let epochs = AppendsEpoch.Config.create storeLog (epochBytesCutoff, maxVersion, maxStreams, onlyWarnOnGap) (context, cache)
let index = AppendsIndex.Config.create storeLog (context, cache)
let createIngester partitionId =
let log = log.ForContext("partition", partitionId)
Expand All @@ -30,7 +31,7 @@ type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff,
let! originEpoch = ingester.ActiveIngestionEpochId()
return! ingester.IngestMany(originEpoch, spans) |> Async.Ignore }

type DynamoStoreIngester(log, context, ?storeLog) =
type DynamoStoreIngester(log, context, ?storeLog, ?onlyWarnOnGap : bool) =

// Values up to 5 work reasonably, but side effects are:
// - read usage is more 'lumpy'
Expand All @@ -42,4 +43,4 @@ type DynamoStoreIngester(log, context, ?storeLog) =
// (Overusage will hasten the Lambda being killed due to excess memory usage)
let maxCacheMiB = 5
let cache = Equinox.Cache(nameof DynamoStoreIngester, sizeMb = maxCacheMiB)
member val Service = DynamoStoreIndexer(log, context, cache, epochBytesCutoff = epochCutoffMiB * 1024 * 1024, ?storeLog = storeLog)
member val Service = DynamoStoreIndexer(log, context, cache, epochBytesCutoff = epochCutoffMiB * 1024 * 1024, ?storeLog = storeLog, ?onlyWarnOnGap = onlyWarnOnGap)
74 changes: 74 additions & 0 deletions tests/Propulsion.Tests/AppendsEpochTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
module Propulsion.Tests.AppendsEpochTests

open Propulsion.DynamoStore
open Propulsion.DynamoStore.AppendsEpoch
open Serilog
open Swensen.Unquote
open System
open Xunit

let mkSpan sid index cases: Events.StreamSpan = { p = IndexStreamId.ofP sid; i = index; c = cases }
let mkSpanA sid index cases = mkSpan sid index cases |> Array.singleton
let decideIngest' shouldClose spans inputs =
let ({ accepted = accepted; residual = residual }: ExactlyOnceIngester.IngestResult<_ ,_>, events) =
Ingest.decide None shouldClose spans inputs
(accepted, residual), events
let decideIngest = decideIngest' ((<) 10)

[<Fact>]
let ``residual span shouldn't be affected by earlier events in closed spans`` () =
let spans1 = mkSpanA "Cat-Id" 0L [| "a" |]
let spans2 = mkSpanA "Cat-Id" 1L [| "b" |]
let spans3 = mkSpanA "Cat-Id" 2L [| "c" |]

let _, events1 = decideIngest spans1 Fold.initial
let epoch1Closed = (Fold.fold Fold.initial events1).WithClosed()

let _, events2 = decideIngest spans2 Fold.initial
let epoch2Open = Fold.fold Fold.initial events2

let (_, residual1), _ = decideIngest spans3 epoch1Closed

let (accepted2, residual2), _ = decideIngest residual1 epoch2Open

test <@ residual1 = spans3
&& accepted2 = (spans3 |> Array.map (fun {p = p} -> p))
&& residual2 = [||] @>

[<Fact>]
let ``Already ingested events should be removed by ingestion on closed epoch`` () =
let spans1 = mkSpanA "Cat-Id" 0L [| "a"; "a" |]
let spans2 = mkSpanA "Cat-Id" 1L [| "a"; "b" |]

let _, events1 = decideIngest spans1 Fold.initial
let epoch1Closed = (Fold.fold Fold.initial events1).WithClosed()

let (accepted, residual), _ = decideIngest spans2 epoch1Closed

test <@ accepted = [||]
&& residual = mkSpanA "Cat-Id" 2L [| "b" |] @>

[<Fact>]
let ``Already ingested events are not ingested on open epoch`` () =
let streamId = "Cat-Id"
let spans1 = mkSpanA streamId 0L [| "a"; "a" |]
let spans2 = mkSpanA streamId 1L [| "a"; "b" |]

let _, events1 = decideIngest spans1 Fold.initial
let epoch1Closed = (Fold.fold Fold.initial events1)

let (accepted, _), events = decideIngest spans2 epoch1Closed

test <@ accepted = [| IndexStreamId.ofP streamId |]
&& events = [| Events.Ingested { add = [||]; app = mkSpanA "Cat-Id" 2L [| "b" |] } |] @>

[<Fact>]
let ``Gap within epoch, throw?`` () =
let streamId = "Cat-Id"
let spans1 = mkSpanA streamId 0L [| "a" |]
let spans2 = mkSpanA streamId 2L [| "b" |]

let _, events1 = decideIngest spans1 Fold.initial
let epoch1Closed = Fold.fold Fold.initial events1
let f () = decideIngest spans2 epoch1Closed |> ignore
raises<InvalidOperationException> <@ f () @>
1 change: 1 addition & 0 deletions tests/Propulsion.Tests/Propulsion.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<Compile Include="TestOutputLogger.fs" />
<Compile Include="SourceTests.fs" />
<Compile Include="AppendsIndexTests.fs" />
<Compile Include="AppendsEpochTests.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions tools/Propulsion.Tool/Propulsion.Tool.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
<PackageReference Include="Argu" Version="6.1.1" />
<!-- Required or there'll be an exception at runtime re missing support DLLs when using RBAC -->
<PackageReference Include="AWSSDK.SecurityToken" Version="3.7.1.177" />
<!-- Required for AWS SSO resolution -->
<PackageReference Include="AWSSDK.SSO" Version="3.7.100.69" />
<PackageReference Include="AWSSDK.SSOOIDC" Version="3.7.100.69" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
</ItemGroup>
Expand Down

0 comments on commit 35dfe3f

Please sign in to comment.