Skip to content

Commit

Permalink
allow competitor functions to self-initialize
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Leaver committed Jun 28, 2017
1 parent 9a836b0 commit 72135bc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 34 deletions.
7 changes: 6 additions & 1 deletion Nata.Core/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,18 @@ module Core =
module Option =

let whenTrue fn x = if fn x then Some x else None
let coalesce snd fst = match fst with None -> snd | Some _ -> fst
let filter fn = Option.bind(whenTrue fn)

let coalesce snd fst = match fst with None -> snd | x -> x
let coalesceYield snd fst = match fst with None -> snd() | x -> x

let getValueOr x = function None -> x | Some x -> x
let getValueOrYield fn = function None -> fn() | Some x -> x

let tryFunction fn x = try Some <| fn x with _ -> None

let distribute = function Some(x,y) -> Some x, Some y | None -> None, None
let join = function Some (Some x) -> Some x | _ -> None

module Null =

Expand Down
28 changes: 13 additions & 15 deletions Nata.IO.EventStore/Stream.fs
Original file line number Diff line number Diff line change
Expand Up @@ -160,25 +160,23 @@ module Stream =

let compete (connection : IEventStoreConnection)
(stream : string)
(fn : Event<Data>->Event<Data>) =
(fn : Event<Data> option->Event<Data>) =
let state() =
let last =
read connection stream 1 Direction.Reverse Position.End
|> Seq.tryPick Some
match last with
| Some (e,i) -> (e,i)
| None ->
listen connection stream Position.Start
|> Seq.head
let update(e,i) =
try write connection stream (Position.At (1L+i)) e |> Some
read connection stream 1 Direction.Reverse Position.End
|> Seq.tryPick Some
let update(event,index) =
let position =
index
|> Option.map ((+) 1L >> Position.At)
|> Option.getValueOr (Position.Start)
try write connection stream position event |> Some
with :? Position.Invalid<Index> -> None
let rec apply(last) =
let rec apply(last:(Event*Index) option) =
seq {
let eventIn, indexIn =
match last with
| Some (e,i) -> (e,i)
| None -> state()
last
|> Option.coalesceYield state
|> Option.distribute
let eventOut = fn eventIn
let result =
update(eventOut, indexIn)
Expand Down
1 change: 1 addition & 0 deletions Nata.IO.Memory/Stream.fs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module Stream =

let rec readFrom index =
seq {
let index = Math.Max(0L, index)
match data.TryGetValue index with
| true, event when event.IsValueCreated ->
yield event.Value, index
Expand Down
9 changes: 2 additions & 7 deletions Nata.IO.Tests/LogStoreTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,8 @@ type LogStoreTests() as x =
tryWriter connection
match tryCompetitor, tryReaderFrom, tryWriter with
| Some compete, Some readFrom, Some write ->
write (Event.create 2)
let generation : int list =
compete (Event.map ((*) 2))
compete (Option.getValueOr (Event.create 2) >> Event.map ((*) 2))
|> Seq.take 10
|> Seq.map Event.data
|> Seq.toList
Expand All @@ -321,7 +320,6 @@ type LogStoreTests() as x =
Assert.AreEqual(expectation, generation)
let verification =
readFrom Position.Start
|> Seq.skip 1
|> Seq.take 10
|> Seq.map (fst >> Event.data)
|> Seq.toList
Expand All @@ -343,10 +341,8 @@ type LogStoreTests() as x =
tryWriter connection
match tryCompetitor, tryReaderFrom, tryWriter with
| Some compete, Some readFrom, Some write ->
write (Event.create 2)

let generation (delay:int->int) : int seq =
compete (fun e ->
compete (Option.getValueOr (Event.create 2) >> fun e ->
let input = Event.data e
let output = input * 2
Thread.Sleep(delay input)
Expand Down Expand Up @@ -374,7 +370,6 @@ type LogStoreTests() as x =

let verification =
readFrom Position.Start
|> Seq.skip 1
|> Seq.take 11
|> Seq.map (fst >> Event.data)
|> Seq.toList
Expand Down
26 changes: 15 additions & 11 deletions Nata.IO/Competitor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,38 @@

open Nata.Core

type Competitor<'Data> = (Event<'Data> -> Event<'Data>) -> seq<Event<'Data>>
type Competitor<'Data> = (Event<'Data> option -> Event<'Data>) -> seq<Event<'Data>>

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Competitor =

let mapData ((decode, encode):Codec<'DataIn,'DataOut>)
(competitor:Competitor<'DataIn>) : Competitor<'DataOut> =
fun fn ->
competitor (Event.mapData decode >> fn >> Event.mapData encode)
competitor (Option.map (Event.mapData decode) >> fn >> Event.mapData encode)
|> Seq.map (Event.mapData decode)

let map = mapData

let fallback (writeTo:WriterTo<'Data,'Index>)
(subscribeFrom:SubscriberFrom<'Data,'Index>) : Competitor<'Data> =
fun (fn:Event<'Data>->Event<'Data>) ->
(readFrom:ReaderFrom<'Data,'Index>) : Competitor<'Data> =
fun (fn:Event<'Data> option->Event<'Data>) ->
let state() =
subscribeFrom (Position.Before Position.End)
|> Seq.head
let update(e,i) =
try writeTo (Position.After (Position.At i)) e |> Some
readFrom (Position.Before Position.End)
|> Seq.tryHead
let update(event,index) =
let position =
index
|> Option.map (Position.At >> Position.After)
|> Option.getValueOr (Position.Start)
try writeTo position event |> Some
with :? Position.Invalid<'Index> -> None
let rec apply(last) =
seq {
let eventIn, indexIn =
match last with
| Some (e,i) -> (e,i)
| None -> state()
last
|> Option.coalesceYield state
|> Option.distribute

let eventOut = fn eventIn
let result =
Expand Down

0 comments on commit 72135bc

Please sign in to comment.