Skip to content

Commit

Permalink
Merge pull request #26 from softwaremill/feat_lastOption_last
Browse files Browse the repository at this point in the history
feat: implement `last` and `lastOption` operators
  • Loading branch information
geminicaprograms authored Oct 30, 2023
2 parents 931591f + 4f1d858 commit 2cde6cd
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/src/main/scala/ox/channels/ChannelClosed.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ object ChannelClosed:
case class Error(reason: Option[Throwable]) extends ChannelClosed
case object Done extends ChannelClosed

enum ChannelClosedException(reason: Option[Throwable]) extends Exception:
enum ChannelClosedException(reason: Option[Throwable]) extends Exception(reason.orNull):
case Error(reason: Option[Throwable]) extends ChannelClosedException(reason)
case Done() extends ChannelClosedException(None)
57 changes: 57 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,63 @@ trait SourceOps[+T] { this: Source[T] =>
}
}
c

/** Returns the last element from this source wrapped in [[Some]] or [[None]] when this source is empty. Note that `lastOption` is a
* terminal operation leaving the source in [[ChannelClosed.Done]] state.
*
* @return
* A `Some(last element)` if source is not empty or `None` otherwise.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* supervised {
* Source.empty[Int].lastOption() // None
* val s = Source.fromValues(1, 2)
* s.lastOption() // Some(2)
* s.receive() // ChannelClosed.Done
* }
* }}}
*/
def lastOption(): Option[T] =
supervised {
var value: Option[T] = None
repeatWhile {
receive() match
case ChannelClosed.Done => false
case e: ChannelClosed.Error => throw e.toThrowable
case t: T @unchecked => value = Some(t); true
}
value
}

/** Returns the last element from this source or throws [[NoSuchElementException]] when this source is empty. In case when receiving an
* element fails then [[ChannelClosedException.Error]] exception is thrown. Note that `last` is a terminal operation leaving the source
* in [[ChannelClosed.Done]] state.
*
* @return
* A last element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* supervised {
* Source.empty[Int].last() // throws NoSuchElementException("cannot obtain last element from an empty source")
* val s = Source.fromValues(1, 2)
* s.last() // 2
* s.receive() // ChannelClosed.Done
* }
* }}}
*/
def last(): T = lastOption().getOrElse(throw new NoSuchElementException("cannot obtain last element from an empty source"))
}

trait SourceCompanionOps:
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsLastOptionTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ox.channels

import org.scalatest.OptionValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsLastOptionTest extends AnyFlatSpec with Matchers with OptionValues {
behavior of "SourceOps.lastOption"

it should "return None for the empty source" in supervised {
Source.empty[Int].lastOption() shouldBe None
}

it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.lastOption()
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source.failedWithoutReason[Int]().lastOption()
}
}

it should "return last element wrapped in Some for the non-empty source" in supervised {
Source.fromValues(1, 2).lastOption().value shouldBe 2
}

it should "drain the source" in supervised {
val s = Source.fromValues(1)
s.lastOption().value shouldBe 1
s.receive() shouldBe ChannelClosed.Done
}
}
39 changes: 39 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsLastTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsLastTest extends AnyFlatSpec with Matchers {
behavior of "SourceOps.last"

it should "throw NoSuchElementException for the empty source" in supervised {
the[NoSuchElementException] thrownBy {
Source.empty[Int].last()
} should have message "cannot obtain last element from an empty source"
}

it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.last()
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source.failedWithoutReason[Int]().last()
}
}

it should "return last element for the non-empty source" in supervised {
Source.fromValues(1, 2).last() shouldBe 2
}

it should "drain the source" in supervised {
val s = Source.fromValues(1)
s.last() shouldBe 1
s.receive() shouldBe ChannelClosed.Done
}
}

0 comments on commit 2cde6cd

Please sign in to comment.