Skip to content

Commit

Permalink
Merge pull request #7 from j-alexander/improve_relative_positions_wit…
Browse files Browse the repository at this point in the history
…h_eventstore

Improve relative positions with eventstore
  • Loading branch information
j-alexander authored Aug 12, 2017
2 parents d447582 + a1474bf commit 81c8ab8
Showing 7 changed files with 118 additions and 52 deletions.
7 changes: 6 additions & 1 deletion Nata.IO.EventHub.Tests/HubPartitionTests.fs
Original file line number Diff line number Diff line change
@@ -124,4 +124,9 @@ type HubPartitionTests() =
|> Seq.filter ((=) unexpected.Data)
|> Seq.toList

Assert.AreEqual([], results)
Assert.AreEqual([], results)

[<Test>]
override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.")
[<Test>]
override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.")
7 changes: 6 additions & 1 deletion Nata.IO.EventHub.Tests/HubTests.fs
Original file line number Diff line number Diff line change
@@ -77,4 +77,9 @@ type HubTests() =
|> Seq.map Event.data
|> Seq.toList

Assert.AreEqual([ event.Data ], results)
Assert.AreEqual([ event.Data ], results)

[<Test>]
override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.")
[<Test>]
override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.")
22 changes: 11 additions & 11 deletions Nata.IO.EventStore.Tests/LogStoreTests.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
namespace Nata.IO.EventStore.Tests

open NUnit.Framework

[<TestFixture(Description="EventStore-LogStore")>]
type LogStoreTests() =
inherit Nata.IO.Tests.LogStoreTests()

override x.Connect() =
Configuration.channel()
|> Configuration.connect()
namespace Nata.IO.EventStore.Tests

open NUnit.Framework

[<TestFixture(Description="EventStore-LogStore")>]
type LogStoreTests() =
inherit Nata.IO.Tests.LogStoreTests()

override x.Connect() =
Configuration.channel()
|> Configuration.connect()
70 changes: 32 additions & 38 deletions Nata.IO.EventStore/Stream.fs
Original file line number Diff line number Diff line change
@@ -34,20 +34,31 @@ module Stream =
|> Event.withIndex (resolvedEvent.Event.EventNumber |> int64),
resolvedEvent.Event.EventNumber

let rec private indexOf = function
| Position.Start -> int64 StreamPosition.Start
| Position.Before x -> -1L + indexOf x
let rec index (connection : IEventStoreConnection)
(stream : string)
(size : int)
(direction : Direction)
(position : Position<Index>) : Index =
match position with
| Position.At x -> x
| Position.After x -> 1L + indexOf x
| Position.End -> int64 StreamPosition.End

let rec private read (connection : IEventStoreConnection)
(stream : string)
(size : int)
(direction : Direction)
(position : Position<Index>) : seq<Event*Index> =
| Position.Before x -> (index connection stream size direction x) - 1L
| Position.After x -> (index connection stream size direction x) + 1L
| Position.Start -> (int64 StreamPosition.Start)
| Position.End when direction = Direction.Forward ->
read connection stream size Direction.Reverse position
|> Seq.tryPick Some
|> Option.map (snd >> (+) 1L)
|> Option.getValueOr (int64 StreamPosition.Start)
| Position.End -> (int64 StreamPosition.End)


and private read (connection : IEventStoreConnection)
(stream : string)
(size : int)
(direction : Direction)
(position : Position<Index>) : seq<Event*Index> =
seq {
let from = indexOf position
let from = index connection stream size direction position
let sliceTask =
match direction with
| Direction.Forward -> connection.ReadStreamEventsForwardAsync(stream, from, size, true)
@@ -67,10 +78,11 @@ module Stream =
}


let rec private listen (connection : IEventStoreConnection)
(stream : string)
(position : Position<Index>) : seq<Event*Index> =
let from = indexOf position
and private listen (connection : IEventStoreConnection)
(stream : string)
(size : int)
(position : Position<Index>) : seq<Event*Index> =
let from = index connection stream size Direction.Forward position
let queue = new BlockingCollection<Option<Event*Index>>(new ConcurrentQueue<Option<Event*Index>>())
let subscription =
let settings = CatchUpSubscriptionSettings.Default
@@ -96,7 +108,7 @@ module Stream =
yield! traverse (index+1L)
| None ->
Thread.Sleep(10000)
yield! listen connection stream (Position.At last)
yield! listen connection stream size (Position.At last)
}
traverse from

