diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index 0c5c7001..a6d40400 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -20,7 +20,7 @@ - + diff --git a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs index ab454c16..6a4876f8 100644 --- a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs +++ b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs @@ -70,11 +70,17 @@ module Fold = /// We only want to generate a first class event every N minutes, while efficiently writing contingent on the current etag value /// So, we post-process the events to remove `Updated` events (as opposed to `Checkpointed` ones), /// knowing that the state already has that Updated event folded into it when we snapshot +#if COSMOSV2 || COSMOSV3 let transmute events state : Events.Event list * Events.Event list = match events, state with | [Events.Updated _], state -> [], [toSnapshot state] | xs, state -> xs, [toSnapshot state] - +#else + let transmute events state : Events.Event array * Events.Event array = + match events, state with + | [| Events.Updated _ |], state -> [||], [|toSnapshot state|] + | xs, state -> xs, [|toSnapshot state|] +#endif let private mkCheckpoint at next pos = { at = at; nextCheckpointDue = next; pos = pos } : Events.Checkpoint let private mk (at : DateTimeOffset) (interval : TimeSpan) pos : Events.Config * Events.Checkpoint = let next = at.Add interval diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index d54121fb..d241c7de 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -25,7 +25,7 @@ - + diff --git a/src/Propulsion.EventStoreDb/EventStoreSource.fs b/src/Propulsion.EventStoreDb/EventStoreSource.fs index 0eb92c7f..f1926c37 100644 --- a/src/Propulsion.EventStoreDb/EventStoreSource.fs +++ b/src/Propulsion.EventStoreDb/EventStoreSource.fs @@ -14,7 +14,7 @@ module private Impl = for e in events do let sn = Propulsion.Streams.StreamName.internalParseSafe e.EventStreamId if categoryFilter (FsCodec.StreamName.category sn) then - yield sn, toTimelineEvent e |] + yield sn, Equinox.EventStoreDb.ClientCodec.timelineEvent e |] let private checkpointPos (xs : EventRecord array) = match Array.tryLast xs with Some e -> int64 e.Position.CommitPosition | None -> -1L |> Propulsion.Feed.Position.parse diff --git a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj index 335365d0..2eb48b9f 100644 --- a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj +++ b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj @@ -18,7 +18,7 @@ - + diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index 8c4dd876..8bf75bd7 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -135,7 +135,7 @@ type FeedReader let streamsCount = batch.items |> Seq.distinctBy ValueTuple.fst |> Seq.length log.Debug("Page {latency:f0}ms Checkpoint {checkpoint} {eventCount}e {streamCount}s", readLatency.TotalMilliseconds, batch.checkpoint, c, streamsCount) - let epoch, streamEvents : int64 * Propulsion.Streams.StreamEvent<_> seq = int64 batch.checkpoint, Seq.ofArray batch.items + let epoch, streamEvents : int64 * Propulsion.Streams.Default.StreamEvent seq = int64 batch.checkpoint, Seq.ofArray batch.items let ingestTimer = Stopwatch.start () let! struct (cur, max) = submitBatch { epoch = epoch; checkpoint = commit batch.checkpoint; items = streamEvents; onCompletion = ignore } stats.UpdateCurMax(ingestTimer.Elapsed, cur, max) } diff --git a/src/Propulsion.Feed/PeriodicSource.fs b/src/Propulsion.Feed/PeriodicSource.fs index e764a5c3..e77d7efc 100644 --- a/src/Propulsion.Feed/PeriodicSource.fs +++ b/src/Propulsion.Feed/PeriodicSource.fs @@ -72,7 +72,7 @@ type PeriodicSource let mutable elapsed = TimeSpan.Zero for ts, xs in crawl trancheId do elapsed <- elapsed + ts - let streamEvents : Propulsion.Streams.StreamEvent<_> seq = seq { + let streamEvents : Propulsion.Streams.Default.StreamEvent seq = seq { for si in xs -> let i = index index <- index + 1L diff --git a/src/Propulsion.Kafka/Codec.fs b/src/Propulsion.Kafka/Codec.fs index 1c0a136b..3c404075 100644 --- a/src/Propulsion.Kafka/Codec.fs +++ b/src/Propulsion.Kafka/Codec.fs @@ -64,12 +64,13 @@ module RenderedSpan = i = span[0].Index e = span |> Array.map (fun x -> { c = x.EventType; t = x.Timestamp; d = ta x.Data; m = ta x.Meta }) } - let enum (span: RenderedSpan) : StreamEvent seq = + let enum (span: RenderedSpan) : Default.StreamEvent seq = let streamName = StreamName.internalParseSafe span.s - let inline mkEvent offset (e : RenderedEvent) = FsCodec.Core.TimelineEvent.Create(span.i+int64 offset, e.c, e.d, e.m, timestamp=e.t) + let td (x : byte array) : Default.EventBody = System.ReadOnlyMemory x + let inline mkEvent offset (e : RenderedEvent) = FsCodec.Core.TimelineEvent.Create(span.i+int64 offset, e.c, td e.d, td e.m, timestamp = e.t) span.e |> Seq.mapi (fun i e -> streamName, mkEvent i e) - let parse (spanJson: string) : StreamEvent<_> seq = + let parse (spanJson: string) : Default.StreamEvent seq = spanJson |> RenderedSpan.Parse |> enum // Rendition of Summary Events representing the aggregated state of a Stream at a known point / version @@ -89,17 +90,18 @@ type [] RenderedSummary = /// Helpers for mapping to/from `Propulsion.Streams` canonical event contract module RenderedSummary = - let ofStreamEvents (streamName : FsCodec.StreamName) (index : int64) (events : FsCodec.IEventData seq) : RenderedSummary = + let ofStreamEvents (streamName : FsCodec.StreamName) (index : int64) (events : FsCodec.IEventData seq) : RenderedSummary = + let ta (x : Default.EventBody) : byte array = x.ToArray() { s = FsCodec.StreamName.toString streamName i = index - u = [| for x in events -> { c = x.EventType; t = x.Timestamp; d = x.Data; m = x.Meta } |] } + u = [| for x in events -> { c = x.EventType; t = x.Timestamp; d = ta x.Data; m = ta x.Meta } |] } - let ofStreamEvent (streamName : FsCodec.StreamName) (index : int64) (event : FsCodec.IEventData) : RenderedSummary = + let ofStreamEvent (streamName : FsCodec.StreamName) (index : int64) (event : FsCodec.IEventData) : RenderedSummary = ofStreamEvents streamName index (Seq.singleton event) - let enum (span: RenderedSummary) : StreamEvent<_> seq = + let enum (span: RenderedSummary) : Default.StreamEvent seq = let streamName = StreamName.internalParseSafe span.s seq { for e in span.u -> streamName, FsCodec.Core.TimelineEvent.Create(span.i, e.c, e.d, e.m, timestamp=e.t, isUnfold=true) } - let parse (spanJson: string) : StreamEvent<_> seq = + let parse (spanJson: string) : Default.StreamEvent seq = spanJson |> RenderedSummary.Parse |> enum diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index 5cad2f92..586bb8c3 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -287,7 +287,7 @@ module Core = static member Start<'Outcome> ( log : ILogger, config : KafkaConsumerConfig, // often implemented via StreamNameSequenceGenerator.KeyValueToStreamEvent - keyValueToStreamEvents : KeyValuePair -> StreamEvent<_> seq, + keyValueToStreamEvents : KeyValuePair -> Default.StreamEvent seq, handle : struct (StreamName * StreamSpan<_>) -> Async, maxDop, stats : Scheduling.Stats, statsInterval, ?maxSubmissionsPerPartition, ?logExternalState, @@ -379,12 +379,12 @@ type StreamNameSequenceGenerator() = /// - Stores the topic, partition and offset as a ConsumeResultContext in the ITimelineEvent.Context member x.ConsumeResultToStreamEvent( // Placeholder category to use for StreamName where key is null and/or does not adhere to standard {category}-{streamId} form - ?defaultCategory) : ConsumeResult -> StreamEvent seq = + ?defaultCategory) : ConsumeResult -> Default.StreamEvent seq = let defaultCategory = defaultArg defaultCategory "" x.ConsumeResultToStreamEvent(Core.toStreamName defaultCategory) /// Takes the key and value as extracted from the ConsumeResult, mapping them respectively to the StreamName and ITimelineEvent.Data - member x.KeyValueToStreamEvent(KeyValue (k, v : string), ?eventType, ?defaultCategory) : StreamEvent seq = + member x.KeyValueToStreamEvent(KeyValue (k, v : string), ?eventType, ?defaultCategory) : Default.StreamEvent seq = let sn = Core.parseMessageKey (defaultArg defaultCategory String.Empty) k let e = FsCodec.Core.TimelineEvent.Create(x.GenerateIndex sn, defaultArg eventType String.Empty, System.Text.Encoding.UTF8.GetBytes v |> ReadOnlyMemory) Seq.singleton (sn, e) @@ -401,7 +401,7 @@ type StreamsConsumer = static member Start<'Outcome> ( log : ILogger, config : KafkaConsumerConfig, // often implemented via StreamNameSequenceGenerator.ConsumeResultToStreamEvent where the incoming message does not have an embedded sequence number - consumeResultToStreamEvents : ConsumeResult<_, _> -> StreamEvent<_> seq, + consumeResultToStreamEvents : ConsumeResult<_, _> -> Default.StreamEvent seq, // Handler responses: // - first component: Index at which next processing will proceed (which can trigger discarding of earlier items on that stream) // - second component: Outcome (can be simply unit), to pass to the stats processor diff --git a/src/Propulsion.MemoryStore/MemoryStoreSource.fs b/src/Propulsion.MemoryStore/MemoryStoreSource.fs index 6e2ecd29..edb17524 100644 --- a/src/Propulsion.MemoryStore/MemoryStoreSource.fs +++ b/src/Propulsion.MemoryStore/MemoryStoreSource.fs @@ -19,7 +19,7 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, c let mutable prepared = -1L let enqueueSubmission, awaitSubmissions, tryDequeueSubmission = - let c = Channel.unboundedSr seq>> in let r, w = c.Reader, c.Writer + let c = Channel.unboundedSr> in let r, w = c.Reader, c.Writer Channel.write w, Channel.awaitRead r, Channel.tryRead r let handleStoreCommitted struct (categoryName, aggregateId, events : FsCodec.ITimelineEvent<_> []) = diff --git a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj index c3ae0165..b5ec5730 100644 --- a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj +++ b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj @@ -19,7 +19,7 @@ - + diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index fc744f5f..05f055ab 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -996,13 +996,13 @@ module Projector = type StreamsIngester = static member Start(log, partitionId, maxRead, submit, statsInterval) = - let submitBatch (items : StreamEvent<_> seq, onCompletion) = + let submitBatch (items : StreamEvent<'F> seq, onCompletion) = let items = Array.ofSeq items let streams = items |> Seq.map ValueTuple.fst |> HashSet let batch : Submission.Batch<_, _> = { source = partitionId; onCompletion = onCompletion; messages = items } submit batch struct (streams.Count, items.Length) - Ingestion.Ingester seq>.Start(log, partitionId, maxRead, submitBatch, statsInterval) + Ingestion.Ingester seq>.Start(log, partitionId, maxRead, submitBatch, statsInterval) type StreamsSubmitter =