Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split interruptible #2343

Merged
merged 20 commits into from
Oct 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform)
"cats.effect.unsafe.HelperThread.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.LocalQueue.enqueue"),
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.WorkerThread.this")
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.WorkerThread.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.LocalQueue.drain")
)
)
.jvmSettings(
Expand Down
6 changes: 5 additions & 1 deletion core/js/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
def blocking[A](thunk: => A): IO[A] =
apply(thunk)

def interruptible[A](many: Boolean)(thunk: => A): IO[A] = {
private[effect] def interruptible[A](many: Boolean, thunk: => A): IO[A] = {
val _ = many
apply(thunk)
}

def interruptible[A](thunk: => A): IO[A] = interruptible(false, thunk)

def interruptibleMany[A](thunk: => A): IO[A] = interruptible(true, thunk)

def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] = {
val _ = hint
apply(thunk)
Expand Down
18 changes: 15 additions & 3 deletions core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,30 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
private[this] val TypeInterruptibleMany = Sync.Type.InterruptibleMany

def blocking[A](thunk: => A): IO[A] = {
val fn = () => thunk
val fn = Thunk.asFunction0(thunk)
Blocking(TypeBlocking, fn, Tracing.calculateTracingEvent(fn.getClass))
}

def interruptible[A](many: Boolean)(thunk: => A): IO[A] = {
val fn = () => thunk
djspiewak marked this conversation as resolved.
Show resolved Hide resolved
// this cannot be marked private[effect] because of static forwarders in Java
@deprecated("use interruptible / interruptibleMany instead", "3.3.0")
def interruptible[A](many: Boolean, thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
Blocking(
if (many) TypeInterruptibleMany else TypeInterruptibleOnce,
fn,
Tracing.calculateTracingEvent(fn.getClass))
}

def interruptible[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
Blocking(TypeInterruptibleOnce, fn, Tracing.calculateTracingEvent(fn.getClass))
}

def interruptibleMany[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
Blocking(TypeInterruptibleMany, fn, Tracing.calculateTracingEvent(fn.getClass))
}

def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
if (hint eq TypeDelay)
apply(thunk)
Expand Down
5 changes: 3 additions & 2 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1482,8 +1482,9 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {

override def blocking[A](thunk: => A): IO[A] = IO.blocking(thunk)

override def interruptible[A](many: Boolean)(thunk: => A): IO[A] =
IO.interruptible(many)(thunk)
override def interruptible[A](thunk: => A): IO[A] = IO.interruptible(thunk)

override def interruptibleMany[A](thunk: => A): IO[A] = IO.interruptibleMany(thunk)

def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
IO.suspend(hint)(thunk)
Expand Down
3 changes: 2 additions & 1 deletion docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ In Cats Effect, code containing side-effects should always be wrapped in one of
- **Synchronous** (`return`s or `throw`s)
+ `IO(...)` or `IO.delay(...)`
+ `IO.blocking(...)`
+ `IO.interruptible(true/false)(...)`
+ `IO.interruptible(...)`
+ `IO.interruptibleMany(...)`
- **Asynchronous** (invokes a callback)
+ `IO.async` or `IO.async_`

Expand Down
2 changes: 1 addition & 1 deletion docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ It is now possible to make the blocking task interruptible using [`Sync`](./type
```scala mdoc
// many: whether it's okay to try interrupting more than once
val programInterruptible =
Sync[IO].interruptible(many = false)(println("hello Sync blocking!"))
Sync[IO].interruptible(println("hello Sync blocking!"))
```

#### Where Does The Blocking Pool Come From?
Expand Down
2 changes: 1 addition & 1 deletion docs/typeclasses/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ blocking operation via a thread interrupt in the event on cancelation.

```scala
//true means we try thread interruption repeatedly until the blocking operation exits
val operation: F[Unit] = F.interruptible(true)(longRunningOp())
val operation: F[Unit] = F.interruptibleMany(longRunningOp())

val run: F[Unit] = operation.timeout(30.seconds)
```
21 changes: 15 additions & 6 deletions kernel/shared/src/main/scala/cats/effect/kernel/Sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,29 @@ trait Sync[F[_]] extends MonadCancel[F, Throwable] with Clock[F] with Unique[F]
def blocking[A](thunk: => A): F[A] =
suspend(Blocking)(thunk)

private[effect] def interruptible[A](many: Boolean, thunk: => A): F[A] =
if (many) interruptibleMany(thunk) else interruptible(thunk)

/**
* Like [[Sync.blocking]] but will attempt to abort the blocking operation using thread
* interrupts in the event of cancelation.
* interrupts in the event of cancelation. The interrupt will be attempted only once.
*
* @param many
* Whether interruption via thread interrupt should be attempted once or repeatedly until
* the blocking operation completes or exits.
* @param thunk
* The side effect which is to be suspended in `F[_]` and evaluated on a blocking execution
* context
*/
def interruptible[A](thunk: => A): F[A] = suspend(InterruptibleOnce)(thunk)

/**
* Like [[Sync.blocking]] but will attempt to abort the blocking operation using thread
* interrupts in the event of cancelation. The interrupt will be attempted repeatedly until
* the blocking operation completes or exits.
*
* @param thunk
* The side effect which is to be suspended in `F[_]` and evaluated on a blocking execution
* context
*/
def interruptible[A](many: Boolean)(thunk: => A): F[A] =
suspend(if (many) InterruptibleMany else InterruptibleOnce)(thunk)
def interruptibleMany[A](thunk: => A): F[A] = suspend(InterruptibleMany)(thunk)

def suspend[A](hint: Sync.Type)(thunk: => A): F[A]
}
Expand Down
34 changes: 34 additions & 0 deletions scalafix/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,37 @@ lazy val v3_0_0_tests = project
)
.dependsOn(v3_0_0_input, rules)
.enablePlugins(ScalafixTestkitPlugin)


lazy val v3_3_0_input = project
.in(file("v3_3_0/input"))
.settings(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "3.0.0"
),
scalacOptions += "-P:semanticdb:synthetics:on"
)

lazy val v3_3_0_output = project
.in(file("v3_3_0/output"))
.settings(
libraryDependencies ++= Seq(
"io.vasilev" %% "cats-effect" % "3.3-208-b352672"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remember to update this to the mainline 3.3.0 once released

)
)

lazy val v3_3_0_tests = project
.in(file("v3_3_0/tests"))
.settings(
libraryDependencies += "ch.epfl.scala" % "scalafix-testkit" % V.scalafixVersion % Test cross CrossVersion.full,
Compile / compile :=
(Compile / compile).dependsOn(v3_3_0_input / Compile / compile).value,
scalafixTestkitOutputSourceDirectories :=
(v3_3_0_output / Compile / sourceDirectories).value,
scalafixTestkitInputSourceDirectories :=
(v3_3_0_input / Compile / sourceDirectories).value,
scalafixTestkitInputClasspath :=
(v3_3_0_input / Compile / fullClasspath).value
)
.dependsOn(v3_3_0_input, rules)
.enablePlugins(ScalafixTestkitPlugin)
47 changes: 47 additions & 0 deletions scalafix/rules/src/main/scala/fix/v3_3_0.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package fix

import scalafix.v1._

import scala.meta.Token._
import scala.meta._

class v3_3_0 extends SemanticRule("v3_3_0") {
override def fix(implicit doc: SemanticDocument): Patch = {
val IO_M = SymbolMatcher.exact("cats/effect/IO.")
val Sync_interruptible_M = SymbolMatcher.exact("cats/effect/kernel/Sync#interruptible().")

val IO_S = Symbol("cats/effect/IO.")
val Sync_S = Symbol("cats/effect/Sync#")

doc.tree.collect {
// IO.interruptible(false) -> IO.interruptible
// IO.interruptible(true) -> IO.interruptibleMany
case t @ q"${IO_M(_)}.interruptible(${Lit.Boolean(many)})" =>
replaceInterruptible(many, t, s"${IO_S.displayName}")

// Sync#interruptible(false) -> Sync#interruptible
// Sync#interruptible(true) -> Sync#interruptibleMany
case t @ q"${Sync_interruptible_M(Term.Apply(interruptible, Lit.Boolean(many) :: _))}" =>
interruptible.synthetics match {
case TypeApplyTree(_, TypeRef(_, symbol, _) :: _) :: _ =>
if (symbol.displayName == "Unit") interruptible match {
case Term.Select(typeF, _) =>
replaceInterruptible(
many,
t,
s"${Sync_S.displayName}[${typeF.symbol.displayName}]"
)
case _ => Patch.empty
}
else replaceInterruptible(many, t, s"${Sync_S.displayName}[${symbol.displayName}]")
case _ => Patch.empty
}
}.asPatch
}

def replaceInterruptible(many: Boolean, from: Tree, to: String): Patch =
if (many)
Patch.replaceTree(from, s"$to.interruptibleMany")
else
Patch.replaceTree(from, s"$to.interruptible")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
rule = "scala:fix.v3_3_0"
*/
package fix

import cats.effect.{ IO, Sync }

object InterruptibleRewrites {
IO.interruptible(true)(IO.unit)

IO.interruptible(false)(IO.unit)

Sync[IO].interruptible(true)(IO.unit)

Sync[IO].interruptible(false)(IO.unit)

def f[F[_]](implicit F: Sync[F]): F[Unit] = F.interruptible(true)(IO.unit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package fix

import cats.effect.{ IO, Sync }

object InterruptibleRewrites {
IO.interruptibleMany(IO.unit)

IO.interruptible(IO.unit)

Sync[IO].interruptibleMany(IO.unit)

Sync[IO].interruptible(IO.unit)

def f[F[_]](implicit F: Sync[F]): F[Unit] = Sync[F].interruptibleMany(IO.unit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package fix

import org.scalatest.FunSuiteLike
import scalafix.testkit.AbstractSemanticRuleSuite

class CatsEffectTests extends AbstractSemanticRuleSuite with FunSuiteLike {
runAllTests()
}
2 changes: 1 addition & 1 deletion std/shared/src/main/scala/cats/effect/std/Console.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ object Console {

private final class SyncConsole[F[_]](implicit F: Sync[F]) extends Console[F] {
def readLineWithCharset(charset: Charset): F[String] =
F.interruptible(false) {
F.interruptible {
val in = System.in
val decoder = charset
.newDecoder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ trait IOPlatformSpecification { self: BaseSpec with ScalaCheck =>
var interrupted = true
val latch = new CountDownLatch(1)

val await = IO.interruptible(false) {
val await = IO.interruptible {
latch.countDown()
Thread.sleep(15000)
interrupted = false
Expand All @@ -157,7 +157,7 @@ trait IOPlatformSpecification { self: BaseSpec with ScalaCheck =>
var interrupted = true
val latch = new CountDownLatch(1)

val await = IO.interruptible(true) {
val await = IO.interruptibleMany {
latch.countDown()

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ConsoleJVMSpec extends BaseSpec {
val release = (src: Source) => IO(src.close())
Resource
.make(acquire)(release)
.use(src => IO.interruptible(false)(src.getLines().toList))
.use(src => IO.interruptible(src.getLines().toList))
.handleErrorWith(_ => IO.pure(Nil))
}

Expand Down