@@ -139,24 +151,6 @@ module Stream =
raise exn
| _ -> raise exn

let rec index (connection : IEventStoreConnection)
(stream : string)
(size : int)
(position : Position<Index>) : Index =
match position with
| Position.At x -> x
| Position.Before x -> (index connection stream size x) - 1L
| Position.After x -> (index connection stream size x) + 1L
| Position.Start ->
read connection stream size Direction.Forward position
|> Seq.tryPick Some
|> Option.map snd
|> Option.getValueOr (int64 StreamPosition.Start)
| Position.End ->
read connection stream size Direction.Reverse position
|> Seq.tryPick Some
|> Option.map (snd >> (+) 1L)
|> Option.getValueOr (int64 StreamPosition.Start)

let compete (connection : IEventStoreConnection)
(stream : string)
@@ -196,7 +190,7 @@ module Stream =
fun stream ->
[
Capability.Indexer <|
index connection stream size
index connection stream size Direction.Forward

Capability.Reader <| fun () ->
read connection stream size Direction.Forward Position.Start |> Seq.map fst
@@ -211,10 +205,10 @@ module Stream =
write connection stream

Capability.Subscriber <| fun () ->
listen connection stream Position.Start |> Seq.map fst
listen connection stream size Position.Start |> Seq.map fst

Capability.SubscriberFrom <|
listen connection stream
listen connection stream size

Capability.Competitor <| fun fn ->
compete connection stream fn
7 changes: 6 additions & 1 deletion Nata.IO.KafkaNet.Tests/TopicTests.fs
Original file line number Diff line number Diff line change
@@ -26,4 +26,9 @@ type TopicTests() =
override x.Connect() =
Topic.connect cluster
|> Source.mapIndex (Offsets.Codec.OffsetsToInt64 0)
<| guid()
<| guid()

[<Test>]
override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.")
[<Test>]
override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.")
5 changes: 5 additions & 0 deletions Nata.IO.Kafunk.Tests/TopicTests.fs
Original file line number Diff line number Diff line change
@@ -52,3 +52,8 @@ type TopicTests() =
|> Seq.toList

Assert.AreEqual(expected, actual)

[<Test>]
override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.")
[<Test>]
override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.")
52 changes: 52 additions & 0 deletions Nata.IO.Tests/LogStoreTests.fs
Original file line number Diff line number Diff line change
@@ -376,3 +376,55 @@ type LogStoreTests() as x =
Assert.AreEqual(expectation, verification)
| _ ->
Assert.Ignore("Competitor, ReaderFrom or Writer is reported to be unsupported by this source.")

[<Test>]
abstract member TestReadFromBeforeEnd : unit->unit

default x.TestReadFromBeforeEnd() =
let connection = x.Connect()
let write = writer connection
let readFrom = readerFrom connection
let subscribeFrom = subscriberFrom connection
let event_0 = event("TestReadFromBeforeEnd-0")
let event_1 = event("TestReadFromBeforeEnd-1")
let event_2 = event("TestReadFromBeforeEnd-2")
write event_0
write event_1
write event_2
let flush =
subscribeFrom Position.Start
|> Seq.take 3
let take position =
readFrom position
|> Seq.map (fst >> Event.data)
|> Seq.head
Assert.AreEqual([], List.ofSeq(readFrom(Position.End)))
Assert.AreEqual(event_2.Data,take(Position.Before(Position.End)))
Assert.AreEqual(event_1.Data,take(Position.Before(Position.Before(Position.End))))
Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End)))))

[<Test>]
abstract member TestSubscribeFromBeforeEnd : unit->unit

[<Test>]
default x.TestSubscribeFromBeforeEnd() =
let connection = x.Connect()
let write = writer connection
let readFrom = readerFrom connection
let subscribeFrom = subscriberFrom connection
let event_0 = event("TestSubscribeFromBeforeEnd-0")
let event_1 = event("TestSubscribeFromBeforeEnd-1")
let event_2 = event("TestSubscribeFromBeforeEnd-2")
write event_0
write event_1
write event_2
let flush =
subscribeFrom Position.Start
|> Seq.take 3
let take position =
subscribeFrom position
|> Seq.map (fst >> Event.data)
|> Seq.head
Assert.AreEqual(event_2.Data,take(Position.Before(Position.End)))
Assert.AreEqual(event_1.Data,take(Position.Before(Position.Before(Position.End))))
Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End)))))

0 comments on commit 81c8ab8

Please sign in to comment.