Skip to content

Commit

Permalink
feat: implement head and headOption operators
Browse files Browse the repository at this point in the history
The `headOption` operator returns the first element in `Source` wrapped
in `Some` or `None` in case when source is empty or failed e.g.:

  Source.empty[Int].headOption()       // None
  Source.fromValues(1, 2).headOption() // Some(1)

The `head` operator returns the first element in `Source` or throws
`NoSuchElementException` in case when it is either empty or `receive()`
operation fails without error. In case when `receive()` fails with
exception then this exception is re-thrown e.g.:

  Source.empty[Int].head()       // throws NoSuchElementException("cannot obtain head from the empty source")
  Source.fromValues(1, 2).head() // 1

Note that neither `head` nor `headOption` are idempotent operations.
  • Loading branch information
geminicaprograms committed Oct 26, 2023
1 parent 7d7897a commit 0df1526
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 31 deletions.
124 changes: 93 additions & 31 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import ox.*
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -55,6 +56,21 @@ trait SourceOps[+T] { this: Source[T] =>
def intersperse[U >: T](inject: U)(using Ox, StageCapacity): Source[U] =
intersperse(None, inject, None)

private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
forkDaemon {
start.foreach(c.send)
var firstEmitted = false
repeatWhile {
receive() match
case ChannelClosed.Done => end.foreach(c.send); c.done(); false
case ChannelClosed.Error(e) => c.error(e); false
case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v); true
case v: U @unchecked => c.send(inject); c.send(v); true
}
}
c

/** Intersperses this source with start, end and provided elements and forwards it to the returned channel.
*
* @param start
Expand All @@ -80,21 +96,6 @@ trait SourceOps[+T] { this: Source[T] =>
def intersperse[U >: T](start: U, inject: U, end: U)(using Ox, StageCapacity): Source[U] =
intersperse(Some(start), inject, Some(end))

private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
forkDaemon {
start.foreach(c.send)
var firstEmitted = false
repeatWhile {
receive() match
case ChannelClosed.Done => end.foreach(c.send); c.done(); false
case ChannelClosed.Error(e) => c.error(e); false
case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v); true
case v: U @unchecked => c.send(inject); c.send(v); true
}
}
c

/** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At
* most `parallelism` invocations of `f` are run in parallel.
*
Expand Down Expand Up @@ -366,6 +367,15 @@ trait SourceOps[+T] { this: Source[T] =>
def interleave[U >: T](other: Source[U], segmentSize: Int = 1, eagerComplete: Boolean = false)(using Ox, StageCapacity): Source[U] =
Source.interleaveAll(List(this, other), segmentSize, eagerComplete)

/** Accumulates all elements received from the channel into a list. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error.
*/
def toList: List[T] =
val b = List.newBuilder[T]
foreach(b += _)
b.result()

/** Invokes the given function for each received element. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error.
Expand All @@ -378,15 +388,6 @@ trait SourceOps[+T] { this: Source[T] =>
case t: T @unchecked => f(t); true
}

/** Accumulates all elements received from the channel into a list. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error.
*/
def toList: List[T] =
val b = List.newBuilder[T]
foreach(b += _)
b.result()

/** Passes each received element from this channel to the given sink. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error, or when the sink is closed.
Expand Down Expand Up @@ -513,13 +514,62 @@ trait SourceOps[+T] { this: Source[T] =>
}
}
c

/** Returns the first element from this source wrapped in `Some` or `None` when the source is empty or fails during the receive operation.
* Note that `headOption` is not an idempotent operation on source as it receives elements from it.
*
* @return
* A `Some(first element)` if source is not empty or None` otherwise.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].headOption() // None
* val s = Source.fromValues(1, 2)
* s.headOption() // Some(1)
* s.headOption() // Some(2)
* }
* }}}
*/
def headOption(): Option[T] = Try(head()).toOption

