From 16e477e7bdf808686f5416c9ee4a75fbe3f08c80 Mon Sep 17 00:00:00 2001 From: Jonathan Leaver Date: Sat, 12 Aug 2017 18:26:19 -0400 Subject: [PATCH 1/2] implement position lookups when End is used in the forward direction --- Nata.IO.EventStore.Tests/LogStoreTests.fs | 20 +++---- Nata.IO.EventStore/Stream.fs | 70 +++++++++++------------ Nata.IO.Tests/LogStoreTests.fs | 47 +++++++++++++++ 3 files changed, 89 insertions(+), 48 deletions(-) diff --git a/Nata.IO.EventStore.Tests/LogStoreTests.fs b/Nata.IO.EventStore.Tests/LogStoreTests.fs index 8f162aa..d2f2f8e 100644 --- a/Nata.IO.EventStore.Tests/LogStoreTests.fs +++ b/Nata.IO.EventStore.Tests/LogStoreTests.fs @@ -1,11 +1,11 @@ -namespace Nata.IO.EventStore.Tests - -open NUnit.Framework - -[] -type LogStoreTests() = - inherit Nata.IO.Tests.LogStoreTests() - - override x.Connect() = - Configuration.channel() +namespace Nata.IO.EventStore.Tests + +open NUnit.Framework + +[] +type LogStoreTests() = + inherit Nata.IO.Tests.LogStoreTests() + + override x.Connect() = + Configuration.channel() |> Configuration.connect() \ No newline at end of file diff --git a/Nata.IO.EventStore/Stream.fs b/Nata.IO.EventStore/Stream.fs index a571f7a..d5f6771 100644 --- a/Nata.IO.EventStore/Stream.fs +++ b/Nata.IO.EventStore/Stream.fs @@ -34,20 +34,31 @@ module Stream = |> Event.withIndex (resolvedEvent.Event.EventNumber |> int64), resolvedEvent.Event.EventNumber - let rec private indexOf = function - | Position.Start -> int64 StreamPosition.Start - | Position.Before x -> -1L + indexOf x + let rec index (connection : IEventStoreConnection) + (stream : string) + (size : int) + (direction : Direction) + (position : Position) : Index = + match position with | Position.At x -> x - | Position.After x -> 1L + indexOf x - | Position.End -> int64 StreamPosition.End - - let rec private read (connection : IEventStoreConnection) - (stream : string) - (size : int) - (direction : Direction) - (position : Position) : seq = + | Position.Before x -> (index connection stream size direction x) - 1L + | Position.After x -> (index connection stream size direction x) + 1L + | Position.Start -> (int64 StreamPosition.Start) + | Position.End when direction = Direction.Forward -> + read connection stream size Direction.Reverse position + |> Seq.tryPick Some + |> Option.map (snd >> (+) 1L) + |> Option.getValueOr (int64 StreamPosition.Start) + | Position.End -> (int64 StreamPosition.End) + + + and private read (connection : IEventStoreConnection) + (stream : string) + (size : int) + (direction : Direction) + (position : Position) : seq = seq { - let from = indexOf position + let from = index connection stream size direction position let sliceTask = match direction with | Direction.Forward -> connection.ReadStreamEventsForwardAsync(stream, from, size, true) @@ -67,10 +78,11 @@ module Stream = } - let rec private listen (connection : IEventStoreConnection) - (stream : string) - (position : Position) : seq = - let from = indexOf position + and private listen (connection : IEventStoreConnection) + (stream : string) + (size : int) + (position : Position) : seq = + let from = index connection stream size Direction.Forward position let queue = new BlockingCollection>(new ConcurrentQueue>()) let subscription = let settings = CatchUpSubscriptionSettings.Default @@ -96,7 +108,7 @@ module Stream = yield! traverse (index+1L) | None -> Thread.Sleep(10000) - yield! listen connection stream (Position.At last) + yield! listen connection stream size (Position.At last) } traverse from @@ -139,24 +151,6 @@ module Stream = raise exn | _ -> raise exn - let rec index (connection : IEventStoreConnection) - (stream : string) - (size : int) - (position : Position) : Index = - match position with - | Position.At x -> x - | Position.Before x -> (index connection stream size x) - 1L - | Position.After x -> (index connection stream size x) + 1L - | Position.Start -> - read connection stream size Direction.Forward position - |> Seq.tryPick Some - |> Option.map snd - |> Option.getValueOr (int64 StreamPosition.Start) - | Position.End -> - read connection stream size Direction.Reverse position - |> Seq.tryPick Some - |> Option.map (snd >> (+) 1L) - |> Option.getValueOr (int64 StreamPosition.Start) let compete (connection : IEventStoreConnection) (stream : string) @@ -196,7 +190,7 @@ module Stream = fun stream -> [ Capability.Indexer <| - index connection stream size + index connection stream size Direction.Forward Capability.Reader <| fun () -> read connection stream size Direction.Forward Position.Start |> Seq.map fst @@ -211,10 +205,10 @@ module Stream = write connection stream Capability.Subscriber <| fun () -> - listen connection stream Position.Start |> Seq.map fst + listen connection stream size Position.Start |> Seq.map fst Capability.SubscriberFrom <| - listen connection stream + listen connection stream size Capability.Competitor <| fun fn -> compete connection stream fn diff --git a/Nata.IO.Tests/LogStoreTests.fs b/Nata.IO.Tests/LogStoreTests.fs index 2e30ef3..23b0ed0 100644 --- a/Nata.IO.Tests/LogStoreTests.fs +++ b/Nata.IO.Tests/LogStoreTests.fs @@ -376,3 +376,50 @@ type LogStoreTests() as x = Assert.AreEqual(expectation, verification) | _ -> Assert.Ignore("Competitor, ReaderFrom or Writer is reported to be unsupported by this source.") + + [] + member x.TestReadFromBeforeEnd() = + let connection = x.Connect() + let write = writer connection + let readFrom = readerFrom connection + let subscribeFrom = subscriberFrom connection + let event_0 = event("TestReadFromBeforeEnd-0") + let event_1 = event("TestReadFromBeforeEnd-1") + let event_2 = event("TestReadFromBeforeEnd-2") + write event_0 + write event_1 + write event_2 + let flush = + subscribeFrom Position.Start + |> Seq.take 3 + let take position = + readFrom position + |> Seq.map (fst >> Event.data) + |> Seq.head + Assert.AreEqual([], List.ofSeq(readFrom(Position.End))) + Assert.AreEqual(event_2.Data,take(Position.Before(Position.End))) + Assert.AreEqual(event_1.Data,take(Position.Before(Position.Before(Position.End)))) + Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End))))) + + [] + member x.TestSubscribeFromBeforeEnd() = + let connection = x.Connect() + let write = writer connection + let readFrom = readerFrom connection + let subscribeFrom = subscriberFrom connection + let event_0 = event("TestSubscribeFromBeforeEnd-0") + let event_1 = event("TestSubscribeFromBeforeEnd-1") + let event_2 = event("TestSubscribeFromBeforeEnd-2") + write event_0 + write event_1 + write event_2 + let flush = + subscribeFrom Position.Start + |> Seq.take 3 + let take position = + subscribeFrom position + |> Seq.map (fst >> Event.data) + |> Seq.head + Assert.AreEqual(event_2.Data,take(Position.Before(Position.End))) + Assert.AreEqual(event_1.Data,take(Position.Before(Position.Before(Position.End)))) + Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End))))) From a1474bf5f0199b23f9ba932d6ad294e02d3297c8 Mon Sep 17 00:00:00 2001 From: Jonathan Leaver Date: Sat, 12 Aug 2017 19:27:47 -0400 Subject: [PATCH 2/2] make it explicit that KafkaNet, EventHub and Kafunk do not support "Before End" style positions --- Nata.IO.EventHub.Tests/HubPartitionTests.fs | 7 ++++++- Nata.IO.EventHub.Tests/HubTests.fs | 7 ++++++- Nata.IO.EventStore.Tests/LogStoreTests.fs | 2 +- Nata.IO.KafkaNet.Tests/TopicTests.fs | 7 ++++++- Nata.IO.Kafunk.Tests/TopicTests.fs | 5 +++++ Nata.IO.Tests/LogStoreTests.fs | 11 ++++++++--- 6 files changed, 32 insertions(+), 7 deletions(-) diff --git a/Nata.IO.EventHub.Tests/HubPartitionTests.fs b/Nata.IO.EventHub.Tests/HubPartitionTests.fs index 8d3917c..f2902b6 100644 --- a/Nata.IO.EventHub.Tests/HubPartitionTests.fs +++ b/Nata.IO.EventHub.Tests/HubPartitionTests.fs @@ -124,4 +124,9 @@ type HubPartitionTests() = |> Seq.filter ((=) unexpected.Data) |> Seq.toList - Assert.AreEqual([], results) \ No newline at end of file + Assert.AreEqual([], results) + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.EventHub.Tests/HubTests.fs b/Nata.IO.EventHub.Tests/HubTests.fs index baae0ea..bd40075 100644 --- a/Nata.IO.EventHub.Tests/HubTests.fs +++ b/Nata.IO.EventHub.Tests/HubTests.fs @@ -77,4 +77,9 @@ type HubTests() = |> Seq.map Event.data |> Seq.toList - Assert.AreEqual([ event.Data ], results) \ No newline at end of file + Assert.AreEqual([ event.Data ], results) + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.EventStore.Tests/LogStoreTests.fs b/Nata.IO.EventStore.Tests/LogStoreTests.fs index d2f2f8e..6894f0a 100644 --- a/Nata.IO.EventStore.Tests/LogStoreTests.fs +++ b/Nata.IO.EventStore.Tests/LogStoreTests.fs @@ -8,4 +8,4 @@ type LogStoreTests() = override x.Connect() = Configuration.channel() - |> Configuration.connect() \ No newline at end of file + |> Configuration.connect() diff --git a/Nata.IO.KafkaNet.Tests/TopicTests.fs b/Nata.IO.KafkaNet.Tests/TopicTests.fs index dc6e945..19c7a90 100644 --- a/Nata.IO.KafkaNet.Tests/TopicTests.fs +++ b/Nata.IO.KafkaNet.Tests/TopicTests.fs @@ -26,4 +26,9 @@ type TopicTests() = override x.Connect() = Topic.connect cluster |> Source.mapIndex (Offsets.Codec.OffsetsToInt64 0) - <| guid() \ No newline at end of file + <| guid() + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.Kafunk.Tests/TopicTests.fs b/Nata.IO.Kafunk.Tests/TopicTests.fs index 7fa8fb2..e9923dc 100644 --- a/Nata.IO.Kafunk.Tests/TopicTests.fs +++ b/Nata.IO.Kafunk.Tests/TopicTests.fs @@ -52,3 +52,8 @@ type TopicTests() = |> Seq.toList Assert.AreEqual(expected, actual) + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.Tests/LogStoreTests.fs b/Nata.IO.Tests/LogStoreTests.fs index 23b0ed0..0af5bbf 100644 --- a/Nata.IO.Tests/LogStoreTests.fs +++ b/Nata.IO.Tests/LogStoreTests.fs @@ -376,9 +376,11 @@ type LogStoreTests() as x = Assert.AreEqual(expectation, verification) | _ -> Assert.Ignore("Competitor, ReaderFrom or Writer is reported to be unsupported by this source.") - + [] - member x.TestReadFromBeforeEnd() = + abstract member TestReadFromBeforeEnd : unit->unit + + default x.TestReadFromBeforeEnd() = let connection = x.Connect() let write = writer connection let readFrom = readerFrom connection @@ -402,7 +404,10 @@ type LogStoreTests() as x = Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End))))) [] - member x.TestSubscribeFromBeforeEnd() = + abstract member TestSubscribeFromBeforeEnd : unit->unit + + [] + default x.TestSubscribeFromBeforeEnd() = let connection = x.Connect() let write = writer connection let readFrom = readerFrom connection