diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index b9650ada..2ea7119e 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -6,6 +6,8 @@ import java.util import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, ExecutionException, Future} +import scala.util.{Failure, Success} trait SourceOps[+T] { this: Source[T] => // view ops (lazy) @@ -993,6 +995,39 @@ trait SourceCompanionOps: } c + /** Creates a source that emits a single value when `from` completes or fails otherwise. The `from` completion is performed on the + * provided [[scala.concurrent.ExecutionContext]]. Note that when `from` fails with [[scala.concurrent.ExecutionException]] then its + * cause is returned as source failure. + * + * @param from + * A [[scala.concurrent.Future]] that returns value upon completion. + * @return + * A source that will emit value upon a `from` [[scala.concurrent.Future]] completion. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * import scala.concurrent.ExecutionContext.Implicits.global + * import scala.concurrent.Future + * + * supervised { + * Source + * .future(Future.failed(new RuntimeException("future failed"))) + * .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed)) + * Source.future(Future.successful(1)).toList // List(1) + * } + * }}} + */ + def future[T](from: Future[T])(using StageCapacity, ExecutionContext): Source[T] = + val c = StageCapacity.newChannel[T] + from.onComplete { + case Success(value) => c.send(value); c.done() + case Failure(ex: ExecutionException) => c.error(ex.getCause) + case Failure(ex) => c.error(ex) + } + c + /** Creates a source that fails immediately with the given [[java.lang.Throwable]] * * @param t diff --git a/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala b/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala new file mode 100644 index 00000000..8d7b612a --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala @@ -0,0 +1,29 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +import scala.concurrent.Future + +class SourceOpsFutureTest extends AnyFlatSpec with Matchers { + import scala.concurrent.ExecutionContext.Implicits.global + + behavior of "Source.future" + + it should "return the original future failure when future fails" in supervised { + val failure = new RuntimeException("future failed") + Source.future(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure)) + } + + it should "return the original future failure when future fails with ExecutionException" in supervised { + // according to https://docs.scala-lang.org/overviews/core/futures.html#exceptions + // the InterruptedException is one of the exceptions wrapped in ExecutionException + val failure = new InterruptedException("future interrupted") + Source.future(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure)) + } + + it should "return future value" in supervised { + Source.future(Future.successful(1)).toList shouldBe List(1) + } +}