diff --git a/README.md b/README.md index e346c54..69a20ba 100644 --- a/README.md +++ b/README.md @@ -315,3 +315,25 @@ val result: Future[Int] = retryWithBackOff(5, 5.seconds) { callThatService(): Future[Int] } ``` + +### CancelableFuture + +If you need to create a ```Future``` that you want to cancel at a later point in time then +you can use a `CancelableFuture`. + +```scala +import markatta.futiles.CancellableFuture + +val cancellableFuture = CancellableFuture { + someLongOperation() +} + +cancellableFuture.cancel() + +``` + +Note that the `.cancel` method on `CancelableFuture` is a best effort implementation and +it also does not handle cleaning up of resources (such as file handles) since further +computations deriving from `.map`/`.flatMap`/`onComplete` may not execute. If a +`CancelableFuture` was cancelled this way it will fail with a `CancellationException` +exception. diff --git a/src/main/scala-2.11/markatta/futiles/CancellableFutureImpl.scala b/src/main/scala-2.11/markatta/futiles/CancellableFutureImpl.scala new file mode 100644 index 0000000..44df1ea --- /dev/null +++ b/src/main/scala-2.11/markatta/futiles/CancellableFutureImpl.scala @@ -0,0 +1,55 @@ +package markatta.futiles + +import java.util.concurrent.{Callable, FutureTask} +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.Try + +private[futiles] final class CancellableFutureImpl[T](executionContext: ExecutionContext, block: => T) + extends CancellableFuture[T] { + private val promise = Promise[T]() + + def delegate: Future[T] = promise.future + + private val jf: FutureTask[T] = new FutureTask[T]( + new Callable[T] { + override def call(): T = block + } + ) { + override def done(): Unit = promise.complete( + Try( + try + get() + catch { + case e: ExecutionException if e.getCause != null => + // This is here to mirror the same behaviour that Scala's Future has, i.e. if you throw + // an exception in a Scala Future then then Future.failed has that same exception. Java's + // FutureTask however wraps this in an ExecutionException. + throw e.getCause + } + ) + ) + } + + override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = + delegate.onComplete(f) + + override def isCompleted: Boolean = delegate.isCompleted + + override def value: Option[Try[T]] = delegate.value + + override def transform[S](s: T => S, f: Throwable => Throwable)(implicit + executor: ExecutionContext + ): Future[S] = delegate.transform(s, f) + + override def ready(atMost: Duration)(implicit permit: CanAwait): CancellableFutureImpl.this.type = { + delegate.ready(atMost) + this + } + + override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost) + + override def cancel(): Unit = jf.cancel(true) + + executionContext.execute(jf) +} diff --git a/src/main/scala-2.12/markatta/futiles/CancellableFutureImpl.scala b/src/main/scala-2.12/markatta/futiles/CancellableFutureImpl.scala new file mode 100644 index 0000000..5def544 --- /dev/null +++ b/src/main/scala-2.12/markatta/futiles/CancellableFutureImpl.scala @@ -0,0 +1,56 @@ +package markatta.futiles + +import java.util.concurrent.{Callable, FutureTask} +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.Try + +private[futiles] final class CancellableFutureImpl[T](executionContext: ExecutionContext, block: => T) + extends CancellableFuture[T] { + private val promise = Promise[T]() + + def delegate: Future[T] = promise.future + + private val jf: FutureTask[T] = new FutureTask[T]( + new Callable[T] { + override def call(): T = block + } + ) { + override def done(): Unit = promise.complete( + Try( + try + get() + catch { + case e: ExecutionException if e.getCause != null => + // This is here to mirror the same behaviour that Scala's Future has, i.e. if you throw + // an exception in a Scala Future then then Future.failed has that same exception. Java's + // FutureTask however wraps this in an ExecutionException. + throw e.getCause + } + ) + ) + } + + override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = + delegate.onComplete(f) + + override def isCompleted: Boolean = delegate.isCompleted + + override def value: Option[Try[T]] = delegate.value + + override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = delegate.transform(f) + + override def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = + delegate.transformWith(f) + + override def ready(atMost: Duration)(implicit permit: CanAwait): CancellableFutureImpl.this.type = { + delegate.ready(atMost) + this + } + + override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost) + + override def cancel(): Unit = jf.cancel(true) + + executionContext.execute(jf) +} diff --git a/src/main/scala-2.13/markatta/futiles/CancellableFutureImpl.scala b/src/main/scala-2.13/markatta/futiles/CancellableFutureImpl.scala new file mode 100644 index 0000000..5def544 --- /dev/null +++ b/src/main/scala-2.13/markatta/futiles/CancellableFutureImpl.scala @@ -0,0 +1,56 @@ +package markatta.futiles + +import java.util.concurrent.{Callable, FutureTask} +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.Try + +private[futiles] final class CancellableFutureImpl[T](executionContext: ExecutionContext, block: => T) + extends CancellableFuture[T] { + private val promise = Promise[T]() + + def delegate: Future[T] = promise.future + + private val jf: FutureTask[T] = new FutureTask[T]( + new Callable[T] { + override def call(): T = block + } + ) { + override def done(): Unit = promise.complete( + Try( + try + get() + catch { + case e: ExecutionException if e.getCause != null => + // This is here to mirror the same behaviour that Scala's Future has, i.e. if you throw + // an exception in a Scala Future then then Future.failed has that same exception. Java's + // FutureTask however wraps this in an ExecutionException. + throw e.getCause + } + ) + ) + } + + override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = + delegate.onComplete(f) + + override def isCompleted: Boolean = delegate.isCompleted + + override def value: Option[Try[T]] = delegate.value + + override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = delegate.transform(f) + + override def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = + delegate.transformWith(f) + + override def ready(atMost: Duration)(implicit permit: CanAwait): CancellableFutureImpl.this.type = { + delegate.ready(atMost) + this + } + + override def result(atMost: Duration)(implicit permit: CanAwait): T = delegate.result(atMost) + + override def cancel(): Unit = jf.cancel(true) + + executionContext.execute(jf) +} diff --git a/src/main/scala/markatta/futiles/CancellableFuture.scala b/src/main/scala/markatta/futiles/CancellableFuture.scala new file mode 100644 index 0000000..1a0939f --- /dev/null +++ b/src/main/scala/markatta/futiles/CancellableFuture.scala @@ -0,0 +1,32 @@ +package markatta.futiles + +import scala.concurrent._ +import scala.util.Try +import java.util.concurrent.{Callable, FutureTask} +import scala.concurrent.duration.Duration + +trait CancellableFuture[T] extends Future[T] { + + /** Attempts to cancel the underlying [[scala.concurrent.Future]]. Note that this is a best effort attempt + */ + @throws[CancellationException] + def cancel(): Unit +} + +object CancellableFuture { + + /** Allows you to run a computation inside of a [[scala.concurrent.Future]] which can later be cancelled + * + * @param body + * The computation to run inside of the [[scala.concurrent.Future]] + * @param executionContext + * The [[scala.concurrent.ExecutionContext]] to run the [[scala.concurrent.Future]] on + * @return + * A [[markatta.futiles.CancellableFuture]] providing a `cancel` method allowing you to terminate the + * [[markatta.futiles.CancellableFuture]] at any time + * @see + * Adapted from https://stackoverflow.com/a/39986418/1519631 + */ + def apply[T](body: => T)(implicit executionContext: ExecutionContext): CancellableFuture[T] = + new CancellableFutureImpl[T](executionContext, body) +} diff --git a/src/test/scala/markatta/futiles/CancellableFutureSpec.scala b/src/test/scala/markatta/futiles/CancellableFutureSpec.scala new file mode 100644 index 0000000..6889ffc --- /dev/null +++ b/src/test/scala/markatta/futiles/CancellableFutureSpec.scala @@ -0,0 +1,58 @@ +package markatta.futiles + +import java.util.concurrent.atomic.AtomicBoolean +import scala.concurrent.CancellationException + +class CancellableFutureSpec extends Spec { + describe("The cancellable utility") { + + describe("without cancellation") { + + it("works as a normal Future") { + val cancellable = CancellableFuture { + () + } + + cancellable.futureValue shouldEqual () + } + + it("throws an Exception correctly") { + val cancellable = CancellableFuture { + throw new IllegalArgumentException + } + + val exception = cancellable.failed.futureValue + exception shouldBe an[IllegalArgumentException] + } + + } + + describe("with cancellation") { + + it("prevents Future from completing") { + val atomicBoolean = new AtomicBoolean(true) + + val cancellable = CancellableFuture { + Thread.sleep(100) + atomicBoolean.set(false) + } + + Thread.sleep(50) + cancellable.cancel() + Thread.sleep(100) + atomicBoolean.get() shouldEqual true + } + + it("throws a CancellationException exception") { + val cancellable = CancellableFuture { + Thread.sleep(100) + } + cancellable.cancel() + cancellable.failed.futureValue shouldBe an[CancellationException] + } + + } + + } + +}