Skip to content

Commit

Permalink
Merge pull request #2343 from alexandrustana/2268
Browse files Browse the repository at this point in the history
Split `interruptible`
  • Loading branch information
djspiewak authored Oct 10, 2021
2 parents e4f0f66 + c8a66d7 commit b117e63
Show file tree
Hide file tree
Showing 16 changed files with 171 additions and 20 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform)
"cats.effect.unsafe.HelperThread.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.LocalQueue.enqueue"),
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.WorkerThread.this")
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.WorkerThread.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.LocalQueue.drain")
)
)
.jvmSettings(
Expand Down
6 changes: 5 additions & 1 deletion core/js/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
def blocking[A](thunk: => A): IO[A] =
apply(thunk)

def interruptible[A](many: Boolean)(thunk: => A): IO[A] = {
private[effect] def interruptible[A](many: Boolean, thunk: => A): IO[A] = {
val _ = many
apply(thunk)
}

def interruptible[A](thunk: => A): IO[A] = interruptible(false, thunk)

def interruptibleMany[A](thunk: => A): IO[A] = interruptible(true, thunk)

def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] = {
val _ = hint
apply(thunk)
Expand Down
18 changes: 15 additions & 3 deletions core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,30 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
private[this] val TypeInterruptibleMany = Sync.Type.InterruptibleMany

def blocking[A](thunk: => A): IO[A] = {
val fn = () => thunk
val fn = Thunk.asFunction0(thunk)
Blocking(TypeBlocking, fn, Tracing.calculateTracingEvent(fn.getClass))
}

def interruptible[A](many: Boolean)(thunk: => A): IO[A] = {
val fn = () => thunk
// this cannot be marked private[effect] because of static forwarders in Java
@deprecated("use interruptible / interruptibleMany instead", "3.3.0")
def interruptible[A](many: Boolean, thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
Blocking(
if (many) TypeInterruptibleMany else TypeInterruptibleOnce,
fn,
Tracing.calculateTracingEvent(fn.getClass))
}

def interruptible[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
Blocking(TypeInterruptibleOnce, fn, Tracing.calculateTracingEvent(fn.getClass))
}

def interruptibleMany[A](thunk: => A): IO[A] = {
val fn = Thunk.asFunction0(thunk)
Blocking(TypeInterruptibleMany, fn, Tracing.calculateTracingEvent(fn.getClass))
}

def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
if (hint eq TypeDelay)
apply(thunk)
Expand Down
5 changes: 3 additions & 2 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1482,8 +1482,9 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {

override def blocking[A](thunk: => A): IO[A] = IO.blocking(thunk)

override def interruptible[A](many: Boolean)(thunk: => A): IO[A] =
IO.interruptible(many)(thunk)
override def interruptible[A](thunk: => A): IO[A] = IO.interruptible(thunk)

override def interruptibleMany[A](thunk: => A): IO[A] = IO.interruptibleMany(thunk)

def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
IO.suspend(hint)(thunk)
Expand Down
3 changes: 2 additions & 1 deletion docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ In Cats Effect, code containing side-effects should always be wrapped in one of
- **Synchronous** (`return`s or `throw`s)
+ `IO(...)` or `IO.delay(...)`
+ `IO.blocking(...)`
+ `IO.interruptible(true/false)(...)`
+ `IO.interruptible(...)`
+ `IO.interruptibleMany(...)`
- **Asynchronous** (invokes a callback)
+ `IO.async` or `IO.async_`

Expand Down
2 changes: 1 addition & 1 deletion docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ It is now possible to make the blocking task interruptible using [`Sync`](./type
```scala mdoc
// many: whether it's okay to try interrupting more than once
val programInterruptible =
Sync[IO].interruptible(many = false)(println("hello Sync blocking!"))
Sync[IO].interruptible(println("hello Sync blocking!"))
```

#### Where Does The Blocking Pool Come From?
Expand Down
2 changes: 1 addition & 1 deletion docs/typeclasses/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ blocking operation via a thread interrupt in the event on cancelation.

```scala
//true means we try thread interruption repeatedly until the blocking operation exits
val operation: F[Unit] = F.interruptible(true)(longRunningOp())
val operation: F[Unit] = F.interruptibleMany(longRunningOp())

val run: F[Unit] = operation.timeout(30.seconds)
```
21 changes: 15 additions & 6 deletions kernel/shared/src/main/scala/cats/effect/kernel/Sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,29 @@ trait Sync[F[_]] extends MonadCancel[F, Throwable] with Clock[F] with Unique[F]
def blocking[A](thunk: => A): F[A] =
suspend(Blocking)(thunk)

private[effect] def interruptible[A](many: Boolean, thunk: => A): F[A] =
if (many) interruptibleMany(thunk) else interruptible(thunk)

/**
* Like [[Sync.blocking]] but will attempt to abort the blocking operation using thread
* interrupts in the event of cancelation.
* interrupts in the event of cancelation. The interrupt will be attempted only once.
*
* @param many
* Whether interruption via thread interrupt should be attempted once or repeatedly until
* the blocking operation completes or exits.
* @param thunk
* The side effect which is to be suspended in `F[_]` and evaluated on a blocking execution
* context
*/
def interruptible[A](thunk: => A): F[A] = suspend(InterruptibleOnce)(thunk)

/**
* Like [[Sync.blocking]] but will attempt to abort the blocking operation using thread
* interrupts in the event of cancelation. The interrupt will be attempted repeatedly until
* the blocking operation completes or exits.
*
* @param thunk
* The side effect which is to be suspended in `F[_]` and evaluated on a blocking execution
* context
*/
def interruptible[A](many: Boolean)(thunk: => A): F[A] =
suspend(if (many) InterruptibleMany else InterruptibleOnce)(thunk)
def interruptibleMany[A](thunk: => A): F[A] = suspend(InterruptibleMany)(thunk)

def suspend[A](hint: Sync.Type)(thunk: => A): F[A]
}
Expand Down
34 changes: 34 additions & 0 deletions scalafix/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,37 @@ lazy val v3_0_0_tests = project
)
.dependsOn(v3_0_0_input, rules)
.enablePlugins(ScalafixTestkitPlugin)


lazy val v3_3_0_input = project
.in(file("v3_3_0/input"))
.settings(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "3.0.0"
),
scalacOptions += "-P:semanticdb:synthetics:on"
)

lazy val v3_3_0_output = project
.in(file("v3_3_0/output"))
.settings(
libraryDependencies ++= Seq(
"io.vasilev" %% "cats-effect" % "3.3-208-b352672"
)
)

lazy val v3_3_0_tests = project
.in(file("v3_3_0/tests"))
.settings(
libraryDependencies += "ch.epfl.scala" % "scalafix-testkit" % V.scalafixVersion % Test cross CrossVersion.full,
Compile / compile :=
(Compile / compile).dependsOn(v3_3_0_input / Compile / compile).value,
scalafixTestkitOutputSourceDirectories :=
(v3_3_0_output / Compile / sourceDirectories).value,
scalafixTestkitInputSourceDirectories :=
(v3_3_0_input / Compile / sourceDirectories).value,
scalafixTestkitInputClasspath :=
(v3_3_0_input / Compile / fullClasspath).value
)
.dependsOn(v3_3_0_input, rules)
.enablePlugins(ScalafixTestkitPlugin)
47 changes: 47 additions & 0 deletions scalafix/rules/src/main/scala/fix/v3_3_0.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package fix

import scalafix.v1._

import scala.meta.Token._
import scala.meta._

class v3_3_0 extends SemanticRule("v3_3_0") {
override def fix(implicit doc: SemanticDocument): Patch = {
val IO_M = SymbolMatcher.exact("cats/effect/IO.")
val Sync_interruptible_M = SymbolMatcher.exact("cats/effect/kernel/Sync#interruptible().")

val IO_S = Symbol("cats/effect/IO.")
val Sync_S = Symbol("cats/effect/Sync#")

doc.tree.collect {
// IO.interruptible(false) -> IO.interruptible
// IO.interruptible(true) -> IO.interruptibleMany
case t @ q"${IO_M(_)}.interruptible(${Lit.Boolean(many)})" =>
replaceInterruptible(many, t, s"${IO_S.displayName}")

// Sync#interruptible(false) -> Sync#interruptible
// Sync#interruptible(true) -> Sync#interruptibleMany
case t @ q"${Sync_interruptible_M(Term.Apply(interruptible, Lit.Boolean(many) :: _))}" =>
interruptible.synthetics match {
case TypeApplyTree(_, TypeRef(_, symbol, _) :: _) :: _ =>
if (symbol.displayName == "Unit") interruptible match {
case Term.Select(typeF, _) =>
replaceInterruptible(
many,
t,
s"${Sync_S.displayName}[${typeF.symbol.displayName}]"
)
case _ => Patch.empty
}
else replaceInterruptible(many, t, s"${Sync_S.displayName}[${symbol.displayName}]")
case _ => Patch.empty
}
}.asPatch
}

def replaceInterruptible(many: Boolean, from: Tree, to: String): Patch =
if (many)
Patch.replaceTree(from, s"$to.interruptibleMany")
else
Patch.replaceTree(from, s"$to.interruptible")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
rule = "scala:fix.v3_3_0"
*/
package fix

import cats.effect.{ IO, Sync }

object InterruptibleRewrites {
IO.interruptible(true)(IO.unit)

IO.interruptible(false)(IO.unit)

Sync[IO].interruptible(true)(IO.unit)

Sync[IO].interruptible(false)(IO.unit)

def f[F[_]](implicit F: Sync[F]): F[Unit] = F.interruptible(true)(IO.unit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package fix

import cats.effect.{ IO, Sync }

object InterruptibleRewrites {
IO.interruptibleMany(IO.unit)

IO.interruptible(IO.unit)

Sync[IO].interruptibleMany(IO.unit)

Sync[IO].interruptible(IO.unit)

def f[F[_]](implicit F: Sync[F]): F[Unit] = Sync[F].interruptibleMany(IO.unit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package fix

import org.scalatest.FunSuiteLike
import scalafix.testkit.AbstractSemanticRuleSuite

class CatsEffectTests extends AbstractSemanticRuleSuite with FunSuiteLike {
runAllTests()
}
2 changes: 1 addition & 1 deletion std/shared/src/main/scala/cats/effect/std/Console.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ object Console {

private final class SyncConsole[F[_]](implicit F: Sync[F]) extends Console[F] {
def readLineWithCharset(charset: Charset): F[String] =
F.interruptible(false) {
F.interruptible {
val in = System.in
val decoder = charset
.newDecoder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ trait IOPlatformSpecification { self: BaseSpec with ScalaCheck =>
var interrupted = true
val latch = new CountDownLatch(1)

val await = IO.interruptible(false) {
val await = IO.interruptible {
latch.countDown()
Thread.sleep(15000)
interrupted = false
Expand All @@ -157,7 +157,7 @@ trait IOPlatformSpecification { self: BaseSpec with ScalaCheck =>
var interrupted = true
val latch = new CountDownLatch(1)

val await = IO.interruptible(true) {
val await = IO.interruptibleMany {
latch.countDown()

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ConsoleJVMSpec extends BaseSpec {
val release = (src: Source) => IO(src.close())
Resource
.make(acquire)(release)
.use(src => IO.interruptible(false)(src.getLines().toList))
.use(src => IO.interruptible(src.getLines().toList))
.handleErrorWith(_ => IO.pure(Nil))
}

Expand Down

0 comments on commit b117e63

Please sign in to comment.