Skip to content

Commit

Permalink
feat: implement Source.future operator
Browse files Browse the repository at this point in the history
Creates a source that emits a single value when Future completes or fails
otherwise.

Notes:
* when Future fails with `scala.concurrent.ExecutionException`
  then its cause is returned as source failure.
* the input Future cannot be `null`

Examples:

  Source
    .future(Future.failed(new RuntimeException("future failed")))
    .receive()                               // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed))
  Source.future(Future.successful(1)).toList // List(1)
  • Loading branch information
geminicaprograms committed Oct 31, 2023
1 parent 053c486 commit 32dfa65
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
39 changes: 38 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import ox.*
import java.util
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.{IterableOnce, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Await, ExecutionException, Future}

trait SourceOps[+T] { this: Source[T] =>
// view ops (lazy)
Expand Down Expand Up @@ -993,6 +994,42 @@ trait SourceCompanionOps:
}
c

/** Creates a source that emits a single value when `from` completes or fails otherwise. Note that when `form` fails with
* [[scala.concurrent.ExecutionException]] then its cause is returned as source failure.
*
* @param from
* A [[scala.concurrent.Future]] that returns value upon completion. It cannot be `null`.
* @return
* A source that will emit value upon a `from` [[scala.concurrent.Future]] completion.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* import scala.concurrent.Future
*
* supervised {
* Source
* .future(Future.failed(new RuntimeException("future failed")))
* .receive() // ChannelClosed.Error(Some(java.lang.RuntimeException: future failed))
* Source.future(Future.successful(1)).toList // List(1)
* }
* }}}
*/
def future[T](from: Future[T])(using Ox, StageCapacity): Source[T] =
require(from != null, "from cannot be null")
val c = StageCapacity.newChannel[T]
forkDaemon {
try
val t = Await.result(from, Duration.Inf)
c.send(t)
c.done()
catch
case ex: ExecutionException => c.error(ex.getCause)
case t: Throwable => c.error(t)
}
c

/** Creates a source that fails immediately with the given [[java.lang.Throwable]]
*
* @param t
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsFutureTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import scala.concurrent.Future

class SourceOpsFutureTest extends AnyFlatSpec with Matchers {
behavior of "Source.future"

it should "fail to create source from the null future" in supervised {
the[IllegalArgumentException] thrownBy {
Source.future(null)
} should have message "requirement failed: from cannot be null"
}

it should "return report failure when future fails" in supervised {
val failure = new RuntimeException("future failed")
Source.future(Future.failed(failure)).receive() shouldBe ChannelClosed.Error(Some(failure))
}

it should "return future value" in supervised {
Source.future(Future.successful(1)).toList shouldBe List(1)
}
}

0 comments on commit 32dfa65

Please sign in to comment.