Skip to content

Commit

Permalink
chore: apply comments from #26 and #27 to existing code in `SourceOps…
Browse files Browse the repository at this point in the history
….scala` (#28)

The following comments were applied:
* use supervised instead of scoped
* use toThrowable instead of ad-hoc exception creation
* don't catch exceptions in headOption
* also fixed comments about the source and receive function
  • Loading branch information
geminicaprograms authored Oct 30, 2023
1 parent 2cde6cd commit 4fc8379
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 42 deletions.
63 changes: 32 additions & 31 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ox.*
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -46,7 +45,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[String].intersperse(", ").toList // List()
* Source.fromValues("foo").intersperse(", ").toList // List(foo)
* Source.fromValues("foo", "bar").intersperse(", ").toList // List(foo, ", ", bar)
Expand All @@ -71,7 +70,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[String].intersperse("[", ", ", "]").toList // List([, ])
* Source.fromValues("foo").intersperse("[", ", ", "]").toList // List([, foo, ])
* Source.fromValues("foo", "bar").intersperse("[", ", ", "]").toList // List([, foo, ", ", bar, ])
Expand Down Expand Up @@ -210,7 +209,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].takeWhile(_ > 3).toList // List()
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
Expand All @@ -228,7 +227,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].drop(1).toList // List()
* Source.fromValues(1, 2, 3).drop(1).toList // List(2 ,3)
* Source.fromValues(1).drop(2).toList // List()
Expand Down Expand Up @@ -303,7 +302,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].zipAll(Source.empty[String], -1, "foo").toList // List()
* Source.empty[Int].zipAll(Source.fromValues("a"), -1, "foo").toList // List((-1, "a"))
* Source.fromValues(1).zipAll(Source.empty[String], -1, "foo").toList // List((1, "foo"))
Expand Down Expand Up @@ -355,7 +354,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7)
* val s2 = Source.fromValues(10, 20, 30, 40)
* s1.interleave(s2, segmentSize = 2).toList
Expand Down Expand Up @@ -433,7 +432,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s = Source.fromValues(1, 2, 3, 4, 5)
* s.mapStateful(() => 0)((sum, element) => (sum + element, sum), Some.apply)
* }
Expand Down Expand Up @@ -476,7 +475,7 @@ trait SourceOps[+T] { this: Source[T] =>
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s = Source.fromValues(1, 2, 2, 3, 2, 4, 3, 1, 5)
* // deduplicate the values
* s.mapStatefulConcat(() => Set.empty[Int])((s, e) => (s + e, Option.unless(s.contains(e))(e)))
Expand Down Expand Up @@ -515,56 +514,58 @@ trait SourceOps[+T] { this: Source[T] =>
}
c

/** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation.
* Note that `headOption` is not an idempotent operation on source as it receives elements from it.
/** Returns the first element from this source wrapped in [[Some]] or [[None]] when this source is empty. Note that `headOption` is not an
* idempotent operation on source as it receives elements from it.
*
* @return
* A `Some(first element)` if source is not empty or None` otherwise.
* A `Some(first element)` if source is not empty or `None` otherwise.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* Source.empty[Int].headOption() // None
* val s = Source.fromValues(1, 2)
* s.headOption() // Some(1)
* s.headOption() // Some(2)
* }
* }}}
*/
def headOption(): Option[T] = Try(head()).toOption
def headOption(): Option[T] =
supervised {
receive() match
case ChannelClosed.Done => None
case e: ChannelClosed.Error => throw e.toThrowable
case t: T @unchecked => Some(t)
}

/** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails
* without error. In case when the `receive()` operation fails with exception that exception is re-thrown. Note that `headOption` is not
* an idempotent operation on source as it receives elements from it.
/** Returns the first element from this source or throws [[NoSuchElementException]] when this source is empty. In case when receiving an
* element fails with exception then [[ChannelClosedException.Error]] is thrown. Note that `head` is not an idempotent operation on
* source as it receives elements from it.
*
* @return
* A first element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty or `receive()` failed without error.
* @throws exception
* When `receive()` failed with exception then this exception is re-thrown.
* When this source is empty.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from an empty source")
* supervised {
* Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head element from an empty source")
* val s = Source.fromValues(1, 2)
* s.head() // 1
* s.head() // 2
* }
* }}}
*/
def head(): T =
supervised {
receive() match
case ChannelClosed.Done => throw new NoSuchElementException("cannot obtain head from an empty source")
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
case t: T @unchecked => t
}
def head(): T = headOption().getOrElse(throw new NoSuchElementException("cannot obtain head element from an empty source"))

/** Sends elements to the returned channel limiting the throughput to specific number of elements (evenly spaced) per time unit. Note that
* the element's `receive()` time is included in the resulting throughput. For instance having `throttle(1, 1.second)` and `receive()`
Expand Down Expand Up @@ -645,7 +646,7 @@ trait SourceOps[+T] { this: Source[T] =>
* @return
* A last element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty.
* When this source is empty.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @example
Expand Down Expand Up @@ -822,7 +823,7 @@ trait SourceCompanionOps:
* import ox.*
* import ox.channels.Source
*
* scoped {
* supervised {
* val s1 = Source.fromValues(1, 2, 3, 4, 5, 6, 7, 8)
* val s2 = Source.fromValues(10, 20, 30)
* val s3 = Source.fromValues(100, 200, 300, 400, 500)
Expand Down
16 changes: 12 additions & 4 deletions core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValue
Source.empty[Int].headOption() shouldBe None
}

it should "return None for the failed source" in supervised {
Source
.failed(new RuntimeException("source is broken"))
.headOption() shouldBe None
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.headOption()
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source.failedWithoutReason[Int]().headOption()
}
}

it should "return Some element for the non-empty source" in supervised {
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/ox/channels/SourceOpsHeadTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ class SourceOpsHeadTest extends AnyFlatSpec with Matchers {
it should "throw NoSuchElementException for the empty source" in supervised {
the[NoSuchElementException] thrownBy {
Source.empty[Int].head()
} should have message "cannot obtain head from an empty source"
} should have message "cannot obtain head element from an empty source"
}

it should "re-throw exception that was thrown during element retrieval" in supervised {
the[RuntimeException] thrownBy {
it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.head()
} should have message "source is broken"
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw NoSuchElementException for source failed without exception" in supervised {
the[NoSuchElementException] thrownBy {
it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source.failedWithoutReason[Int]().head()
} should have message "getting head failed"
}
}

it should "return first value from non empty source" in supervised {
Expand Down

0 comments on commit 4fc8379

Please sign in to comment.