diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index b9650ada..45d10aa1 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -5,7 +5,9 @@ import ox.* import java.util import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{Await, ExecutionContext, ExecutionException, Future} +import scala.util.{Failure, Success} trait SourceOps[+T] { this: Source[T] => // view ops (lazy) @@ -993,6 +995,40 @@ trait SourceCompanionOps: } c + /** Creates a source that emits a single value when `from` completes or fails otherwise. Note that when `from` fails with + * [[scala.concurrent.ExecutionException]] then its cause is returned as source failure. + * + * @param from + * A [[scala.concurrent.Future]] that returns value upon completion. It cannot be `null`. + * @return + * A source that will emit value upon a `from` [[scala.concurrent.Future]] completion. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * import scala.concurrent.Future + * + * supervised { + * Source + * .future(Future.failed(new RuntimeException("future failed"))) + * .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed)) + * Source.future(Future.successful(1)).toList // List(1) + * } + * }}} + */ + def future[T](from: Future[T])(using Ox, StageCapacity, ExecutionContext): Source[T] = + require(from != null, "from cannot be null") + val c = StageCapacity.newChannel[T] + forkDaemon { + from.onComplete { + case Success(value) => c.send(value); c.done() + case Failure(ex: ExecutionContext) => c.error(ex.getCause) + case Failure(ex) => c.error(ex) + } + } + c + /** Creates a source that fails immediately with the given [[java.lang.Throwable]] * * @param t diff --git a/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala b/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala new file mode 100644 index 00000000..a80af3ff --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala @@ -0,0 +1,28 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +import scala.concurrent.Future + +class SourceOpsFutureTest extends AnyFlatSpec with Matchers { + import scala.concurrent.ExecutionContext.Implicits.global + + behavior of "Source.future" + + it should "fail to create source from the null future" in supervised { + the[IllegalArgumentException] thrownBy { + Source.future(null) + } should have message "requirement failed: from cannot be null" + } + + it should "return the original future failure when future fails" in supervised { + val failure = new RuntimeException("future failed") + Source.future(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure)) + } + + it should "return future value" in supervised { + Source.future(Future.successful(1)).toList shouldBe List(1) + } +}