From a99f414a1092d73503ac30212d9d53531564bcc4 Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Wed, 25 Oct 2023 17:42:23 +0200 Subject: [PATCH] feat: implement `head` and `headOption` operators The `headOption` operator returns the first element in `Source` wrapped in `Some` or `None` in case when source is empty or failed e.g.: Source.empty[Int].headOption() // None Source.fromValues(1, 2).headOption() // Some(1) The `head` operator returns the first element in `Source` or throws `NoSuchElementException` in case when it is either empty or `receive()` operation fails without error. In case when `receive()` fails with exception then this exception is re-thrown e.g.: Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from an empty source") Source.fromValues(1, 2).head() // 1 Note that neither `head` nor `headOption` are idempotent operations. --- .../main/scala/ox/channels/SourceOps.scala | 62 +++++++++++++++++++ .../ox/channels/SourceOpsHeadOptionTest.scala | 30 +++++++++ .../scala/ox/channels/SourceOpsHeadTest.scala | 39 ++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala create mode 100644 core/src/test/scala/ox/channels/SourceOpsHeadTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 47d7be8f..c2a62240 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -5,6 +5,7 @@ import ox.* import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} import scala.concurrent.duration.FiniteDuration +import scala.util.Try trait SourceOps[+T] { this: Source[T] => // view ops (lazy) @@ -513,6 +514,57 @@ trait SourceOps[+T] { this: Source[T] => } } c + + /** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation. + * Note that `headOption` is not an idempotent operation on source as it receives elements from it. + * + * @return + * A `Some(first element)` if source is not empty or None` otherwise. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].headOption() // None + * val s = Source.fromValues(1, 2) + * s.headOption() // Some(1) + * s.headOption() // Some(2) + * } + * }}} + */ + def headOption(): Option[T] = Try(head()).toOption + + /** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails + * without error. In case when the `receive()` operation fails with exception that exception is re-thrown. Note that `headOption` is not + * an idempotent operation on source as it receives elements from it. + * + * @return + * A first element if source is not empty or throws otherwise. + * @throws NoSuchElementException + * When source is empty or `receive()` failed without error. + * @throws exception + * When `receive()` failed with exception then this exception is re-thrown. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from an empty source") + * val s = Source.fromValues(1, 2) + * s.head() // 1 + * s.head() // 2 + * } + * }}} + */ + def head(): T = + supervised { + receive() match + case ChannelClosed.Done => throw new NoSuchElementException("cannot obtain head from an empty source") + case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed")) + case t: T @unchecked => t + } } trait SourceCompanionOps: @@ -740,3 +792,13 @@ trait SourceCompanionOps: val c = DirectChannel[T]() c.error(t) c + + /** Creates a source that fails immediately + * + * @return + * A source that would fail immediately + */ + private[channels] def failedWithoutReason[T](): Source[T] = + val c = DirectChannel[T]() + c.error(None) + c diff --git a/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala b/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala new file mode 100644 index 00000000..36f2278d --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala @@ -0,0 +1,30 @@ +package ox.channels + +import org.scalatest.OptionValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValues { + behavior of "Source.headOption" + + it should "return None for the empty source" in supervised { + Source.empty[Int].headOption() shouldBe None + } + + it should "return None for the failed source" in supervised { + Source + .failed(new RuntimeException("source is broken")) + .headOption() shouldBe None + } + + it should "return Some element for the non-empty source" in supervised { + Source.fromValues(1, 2).headOption().value shouldBe 1 + } + + it should "be not idempotent operation" in supervised { + val s = Source.fromValues(1, 2) + s.headOption().value shouldBe 1 + s.headOption().value shouldBe 2 + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala b/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala new file mode 100644 index 00000000..0c3015a5 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala @@ -0,0 +1,39 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsHeadTest extends AnyFlatSpec with Matchers { + behavior of "Source.head" + + it should "throw NoSuchElementException for the empty source" in supervised { + the[NoSuchElementException] thrownBy { + Source.empty[Int].head() + } should have message "cannot obtain head from an empty source" + } + + it should "re-throw exception that was thrown during element retrieval" in supervised { + the[RuntimeException] thrownBy { + Source + .failed(new RuntimeException("source is broken")) + .head() + } should have message "source is broken" + } + + it should "throw NoSuchElementException for source failed without exception" in supervised { + the[NoSuchElementException] thrownBy { + Source.failedWithoutReason[Int]().head() + } should have message "getting head failed" + } + + it should "return first value from non empty source" in supervised { + Source.fromValues(1, 2).head() shouldBe 1 + } + + it should "be not idempotent operation" in supervised { + val s = Source.fromValues(1, 2) + s.head() shouldBe 1 + s.head() shouldBe 2 + } +}