Skip to content

Commit

Permalink
Merge pull request #1059 from functional-streams-for-scala/feature/in…
Browse files Browse the repository at this point in the history
…terrupt-with-algebra

Feature/interrupt with algebra
  • Loading branch information
mpilquist authored Jan 7, 2018
2 parents dd4e03a + 237f021 commit 9d99377
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 334 deletions.
1 change: 1 addition & 0 deletions core/jvm/src/test/scala/fs2/TestUtilPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ trait TestUtilPlatform {
def throws[A](err: Throwable)(s: Stream[IO, A]): Boolean =
s.compile.toVector.attempt.unsafeRunSync() match {
case Left(e) if e == err => true
case Left(e) => println(s"EXPECTED: $err, thrown: $e"); false
case _ => false
}
}
16 changes: 16 additions & 0 deletions core/shared/src/main/scala/fs2/CompositeFailure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,20 @@ object CompositeFailure {
case hd :: Nil => Some(hd)
case first :: second :: rest => Some(apply(first, second, rest))
}

/**
* Builds composite failure from the results supplied.
*
* - When any of the results are on left, then the Left(err) is returned
* - When both results fail, the Left(CompositeFailure(_)) is returned
* - When both results succeeds then Right(()) is returned
*
*/
def fromResults(first: Either[Throwable, Unit],
second: Either[Throwable, Unit]): Either[Throwable, Unit] =
first match {
case Right(_) => second
case Left(err) =>
Left(second.left.toOption.map(err1 => CompositeFailure(err, err1, Nil)).getOrElse(err))
}
}
18 changes: 0 additions & 18 deletions core/shared/src/main/scala/fs2/Interrupted.scala

This file was deleted.

6 changes: 0 additions & 6 deletions core/shared/src/main/scala/fs2/Scope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,4 @@ abstract class Scope[F[_]] {
*/
def interrupt(cause: Either[Throwable, Unit]): F[Unit]

/**
* Yields to true, if the scope is interrupted.
* Note that when scope is interrupted with error, this yields to false
* @return
*/
def isInterrupted: F[Boolean]
}
36 changes: 7 additions & 29 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* }}}
*/
def repeat: Stream[F, O] =
Stream.fromFreeC[F, O](this.get.transformWith {
case Right(_) => repeat.get
case Left(int: Interrupted) => Algebra.pure(())
case Left(err) => Algebra.raiseError(err)
})
this ++ repeat

/**
* Converts a `Stream[F,Either[Throwable,O]]` to a `Stream[F,O]`, which emits right values and fails upon the first `Left(t)`.
Expand Down Expand Up @@ -1593,11 +1589,8 @@ object Stream {

/** Appends `s2` to the end of this stream. Alias for `s1 ++ s2`. */
def append[O2 >: O](s2: => Stream[F, O2]): Stream[F, O2] =
fromFreeC(self.get[F, O2].transformWith {
case Right(_) => s2.get
case Left(interrupted: Interrupted) =>
Algebra.interruptEventually(s2.get, interrupted)
case Left(err) => Algebra.raiseError(err)
fromFreeC(self.get[F, O2].flatMap { _ =>
s2.get
})

/**
Expand Down Expand Up @@ -1661,8 +1654,8 @@ object Stream {
.drain
.attempt
.flatMap {
case Right(_) | Left(_: Interrupted) => doneR.complete(())
case Left(err) => interruptL.complete(err) *> doneR.complete(())
case Right(_) => doneR.complete(())
case Left(err) => interruptL.complete(err) *> doneR.complete(())
}

Stream.eval(async.fork(runR)) >>
Expand Down Expand Up @@ -1839,19 +1832,8 @@ object Stream {
}
only match {
case None =>
// specific version of ++, that in case of error or interrupt shortcuts for evaluation immediately to tail.
def fby(s1: Stream[F, O2], s2: => Stream[F, O2]): Stream[F, O2] =
fromFreeC[F, O2](s1.get.transformWith {
case Right(()) => s2.get
case Left(int: Interrupted) =>
fromFreeC(Algebra.interruptEventually(tl, int))
.flatMap(f)
.get
case Left(err) => Algebra.raiseError(err)
})

hd.map(f)
.foldRightLazy(Stream.fromFreeC(tl).flatMap(f))(fby(_, _))
.foldRightLazy(Stream.fromFreeC(tl).flatMap(f))(_ ++ _)
.get

case Some(o) =>
Expand Down Expand Up @@ -1969,10 +1951,6 @@ object Stream {
}
}
.interruptScope
.handleErrorWith {
case int: fs2.Interrupted => Stream.empty
case other => Stream.raiseError(other)
}

/**
* Creates a scope that may be interrupted by calling scope#interrupt.
Expand Down Expand Up @@ -2793,7 +2771,7 @@ object Stream {
val runStep =
Algebra
.compileScope(
scope,
scope.asInstanceOf[fs2.internal.CompileScope[F, UO]], // todo: resolve cast
Algebra.uncons(self.get).flatMap(Algebra.output1(_)),
None: UO
)((_, uo) => uo.asInstanceOf[UO])
Expand Down
Loading

0 comments on commit 9d99377

Please sign in to comment.