From 32dfa65334f3a6d84f2f45c854ca1fbaf15eec0b Mon Sep 17 00:00:00 2001 From: Jacek Centkowski Date: Tue, 31 Oct 2023 15:27:50 +0100 Subject: [PATCH] feat: implement `Source.future` operator Creates a source that emits a single value when Future completes or fails otherwise. Notes: * when Future fails with `scala.concurrent.ExecutionException` then its cause is returned as source failure. * the input Future cannot be `null` Examples: Source .future(Future.failed(new RuntimeException("future failed"))) .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed)) Source.future(Future.successful(1)).toList // List(1) --- .../main/scala/ox/channels/SourceOps.scala | 39 ++++++++++++++++++- .../ox/channels/SourceOpsFutureTest.scala | 26 +++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/ox/channels/SourceOpsFutureTest.scala diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index b9650ada..a0b90a96 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -5,7 +5,8 @@ import ox.* import java.util import java.util.concurrent.{CountDownLatch, Semaphore} import scala.collection.{IterableOnce, mutable} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{Await, ExecutionException, Future} trait SourceOps[+T] { this: Source[T] => // view ops (lazy) @@ -993,6 +994,42 @@ trait SourceCompanionOps: } c + /** Creates a source that emits a single value when `from` completes or fails otherwise. Note that when `form` fails with + * [[scala.concurrent.ExecutionException]] then its cause is returned as source failure. + * + * @param from + * A [[scala.concurrent.Future]] that returns value upon completion. It cannot be `null`. + * @return + * A source that will emit value upon a `from` [[scala.concurrent.Future]] completion. + * @example + * {{{ + * import ox.* + * import ox.channels.Source + * + * import scala.concurrent.Future + * + * supervised { + * Source + * .future(Future.failed(new RuntimeException("future failed"))) + * .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed)) + * Source.future(Future.successful(1)).toList // List(1) + * } + * }}} + */ + def future[T](from: Future[T])(using Ox, StageCapacity): Source[T] = + require(from != null, "from cannot be null") + val c = StageCapacity.newChannel[T] + forkDaemon { + try + val t = Await.result(from, Duration.Inf) + c.send(t) + c.done() + catch + case ex: ExecutionException => c.error(ex.getCause) + case t: Throwable => c.error(t) + } + c + /** Creates a source that fails immediately with the given [[java.lang.Throwable]] * * @param t diff --git a/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala b/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala new file mode 100644 index 00000000..637dbc73 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsFutureTest.scala @@ -0,0 +1,26 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +import scala.concurrent.Future + +class SourceOpsFutureTest extends AnyFlatSpec with Matchers { + behavior of "Source.future" + + it should "fail to create source from the null future" in supervised { + the[IllegalArgumentException] thrownBy { + Source.future(null) + } should have message "requirement failed: from cannot be null" + } + + it should "return report failure when future fails" in supervised { + val failure = new RuntimeException("future failed") + Source.future(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure)) + } + + it should "return future value" in supervised { + Source.future(Future.successful(1)).toList shouldBe List(1) + } +}