diff --git a/core/src/main/scala/ox/channels/ChannelClosed.scala b/core/src/main/scala/ox/channels/ChannelClosed.scala index d9a79a3f..5638c8f6 100644 --- a/core/src/main/scala/ox/channels/ChannelClosed.scala +++ b/core/src/main/scala/ox/channels/ChannelClosed.scala @@ -8,6 +8,6 @@ object ChannelClosed: case class Error(reason: Option[Throwable]) extends ChannelClosed case object Done extends ChannelClosed -enum ChannelClosedException(reason: Option[Throwable]) extends Exception: +enum ChannelClosedException(reason: Option[Throwable]) extends Exception(reason.orNull): case Error(reason: Option[Throwable]) extends ChannelClosedException(reason) case Done() extends ChannelClosedException(None) diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index 7bde4707..05dd58f7 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -605,6 +605,63 @@ trait SourceOps[+T] { this: Source[T] => } } c + + /** Returns the last element from this source wrapped in [[Some]] or [[None]] when this source is empty. Note that `lastOption` is a + * terminal operation leaving the source in [[ChannelClosed.Done]] state. + * + * @return + * A `Some(last element)` if source is not empty or `None` otherwise. + * @throws ChannelClosedException.Error + * When receiving an element from this source fails. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * supervised { + * Source.empty[Int].lastOption() // None + * val s = Source.fromValues(1, 2) + * s.lastOption() // Some(2) + * s.receive() // ChannelClosed.Done + * } + * }}} + */ + def lastOption(): Option[T] = + supervised { + var value: Option[T] = None + repeatWhile { + receive() match + case ChannelClosed.Done => false + case e: ChannelClosed.Error => throw e.toThrowable + case t: T @unchecked => value = Some(t); true + } + value + } + + /** Returns the last element from this source or throws [[NoSuchElementException]] when this source is empty. In case when receiving an + * element fails then [[ChannelClosedException.Error]] exception is thrown. Note that `last` is a terminal operation leaving the source + * in [[ChannelClosed.Done]] state. + * + * @return + * A last element if source is not empty or throws otherwise. + * @throws NoSuchElementException + * When source is empty. + * @throws ChannelClosedException.Error + * When receiving an element from this source fails. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * supervised { + * Source.empty[Int].last() // throws NoSuchElementException("cannot obtain last element from an empty source") + * val s = Source.fromValues(1, 2) + * s.last() // 2 + * s.receive() // ChannelClosed.Done + * } + * }}} + */ + def last(): T = lastOption().getOrElse(throw new NoSuchElementException("cannot obtain last element from an empty source")) } trait SourceCompanionOps: diff --git a/core/src/test/scala/ox/channels/SourceOpsLastOptionTest.scala b/core/src/test/scala/ox/channels/SourceOpsLastOptionTest.scala new file mode 100644 index 00000000..c3b1006b --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsLastOptionTest.scala @@ -0,0 +1,38 @@ +package ox.channels + +import org.scalatest.OptionValues +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsLastOptionTest extends AnyFlatSpec with Matchers with OptionValues { + behavior of "SourceOps.lastOption" + + it should "return None for the empty source" in supervised { + Source.empty[Int].lastOption() shouldBe None + } + + it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source + .failed(new RuntimeException("source is broken")) + .lastOption() + } should have message "java.lang.RuntimeException: source is broken" + } + + it should "throw ChannelClosedException.Error for source failed without exception" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source.failedWithoutReason[Int]().lastOption() + } + } + + it should "return last element wrapped in Some for the non-empty source" in supervised { + Source.fromValues(1, 2).lastOption().value shouldBe 2 + } + + it should "drain the source" in supervised { + val s = Source.fromValues(1) + s.lastOption().value shouldBe 1 + s.receive() shouldBe ChannelClosed.Done + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsLastTest.scala b/core/src/test/scala/ox/channels/SourceOpsLastTest.scala new file mode 100644 index 00000000..5627b492 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsLastTest.scala @@ -0,0 +1,39 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsLastTest extends AnyFlatSpec with Matchers { + behavior of "SourceOps.last" + + it should "throw NoSuchElementException for the empty source" in supervised { + the[NoSuchElementException] thrownBy { + Source.empty[Int].last() + } should have message "cannot obtain last element from an empty source" + } + + it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source + .failed(new RuntimeException("source is broken")) + .last() + } should have message "java.lang.RuntimeException: source is broken" + } + + it should "throw ChannelClosedException.Error for source failed without exception" in supervised { + the[ChannelClosedException.Error] thrownBy { + Source.failedWithoutReason[Int]().last() + } + } + + it should "return last element for the non-empty source" in supervised { + Source.fromValues(1, 2).last() shouldBe 2 + } + + it should "drain the source" in supervised { + val s = Source.fromValues(1) + s.last() shouldBe 1 + s.receive() shouldBe ChannelClosed.Done + } +}