Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic/interrupt scope #1019

Merged
merged 22 commits into from
Jan 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f9661a
Implemented interruption via Scope#Interrupt
pchlupacek Dec 15, 2017
e09177b
Fixed concurrently to not propagate Interrupt outside of its scope.
pchlupacek Dec 15, 2017
a9967d9
Added more test cases to test flatMapped hung streams to be interrupted
pchlupacek Dec 15, 2017
a85bead
Added resume interruption test
pchlupacek Dec 16, 2017
8757ea2
Added more tests and corrected scope closure in flatMap
pchlupacek Dec 19, 2017
750d062
Revereted flatMap implementation, introduced onInterrupt
pchlupacek Dec 20, 2017
c206629
Moved Interrupted to fs2._ root
pchlupacek Dec 20, 2017
3cc0adb
Introduce id of the scope of the intterruption
pchlupacek Dec 24, 2017
a886ee6
Append based interruption
pchlupacek Dec 25, 2017
2ab62d8
Introduced Algebra.Uncons
pchlupacek Dec 30, 2017
a53f8fc
Fixed issues uncovered in tests
pchlupacek Dec 30, 2017
91cbb9f
Merge branch 'series/0.10' of github.com:functional-streams-for-scala…
pchlupacek Dec 31, 2017
3b4e0d7
Corrected concurrently spec that `drained` the failed stream before c…
pchlupacek Dec 31, 2017
b2db70f
Implemented Run in terms of Segment.force.splitAt
pchlupacek Dec 31, 2017
399deba
Added test to verify that #1035 is fixed
pchlupacek Jan 1, 2018
b522afb
Corrected name of spec cases
pchlupacek Jan 1, 2018
73c27bc
Merge branch 'series/0.10' into topic/interrupt-scope
mpilquist Jan 2, 2018
dcca8ba
Merge branch 'series/0.10' into topic/interrupt-scope
mpilquist Jan 2, 2018
d68d7f8
Merge branch 'series/0.10' of github.com:functional-streams-for-scala…
pchlupacek Jan 3, 2018
44a249c
Addressed review comments, cleanup
pchlupacek Jan 3, 2018
6f64b36
Merge branch 'topic/interrupt-scope' of github.com:functional-streams…
pchlupacek Jan 3, 2018
71ee481
Addresses review comments
pchlupacek Jan 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion core/jvm/src/test/scala/fs2/ConcurrentlySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fs2

import scala.concurrent.duration._
import cats.effect.IO
import fs2.async.Promise

class ConcurrentlySpec extends Fs2Spec {

Expand All @@ -13,7 +14,7 @@ class ConcurrentlySpec extends Fs2Spec {

"when background stream fails, overall stream fails" in forAll { (s: PureStream[Int], f: Failure) =>
val prg = Scheduler[IO](1).flatMap(scheduler => (scheduler.sleep_[IO](25.millis) ++ s.get).concurrently(f.get))
val throws = f.get.drain.compile.drain.attempt.unsafeRunSync.isLeft
val throws = f.get.compile.drain.attempt.unsafeRunSync.isLeft
if (throws) an[Err.type] should be thrownBy runLog(prg)
else runLog(prg)
}
Expand All @@ -33,5 +34,17 @@ class ConcurrentlySpec extends Fs2Spec {
runLog(prg)
bgDone shouldBe true
}

"when background stream fails, primary stream fails even when hung" in forAll { (s: PureStream[Int], f: Failure) =>
val promise = Promise.unsafeCreate[IO, Unit]
val prg = Scheduler[IO](1).flatMap{ scheduler =>
(scheduler.sleep_[IO](25.millis) ++ (Stream(1) ++ s.get)).concurrently(f.get)
.flatMap { i => Stream.eval(promise.get).map { _ => i } }
}

val throws = f.get.compile.drain.attempt.unsafeRunSync.isLeft
if (throws) an[Err.type] should be thrownBy runLog(prg)
else runLog(prg)
}
}
}
124 changes: 122 additions & 2 deletions core/jvm/src/test/scala/fs2/Pipe2Spec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package fs2


import scala.concurrent.duration._
import cats.effect.IO
import cats.implicits._
Expand Down Expand Up @@ -150,17 +151,70 @@ class Pipe2Spec extends Fs2Spec {

"interrupt (1)" in forAll { (s1: PureStream[Int]) =>
val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt = Stream.emit(true) ++ Stream.eval_(s.increment)
val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) }.compile.drain.attempt
// tests that termination is successful even if stream being interrupted is hung
runLog { s1.get.covary[IO].evalMap(_ => s.decrement).interruptWhen(interrupt) } shouldBe Vector()
}

"interrupt (2)" in forAll { (s1: PureStream[Int]) =>
val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt = Stream.emit(true) ++ Stream.eval_(s.increment)
// tests that termination is successful even if stream being interrupted is hung
runLog { s1.get.covary[IO].evalMap(_ => s.decrement).interruptWhen(interrupt) } shouldBe Vector()
}

