Skip to content

Commit

Permalink
feat: implement Source.futureSource operator
Browse files Browse the repository at this point in the history
Creates a source that emits elements from future source when it completes or
fails otherwise. The future completion is performed on the provided
`ExecutionContext` whereas elements are emitted through `Supervised`.
Note that when Future fails with `ExecutionException` then its cause is
returned as source failure.

Examples:
  Source
    .futureSource(Future.failed(new RuntimeException("future failed")))
    .receive()                                                           // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed))
  Source.futureSource(Future.successful(Source.fromValues(1, 2))).toList // List(1, 2)
  • Loading branch information
geminicaprograms committed Oct 31, 2023
1 parent f18c916 commit ffe22ae
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
42 changes: 42 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,48 @@ trait SourceCompanionOps:
}
c

/** Creates a source that emits elements from future source when `from` completes or fails otherwise. The `from` completion is performed
* on the provided [[scala.concurrent.ExecutionContext]] whereas elements are emitted through [[ox.supervised]]. Note that when `from`
* fails with [[scala.concurrent.ExecutionException]] then its cause is returned as source failure.
*
* @param from
* A [[scala.concurrent.Future]] that returns source upon completion. It cannot be `null`.
* @return
* A source that will emit values upon a `from` [[scala.concurrent.Future]] completion.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* import scala.concurrent.ExecutionContext.Implicits.global
* import scala.concurrent.Future
*
* supervised {
* Source
* .futureSource(Future.failed(new RuntimeException("future failed")))
* .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed))
* Source.futureSource(Future.successful(Source.fromValues(1, 2))).toList // List(1, 2)
* }
* }}}
*/
def futureSource[T](from: Future[Source[T]])(using Ox, StageCapacity, ExecutionContext): Source[T] =
require(from != null, "from cannot be null")
val c = StageCapacity.newChannel[T]
from.onComplete {
case Success(source) =>
supervised {
repeatWhile {
source.receive() match
case ChannelClosed.Done => c.done(); false
case ChannelClosed.Error(r) => c.error(r); false
case t: T @unchecked => c.send(t); true
}
}
case Failure(ex: ExecutionContext) => c.error(ex.getCause)
case Failure(ex) => c.error(ex)
}
c

/** Creates a source that fails immediately with the given [[java.lang.Throwable]]
*
* @param t
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsFutureSourceTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import scala.concurrent.Future

class SourceOpsFutureSourceTest extends AnyFlatSpec with Matchers {
import scala.concurrent.ExecutionContext.Implicits.global

behavior of "SourceOps.futureSource"

it should "fail to create source from the null future" in supervised {
the[IllegalArgumentException] thrownBy {
Source.futureSource(null)
} should have message "requirement failed: from cannot be null"
}

it should "return the original future failure when future fails" in supervised {
val failure = new RuntimeException("future failed")
Source.futureSource(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure))
}

it should "return future's source values" in supervised {
Source.futureSource(Future.successful(Source.fromValues(1, 2))).toList shouldBe List(1, 2)
}
}

0 comments on commit ffe22ae

Please sign in to comment.