From 72135bc309b2837778062543088676112c926926 Mon Sep 17 00:00:00 2001 From: Jonathan Leaver Date: Wed, 28 Jun 2017 18:42:02 -0400 Subject: [PATCH] allow competitor functions to self-initialize --- Nata.Core/Core.fs | 7 ++++++- Nata.IO.EventStore/Stream.fs | 28 +++++++++++++--------------- Nata.IO.Memory/Stream.fs | 1 + Nata.IO.Tests/LogStoreTests.fs | 9 ++------- Nata.IO/Competitor.fs | 26 +++++++++++++++----------- 5 files changed, 37 insertions(+), 34 deletions(-) diff --git a/Nata.Core/Core.fs b/Nata.Core/Core.fs index e26af35..a443fbd 100644 --- a/Nata.Core/Core.fs +++ b/Nata.Core/Core.fs @@ -144,13 +144,18 @@ module Core = module Option = let whenTrue fn x = if fn x then Some x else None - let coalesce snd fst = match fst with None -> snd | Some _ -> fst let filter fn = Option.bind(whenTrue fn) + let coalesce snd fst = match fst with None -> snd | x -> x + let coalesceYield snd fst = match fst with None -> snd() | x -> x + let getValueOr x = function None -> x | Some x -> x let getValueOrYield fn = function None -> fn() | Some x -> x let tryFunction fn x = try Some <| fn x with _ -> None + + let distribute = function Some(x,y) -> Some x, Some y | None -> None, None + let join = function Some (Some x) -> Some x | _ -> None module Null = diff --git a/Nata.IO.EventStore/Stream.fs b/Nata.IO.EventStore/Stream.fs index 3fe312d..a571f7a 100644 --- a/Nata.IO.EventStore/Stream.fs +++ b/Nata.IO.EventStore/Stream.fs @@ -160,25 +160,23 @@ module Stream = let compete (connection : IEventStoreConnection) (stream : string) - (fn : Event->Event) = + (fn : Event option->Event) = let state() = - let last = - read connection stream 1 Direction.Reverse Position.End - |> Seq.tryPick Some - match last with - | Some (e,i) -> (e,i) - | None -> - listen connection stream Position.Start - |> Seq.head - let update(e,i) = - try write connection stream (Position.At (1L+i)) e |> Some + read connection stream 1 Direction.Reverse Position.End + |> Seq.tryPick Some + let update(event,index) = + let position = + index + |> Option.map ((+) 1L >> Position.At) + |> Option.getValueOr (Position.Start) + try write connection stream position event |> Some with :? Position.Invalid -> None - let rec apply(last) = + let rec apply(last:(Event*Index) option) = seq { let eventIn, indexIn = - match last with - | Some (e,i) -> (e,i) - | None -> state() + last + |> Option.coalesceYield state + |> Option.distribute let eventOut = fn eventIn let result = update(eventOut, indexIn) diff --git a/Nata.IO.Memory/Stream.fs b/Nata.IO.Memory/Stream.fs index 78ddff1..0c81ee3 100644 --- a/Nata.IO.Memory/Stream.fs +++ b/Nata.IO.Memory/Stream.fs @@ -55,6 +55,7 @@ module Stream = let rec readFrom index = seq { + let index = Math.Max(0L, index) match data.TryGetValue index with | true, event when event.IsValueCreated -> yield event.Value, index diff --git a/Nata.IO.Tests/LogStoreTests.fs b/Nata.IO.Tests/LogStoreTests.fs index c155de3..2e30ef3 100644 --- a/Nata.IO.Tests/LogStoreTests.fs +++ b/Nata.IO.Tests/LogStoreTests.fs @@ -310,9 +310,8 @@ type LogStoreTests() as x = tryWriter connection match tryCompetitor, tryReaderFrom, tryWriter with | Some compete, Some readFrom, Some write -> - write (Event.create 2) let generation : int list = - compete (Event.map ((*) 2)) + compete (Option.getValueOr (Event.create 2) >> Event.map ((*) 2)) |> Seq.take 10 |> Seq.map Event.data |> Seq.toList @@ -321,7 +320,6 @@ type LogStoreTests() as x = Assert.AreEqual(expectation, generation) let verification = readFrom Position.Start - |> Seq.skip 1 |> Seq.take 10 |> Seq.map (fst >> Event.data) |> Seq.toList @@ -343,10 +341,8 @@ type LogStoreTests() as x = tryWriter connection match tryCompetitor, tryReaderFrom, tryWriter with | Some compete, Some readFrom, Some write -> - write (Event.create 2) - let generation (delay:int->int) : int seq = - compete (fun e -> + compete (Option.getValueOr (Event.create 2) >> fun e -> let input = Event.data e let output = input * 2 Thread.Sleep(delay input) @@ -374,7 +370,6 @@ type LogStoreTests() as x = let verification = readFrom Position.Start - |> Seq.skip 1 |> Seq.take 11 |> Seq.map (fst >> Event.data) |> Seq.toList diff --git a/Nata.IO/Competitor.fs b/Nata.IO/Competitor.fs index 9869d55..2b6b1c5 100644 --- a/Nata.IO/Competitor.fs +++ b/Nata.IO/Competitor.fs @@ -2,7 +2,7 @@ open Nata.Core -type Competitor<'Data> = (Event<'Data> -> Event<'Data>) -> seq> +type Competitor<'Data> = (Event<'Data> option -> Event<'Data>) -> seq> [] module Competitor = @@ -10,26 +10,30 @@ module Competitor = let mapData ((decode, encode):Codec<'DataIn,'DataOut>) (competitor:Competitor<'DataIn>) : Competitor<'DataOut> = fun fn -> - competitor (Event.mapData decode >> fn >> Event.mapData encode) + competitor (Option.map (Event.mapData decode) >> fn >> Event.mapData encode) |> Seq.map (Event.mapData decode) let map = mapData let fallback (writeTo:WriterTo<'Data,'Index>) - (subscribeFrom:SubscriberFrom<'Data,'Index>) : Competitor<'Data> = - fun (fn:Event<'Data>->Event<'Data>) -> + (readFrom:ReaderFrom<'Data,'Index>) : Competitor<'Data> = + fun (fn:Event<'Data> option->Event<'Data>) -> let state() = - subscribeFrom (Position.Before Position.End) - |> Seq.head - let update(e,i) = - try writeTo (Position.After (Position.At i)) e |> Some + readFrom (Position.Before Position.End) + |> Seq.tryHead + let update(event,index) = + let position = + index + |> Option.map (Position.At >> Position.After) + |> Option.getValueOr (Position.Start) + try writeTo position event |> Some with :? Position.Invalid<'Index> -> None let rec apply(last) = seq { let eventIn, indexIn = - match last with - | Some (e,i) -> (e,i) - | None -> state() + last + |> Option.coalesceYield state + |> Option.distribute let eventOut = fn eventIn let result =