Skip to content

Commit

Permalink
Move Propulsion.Feed.Batch up a level
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 2, 2024
1 parent 13b02e2 commit b28f360
Show file tree
Hide file tree
Showing 15 changed files with 33 additions and 39 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226)
- `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.Feed`: Moved implementations into main `Propulsion` library. While this adds a `FSharp.Control.TaskSeq` dependency, it makes maintenance and navigation easier
- `Propulsion.Feed`: Moved implementations into main `Propulsion` library. While this adds a `FSharp.Control.TaskSeq` dependency, it makes maintenance and navigation easier [#265](https://github.com/jet/propulsion/pull/265)
- `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https://github.com/jet/propulsion/pull/158), [#159](https://github.com/jet/propulsion/pull/159)
- `Propulsion.Kafka`: Target `FsCodec.NewtonsoftJson` v `3.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.Prometheus`: Extracted `Propulsion.Prometheus` and `Propulsion.Feed.Prometheus` in order to remove `Prometheus` dependency from core package
- `Propulsion.Prometheus`: Extracted `Propulsion.Prometheus` and `Propulsion.Feed.Prometheus` in order to remove `Prometheus` dependency from core package [#265](https://github.com/jet/propulsion/pull/265)
- `Propulsion.Tool`: `project` renamed to `sync`; sources now have a `from` prefix [#252](https://github.com/jet/propulsion/pull/252)

### Removed
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module private Impl =
| Exceptions.ProvisionedThroughputExceeded when not force -> ()
| e -> storeLog.Warning(e, "DynamoStoreSource commit failure")

let mkBatch position isTail items: Propulsion.Feed.Core.Batch<Propulsion.Sinks.EventBody> =
let mkBatch position isTail items: Propulsion.Feed.Batch<Propulsion.Sinks.EventBody> =
{ items = items; checkpoint = position; isTail = isTail }
let sliceBatch epochId offset items =
mkBatch (Checkpoint.positionOfEpochAndOffset epochId offset) false items
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module private Impl =
let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p)
let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct)
let! batch = res |> TaskSeq.map _.Event |> TaskSeq.toArrayAsync
return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Core.Batch<_>) }
return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Batch<_>) }

// @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk
let private chunk (pos: Position) = uint64 pos.CommitPosition >>> 28
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion.MessageDb/MessageDbSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ module Internal =
let sn = reader.GetString(6) |> FsCodec.StreamName.parse
struct (sn, event)

member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct): Task<Core.Batch<_>> = task {
member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct): Task<Batch<_>> = task {
use! conn = connect ct
use command = GetCategoryMessages.prepareCommand conn category fromPositionInclusive batchSize

use! reader = command.ExecuteReaderAsync(ct)
let events = [| while reader.Read() do parseRow reader |]

let checkpoint = match Array.tryLast events with Some (_, ev) -> unbox<int64> ev.Context | None -> fromPositionInclusive
return ({ checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 }: Core.Batch<_>) }
return ({ checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 }: Batch<_>) }

member _.TryReadCategoryLastVersion(category: TrancheId, ct): Task<int64 voption> = task {
use! conn = connect ct
Expand All @@ -70,7 +70,7 @@ module Internal =
use! reader = command.ExecuteReaderAsync(ct)
return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone }

let internal readBatch batchSize (store: MessageDbCategoryClient) struct (category, pos, ct): Task<Core.Batch<_>> =
let internal readBatch batchSize (store: MessageDbCategoryClient) struct (category, pos, ct): Task<Batch<_>> =
let positionInclusive = Position.toInt64 pos
store.ReadCategoryMessages(category, positionInclusive, batchSize, ct)

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module private Impl =
if streamFilter sn then Some struct (sn, msg) else None)
let! items = if not withData then task { return filtered |> Seq.map (toStreamEvent null) |> Array.ofSeq }
else filtered |> Seq.map readWithDataAsStreamEvent |> Propulsion.Internal.Task.sequential ct
return ({ checkpoint = Propulsion.Feed.Position.parse page.NextPosition; items = items; isTail = page.IsEnd }: Propulsion.Feed.Core.Batch<_>) }
return ({ checkpoint = Propulsion.Feed.Position.parse page.NextPosition; items = items; isTail = page.IsEnd }: Propulsion.Feed.Batch<_>) }

let readTailPositionForTranche (store: SqlStreamStore.IStreamStore) _trancheId ct = task {
let! lastEventPos = store.ReadHeadPosition(ct)
Expand Down
9 changes: 9 additions & 0 deletions src/Propulsion/Feed.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,12 @@ type IFeedCheckpointStore =
/// Determines the starting position, and checkpointing frequency for a given tranche
abstract member Start: source: SourceId * tranche: TrancheId * establishOrigin: Func<CancellationToken, Task<Position>> option * ct: CancellationToken -> Task<Position>
abstract member Commit: source: SourceId * tranche: TrancheId * pos: Position * CancellationToken -> Task

[<NoComparison; NoEquality>]
type Batch<'F> =
{ items: Propulsion.Streams.StreamEvent<'F>[]
/// Next computed read position (inclusive). Checkpoint stores treat absence of a value as `Position.initial` (= `0`)
checkpoint: Position
/// Indicates whether the end of a feed has been reached (a batch being empty does not necessarily imply that)
/// Implies tail sleep delay. May trigger completion of `Monitor.AwaitCompletion`
isTail: bool }
10 changes: 0 additions & 10 deletions src/Propulsion/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,6 @@ open Propulsion.Feed
open Propulsion.Internal
open Serilog
open System
open System.Collections.Generic

[<NoComparison; NoEquality>]
type Batch<'F> =
{ items: Propulsion.Streams.StreamEvent<'F>[]
/// Next computed read position (inclusive). Checkpoint stores treat absence of a value as `Position.initial` (= `0`)
checkpoint: Position
/// Indicates whether the end of a feed has been reached (a batch being empty does not necessarily imply that)
/// Implies tail sleep delay. May trigger completion of `Monitor.AwaitCompletion`
isTail: bool }

module internal TimelineEvent =

Expand Down
3 changes: 1 addition & 2 deletions src/Propulsion/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ open Propulsion
open Propulsion.Feed
open Propulsion.Internal
open System
open System.Collections.Generic

/// Drives reading and checkpointing for a set of feeds (tranches) of a custom source feed
type FeedSourceBase internal
Expand Down Expand Up @@ -169,7 +168,7 @@ type FeedSource
let readTs = Stopwatch.timestamp ()
let! page = readPage.Invoke(trancheId, pos, ct)
let items' = page.items |> Array.map (fun x -> struct (streamName, x))
yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail }: Core.Batch<_>)) }
yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail }: Batch<_>)) }

member internal _.Pump(readTranches: Func<CancellationToken, Task<TrancheId[]>>,
readPage: Func<TrancheId, Position, CancellationToken, Task<Page<Propulsion.Sinks.EventBody>>>, ct): Task<unit> =
Expand Down
5 changes: 2 additions & 3 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ module Exception =
let [<return: Struct>] (|Log|_|) log (e: exn) = log e; ValueNone

type CancellationToken = System.Threading.CancellationToken
type IAsyncEnumerable<'T> = System.Collections.Generic.IAsyncEnumerable<'T>
type Task = System.Threading.Tasks.Task
type Task<'T> = System.Threading.Tasks.Task<'T>
open System.Threading.Tasks
Expand Down Expand Up @@ -156,9 +157,7 @@ module Task =
parallel_ 1 ct xs
let parallelUnlimited ct xs: Task<'t []> =
parallel_ 0 ct xs
let inline ignore<'T> (a: Task<'T>): Task<unit> = task {
let! _ = a
return () }
let inline ignore<'T> (a: Task<'T>): Task<unit> = task { let! _ = a in return () }
let ofUnitTask (x: Task): Task<unit> = task { return! x }
let periodically (f: CancellationToken -> Task<unit>) interval (ct: CancellationToken) = task {
let t = new System.Threading.PeriodicTimer(interval) // no use as ct will Dispose
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion/JsonSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type [<Sealed; AbstractClass>] JsonSource private () =
let lineNo = int64 i + 1L
try let items = if isEof then Array.empty
else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray
struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>))
struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Batch<_>))
with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) }
let source = SinglePassFeedSource(log, statsInterval, sourceId, crawl, checkpoints, sink, string)
source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] })
7 changes: 3 additions & 4 deletions src/Propulsion/PeriodicSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Propulsion.Feed
open FSharp.Control // taskSeq
open Propulsion.Internal
open System
open System.Collections.Generic

/// Int64.MaxValue = 9223372036854775807
/// ([datetimeoffset]::FromUnixTimeSeconds(9223372036854775807 / 1000000000)) is in 2262
Expand Down Expand Up @@ -55,7 +54,7 @@ type PeriodicSource
defaultArg renderPos DateTimeOffsetPosition.render, defaultArg shutdownTimeout (TimeSpan.seconds 5))

// We don't want to checkpoint for real until we know the scheduler has handled the full set of pages in the crawl.
let crawlInternal (read: Func<_, IAsyncEnumerable<struct (_ * _)>>) trancheId (_wasLast, position) ct: IAsyncEnumerable<struct (TimeSpan * Core.Batch<_>)> = taskSeq {
let crawlInternal (read: Func<_, IAsyncEnumerable<struct (_ * _)>>) trancheId (_wasLast, position) ct: IAsyncEnumerable<struct (TimeSpan * Batch<_>)> = taskSeq {
let startDate = DateTimeOffsetPosition.getDateTimeOffset position
let dueDate = startDate + refreshInterval
match dueDate - DateTimeOffset.UtcNow with
Expand Down Expand Up @@ -83,14 +82,14 @@ type PeriodicSource
let items = Array.zeroCreate ready
buffer.CopyTo(0, items, 0, ready)
buffer.RemoveRange(0, ready)
yield struct (elapsed, ({ items = items; checkpoint = position; isTail = false }: Core.Batch<_>))
yield struct (elapsed, ({ items = items; checkpoint = position; isTail = false }: Batch<_>))
elapsed <- TimeSpan.Zero
| _ -> ()
let items, checkpoint =
match buffer.ToArray() with
| [||] as noItems -> noItems, basePosition
| finalItem -> finalItem, let struct (_s, e) = Array.last finalItem in e |> Core.TimelineEvent.toCheckpointPosition
yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true }: Core.Batch<_>) }
yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true }: Batch<_>) }

member internal _.Pump(readTranches: Func<CancellationToken, Task<TrancheId[]>>,
// The <c>TaskSeq</c> is expected to manage its own resilience strategy (retries etc). <br/>
Expand Down
7 changes: 3 additions & 4 deletions src/Propulsion/SinglePassFeedSource.fs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace Propulsion.Feed

open Propulsion.Feed.Core
open Propulsion.Internal
open System
open System.Collections.Generic
open System.Threading

/// Drives reading from the Source, stopping when the Tail of each of the Tranches has been reached
type SinglePassFeedSource
Expand All @@ -12,7 +10,8 @@ type SinglePassFeedSource
crawl: Func<TrancheId, Position, CancellationToken, IAsyncEnumerable<struct (TimeSpan * Batch<_>)>>,
checkpoints: IFeedCheckpointStore, sink: Propulsion.Sinks.SinkPipeline,
?renderPos, ?logReadFailure, ?readFailureSleepInterval, ?logCommitFailure) =
inherit TailingFeedSource(log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, defaultArg renderPos string,
inherit Propulsion.Feed.Core.TailingFeedSource(
log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, defaultArg renderPos string,
crawl,
?logReadFailure = logReadFailure, ?readFailureSleepInterval = readFailureSleepInterval, ?logCommitFailure = logCommitFailure,
readersStopAtTail = true)
Expand Down
3 changes: 1 addition & 2 deletions tests/Propulsion.MessageDb.Integration/Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ open Propulsion.Internal
open Propulsion.MessageDb
open Swensen.Unquote
open System
open System.Collections.Generic
open System.Diagnostics
open Xunit

Expand Down Expand Up @@ -76,7 +75,7 @@ let ``It processes events for a category`` () = task {
let! checkpoints = makeCheckpoints consumerGroup
let stats = stats log
let mutable stop = ignore
let handled = HashSet<_>()
let handled = System.Collections.Generic.HashSet<_>()
let handle stream (events: Propulsion.Sinks.Event[]) _ct = task {
lock handled (fun _ ->
for evt in events do
Expand Down
4 changes: 2 additions & 2 deletions tests/Propulsion.Tests/SinkHealthTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Scenario(testOutput) =
[| sid "a-ok", mk 0 "EventType"
failingSid, mk 0 "EventType"
stuckSid, mk 0 "EventType" |]
let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = items; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>))
let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = items; isTail = true; checkpoint = Unchecked.defaultof<_> }: Batch<_>))

let extractHealthCheckExn (ex: Choice<_, exn>) =
trap <@ match ex with
Expand All @@ -49,7 +49,7 @@ type Scenario(testOutput) =

[<Fact>]
let run () = async {
let source = Propulsion.Feed.SinglePassFeedSource(log, TimeSpan.FromSeconds 5, SourceId.parse "sid", crawl, checkpoints, sink, string)
let source = SinglePassFeedSource(log, TimeSpan.FromSeconds 5, SourceId.parse "sid", crawl, checkpoints, sink, string)
let src = source.Start(fun _ct -> task { return [| TrancheId.parse "tid" |] })
let! monEx = src.Monitor.AwaitCompletion(propagationDelay = TimeSpan.FromSeconds 1, awaitFullyCaughtUp = true) |> Propulsion.Internal.Async.ofTask |> Async.Catch
let me = extractHealthCheckExn monEx
Expand Down
6 changes: 3 additions & 3 deletions tests/Propulsion.Tests/SourceTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Scenario(testOutput) =

[<Fact>]
let ``TailingFeedSource Stop / AwaitCompletion semantics`` () = task {
let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>))
let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Batch<_>))
let source = Propulsion.Feed.Core.TailingFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", TimeSpan.FromMinutes 1,
checkpoints, (*establishOrigin*)None, sink, string, crawl)
use src = source.Start(fun ct -> source.Pump((fun _ -> task { return [| TrancheId.parse "tid" |] }), ct))
Expand All @@ -38,8 +38,8 @@ type Scenario(testOutput) =

[<Theory; InlineData true; InlineData false>]
let SinglePassFeedSource withWait = async {
let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>))
let source = Propulsion.Feed.SinglePassFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", crawl, checkpoints, sink, string)
let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Batch<_>))
let source = SinglePassFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", crawl, checkpoints, sink, string)
use src = source.Start(fun _ct -> task { return [| TrancheId.parse "tid" |] })
// SinglePassFeedSource completion includes Waiting for Completion of all Batches on all Tranches and Flushing of Checkpoints
// Hence waiting with the Monitor is not actually necessary (though it provides progress logging which otherwise would be less thorough)
Expand Down

0 comments on commit b28f360

Please sign in to comment.