"interrupt (3)" in {
// tests the interruption of the constant stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor style nitpick -- these comments can be either put in to the test name -- e.g. intterupt (3) - interruption of constant stream or wrapped in an info("interruption of constant stream")

Copy link
Contributor Author

@pchlupacek pchlupacek Jan 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about info syntax, can you just write quick example pls?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"interrupt (3)" in {
  info("tests interruption of the constant stream")
  ...
}

It just prints the info string to the test output in test reports. In this case, it's probably better in the test name itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have recently experimented with that, however info() causes to print out the text at every Gen case, so perhaps 100x or so for every test.
I am not sure about moving test cases to the names, not sure what that will do with tools that using names of tests to filter execution of only one single test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. Moving text description to test name works fine -- e.g., testOnly *MergeJoinSpec -- -z interrupt will run all the tests that include the word interrupt in their name

val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
Stream.constant(true).covary[IO].interruptWhen(interrupt).compile.drain.unsafeRunSync
}

"interrupt (4)" in {
// tests the interruption of the constant stream with flatMap combinator
val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
Stream.constant(true).covary[IO].interruptWhen(interrupt).flatMap { _ => Stream.emit(1) }.compile.drain.unsafeRunSync
}

"interrupt (5)" in {
// tests the interruption of the stream that recurses infinitelly
val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
def loop(i: Int): Stream[IO, Int] = Stream.emit(i).covary[IO].flatMap { i => Stream.emit(i) ++ loop(i+1) }
loop(0).interruptWhen(interrupt).compile.drain.unsafeRunSync
}

//todo: need to resolve SoE in flatMap
"interrupt (6)" in {
// tests the interruption of the stream that recurse infinitely and never emits
val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
def loop: Stream[IO, Int] = Stream.eval(IO{()}).flatMap { _ => loop }
loop.interruptWhen(interrupt).compile.drain.unsafeRunSync
}

"interrupt (7)" in {
// tests the interruption of the stream that recurse infinitely, is pure and never emits
val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
def loop: Stream[IO, Int] = Stream.emit(()).covary[IO].flatMap { _ => loop }
loop.interruptWhen(interrupt).compile.drain.unsafeRunSync
}

"interrupt (8)" in {
// tests the interruption of the stream that repeatedly evaluates
val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
Stream.repeatEval(IO{()}).interruptWhen(interrupt).compile.drain.unsafeRunSync
}

"interrupt (9)" in {
// tests the interruption of the constant drained stream
val interrupt = mkScheduler.flatMap { _.sleep_[IO](1.millis) }.compile.drain.attempt
Stream.constant(true).dropWhile(! _ ).covary[IO].interruptWhen(interrupt).compile.drain.unsafeRunSync
}

"interrupt (10)" in forAll { (s1: PureStream[Int]) =>
// tests that termination is successful even if interruption stream is infinitely false
runLog { s1.get.covary[IO].interruptWhen(Stream.constant(false)) } shouldBe runLog(s1.get)
}

"interrupt (3)" in forAll { (s1: PureStream[Int]) =>
"interrupt (11)" in forAll { (s1: PureStream[Int]) =>
val barrier = async.mutable.Semaphore[IO](0).unsafeRunSync()
val enableInterrupt = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interruptedS1 = s1.get.covary[IO].evalMap { i =>
Expand All @@ -176,6 +230,72 @@ class Pipe2Spec extends Fs2Spec {
assert(out.forall(i => i % 7 != 0))
}

"interrupt (12)" in forAll { (s1: PureStream[Int]) =>
// tests interruption of stream that never terminates in flatMap
val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) }.compile.drain.attempt
// tests that termination is successful even when flatMapped stream is hung
runLog { s1.get.covary[IO].interruptWhen(interrupt).flatMap(_ => Stream.eval_(s.decrement)) } shouldBe Vector()
}

"interrupt (13)" in forAll { (s1: PureStream[Int], f: Failure) =>
// tests that failure from the interrupt signal stream will be propagated to main stream
// even when flatMap stream is hung

val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) ++ f.get.map { _ => false } }
val prg = (Stream(1) ++ s1.get).covary[IO].interruptWhen(interrupt).flatMap(_ => Stream.eval_(s.decrement))
val throws = f.get.compile.drain.attempt.unsafeRunSync.isLeft
if (throws) an[Err.type] should be thrownBy runLog(prg)
else runLog(prg)
}

"interrupt (14)" in forAll { s1: PureStream[Int] =>
// tests that when interrupted, the interruption will resume with append.
val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt = mkScheduler.flatMap { _.sleep_[IO](50.millis) }.compile.drain.attempt
val prg = (
(s1.get.covary[IO].interruptWhen(interrupt).evalMap { _ => s.decrement map { _ => None } })
++ (s1.get.map(Some(_)))
).collect { case Some(v) => v }

runLog(prg) shouldBe runLog(s1.get)
}

