From 0df1526946ab0cb3307613d0fd173ff33da95f14 Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Wed, 25 Oct 2023 17:42:23 +0200 Subject: [PATCH] feat: implement `head` and `headOption` operators The `headOption` operator returns the first element in `Source` wrapped in `Some` or `None` in case when source is empty or failed e.g.: Source.empty[Int].headOption() // None Source.fromValues(1, 2).headOption() // Some(1) The `head` operator returns the first element in `Source` or throws `NoSuchElementException` in case when it is either empty or `receive()` operation fails without error. In case when `receive()` fails with exception then this exception is re-thrown e.g.: Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from the empty source") Source.fromValues(1, 2).head() // 1 Note that neither `head` nor `headOption` are idempotent operations. --- .../main/scala/ox/channels/SourceOps.scala | 124 +++++++++++++----- .../ox/channels/SourceOpsHeadOptionTest.scala | 30 +++++ .../scala/ox/channels/SourceOpsHeadTest.scala | 39 ++++++ 3 files changed, 162 insertions(+), 31 deletions(-) create mode 100644 core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala create mode 100644 core/src/test/scala/ox/channels/SourceOpsHeadTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 47d7be8f..f6cf5b37 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -5,6 +5,7 @@ import ox.* import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} import scala.concurrent.duration.FiniteDuration +import scala.util.Try trait SourceOps[+T] { this: Source[T] => // view ops (lazy) @@ -55,6 +56,21 @@ trait SourceOps[+T] { this: Source[T] => def intersperse[U >: T](inject: U)(using Ox, StageCapacity): Source[U] = intersperse(None, inject, None) + private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] = + val c = StageCapacity.newChannel[U] + forkDaemon { + start.foreach(c.send) + var firstEmitted = false + repeatWhile { + receive() match + case ChannelClosed.Done => end.foreach(c.send); c.done(); false + case ChannelClosed.Error(e) => c.error(e); false + case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v); true + case v: U @unchecked => c.send(inject); c.send(v); true + } + } + c + /** Intersperses this source with start, end and provided elements and forwards it to the returned channel. * * @param start @@ -80,21 +96,6 @@ trait SourceOps[+T] { this: Source[T] => def intersperse[U >: T](start: U, inject: U, end: U)(using Ox, StageCapacity): Source[U] = intersperse(Some(start), inject, Some(end)) - private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] = - val c = StageCapacity.newChannel[U] - forkDaemon { - start.foreach(c.send) - var firstEmitted = false - repeatWhile { - receive() match - case ChannelClosed.Done => end.foreach(c.send); c.done(); false - case ChannelClosed.Error(e) => c.error(e); false - case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v); true - case v: U @unchecked => c.send(inject); c.send(v); true - } - } - c - /** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At * most `parallelism` invocations of `f` are run in parallel. * @@ -366,6 +367,15 @@ trait SourceOps[+T] { this: Source[T] => def interleave[U >: T](other: Source[U], segmentSize: Int = 1, eagerComplete: Boolean = false)(using Ox, StageCapacity): Source[U] = Source.interleaveAll(List(this, other), segmentSize, eagerComplete) + /** Accumulates all elements received from the channel into a list. Blocks until the channel is done. + * @throws ChannelClosedException + * when there is an upstream error. + */ + def toList: List[T] = + val b = List.newBuilder[T] + foreach(b += _) + b.result() + /** Invokes the given function for each received element. Blocks until the channel is done. * @throws ChannelClosedException * when there is an upstream error. @@ -378,15 +388,6 @@ trait SourceOps[+T] { this: Source[T] => case t: T @unchecked => f(t); true } - /** Accumulates all elements received from the channel into a list. Blocks until the channel is done. - * @throws ChannelClosedException - * when there is an upstream error. - */ - def toList: List[T] = - val b = List.newBuilder[T] - foreach(b += _) - b.result() - /** Passes each received element from this channel to the given sink. Blocks until the channel is done. * @throws ChannelClosedException * when there is an upstream error, or when the sink is closed. @@ -513,13 +514,62 @@ trait SourceOps[+T] { this: Source[T] => } } c + + /** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation. + * Note that `headOption` is not an idempotent operation on source as it receives elements from it. + * + * @return + * A `Some(first element)` if source is not empty or None` otherwise. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].headOption() // None + * val s = Source.fromValues(1, 2) + * s.headOption() // Some(1) + * s.headOption() // Some(2) + * } + * }}} + */ + def headOption(): Option[T] = Try(head()).toOption + + /** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails + * without error. In case when the `receive()` operation fails with exception that exception is re-thrown. Note that `headOption` is not + * an idempotent operation on source as it receives elements from it. + * + * @return + * A first element if source is not empty or throws otherwise. + * @throws NoSuchElementException + * When source is empty or `receive()` failed without error. + * @throws exception + * When `receive()` failed with exception then this exception is re-thrown. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from the empty source") + * val s = Source.fromValues(1, 2) + * s.head() // 1 + * s.head() // 2 + * } + * }}} + */ + def head(): T = + supervised { + receive() match + case ChannelClosed.Done => throw new NoSuchElementException("cannot obtain head from an empty source") + case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed")) + case t: T @unchecked => t + } } trait SourceCompanionOps: def fromIterable[T](it: Iterable[T])(using Ox, StageCapacity): Source[T] = fromIterator(it.iterator) - def fromValues[T](ts: T*)(using Ox, StageCapacity): Source[T] = fromIterator(ts.iterator) - def fromIterator[T](it: => Iterator[T])(using Ox, StageCapacity): Source[T] = val c = StageCapacity.newChannel[T] forkDaemon { @@ -531,6 +581,8 @@ trait SourceCompanionOps: } c + def fromValues[T](ts: T*)(using Ox, StageCapacity): Source[T] = fromIterator(ts.iterator) + def fromFork[T](f: Fork[T])(using Ox, StageCapacity): Source[T] = val c = StageCapacity.newChannel[T] forkDaemon { @@ -643,11 +695,6 @@ trait SourceCompanionOps: } c - def empty[T]: Source[T] = - val c = DirectChannel() - c.done() - c - /** Sends a given number of elements (determined byc `segmentSize`) from each source in `sources` to the returned channel and repeats. The * order of elements in all sources is preserved. * @@ -729,6 +776,11 @@ trait SourceCompanionOps: } c + def empty[T]: Source[T] = + val c = DirectChannel() + c.done() + c + /** Creates a source that fails immediately with the given [[java.lang.Throwable]] * * @param t @@ -740,3 +792,13 @@ trait SourceCompanionOps: val c = DirectChannel[T]() c.error(t) c + + /** Creates a source that fails immediately + * + * @return + * A source that would fail immediately + */ + def failed[T](): Source[T] = + val c = DirectChannel[T]() + c.error(None) + c diff --git a/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala b/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala new file mode 100644 index 00000000..36f2278d --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala @@ -0,0 +1,30 @@ +package ox.channels + +import org.scalatest.OptionValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValues { + behavior of "Source.headOption" + + it should "return None for the empty source" in supervised { + Source.empty[Int].headOption() shouldBe None + } + + it should "return None for the failed source" in supervised { + Source + .failed(new RuntimeException("source is broken")) + .headOption() shouldBe None + } + + it should "return Some element for the non-empty source" in supervised { + Source.fromValues(1, 2).headOption().value shouldBe 1 + } + + it should "be not idempotent operation" in supervised { + val s = Source.fromValues(1, 2) + s.headOption().value shouldBe 1 + s.headOption().value shouldBe 2 + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala b/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala new file mode 100644 index 00000000..db048700 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala @@ -0,0 +1,39 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsHeadTest extends AnyFlatSpec with Matchers { + behavior of "Source.head" + + it should "throw NoSuchElementException for the empty source" in supervised { + the[NoSuchElementException] thrownBy { + Source.empty[Int].head() + } should have message "cannot obtain head from an empty source" + } + + it should "re-throw exception that was thrown during element retrieval" in supervised { + the[RuntimeException] thrownBy { + Source + .failed(new RuntimeException("source is broken")) + .head() + } should have message "source is broken" + } + + it should "throw NoSuchElementException for source failed without exception" in supervised { + the[NoSuchElementException] thrownBy { + Source.failed[Int]().head() + } should have message "getting head failed" + } + + it should "return first value from non empty source" in supervised { + Source.fromValues(1, 2).head() shouldBe 1 + } + + it should "be not idempotent operation" in supervised { + val s = Source.fromValues(1, 2) + s.head() shouldBe 1 + s.head() shouldBe 2 + } +}