/** Returns the first element from this source or throws `NoSuchElementException` when the source is empty or `receive()` operation fails
* without error. In case when the `receive()` operation fails with exception that exception is re-thrown. Note that `headOption` is not
* an idempotent operation on source as it receives elements from it.
*
* @return
* A first element if source is not empty or throws otherwise.
* @throws NoSuchElementException
* When source is empty or `receive()` failed without error.
* @throws exception
* When `receive()` failed with exception then this exception is re-thrown.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].head() // throws NoSuchElementException("cannot obtain head from the empty source")
* val s = Source.fromValues(1, 2)
* s.head() // 1
* s.head() // 2
* }
* }}}
*/
def head(): T =
supervised {
receive() match
case ChannelClosed.Done => throw new NoSuchElementException("cannot obtain head from an empty source")
case ChannelClosed.Error(r) => throw r.getOrElse(new NoSuchElementException("getting head failed"))
case t: T @unchecked => t
}
}

trait SourceCompanionOps:
def fromIterable[T](it: Iterable[T])(using Ox, StageCapacity): Source[T] = fromIterator(it.iterator)

def fromValues[T](ts: T*)(using Ox, StageCapacity): Source[T] = fromIterator(ts.iterator)

def fromIterator[T](it: => Iterator[T])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
forkDaemon {
Expand All @@ -531,6 +581,8 @@ trait SourceCompanionOps:
}
c

def fromValues[T](ts: T*)(using Ox, StageCapacity): Source[T] = fromIterator(ts.iterator)

def fromFork[T](f: Fork[T])(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
forkDaemon {
Expand Down Expand Up @@ -643,11 +695,6 @@ trait SourceCompanionOps:
}
c

def empty[T]: Source[T] =
val c = DirectChannel()
c.done()
c

/** Sends a given number of elements (determined byc `segmentSize`) from each source in `sources` to the returned channel and repeats. The
* order of elements in all sources is preserved.
*
Expand Down Expand Up @@ -729,6 +776,11 @@ trait SourceCompanionOps:
}
c

def empty[T]: Source[T] =
val c = DirectChannel()
c.done()
c

/** Creates a source that fails immediately with the given [[java.lang.Throwable]]
*
* @param t
Expand All @@ -740,3 +792,13 @@ trait SourceCompanionOps:
val c = DirectChannel[T]()
c.error(t)
c

/** Creates a source that fails immediately
*
* @return
* A source that would fail immediately
*/
def failed[T](): Source[T] =
val c = DirectChannel[T]()
c.error(None)
c
30 changes: 30 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsHeadOptionTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ox.channels

import org.scalatest.OptionValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsHeadOptionTest extends AnyFlatSpec with Matchers with OptionValues {
behavior of "Source.headOption"

it should "return None for the empty source" in supervised {
Source.empty[Int].headOption() shouldBe None
}

it should "return None for the failed source" in supervised {
Source
.failed(new RuntimeException("source is broken"))
.headOption() shouldBe None
}

it should "return Some element for the non-empty source" in supervised {
Source.fromValues(1, 2).headOption().value shouldBe 1
}

it should "be not idempotent operation" in supervised {
val s = Source.fromValues(1, 2)
s.headOption().value shouldBe 1
s.headOption().value shouldBe 2
}
}
39 changes: 39 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsHeadTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsHeadTest extends AnyFlatSpec with Matchers {
behavior of "Source.head"

it should "throw NoSuchElementException for the empty source" in supervised {
the[NoSuchElementException] thrownBy {
Source.empty[Int].head()
} should have message "cannot obtain head from an empty source"
}

it should "re-throw exception that was thrown during element retrieval" in supervised {
the[RuntimeException] thrownBy {
Source
.failed(new RuntimeException("source is broken"))
.head()
} should have message "source is broken"
}

it should "throw NoSuchElementException for source failed without exception" in supervised {
the[NoSuchElementException] thrownBy {
Source.failed[Int]().head()
} should have message "getting head failed"
}

it should "return first value from non empty source" in supervised {
Source.fromValues(1, 2).head() shouldBe 1
}

it should "be not idempotent operation" in supervised {
val s = Source.fromValues(1, 2)
s.head() shouldBe 1
s.head() shouldBe 2
}
}

0 comments on commit 0df1526

Please sign in to comment.