diff --git a/core/jvm/src/test/scala/fs2/ConcurrentlySpec.scala b/core/jvm/src/test/scala/fs2/ConcurrentlySpec.scala index c47f9b8f2d..43a481718e 100644 --- a/core/jvm/src/test/scala/fs2/ConcurrentlySpec.scala +++ b/core/jvm/src/test/scala/fs2/ConcurrentlySpec.scala @@ -2,6 +2,7 @@ package fs2 import scala.concurrent.duration._ import cats.effect.IO +import fs2.async.Promise class ConcurrentlySpec extends Fs2Spec { @@ -13,7 +14,7 @@ class ConcurrentlySpec extends Fs2Spec { "when background stream fails, overall stream fails" in forAll { (s: PureStream[Int], f: Failure) => val prg = Scheduler[IO](1).flatMap(scheduler => (scheduler.sleep_[IO](25.millis) ++ s.get).concurrently(f.get)) - val throws = f.get.drain.compile.drain.attempt.unsafeRunSync.isLeft + val throws = f.get.compile.drain.attempt.unsafeRunSync.isLeft if (throws) an[Err.type] should be thrownBy runLog(prg) else runLog(prg) } @@ -33,5 +34,17 @@ class ConcurrentlySpec extends Fs2Spec { runLog(prg) bgDone shouldBe true } + + "when background stream fails, primary stream fails even when hung" in forAll { (s: PureStream[Int], f: Failure) => + val promise = Promise.unsafeCreate[IO, Unit] + val prg = Scheduler[IO](1).flatMap{ scheduler => + (scheduler.sleep_[IO](25.millis) ++ (Stream(1) ++ s.get)).concurrently(f.get) + .flatMap { i => Stream.eval(promise.get).map { _ => i } } + } + + val throws = f.get.compile.drain.attempt.unsafeRunSync.isLeft + if (throws) an[Err.type] should be thrownBy runLog(prg) + else runLog(prg) + } } } diff --git a/core/jvm/src/test/scala/fs2/Pipe2Spec.scala b/core/jvm/src/test/scala/fs2/Pipe2Spec.scala index 4f4dfcfe1a..182c941dbd 100644 --- a/core/jvm/src/test/scala/fs2/Pipe2Spec.scala +++ b/core/jvm/src/test/scala/fs2/Pipe2Spec.scala @@ -1,5 +1,6 @@ package fs2 + import scala.concurrent.duration._ import cats.effect.IO import cats.implicits._ @@ -150,17 +151,70 @@ class Pipe2Spec extends Fs2Spec { "interrupt (1)" in forAll { (s1: PureStream[Int]) => val s = async.mutable.Semaphore[IO](0).unsafeRunSync() - val interrupt = Stream.emit(true) ++ Stream.eval_(s.increment) + val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) }.compile.drain.attempt // tests that termination is successful even if stream being interrupted is hung runLog { s1.get.covary[IO].evalMap(_ => s.decrement).interruptWhen(interrupt) } shouldBe Vector() } "interrupt (2)" in forAll { (s1: PureStream[Int]) => + val s = async.mutable.Semaphore[IO](0).unsafeRunSync() + val interrupt = Stream.emit(true) ++ Stream.eval_(s.increment) + // tests that termination is successful even if stream being interrupted is hung + runLog { s1.get.covary[IO].evalMap(_ => s.decrement).interruptWhen(interrupt) } shouldBe Vector() + } + + "interrupt (3)" in { + // tests the interruption of the constant stream + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + Stream.constant(true).covary[IO].interruptWhen(interrupt).compile.drain.unsafeRunSync + } + + "interrupt (4)" in { + // tests the interruption of the constant stream with flatMap combinator + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + Stream.constant(true).covary[IO].interruptWhen(interrupt).flatMap { _ => Stream.emit(1) }.compile.drain.unsafeRunSync + } + + "interrupt (5)" in { + // tests the interruption of the stream that recurses infinitelly + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + def loop(i: Int): Stream[IO, Int] = Stream.emit(i).covary[IO].flatMap { i => Stream.emit(i) ++ loop(i+1) } + loop(0).interruptWhen(interrupt).compile.drain.unsafeRunSync + } + + //todo: need to resolve SoE in flatMap + "interrupt (6)" in { + // tests the interruption of the stream that recurse infinitely and never emits + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + def loop: Stream[IO, Int] = Stream.eval(IO{()}).flatMap { _ => loop } + loop.interruptWhen(interrupt).compile.drain.unsafeRunSync + } + + "interrupt (7)" in { + // tests the interruption of the stream that recurse infinitely, is pure and never emits + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + def loop: Stream[IO, Int] = Stream.emit(()).covary[IO].flatMap { _ => loop } + loop.interruptWhen(interrupt).compile.drain.unsafeRunSync + } + + "interrupt (8)" in { + // tests the interruption of the stream that repeatedly evaluates + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + Stream.repeatEval(IO{()}).interruptWhen(interrupt).compile.drain.unsafeRunSync + } + + "interrupt (9)" in { + // tests the interruption of the constant drained stream + val interrupt = mkScheduler.flatMap { _.sleep_[IO](1.millis) }.compile.drain.attempt + Stream.constant(true).dropWhile(! _ ).covary[IO].interruptWhen(interrupt).compile.drain.unsafeRunSync + } + + "interrupt (10)" in forAll { (s1: PureStream[Int]) => // tests that termination is successful even if interruption stream is infinitely false runLog { s1.get.covary[IO].interruptWhen(Stream.constant(false)) } shouldBe runLog(s1.get) } - "interrupt (3)" in forAll { (s1: PureStream[Int]) => + "interrupt (11)" in forAll { (s1: PureStream[Int]) => val barrier = async.mutable.Semaphore[IO](0).unsafeRunSync() val enableInterrupt = async.mutable.Semaphore[IO](0).unsafeRunSync() val interruptedS1 = s1.get.covary[IO].evalMap { i => @@ -176,6 +230,72 @@ class Pipe2Spec extends Fs2Spec { assert(out.forall(i => i % 7 != 0)) } + "interrupt (12)" in forAll { (s1: PureStream[Int]) => + // tests interruption of stream that never terminates in flatMap + val s = async.mutable.Semaphore[IO](0).unsafeRunSync() + val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) }.compile.drain.attempt + // tests that termination is successful even when flatMapped stream is hung + runLog { s1.get.covary[IO].interruptWhen(interrupt).flatMap(_ => Stream.eval_(s.decrement)) } shouldBe Vector() + } + + "interrupt (13)" in forAll { (s1: PureStream[Int], f: Failure) => + // tests that failure from the interrupt signal stream will be propagated to main stream + // even when flatMap stream is hung + + val s = async.mutable.Semaphore[IO](0).unsafeRunSync() + val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) ++ f.get.map { _ => false } } + val prg = (Stream(1) ++ s1.get).covary[IO].interruptWhen(interrupt).flatMap(_ => Stream.eval_(s.decrement)) + val throws = f.get.compile.drain.attempt.unsafeRunSync.isLeft + if (throws) an[Err.type] should be thrownBy runLog(prg) + else runLog(prg) + } + + "interrupt (14)" in forAll { s1: PureStream[Int] => + // tests that when interrupted, the interruption will resume with append. + val s = async.mutable.Semaphore[IO](0).unsafeRunSync() + val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) }.compile.drain.attempt + val prg = ( + (s1.get.covary[IO].interruptWhen(interrupt).evalMap { _ => s.decrement map { _ => None } }) + ++ (s1.get.map(Some(_))) + ).collect { case Some(v) => v } + + runLog(prg) shouldBe runLog(s1.get) + } + + "interrupt (15)" in forAll { s1: PureStream[Int] => + // tests that interruption works even when flatMap is followed by `collect` + // also tests scenario when interrupted stream is followed by other stream and both have map fusion defined + val s = async.mutable.Semaphore[IO](0).unsafeRunSync() + val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + val prg = + (s1.get.covary[IO].interruptWhen(interrupt).map { i => None } ++ s1.get.map(Some(_))) + .flatMap { + case None => Stream.eval(s.decrement.map { _ => None }) + case Some(i) => Stream.emit(Some(i)) + } + .collect { case Some(i) => i } + + runLog(prg) shouldBe runLog(s1.get) + + } + + "nested-interrupt (1)" in forAll { s1: PureStream[Int] => + val s = async.mutable.Semaphore[IO](0).unsafeRunSync() + val interrupt: IO[Either[Throwable, Unit]] = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + val neverInterrupt = IO.async[Unit] { _ => () }.attempt + + val prg = + (s1.get.covary[IO].interruptWhen(interrupt).map(_ => None) ++ s1.get.map(Some(_))).interruptWhen(neverInterrupt) + .flatMap { + case None => Stream.eval(s.decrement.map { _ => None }) + case Some(i) => Stream.emit(Some(i)) + } + .collect { case Some(i) => i } + + runLog(prg) shouldBe runLog(s1.get) + } + + "pause" in { forAll { (s1: PureStream[Int]) => val pausedStream = Stream.eval(async.signalOf[IO,Boolean](false)).flatMap { pause => diff --git a/core/jvm/src/test/scala/fs2/StreamPerformanceSpec.scala b/core/jvm/src/test/scala/fs2/StreamPerformanceSpec.scala index c27aa5ce92..419c01c71f 100644 --- a/core/jvm/src/test/scala/fs2/StreamPerformanceSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamPerformanceSpec.scala @@ -31,12 +31,24 @@ class StreamPerformanceSpec extends Fs2Spec { } }} + "left-associated eval() ++ flatMap 1" - { Ns.foreach { N => + N.toString in { + runLog((1 until N).map(emit).foldLeft(emit(0).covary[IO])((acc,a) => acc flatMap { _ => eval(IO {()}) flatMap { _ => a }})) shouldBe Vector(N-1) + } + }} + "right-associated flatMap 1" - { Ns.foreach { N => N.toString in { runLog((1 until N).map(emit).reverse.foldLeft(emit(0))((acc,a) => a flatMap { _ => acc })) shouldBe Vector(0) } }} + "right-associated eval() ++ flatMap 1" - { Ns.foreach { N => + N.toString in { + runLog((1 until N).map(emit).reverse.foldLeft(emit(0).covary[IO])((acc,a) => a flatMap { _ => eval(IO {()}) flatMap { _=> acc } })) shouldBe Vector(0) + } + }} + "left-associated flatMap 2" - { Ns.foreach { N => N.toString in { runLog((1 until N).map(emit).foldLeft(emit(0) ++ emit(1) ++ emit(2))( diff --git a/core/shared/src/main/scala/fs2/Interrupted.scala b/core/shared/src/main/scala/fs2/Interrupted.scala new file mode 100644 index 0000000000..786088a14c --- /dev/null +++ b/core/shared/src/main/scala/fs2/Interrupted.scala @@ -0,0 +1,18 @@ +package fs2 + +import fs2.internal.Token + +/** + * Signals interruption of the evaluation. + * + * @param scopeId Id of the scope that shall be the last interrupted scope by this signal + * @param loop In case of infinite recursion this prevents interpreter to search for `CloseScope` indefinitely. + * In each recursive iteration, this will increment by 1 up to limit defined in current scope, + * After which this will Interrupt stream w/o searching further for any cleanups. + */ +final case class Interrupted(private[fs2] val scopeId: Token, private[fs2] val loop: Int) extends Throwable { + override def fillInStackTrace = this + + override def toString = s"Interrupted($scopeId, $loop)" +} + diff --git a/core/shared/src/main/scala/fs2/Scope.scala b/core/shared/src/main/scala/fs2/Scope.scala index cc168b4ad1..bf713ab80a 100644 --- a/core/shared/src/main/scala/fs2/Scope.scala +++ b/core/shared/src/main/scala/fs2/Scope.scala @@ -26,4 +26,26 @@ abstract class Scope[F[_]] { * successfully leased. */ def lease: F[Option[Lease[F]]] + + /** + * Interrupts evaluation of the current scope. Only scopes previously indicated wih Stream.interruptScope may be interrupted. + * For other scopes this will fail. + * + * Interruption is final and may take two forms: + * + * When invoked on right side, that will interrupt only current scope evaluation, and will resume when control is given + * to next scope. + * + * When invoked on left side, then this will inject given throwable like it will be caused by stream evaluation, + * and then, without any error handling the whole stream will fail with supplied throwable. + * + */ + def interrupt(cause: Either[Throwable, Unit]): F[Unit] + + /** + * Yields to true, if the scope is interrupted. + * Note that when scope is interrupted with error, this yields to false + * @return + */ + def isInterrupted: F[Boolean] } diff --git a/core/shared/src/main/scala/fs2/Segment.scala b/core/shared/src/main/scala/fs2/Segment.scala index 5a87664890..707eb8c791 100644 --- a/core/shared/src/main/scala/fs2/Segment.scala +++ b/core/shared/src/main/scala/fs2/Segment.scala @@ -1034,6 +1034,21 @@ object Segment { result.get } + /** + * Like `run` but allows to run `f` for each `O`. + * as they are processed when running this segment. + * Allows to perfrom efficient accumulation of `O` while running the stream. + */ + final def runForEach(f: O => Unit): R = { + def chunk(ch: Chunk[O]): Unit = ch.foreach(f) + var result: Option[R] = None + val trampoline = new Trampoline + val step = self.stage(Depth(0), trampoline.defer, f, chunk, r => { result = Some(r); throw Done }).value + try while (true) stepAll(step, trampoline) + catch { case Done => } + result.get + } + /** * Splits this segment at the specified index by simultaneously taking and dropping. * diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 224cd32128..3097731cb3 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -728,7 +728,13 @@ final class Stream[+F[_],+O] private(private val free: FreeC[Algebra[Nothing,Not * res0: List[Int] = List(1, 2, 3, 1, 2, 3, 1, 2) * }}} */ - def repeat: Stream[F,O] = this ++ repeat + def repeat: Stream[F,O] = + Stream.fromFreeC[F, O](this.get.transformWith { + case Right(_) => repeat.get + case Left(int: Interrupted) => Algebra.pure(()) + case Left(err) => Algebra.raiseError(err) + }) + /** * Converts a `Stream[F,Either[Throwable,O]]` to a `Stream[F,O]`, which emits right values and fails upon the first `Left(t)`. @@ -1455,8 +1461,13 @@ object Stream { def ++[O2>:O](s2: => Stream[F,O2]): Stream[F,O2] = self.append(s2) /** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */ - def append[O2>:O](s2: => Stream[F,O2]): Stream[F,O2] = - fromFreeC(self.get.flatMap { _ => s2.get }) + def append[O2>:O](s2: => Stream[F,O2]): Stream[F,O2] = { + fromFreeC(self.get[F, O2].transformWith { + case Right(_) => s2.get + case Left(interrupted: Interrupted) => Algebra.interruptEventually(s2.get, interrupted) + case Left(err) => Algebra.raiseError(err) + }) + } /** * Emits only elements that are distinct from their immediate predecessors, @@ -1507,21 +1518,18 @@ object Stream { * }}} */ def concurrently[O2](that: Stream[F,O2])(implicit F: Effect[F], ec: ExecutionContext): Stream[F,O] = { - Stream.eval(async.signalOf[F,Boolean](false)).flatMap { interruptLeft => - Stream.eval(async.semaphore[F](0)).flatMap { leftFinalized => - Stream.eval(async.signalOf[F,Boolean](false)).flatMap { interruptRight => - Stream.eval(async.signalOf[F,Option[Throwable]](None)).flatMap { leftError => - val left = that. - handleErrorWith(e => Stream.eval_(leftError.set(Some(e)) *> interruptRight.set(true))). - interruptWhen(interruptLeft). - onFinalize(leftFinalized.increment) - val right = self.interruptWhen(interruptRight).onFinalize( - interruptLeft.set(true) *> - leftFinalized.decrement *> - leftError.get.flatMap(_.fold(F.pure(()))(F.raiseError)) - ) - Stream.eval_(async.fork(left.compile.drain)) ++ right - }}}} + Stream.eval(async.promise[F, Unit]).flatMap { interruptR => + Stream.eval(async.promise[F, Unit]).flatMap { doneR => + Stream.eval(async.promise[F, Throwable]).flatMap { interruptL => + def runR = that.interruptWhen(interruptR.get.attempt).compile.drain.attempt flatMap { + case Right(_) | Left(_:Interrupted) => doneR.complete(()) + case Left(err) => interruptL.complete(err) *> doneR.complete(()) + } + + Stream.eval(async.fork(runR)) >> + self.interruptWhen(interruptL.get.map(Left(_):Either[Throwable, Unit])). + onFinalize { interruptR.complete(()) *> doneR.get } + }}} } /** @@ -1661,8 +1669,9 @@ object Stream { * res0: List[Int] = List(1, 2, 2, 3, 3, 3) * }}} */ - def flatMap[O2](f: O => Stream[F,O2]): Stream[F,O2] = - Stream.fromFreeC(Algebra.uncons(self.get[F,O]).flatMap { + def flatMap[O2](f: O => Stream[F,O2]): Stream[F,O2] = { + Stream.fromFreeC[F,O2](Algebra.uncons(self.get[F,O]).flatMap { + case Some((hd, tl)) => // nb: If tl is Pure, there's no need to propagate flatMap through the tail. Hence, we // check if hd has only a single element, and if so, process it directly instead of folding. @@ -1674,11 +1683,25 @@ object Stream { } only match { case None => - hd.map(f).foldRightLazy(Stream.fromFreeC(tl).flatMap(f))(_ ++ _).get - case Some(o) => f(o).get + + // specific version of ++, that in case of error or interrupt shortcuts for evaluation immediately to tail. + def fby(s1: Stream[F, O2], s2: => Stream[F, O2]): Stream[F, O2] = { + fromFreeC[F, O2](s1.get.transformWith { + case Right(()) => s2.get + case Left(int: Interrupted) => fromFreeC(Algebra.interruptEventually(tl, int)).flatMap(f).get + case Left(err) => Algebra.raiseError(err) + }) + } + + hd.map(f).foldRightLazy(Stream.fromFreeC(tl).flatMap(f))(fby(_, _)).get + + case Some(o) => + f(o).get + } case None => Stream.empty.covaryAll[F,O2].get }) + } /** Alias for `flatMap(_ => s2)`. */ def >>[O2](s2: => Stream[F,O2]): Stream[F,O2] = @@ -1732,15 +1755,49 @@ object Stream { * because `s1.interruptWhen(s2)` is never pulled for another element after the first element has been * emitted. To fix, consider `s.flatMap(_ => infiniteStream).interruptWhen(s2)`. */ - def interruptWhen(haltWhenTrue: Stream[F,Boolean])(implicit F: Effect[F], ec: ExecutionContext): Stream[F,O] = - haltWhenTrue.noneTerminate.either(self.noneTerminate). - takeWhile(_.fold(halt => halt.map(!_).getOrElse(false), o => o.isDefined)). - collect { case Right(Some(i)) => i } + def interruptWhen(haltWhenTrue: Stream[F,Boolean])(implicit F: Effect[F], ec: ExecutionContext): Stream[F,O] = { + Stream.eval(async.promise[F, Either[Throwable, Unit]]).flatMap { interruptL => + Stream.eval(async.promise[F, Unit]).flatMap { doneR => + Stream.eval(async.promise[F, Unit]).flatMap { interruptR => + def runR = haltWhenTrue.evalMap { + case false => F.pure(false) + case true => interruptL.complete(Right(())) as true + }.takeWhile(! _).interruptWhen(interruptR.get.attempt).compile.drain.attempt.flatMap { r => + interruptL.complete(r).attempt *> doneR.complete(()) + } + + Stream.eval(async.fork(runR)) >> + self.interruptWhen(interruptL.get) + .onFinalize(interruptR.complete(()) *> doneR.get) + + }}} + + } /** Alias for `interruptWhen(haltWhenTrue.discrete)`. */ def interruptWhen(haltWhenTrue: async.immutable.Signal[F,Boolean])(implicit F: Effect[F], ec: ExecutionContext): Stream[F,O] = interruptWhen(haltWhenTrue.discrete) + /** + * Interrupts the stream, when `haltOnSignal` finishes its evaluation. + */ + def interruptWhen(haltOnSignal: F[Either[Throwable, Unit]])(implicit F: Effect[F], ec: ExecutionContext): Stream[F,O] = { + Stream.getScope[F].flatMap { scope => + Stream.eval(async.fork(haltOnSignal flatMap scope.interrupt)) flatMap { _ => + self + }}.interruptScope.handleErrorWith { + case int: fs2.Interrupted => Stream.empty + case other => Stream.raiseError(other) + } + } + + /** + * Creates a scope that may be interrupted by calling scope#interrupt. + */ + def interruptScope(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O] = + Stream.fromFreeC(Algebra.interruptScope(self.get)) + + /** * Nondeterministically merges a stream of streams (`outer`) in to a single stream, * opening at most `maxOpen` streams at any point in time. @@ -1978,7 +2035,7 @@ object Stream { * }}} */ def handleErrorWith[O2>:O](h: Throwable => Stream[F,O2]): Stream[F,O2] = - Stream.fromFreeC(self.get[F,O2] handleErrorWith { e => h(e).get }) + fromFreeC(Algebra.scope(self.get[F,O2]).handleErrorWith { e => h(e).get[F, O2] } ) /** * Run the supplied effectful action at the end of this stream, regardless of how the stream terminates. diff --git a/core/shared/src/main/scala/fs2/async/Ref.scala b/core/shared/src/main/scala/fs2/async/Ref.scala index d982bdda4f..3b40385874 100644 --- a/core/shared/src/main/scala/fs2/async/Ref.scala +++ b/core/shared/src/main/scala/fs2/async/Ref.scala @@ -126,7 +126,10 @@ final class Ref[F[_], A] private[fs2] (private val ar: AtomicReference[A])(impli object Ref { /** Creates an asynchronous, concurrent mutable reference initialized to the supplied value. */ def apply[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = - F.delay(new Ref[F, A](new AtomicReference[A](a))) + F.delay(unsafeCreate(a)) + + private[fs2] def unsafeCreate[F[_]: Sync, A](a: A): Ref[F, A] = + new Ref[F, A](new AtomicReference[A](a)) /** * The result of a modification to a [[Ref]] diff --git a/core/shared/src/main/scala/fs2/internal/AcquireAfterScopeClosed.scala b/core/shared/src/main/scala/fs2/internal/AcquireAfterScopeClosed.scala new file mode 100644 index 0000000000..e040729a1b --- /dev/null +++ b/core/shared/src/main/scala/fs2/internal/AcquireAfterScopeClosed.scala @@ -0,0 +1,3 @@ +package fs2.internal + +final case object AcquireAfterScopeClosed extends Throwable { override def fillInStackTrace = this } diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index aef0739cee..8b433a0079 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -1,24 +1,31 @@ package fs2.internal -import cats.data.NonEmptyList import cats.~> -import cats.effect.Sync +import cats.effect.{Effect, Sync} import cats.implicits._ import fs2._ +import scala.concurrent.ExecutionContext + private[fs2] sealed trait Algebra[F[_],O,R] + + private[fs2] object Algebra { final case class Output[F[_],O](values: Segment[O,Unit]) extends Algebra[F,O,Unit] final case class Run[F[_],O,R](values: Segment[O,R]) extends Algebra[F,O,R] - final case class Eval[F[_],O,R](value: F[R]) extends Algebra[F,O,R] + final case class Uncons[F[_], X, O](s: FreeC[Algebra[F,X,?],Unit], chunkSize: Int, maxSteps: Long) extends Algebra[F, O, Option[(Segment[X,Unit], FreeC[Algebra[F,X,?],Unit])]] + + // Algebra types performing side effects + private[fs2] sealed trait Effectful[F[_], O, R] extends Algebra[F, O, R] - final case class Acquire[F[_],O,R](resource: F[R], release: R => F[Unit]) extends Algebra[F,O,(R,Token)] - final case class Release[F[_],O](token: Token) extends Algebra[F,O,Unit] - final case class OpenScope[F[_],O]() extends Algebra[F,O,CompileScope[F]] - final case class CloseScope[F[_],O](toClose: CompileScope[F]) extends Algebra[F,O,Unit] - final case class GetScope[F[_],O]() extends Algebra[F,O,CompileScope[F]] + final case class Eval[F[_],O,R](value: F[R]) extends Effectful[F,O,R] + final case class Acquire[F[_],O,R](resource: F[R], release: R => F[Unit]) extends Effectful[F,O,(R,Token)] + final case class Release[F[_],O](token: Token) extends Effectful[F,O,Unit] + final case class OpenScope[F[_],O](interruptible: Option[(Effect[F], ExecutionContext)]) extends Effectful[F,O,CompileScope[F]] + final case class CloseScope[F[_],O](toClose: CompileScope[F]) extends Effectful[F,O,Unit] + final case class GetScope[F[_],O]() extends Effectful[F,O,CompileScope[F]] def output[F[_],O](values: Segment[O,Unit]): FreeC[Algebra[F,O,?],Unit] = FreeC.Eval[Algebra[F,O,?],Unit](Output(values)) @@ -38,19 +45,35 @@ private[fs2] object Algebra { def release[F[_],O](token: Token): FreeC[Algebra[F,O,?],Unit] = FreeC.Eval[Algebra[F,O,?],Unit](Release(token)) - private def openScope[F[_],O]: FreeC[Algebra[F,O,?],CompileScope[F]] = - FreeC.Eval[Algebra[F,O,?],CompileScope[F]](OpenScope()) + /** + * Wraps supplied pull in new scope, that will be opened before this pull is evaluated + * and closed once this pull either finishes its evaluation or when it fails. + */ + def scope[F[_],O,R](pull: FreeC[Algebra[F,O,?],R]): FreeC[Algebra[F,O,?],R] = + scope0(pull, None) + + /** + * Like `scope` but allows this scope to be interrupted. + * Note that this may fail with `Interrupted` when interruption occurred + */ + private[fs2] def interruptScope[F[_], O, R](pull: FreeC[Algebra[F,O,?],R])(implicit effect: Effect[F], ec: ExecutionContext): FreeC[Algebra[F,O,?],R] = + scope0(pull, Some((effect, ec))) + + + private[fs2] def openScope[F[_], O](interruptible: Option[(Effect[F], ExecutionContext)]): FreeC[Algebra[F,O,?],CompileScope[F]] = + FreeC.Eval[Algebra[F,O,?],CompileScope[F]](OpenScope[F, O](interruptible)) - private def closeScope[F[_],O](toClose: CompileScope[F]): FreeC[Algebra[F,O,?],Unit] = + private[fs2] def closeScope[F[_], O](toClose: CompileScope[F]): FreeC[Algebra[F,O,?],Unit] = FreeC.Eval[Algebra[F,O,?],Unit](CloseScope(toClose)) - def scope[F[_],O,R](pull: FreeC[Algebra[F,O,?],R]): FreeC[Algebra[F,O,?],R] = - openScope flatMap { newScope => - FreeC.Bind(pull, (e: Either[Throwable,R]) => e match { - case Left(e) => closeScope(newScope) flatMap { _ => raiseError(e) } - case Right(r) => closeScope(newScope) map { _ => r } - }) + private def scope0[F[_], O, R](pull: FreeC[Algebra[F,O,?],R], interruptible: Option[(Effect[F], ExecutionContext)]): FreeC[Algebra[F,O,?],R] = { + openScope(interruptible) flatMap { scope => + pull.transformWith { + case Right(r) => closeScope(scope) map { _ => r } + case Left(e) => closeScope(scope) flatMap { _ => raiseError(e) } + } } + } def getScope[F[_],O]: FreeC[Algebra[F,O,?],CompileScope[F]] = FreeC.Eval[Algebra[F,O,?],CompileScope[F]](GetScope()) @@ -64,126 +87,140 @@ private[fs2] object Algebra { def suspend[F[_],O,R](f: => FreeC[Algebra[F,O,?],R]): FreeC[Algebra[F,O,?],R] = FreeC.suspend(f) - def uncons[F[_],X,O](s: FreeC[Algebra[F,O,?],Unit], chunkSize: Int = 1024, maxSteps: Long = 10000): FreeC[Algebra[F,X,?],Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]] = { - s.viewL.get match { - case done: FreeC.Pure[Algebra[F,O,?], Unit] => pure(None) - case failed: FreeC.Fail[Algebra[F,O,?], _] => raiseError(failed.error) - case bound: FreeC.Bind[Algebra[F,O,?],x,Unit] => - val fx = bound.fx.asInstanceOf[FreeC.Eval[Algebra[F,O,?],x]].fr - fx match { - case os: Algebra.Output[F,O] => - pure(Some((os.values, bound.f(Right(()))))) - case os: Algebra.Run[F,O,x] => - try { - def asSegment(c: Catenable[Chunk[O]]): Segment[O,Unit] = - c.uncons.flatMap { case (h1,t1) => t1.uncons.map(_ => Segment.catenated(c.map(Segment.chunk))).orElse(Some(Segment.chunk(h1))) }.getOrElse(Segment.empty) - os.values.force.splitAt(chunkSize, Some(maxSteps)) match { - case Left((r,chunks,rem)) => - pure(Some(asSegment(chunks) -> bound.f(Right(r)))) - case Right((chunks,tl)) => - pure(Some(asSegment(chunks) -> FreeC.Bind(segment(tl), bound.f))) - } - } catch { case NonFatal(e) => FreeC.suspend(uncons(bound.f(Left(e)), chunkSize)) } - case algebra => // Eval, Acquire, Release, OpenScope, CloseScope, GetScope - FreeC.Bind( - FreeC.Eval(algebra.asInstanceOf[Algebra[F,X,x]]), // O is phantom in these constructors so it is safe to case O to X - (e: Either[Throwable,x]) => uncons[F,X,O](bound.f(e), chunkSize) - ) - } - case e => sys.error("FreeC.ViewL structure must be Pure(a), Fail(e), or Bind(Eval(fx),k), was: " + e) - } - } + def uncons[F[_],X,O](s: FreeC[Algebra[F,O,?],Unit], chunkSize: Int = 1024, maxSteps: Long = 10000): FreeC[Algebra[F,X,?],Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]] = + FreeC.Eval[Algebra[F,X,?],Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]](Algebra.Uncons[F,O,X](s, chunkSize, maxSteps)) /** Left-folds the output of a stream. */ def compile[F[_],O,B](stream: FreeC[Algebra[F,O,?],Unit], init: B)(f: (B, O) => B)(implicit F: Sync[F]): F[B] = F.delay(CompileScope.newRoot).flatMap { scope => - compileScope[F,O,B](scope, stream, init)(f).attempt.flatMap { - case Left(t) => scope.close *> F.raiseError(t) - case Right(b) => scope.close as b - } - } + compileScope[F,O,B](scope, stream, init)(f).attempt.flatMap { + case Left(t) => scope.close *> F.raiseError(t) + case Right(b) => scope.close as b + }} private[fs2] def compileScope[F[_],O,B](scope: CompileScope[F], stream: FreeC[Algebra[F,O,?],Unit], init: B)(g: (B, O) => B)(implicit F: Sync[F]): F[B] = - compileLoop[F,O,B](scope, init, g, uncons(stream).viewL) + compileFoldLoop[F,O,B](scope, init, g, stream) + + private def compileUncons[F[_],X,O]( + scope: CompileScope[F] + , s: FreeC[Algebra[F,O,?],Unit] + , chunkSize: Int + , maxSteps: Long + )(implicit F: Sync[F]): F[(CompileScope[F], Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])])] = { + F.delay(s.viewL.get) flatMap { + case done: FreeC.Pure[Algebra[F,O,?], Unit] => F.pure((scope, None)) + case failed: FreeC.Fail[Algebra[F,O,?], Unit] => F.raiseError(failed.error) + case bound: FreeC.Bind[Algebra[F,O,?], x, Unit] => + val f = bound.f.asInstanceOf[Either[Throwable,Any] => FreeC[Algebra[F,O,?], Unit]] + val fx = bound.fx.asInstanceOf[FreeC.Eval[Algebra[F,O,?],x]].fr + fx match { + case output: Algebra.Output[F, O] => + F.pure((scope, Some((output.values, f(Right(())))))) + + case run: Algebra.Run[F, O, r] => + val (h, t) = + run.values.force.splitAt(chunkSize, Some(maxSteps)) match { + case Left((r, chunks, _)) => (chunks, f(Right(r))) + case Right((chunks, tail)) => (chunks, segment(tail).transformWith(f)) + } + F.pure((scope, Some((Segment.catenatedChunks(h), t)))) + + case uncons: Algebra.Uncons[F, x, O] => + F.flatMap(F.attempt(compileUncons(scope, uncons.s, uncons.chunkSize, uncons.maxSteps))) { + case Right((scope, u)) => compileUncons(scope, FreeC.suspend(f(Right(u))), chunkSize, maxSteps) + case Left(err) => compileUncons(scope, FreeC.suspend(f(Left(err))), chunkSize, maxSteps) + } + + case alg: Effectful[F, O, r] => + F.flatMap(compileShared(scope, alg)) { case (scope, r) => + compileUncons(scope, f(r), chunkSize, maxSteps) + } + } - private def compileLoop[F[_],O,B]( + case e => sys.error("FreeC.ViewL structure must be Pure(a), Fail(e), or Bind(Eval(fx),k), was (unconcs): " + e) + } + } + + private def compileFoldLoop[F[_],O,B]( scope: CompileScope[F] , acc: B , g: (B, O) => B - , v: FreeC.ViewL[Algebra[F,O,?], Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]] + , v: FreeC[Algebra[F, O, ?], Unit] )(implicit F: Sync[F]): F[B] = { + F.flatMap(F.delay { v.viewL.get }) { + case done: FreeC.Pure[Algebra[F,O,?], Unit] => + F.pure(acc) + + case failed: FreeC.Fail[Algebra[F,O,?], Unit] => + F.raiseError(failed.error) + + case bound: FreeC.Bind[Algebra[F,O,?], _, Unit] => + val f = bound.f.asInstanceOf[Either[Throwable,Any] => FreeC[Algebra[F,O,?], Unit]] + val fx = bound.fx.asInstanceOf[FreeC.Eval[Algebra[F,O,?],_]].fr + F.flatMap(scope.shallInterrupt) { + case None => + fx match { + case output: Algebra.Output[F, O] => + try { + compileFoldLoop(scope, output.values.fold(acc)(g).force.run._2, g, f(Right(()))) + } + catch { + case NonFatal(err) => compileFoldLoop(scope, acc, g, f(Left(err))) + } - v.get match { - case done: FreeC.Pure[Algebra[F,O,?], Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]] => done.r match { - case None => F.pure(acc) - case Some((hd, tl)) => - F.suspend { - try compileLoop[F,O,B](scope, hd.fold(acc)(g).force.run._2, g, uncons(tl).viewL) - catch { case NonFatal(e) => compileLoop[F,O,B](scope, acc, g, uncons(tl.asHandler(e)).viewL) } - } - } - - case failed: FreeC.Fail[Algebra[F,O,?], _] => F.raiseError(failed.error) + case run: Algebra.Run[F, O, r] => + try { + val (r, b) = run.values.fold(acc)(g).force.run + compileFoldLoop(scope, b, g, f(Right(r))) + } catch { + case NonFatal(err) => compileFoldLoop(scope, acc, g, f(Left(err))) + } - case bound: FreeC.Bind[Algebra[F,O,?],x,Option[(Segment[O,Unit],FreeC[Algebra[F,O,?],Unit])]] => - val fx = bound.fx.asInstanceOf[FreeC.Eval[Algebra[F,O,?],x]].fr - fx match { - case wrap: Algebra.Eval[F, O, _] => - F.flatMap(F.attempt(wrap.value)) { e => compileLoop(scope, acc, g, bound.f(e).viewL) } - - case acquire: Algebra.Acquire[F,_,_] => - val acquireResource = acquire.resource - val resource = Resource.create - F.flatMap(scope.register(resource)) { mayAcquire => - if (mayAcquire) { - F.flatMap(F.attempt(acquireResource)) { - case Right(r) => - val finalizer = F.suspend { acquire.release(r) } - F.flatMap(resource.acquired(finalizer)) { result => - compileLoop(scope, acc, g, bound.f(result.right.map { _ => (r, resource.id) }).viewL) - } - - case Left(err) => - F.flatMap(scope.releaseResource(resource.id)) { result => - val failedResult: Either[Throwable,x] = - result.left.toOption.map { err0 => - Left(new CompositeFailure(err, NonEmptyList.of(err0))) - }.getOrElse(Left(err)) - compileLoop(scope, acc, g, bound.f(failedResult).viewL) - } + case uncons: Algebra.Uncons[F, x, O] => + F.flatMap(F.attempt(compileUncons(scope, uncons.s, uncons.chunkSize, uncons.maxSteps))) { + case Right((scope, u)) => compileFoldLoop(scope, acc, g, + f(Right(u)) + ) + case Left(err) => compileFoldLoop(scope, acc, g, f(Left(err))) } - } else { - F.raiseError(Interrupted) // todo: do we really need to signal this as an exception ? - } - } + case alg: Effectful[F, O, _] => + F.flatMap(compileShared(scope, alg)) { case (scope, r) => + compileFoldLoop(scope, acc, g, f(r)) + } - case release: Algebra.Release[F,_] => - F.flatMap(scope.releaseResource(release.token)) { result => - compileLoop(scope, acc, g, bound.f(result).viewL) } + case Some(rsn) => compileFoldLoop(scope, acc, g, f(Left(rsn))) + } + case e => sys.error("FreeC.ViewL structure must be Pure(a), Fail(e), or Bind(Eval(fx),k), was: " + e) + } + } - case c: Algebra.CloseScope[F,_] => - F.flatMap(c.toClose.close) { result => - F.flatMap(c.toClose.openAncestor) { scopeAfterClose => - compileLoop(scopeAfterClose, acc, g, bound.f(result).viewL) - } - } + def compileShared[F[_], O]( + scope: CompileScope[F] + , eff: Effectful[F, O, _] + )(implicit F: Sync[F]): F[(CompileScope[F], Either[Throwable,Any])] = { + eff match { + case eval: Algebra.Eval[F, O, _] => + F.map(scope.interruptibleEval(eval.value)) { (scope, _) } - case o: Algebra.OpenScope[F,_] => - F.flatMap(scope.open) { innerScope => - compileLoop(innerScope, acc, g, bound.f(Right(innerScope)).viewL) - } + case acquire: Algebra.Acquire[F, O, _] => + F.map(scope.acquireResource(acquire.resource, acquire.release)) { (scope, _) } - case e: GetScope[F,_] => - F.suspend { - compileLoop(scope, acc, g, bound.f(Right(scope)).viewL) - } + case release: Algebra.Release[F,O] => + F.map(scope.releaseResource(release.token)) { (scope, _) } - case other => sys.error(s"impossible Segment or Output following uncons: $other") + case c: Algebra.CloseScope[F,O] => + F.flatMap(c.toClose.close) { result => + F.map(c.toClose.openAncestor) { scopeAfterClose => (scopeAfterClose, result) } } - case e => sys.error("FreeC.ViewL structure must be Pure(a), Fail(e), or Bind(Eval(fx),k), was: " + e) + + case o: Algebra.OpenScope[F,O] => + F.map(scope.open(o.interruptible)) { innerScope => (innerScope, Right(innerScope)) } + + case e: GetScope[F,O] => + F.delay((scope, Right(scope))) } } @@ -193,6 +230,7 @@ private[fs2] object Algebra { case o: Output[F,O2] => Output[G,O2](o.values) case Run(values) => Run[G,O2,X](values) case Eval(value) => Eval[G,O2,X](u(value)) + case un:Uncons[F,x,O2] => Uncons[G,x,O2](un.s.translate(algFtoG), un.chunkSize, un.maxSteps).asInstanceOf[Algebra[G,O2,X]] case a: Acquire[F,O2,_] => Acquire(u(a.resource), r => u(a.release(r))) case r: Release[F,O2] => Release[G,O2](r.token) case os: OpenScope[F,O2] => os.asInstanceOf[Algebra[G,O2,X]] @@ -202,4 +240,26 @@ private[fs2] object Algebra { } fr.translate[Algebra[G,O,?]](algFtoG) } + + /** + * If the current stream evaluation scope is scope defined in `interrupted` or any ancestors of current scope + * then this will continue with evaluation of `s` as interrupted stream. + * Otherwise, this continues with `s` normally, and `interrupted` is ignored. + */ + private[fs2] def interruptEventually[F[_], O](s: FreeC[Algebra[F,O,?],Unit], interrupted: Interrupted): FreeC[Algebra[F,O,?],Unit] = { + Algebra.getScope.flatMap { scope => + def loopsExceeded: Boolean = interrupted.loop >= scope.interruptible.map(_.maxInterruptDepth).getOrElse(0) + if (scope.id == interrupted.scopeId) { + if (loopsExceeded) raiseError(interrupted) + else s.asHandler(interrupted.copy(loop = interrupted.loop + 1)) + } else { + Algebra.eval(scope.hasAncestor(interrupted.scopeId)).flatMap { hasAncestor => + if (!hasAncestor) s + else if (loopsExceeded) raiseError(interrupted.copy(loop = 0)) + else s.asHandler(interrupted.copy(loop = interrupted.loop + 1)) + } + } + } + } + } diff --git a/core/shared/src/main/scala/fs2/internal/CompileScope.scala b/core/shared/src/main/scala/fs2/internal/CompileScope.scala index 5097e60904..4b704c5d74 100644 --- a/core/shared/src/main/scala/fs2/internal/CompileScope.scala +++ b/core/shared/src/main/scala/fs2/internal/CompileScope.scala @@ -1,12 +1,16 @@ package fs2.internal -import scala.annotation.tailrec +import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicReference -import fs2.{ Catenable, CompositeFailure, Lease, Scope } -import fs2.async.Ref -import cats.effect.Sync +import cats.data.NonEmptyList +import fs2.{Catenable, CompositeFailure, Interrupted, Lease, Scope} +import fs2.async.{Promise, Ref} +import cats.effect.{Effect, Sync} +import fs2.internal.CompileScope.InterruptContext + +import scala.concurrent.ExecutionContext /** * Implementation of [[Scope]] for the internal stream interpreter. @@ -50,13 +54,24 @@ import cats.effect.Sync * Resources are allocated when the interpreter interprets the `Acquire` element, which is typically constructed * via `Stream.bracket` or `Pull.acquire`. See [[Resource]] docs for more information. * - * @param id Unique identification of the scope - * @param parent If empty indicates root scope. If non-emtpy, indicates parent of this scope. + * @param id Unique identification of the scope + * @param parent If empty indicates root scope. If non-emtpy, indicates parent of this scope. + * @param interruptible If defined, allows this scope to interrupt any of its operation. Interruption + * is performed using the supplied context. + * Normally the interruption awaits next step in Algebra to be evaluated, with exception + * of Eval, that when interruption is enabled on scope will be wrapped in race, + * that eventually allows interruption while eval is evaluating. + * */ -private[internal] final class CompileScope[F[_]] private (val id: Token, private val parent: Option[CompileScope[F]])(implicit F: Sync[F]) extends Scope[F] { self => +private[fs2] final class CompileScope[F[_]] private ( + val id: Token + , private val parent: Option[CompileScope[F]] + , val interruptible: Option[InterruptContext[F]] +)(implicit F: Sync[F]) extends Scope[F] { self => private val state: Ref[F, CompileScope.State[F]] = new Ref(new AtomicReference(CompileScope.State.initial)) + /** * Registers supplied resource in this scope. * Returns false if the resource may not be registered because scope is closed already. @@ -88,11 +103,22 @@ private[internal] final class CompileScope[F[_]] private (val id: Token, private * If this scope is currently closed, then the child scope is opened on the first * open ancestor of this scope. */ - def open: F[CompileScope[F]] = { + def open(interruptible: Option[(Effect[F], ExecutionContext)]): F[CompileScope[F]] = { F.flatMap(state.modify2 { s => if (! s.open) (s, None) else { - val scope = new CompileScope[F](new Token(), Some(self)) + val newScopeId = new Token + def iCtx = interruptible.map { case (effect, ec) => + InterruptContext( + effect = effect + , ec = ec + , promise = Promise.unsafeCreate[F, Throwable](effect, ec) + , ref = Ref.unsafeCreate[F, (Option[Throwable], Boolean)]((None, false)) + , interruptScopeId = newScopeId + , maxInterruptDepth = 256 + ) + } + val scope = new CompileScope[F](newScopeId, Some(self), iCtx orElse self.interruptible) (s.copy(children = scope +: s.children), Some(scope)) } }) { @@ -101,12 +127,34 @@ private[internal] final class CompileScope[F[_]] private (val id: Token, private // This scope is already closed so try to promote the open to an ancestor; this can fail // if the root scope has already been closed, in which case, we can safely throw self.parent match { - case Some(parent) => parent.open + case Some(parent) => parent.open(interruptible) case None => F.raiseError(throw new IllegalStateException("cannot re-open root scope")) } } } + def acquireResource[R](fr: F[R], release: R => F[Unit]): F[Either[Throwable, (R, Token)]] = { + val resource = Resource.create + F.flatMap(register(resource)) { mayAcquire => + if (!mayAcquire) F.raiseError(AcquireAfterScopeClosed) + else { + F.flatMap(F.attempt(fr)) { + case Right(r) => + val finalizer = F.suspend(release(r)) + F.map(resource.acquired(finalizer)) { result => + result.right.map(_ => (r, resource.id)) + } + case Left(err) => + F.map(releaseResource(resource.id)) { result => + result.left.toOption.map { err0 => + Left(new CompositeFailure(err, NonEmptyList.of(err0))) + }.getOrElse(Left(err)) + } + } + } + } + } + /** * Unregisters the child scope identified by the supplied id. * @@ -178,6 +226,11 @@ private[internal] final class CompileScope[F[_]] private (val id: Token, private go(self, Catenable.empty) } + /** yields to true, if this scope has ancestor with given scope Id **/ + def hasAncestor(scopeId: Token): F[Boolean] = { + F.map(ancestors) { c => Catenable.instance.exists(c)(_.id == scopeId) } + } + // See docs on [[Scope#lease]] def lease: F[Option[Lease[F]]] = { val T = Catenable.instance @@ -199,11 +252,91 @@ private[internal] final class CompileScope[F[_]] private (val id: Token, private } } } + + // See docs on [[Scope#interrupt]] + def interrupt(cause: Either[Throwable, Unit]): F[Unit] = { + interruptible match { + case None => F.raiseError(new IllegalStateException("Scope#interrupt called for Scope that cannot be interrupted")) + case Some(iCtx) => + val interruptRsn = cause.left.toOption.getOrElse(Interrupted(iCtx.interruptScopeId, 0)) + F.flatMap(F.attempt(iCtx.promise.complete(interruptRsn))) { + case Right(_) => + F.map(iCtx.ref.modify({ case (interrupted, signalled) => (interrupted orElse Some(interruptRsn), signalled)})) { _ => ()} + case Left(_) => + F.unit + } + } + } + + // See docs on [[Scope#isInterrupted]] + def isInterrupted: F[Boolean] = { + interruptible match { + case None => F.pure(false) + case Some(iCtx) => + F.map(iCtx.ref.get) { case (interrupted, _) => interrupted.nonEmpty } + } + } + + /** + * If evaluates to Some(rsn) then the current step evaluation in stream shall be interrupted by + * given reason. Also sets the `interrupted` flag, so this is guaranteed to yield to interrupt only once. + * + * Used when interruption stream in pure steps between `uncons` + */ + def shallInterrupt: F[Option[Throwable]] = { + interruptible match { + case None => F.pure(None) + case Some(iCtx) => + F.flatMap(iCtx.ref.get) { case (interrupted, signalled) => + if (signalled || interrupted.isEmpty) F.pure(None) + else { + F.map(iCtx.ref.modify { case (int, _) => (int, true)}) { c => + if (c.previous._2) None + else c.previous._1 + } + } + } + } + } + + + /** + * When the stream is evaluated, there may be `Eval` that needs to be cancelled early, when asynchronous interruption + * is taking place. + * This allows to augment eval so whenever this scope is interrupted it will return on left the reason of interruption. + * If the eval completes before the scope is interrupt, then this will return `A`. + */ + private[internal] def interruptibleEval[A](f: F[A]): F[Either[Throwable, A]] = { + interruptible match { + case None => F.attempt(f) + case Some(iCtx) => + F.flatMap(iCtx.ref.get) { case (_, signalled) => + if (signalled) F.attempt(f) + else { + // we assume there will be no parallel evals/interpretation in the scope. + // the concurrent stream evaluation (like merge, join) shall have independent interruption + // for every asynchronous child scope of the stream, so this assumption shall be completely safe. + F.flatMap(iCtx.promise.cancellableGet) { case (get, cancel) => + F.flatMap(fs2.async.race(get, F.attempt(f))(iCtx.effect, iCtx.ec)) { + case Right(result) => F.map(cancel)(_ => result) + case Left(err) => + // this indicates that we have consumed signalling the interruption + // there is only one interruption allowed per scope. + // as such, we have to set interruption flag + F.map(iCtx.ref.modify { case (int, sig) => (int, true) })(_ => Left(err)) + }} + } + } + } + + } + + override def toString = s"RunFoldScope(id=$id,interruptible=${interruptible.nonEmpty})" } private[internal] object CompileScope { /** Creates a new root scope. */ - def newRoot[F[_]: Sync]: CompileScope[F] = new CompileScope[F](new Token(), None) + def newRoot[F[_]: Sync]: CompileScope[F] = new CompileScope[F](new Token(), None, None) /** * State of a scope. @@ -217,9 +350,12 @@ private[internal] object CompileScope { * @param children Children of this scope. Children may appear during the parallel pulls where one scope may * split to multiple asynchronously acquired scopes and resources. * Still, likewise for resources they are released in reverse order. + * */ final private case class State[F[_]]( - open: Boolean, resources: Catenable[Resource[F]], children: Catenable[CompileScope[F]] + open: Boolean + , resources: Catenable[Resource[F]] + , children: Catenable[CompileScope[F]] ) { self => def unregisterResource(id: Token): (State[F], Option[Resource[F]]) = { @@ -244,4 +380,30 @@ private[internal] object CompileScope { private val closed_ = State[Nothing](open = false, resources = Catenable.empty, children = Catenable.empty) def closed[F[_]]: State[F] = closed_.asInstanceOf[State[F]] } + + /** + * A context of interruption status. This is shared from the parent that was created as interruptible to all + * its children. It assures consistent view of the interruption through the stack + * @param effect Effect, used to create interruption at Eval. + * @param ec Execution context used to create promise and ref, and interruption at Eval. + * @param promise Promise signalling once the interruption to the scopes. Onlycompleed once. + * @param ref Ref guarding the interruption. Option holds Interruption cause and boolean holds wheter scope is known to be + * Interrupted already. + * @param interruptScopeId An id of the scope, that shall signal end of the interruption. Essentially thats the first + * scope in the stack that was marked as interruptible scope. Guards, that the interrupt of one stream + * won't propagate to parent stream, that may be also interrupted but not yet by this stream. + * @param maxInterruptDepth In case the stream is interrupted, this is used to prevent infinite stream from searching + * for cleanup indefinitely. This is consulted only if the stream was interrupted, and if the + * loop occurred in single given scope. + * If stream failed with error, this is not used at all. + * @tparam F + */ + final private[internal] case class InterruptContext[F[_]]( + effect: Effect[F] + , ec: ExecutionContext + , promise: Promise[F, Throwable] + , ref: Ref[F, (Option[Throwable], Boolean)] + , interruptScopeId: Token + , maxInterruptDepth: Int + ) } diff --git a/core/shared/src/main/scala/fs2/internal/FreeC.scala b/core/shared/src/main/scala/fs2/internal/FreeC.scala index 22effa8454..f8fa409312 100644 --- a/core/shared/src/main/scala/fs2/internal/FreeC.scala +++ b/core/shared/src/main/scala/fs2/internal/FreeC.scala @@ -14,6 +14,11 @@ private[fs2] sealed abstract class FreeC[F[_], +R] { case Left(e) => FreeC.Fail(e) }) + def transformWith[R2](f: Either[Throwable, R] => FreeC[F, R2]): FreeC[F, R2] = + Bind[F,R,R2](this, r => + try f(r) catch { case NonFatal(e) => FreeC.Fail(e) } + ) + def map[R2](f: R => R2): FreeC[F,R2] = Bind(this, (r: Either[Throwable,R]) => r match { case Right(r) => try FreeC.Pure(f(r)) catch { case NonFatal(e) => FreeC.Fail(e) } @@ -47,13 +52,18 @@ private[fs2] sealed abstract class FreeC[F[_], +R] { private[fs2] object FreeC { final case class Pure[F[_], R](r: R) extends FreeC[F, R] { override def translate[G[_]](f: F ~> G): FreeC[G, R] = this.asInstanceOf[FreeC[G,R]] + override def toString: String = s"FreeC.Pure($r)" } final case class Eval[F[_], R](fr: F[R]) extends FreeC[F, R] { override def translate[G[_]](f: F ~> G): FreeC[G, R] = Eval(f(fr)) + override def toString: String = s"FreeC.Eval($fr)" + } + final case class Bind[F[_], X, R](fx: FreeC[F, X], f: Either[Throwable,X] => FreeC[F, R]) extends FreeC[F, R] { + override def toString: String = s"FreeC.Bind($fx, $f)" } - final case class Bind[F[_], X, R](fx: FreeC[F, X], f: Either[Throwable,X] => FreeC[F, R]) extends FreeC[F, R] final case class Fail[F[_], R](error: Throwable) extends FreeC[F,R] { override def translate[G[_]](f: F ~> G): FreeC[G, R] = this.asInstanceOf[FreeC[G,R]] + override def toString: String = s"FreeC.Fail($error)" } private val pureContinuation_ = (e: Either[Throwable,Any]) => e match { diff --git a/core/shared/src/main/scala/fs2/internal/Interrupted.scala b/core/shared/src/main/scala/fs2/internal/Interrupted.scala deleted file mode 100644 index ab9914b602..0000000000 --- a/core/shared/src/main/scala/fs2/internal/Interrupted.scala +++ /dev/null @@ -1,3 +0,0 @@ -package fs2.internal - -private[internal] final case object Interrupted extends Throwable { override def fillInStackTrace = this }