diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 47d7be8f..36b1fb8a 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -5,6 +5,7 @@ import ox.* import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success, Try} trait SourceOps[+T] { this: Source[T] => // view ops (lazy) @@ -513,6 +514,53 @@ trait SourceOps[+T] { this: Source[T] => } } c + + /** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation. + * + * @return + * A `Some(first element)` if source is not empty or None` otherwise. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].headOption() // None + * Source.fromValues(1, 2).headOption() // Some(1) + * } + * }}} + */ + def headOption()(using Ox): Option[T] = headTry().toOption + + /** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails + * without error. In case when the `receive()` operation fails with exception that exception is re-thrown. + * + * @return + * A first element if source is not empty or throws otherwise. + * @throws NoSuchElementException + * When source is empty or `receive()` failed without error. + * @throws exception + * When `receive()` failed with exception then this exception is re-thrown. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * scoped { + * Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from the empty source") + * Source.fromValues(1, 2).head() // 1 + * } + * }}} + */ + def head()(using Ox): T = headTry().get + + private def headTry()(using Ox): Try[T] = + supervised { + receive() match + case ChannelClosed.Done => Failure(new NoSuchElementException("cannot obtain head from the empty source")) + case ChannelClosed.Error(r) => Failure(r.getOrElse(new NoSuchElementException("getting head failed"))) + case t: T @unchecked => Success(t) + } } trait SourceCompanionOps: @@ -740,3 +788,13 @@ trait SourceCompanionOps: val c = DirectChannel[T]() c.error(t) c + + /** Creates a source that fails immediately + * + * @return + * A source that would fail immediately + */ + def failed[T](): Source[T] = + val c = DirectChannel[T]() + c.error(None) + c diff --git a/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala b/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala new file mode 100644 index 00000000..6c129c18 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala @@ -0,0 +1,22 @@ +package ox.channels +import org.scalatest.OptionValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* +class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValues { + behavior of "Source.headOption" + + it should "return None for the empty source" in supervised { + Source.empty[Int].headOption() shouldBe None + } + + it should "return None for the failed source" in supervised { + Source + .failed(new RuntimeException("source is broken")) + .headOption() shouldBe None + } + + it should "return Some element for the non-empty source" in supervised { + Source.fromValues(1, 2).headOption().value shouldBe 1 + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala b/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala new file mode 100644 index 00000000..b9ab02b6 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsHeadTest.scala @@ -0,0 +1,32 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* +class SourceOpsHeadTest extends AnyFlatSpec with Matchers { + behavior of "Source.head" + + it should "throw NoSuchElementException for the empty source" in supervised { + the[NoSuchElementException] thrownBy { + Source.empty[Int].head() + } should have message "cannot obtain head from the empty source" + } + + it should "re-throw exception that was thrown during element retrieval" in supervised { + the[RuntimeException] thrownBy { + Source + .failed(new RuntimeException("source is broken")) + .head() + } should have message "source is broken" + } + + it should "throw NoSuchElementException for source failed without exception" in supervised { + the[NoSuchElementException] thrownBy { + Source.failed[Int]().head() + } should have message "getting head failed" + } + + it should "return first value from non emptu source" in supervised { + Source.fromValues(1, 2).head() shouldBe 1 + } +}