Skip to content

Commit

Permalink
feat: implement Source.future operator
Browse files Browse the repository at this point in the history
Creates a source that emits a single value when Future completes or fails
otherwise.

Notes:
* when Future fails with `scala.concurrent.ExecutionException`
  then its cause is returned as source failure.
* the input Future cannot be `null`
* the `Future` completion is performed on the provided
  `ExecutionContext`

Examples:

  Source
    .future(Future.failed(new RuntimeException("future failed")))
    .receive()                               // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed))
  Source.future(Future.successful(1)).toList // List(1)
  • Loading branch information
geminicaprograms committed Oct 31, 2023
1 parent 053c486 commit f18c916
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
36 changes: 36 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import java.util
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -993,6 +995,40 @@ trait SourceCompanionOps:
}
c

/** Creates a source that emits a single value when `from` completes or fails otherwise. The `from` completion is performed on the
* provided [[scala.concurrent.ExecutionContext]]. Note that when `from` fails with [[scala.concurrent.ExecutionException]] then its
* cause is returned as source failure.
*
* @param from
* A [[scala.concurrent.Future]] that returns value upon completion. It cannot be `null`.
* @return
* A source that will emit value upon a `from` [[scala.concurrent.Future]] completion.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* import scala.concurrent.ExecutionContext.Implicits.global
* import scala.concurrent.Future
*
* supervised {
* Source
* .future(Future.failed(new RuntimeException("future failed")))
* .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed))
* Source.future(Future.successful(1)).toList // List(1)
* }
* }}}
*/
def future[T](from: Future[T])(using StageCapacity, ExecutionContext): Source[T] =
require(from != null, "from cannot be null")
val c = StageCapacity.newChannel[T]
from.onComplete {
case Success(value) => c.send(value); c.done()
case Failure(ex: ExecutionContext) => c.error(ex.getCause)
case Failure(ex) => c.error(ex)
}
c

/** Creates a source that fails immediately with the given [[java.lang.Throwable]]
*
* @param t
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsFutureTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import scala.concurrent.Future

class SourceOpsFutureTest extends AnyFlatSpec with Matchers {
import scala.concurrent.ExecutionContext.Implicits.global

behavior of "Source.future"

it should "fail to create source from the null future" in supervised {
the[IllegalArgumentException] thrownBy {
Source.future(null)
} should have message "requirement failed: from cannot be null"
}

it should "return the original future failure when future fails" in supervised {
val failure = new RuntimeException("future failed")
Source.future(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure))
}

it should "return future value" in supervised {
Source.future(Future.successful(1)).toList shouldBe List(1)
}
}

0 comments on commit f18c916

Please sign in to comment.