"interrupt (15)" in forAll { s1: PureStream[Int] =>
// tests that interruption works even when flatMap is followed by `collect`
// also tests scenario when interrupted stream is followed by other stream and both have map fusion defined
val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
val prg =
(s1.get.covary[IO].interruptWhen(interrupt).map { i => None } ++ s1.get.map(Some(_)))
.flatMap {
case None => Stream.eval(s.decrement.map { _ => None })
case Some(i) => Stream.emit(Some(i))
}
.collect { case Some(i) => i }

runLog(prg) shouldBe runLog(s1.get)

}

"nested-interrupt (1)" in forAll { s1: PureStream[Int] =>
val s = async.mutable.Semaphore[IO](0).unsafeRunSync()
val interrupt: IO[Either[Throwable, Unit]] = mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
val neverInterrupt = IO.async[Unit] { _ => () }.attempt

val prg =
(s1.get.covary[IO].interruptWhen(interrupt).map(_ => None) ++ s1.get.map(Some(_))).interruptWhen(neverInterrupt)
.flatMap {
case None => Stream.eval(s.decrement.map { _ => None })
case Some(i) => Stream.emit(Some(i))
}
.collect { case Some(i) => i }

runLog(prg) shouldBe runLog(s1.get)
}


"pause" in {
forAll { (s1: PureStream[Int]) =>
val pausedStream = Stream.eval(async.signalOf[IO,Boolean](false)).flatMap { pause =>
Expand Down
12 changes: 12 additions & 0 deletions core/jvm/src/test/scala/fs2/StreamPerformanceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,24 @@ class StreamPerformanceSpec extends Fs2Spec {
}
}}

"left-associated eval() ++ flatMap 1" - { Ns.foreach { N =>
N.toString in {
runLog((1 until N).map(emit).foldLeft(emit(0).covary[IO])((acc,a) => acc flatMap { _ => eval(IO {()}) flatMap { _ => a }})) shouldBe Vector(N-1)
}
}}

"right-associated flatMap 1" - { Ns.foreach { N =>
N.toString in {
runLog((1 until N).map(emit).reverse.foldLeft(emit(0))((acc,a) => a flatMap { _ => acc })) shouldBe Vector(0)
}
}}

"right-associated eval() ++ flatMap 1" - { Ns.foreach { N =>
N.toString in {
runLog((1 until N).map(emit).reverse.foldLeft(emit(0).covary[IO])((acc,a) => a flatMap { _ => eval(IO {()}) flatMap { _=> acc } })) shouldBe Vector(0)
}
}}

"left-associated flatMap 2" - { Ns.foreach { N =>
N.toString in {
runLog((1 until N).map(emit).foldLeft(emit(0) ++ emit(1) ++ emit(2))(
Expand Down
18 changes: 18 additions & 0 deletions core/shared/src/main/scala/fs2/Interrupted.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package fs2

import fs2.internal.Token

/**
* Signals interruption of the evaluation.
*
* @param scopeId Id of the scope that shall be the last interrupted scope by this signal
* @param loop In case of infinite recursion this prevents interpreter to search for `CloseScope` indefinitely.
* In each recursive iteration, this will increment by 1 up to limit defined in current scope,
* After which this will Interrupt stream w/o searching further for any cleanups.
*/
final case class Interrupted(private[fs2] val scopeId: Token, private[fs2] val loop: Int) extends Throwable {
override def fillInStackTrace = this

override def toString = s"Interrupted($scopeId, $loop)"
}

22 changes: 22 additions & 0 deletions core/shared/src/main/scala/fs2/Scope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,26 @@ abstract class Scope[F[_]] {
* successfully leased.
*/
def lease: F[Option[Lease[F]]]

/**
* Interrupts evaluation of the current scope. Only scopes previously indicated wih Stream.interruptScope may be interrupted.
* For other scopes this will fail.
*
* Interruption is final and may take two forms:
*
* When invoked on right side, that will interrupt only current scope evaluation, and will resume when control is given
* to next scope.
*
* When invoked on left side, then this will inject given throwable like it will be caused by stream evaluation,
* and then, without any error handling the whole stream will fail with supplied throwable.
*
*/
def interrupt(cause: Either[Throwable, Unit]): F[Unit]

/**
* Yields to true, if the scope is interrupted.
* Note that when scope is interrupted with error, this yields to false
* @return
*/
def isInterrupted: F[Boolean]
}
15 changes: 15 additions & 0 deletions core/shared/src/main/scala/fs2/Segment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,21 @@ object Segment {
result.get
}

/**
* Like `run` but allows to run `f` for each `O`.
* as they are processed when running this segment.
* Allows to perfrom efficient accumulation of `O` while running the stream.
*/
final def runForEach(f: O => Unit): R = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, could we change signature of foreach to:

def foreach(f: O => Unit): R

Similar change for foreachChunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. will do.

def chunk(ch: Chunk[O]): Unit = ch.foreach(f)
var result: Option[R] = None
val trampoline = new Trampoline
val step = self.stage(Depth(0), trampoline.defer, f, chunk, r => { result = Some(r); throw Done }).value
try while (true) stepAll(step, trampoline)
catch { case Done => }
result.get
}

/**
* Splits this segment at the specified index by simultaneously taking and dropping.
*
Expand Down
Loading