diff --git a/Nata.IO.EventHub.Tests/HubPartitionTests.fs b/Nata.IO.EventHub.Tests/HubPartitionTests.fs index 8d3917c..f2902b6 100644 --- a/Nata.IO.EventHub.Tests/HubPartitionTests.fs +++ b/Nata.IO.EventHub.Tests/HubPartitionTests.fs @@ -124,4 +124,9 @@ type HubPartitionTests() = |> Seq.filter ((=) unexpected.Data) |> Seq.toList - Assert.AreEqual([], results) \ No newline at end of file + Assert.AreEqual([], results) + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.EventHub.Tests/HubTests.fs b/Nata.IO.EventHub.Tests/HubTests.fs index baae0ea..bd40075 100644 --- a/Nata.IO.EventHub.Tests/HubTests.fs +++ b/Nata.IO.EventHub.Tests/HubTests.fs @@ -77,4 +77,9 @@ type HubTests() = |> Seq.map Event.data |> Seq.toList - Assert.AreEqual([ event.Data ], results) \ No newline at end of file + Assert.AreEqual([ event.Data ], results) + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.EventStore.Tests/LogStoreTests.fs b/Nata.IO.EventStore.Tests/LogStoreTests.fs index 8f162aa..6894f0a 100644 --- a/Nata.IO.EventStore.Tests/LogStoreTests.fs +++ b/Nata.IO.EventStore.Tests/LogStoreTests.fs @@ -1,11 +1,11 @@ -namespace Nata.IO.EventStore.Tests - -open NUnit.Framework - -[] -type LogStoreTests() = - inherit Nata.IO.Tests.LogStoreTests() - - override x.Connect() = - Configuration.channel() - |> Configuration.connect() \ No newline at end of file +namespace Nata.IO.EventStore.Tests + +open NUnit.Framework + +[] +type LogStoreTests() = + inherit Nata.IO.Tests.LogStoreTests() + + override x.Connect() = + Configuration.channel() + |> Configuration.connect() diff --git a/Nata.IO.EventStore/Stream.fs b/Nata.IO.EventStore/Stream.fs index a571f7a..d5f6771 100644 --- a/Nata.IO.EventStore/Stream.fs +++ b/Nata.IO.EventStore/Stream.fs @@ -34,20 +34,31 @@ module Stream = |> Event.withIndex (resolvedEvent.Event.EventNumber |> int64), resolvedEvent.Event.EventNumber - let rec private indexOf = function - | Position.Start -> int64 StreamPosition.Start - | Position.Before x -> -1L + indexOf x + let rec index (connection : IEventStoreConnection) + (stream : string) + (size : int) + (direction : Direction) + (position : Position) : Index = + match position with | Position.At x -> x - | Position.After x -> 1L + indexOf x - | Position.End -> int64 StreamPosition.End - - let rec private read (connection : IEventStoreConnection) - (stream : string) - (size : int) - (direction : Direction) - (position : Position) : seq = + | Position.Before x -> (index connection stream size direction x) - 1L + | Position.After x -> (index connection stream size direction x) + 1L + | Position.Start -> (int64 StreamPosition.Start) + | Position.End when direction = Direction.Forward -> + read connection stream size Direction.Reverse position + |> Seq.tryPick Some + |> Option.map (snd >> (+) 1L) + |> Option.getValueOr (int64 StreamPosition.Start) + | Position.End -> (int64 StreamPosition.End) + + + and private read (connection : IEventStoreConnection) + (stream : string) + (size : int) + (direction : Direction) + (position : Position) : seq = seq { - let from = indexOf position + let from = index connection stream size direction position let sliceTask = match direction with | Direction.Forward -> connection.ReadStreamEventsForwardAsync(stream, from, size, true) @@ -67,10 +78,11 @@ module Stream = } - let rec private listen (connection : IEventStoreConnection) - (stream : string) - (position : Position) : seq = - let from = indexOf position + and private listen (connection : IEventStoreConnection) + (stream : string) + (size : int) + (position : Position) : seq = + let from = index connection stream size Direction.Forward position let queue = new BlockingCollection>(new ConcurrentQueue>()) let subscription = let settings = CatchUpSubscriptionSettings.Default @@ -96,7 +108,7 @@ module Stream = yield! traverse (index+1L) | None -> Thread.Sleep(10000) - yield! listen connection stream (Position.At last) + yield! listen connection stream size (Position.At last) } traverse from @@ -139,24 +151,6 @@ module Stream = raise exn | _ -> raise exn - let rec index (connection : IEventStoreConnection) - (stream : string) - (size : int) - (position : Position) : Index = - match position with - | Position.At x -> x - | Position.Before x -> (index connection stream size x) - 1L - | Position.After x -> (index connection stream size x) + 1L - | Position.Start -> - read connection stream size Direction.Forward position - |> Seq.tryPick Some - |> Option.map snd - |> Option.getValueOr (int64 StreamPosition.Start) - | Position.End -> - read connection stream size Direction.Reverse position - |> Seq.tryPick Some - |> Option.map (snd >> (+) 1L) - |> Option.getValueOr (int64 StreamPosition.Start) let compete (connection : IEventStoreConnection) (stream : string) @@ -196,7 +190,7 @@ module Stream = fun stream -> [ Capability.Indexer <| - index connection stream size + index connection stream size Direction.Forward Capability.Reader <| fun () -> read connection stream size Direction.Forward Position.Start |> Seq.map fst @@ -211,10 +205,10 @@ module Stream = write connection stream Capability.Subscriber <| fun () -> - listen connection stream Position.Start |> Seq.map fst + listen connection stream size Position.Start |> Seq.map fst Capability.SubscriberFrom <| - listen connection stream + listen connection stream size Capability.Competitor <| fun fn -> compete connection stream fn diff --git a/Nata.IO.KafkaNet.Tests/TopicTests.fs b/Nata.IO.KafkaNet.Tests/TopicTests.fs index dc6e945..19c7a90 100644 --- a/Nata.IO.KafkaNet.Tests/TopicTests.fs +++ b/Nata.IO.KafkaNet.Tests/TopicTests.fs @@ -26,4 +26,9 @@ type TopicTests() = override x.Connect() = Topic.connect cluster |> Source.mapIndex (Offsets.Codec.OffsetsToInt64 0) - <| guid() \ No newline at end of file + <| guid() + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.Kafunk.Tests/TopicTests.fs b/Nata.IO.Kafunk.Tests/TopicTests.fs index 7fa8fb2..e9923dc 100644 --- a/Nata.IO.Kafunk.Tests/TopicTests.fs +++ b/Nata.IO.Kafunk.Tests/TopicTests.fs @@ -52,3 +52,8 @@ type TopicTests() = |> Seq.toList Assert.AreEqual(expected, actual) + + [] + override x.TestReadFromBeforeEnd() = Assert.Ignore("Not yet supported.") + [] + override x.TestSubscribeFromBeforeEnd() = Assert.Ignore("Not yet supported.") \ No newline at end of file diff --git a/Nata.IO.Tests/LogStoreTests.fs b/Nata.IO.Tests/LogStoreTests.fs index 2e30ef3..0af5bbf 100644 --- a/Nata.IO.Tests/LogStoreTests.fs +++ b/Nata.IO.Tests/LogStoreTests.fs @@ -376,3 +376,55 @@ type LogStoreTests() as x = Assert.AreEqual(expectation, verification) | _ -> Assert.Ignore("Competitor, ReaderFrom or Writer is reported to be unsupported by this source.") + + [] + abstract member TestReadFromBeforeEnd : unit->unit + + default x.TestReadFromBeforeEnd() = + let connection = x.Connect() + let write = writer connection + let readFrom = readerFrom connection + let subscribeFrom = subscriberFrom connection + let event_0 = event("TestReadFromBeforeEnd-0") + let event_1 = event("TestReadFromBeforeEnd-1") + let event_2 = event("TestReadFromBeforeEnd-2") + write event_0 + write event_1 + write event_2 + let flush = + subscribeFrom Position.Start + |> Seq.take 3 + let take position = + readFrom position + |> Seq.map (fst >> Event.data) + |> Seq.head + Assert.AreEqual([], List.ofSeq(readFrom(Position.End))) + Assert.AreEqual(event_2.Data,take(Position.Before(Position.End))) + Assert.AreEqual(event_1.Data,take(Position.Before(Position.Before(Position.End)))) + Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End))))) + + [] + abstract member TestSubscribeFromBeforeEnd : unit->unit + + [] + default x.TestSubscribeFromBeforeEnd() = + let connection = x.Connect() + let write = writer connection + let readFrom = readerFrom connection + let subscribeFrom = subscriberFrom connection + let event_0 = event("TestSubscribeFromBeforeEnd-0") + let event_1 = event("TestSubscribeFromBeforeEnd-1") + let event_2 = event("TestSubscribeFromBeforeEnd-2") + write event_0 + write event_1 + write event_2 + let flush = + subscribeFrom Position.Start + |> Seq.take 3 + let take position = + subscribeFrom position + |> Seq.map (fst >> Event.data) + |> Seq.head + Assert.AreEqual(event_2.Data,take(Position.Before(Position.End))) + Assert.AreEqual(event_1.Data,take(Position.Before(Position.Before(Position.End)))) + Assert.AreEqual(event_0.Data,take(Position.Before(Position.Before(Position.Before(Position.End)))))