Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement head and headOption operators #25

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import ox.*
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -513,6 +514,57 @@ trait SourceOps[+T] { this: Source[T] =>
}
}
c

/** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation.
* Note that `headOption` is not an idempotent operation on source as it receives elements from it.
*
* @return
* A `Some(first element)` if source is not empty or None` otherwise.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].headOption() // None
* val s = Source.fromValues(1, 2)
* s.headOption() // Some(1)
* s.headOption() // Some(2)
* }
* }}}
*/
def headOption(): Option[T] = Try(head()).toOption

/** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails
* without error. In case when the `receive()` operation fails with exception that exception is re-thrown. Note that `headOption` is not
* an idempotent operation on source as it receives elements from it.
*
* @return
* A first element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty or `receive()` failed without error.
* @throws exception
* When `receive()` failed with exception then this exception is re-thrown.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from an empty source")
* val s = Source.fromValues(1, 2)
* s.head() // 1
* s.head() // 2
* }
* }}}
*/
def head(): T =
supervised {
receive() match
case ChannelClosed.Done => throw new NoSuchElementException("cannot obtain head from an empty source")
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
case t: T @unchecked => t
}
}

trait SourceCompanionOps:
Expand Down Expand Up @@ -740,3 +792,13 @@ trait SourceCompanionOps:
val c = DirectChannel[T]()
c.error(t)
c

/** Creates a source that fails immediately
*
* @return
* A source that would fail immediately
*/
private[channels] def failedWithoutReason[T](): Source[T] =
val c = DirectChannel[T]()
c.error(None)
c
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala
geminicaprograms marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.channels

import org.scalatest.OptionValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValues {
geminicaprograms marked this conversation as resolved.
Show resolved Hide resolved
behavior of "Source.headOption"

it should "return None for the empty source" in supervised {
Source.empty[Int].headOption() shouldBe None
}

it should "return None for the failed source" in supervised {
Source
.failed(new RuntimeException("source is broken"))
.headOption() shouldBe None
}

it should "return Some element for the non-empty source" in supervised {
Source.fromValues(1, 2).headOption().value shouldBe 1
}

it should "be not idempotent operation" in supervised {
val s = Source.fromValues(1, 2)
s.headOption().value shouldBe 1
s.headOption().value shouldBe 2
}
}
39 changes: 39 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsHeadTest.scala
geminicaprograms marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsHeadTest extends AnyFlatSpec with Matchers {
behavior of "Source.head"

it should "throw NoSuchElementException for the empty source" in supervised {
the[NoSuchElementException] thrownBy {
Source.empty[Int].head()
} should have message "cannot obtain head from an empty source"
}

it should "re-throw exception that was thrown during element retrieval" in supervised {
the[RuntimeException] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.head()
} should have message "source is broken"
}

it should "throw NoSuchElementException for source failed without exception" in supervised {
the[NoSuchElementException] thrownBy {
Source.failedWithoutReason[Int]().head()
} should have message "getting head failed"
}

it should "return first value from non empty source" in supervised {
Source.fromValues(1, 2).head() shouldBe 1
}

it should "be not idempotent operation" in supervised {
val s = Source.fromValues(1, 2)
s.head() shouldBe 1
s.head() shouldBe 2
}
}