Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Cats Effect v3.5.0-RC2 #3142

Merged
merged 21 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._

Global / onChangedBuildSource := ReloadOnSourceChanges

ThisBuild / tlBaseVersion := "3.6"
ThisBuild / tlBaseVersion := "3.7"

ThisBuild / organization := "co.fs2"
ThisBuild / organizationName := "Functional Streams for Scala"
Expand Down Expand Up @@ -178,7 +178,8 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readBytesFromInputStream"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readInputStreamGeneric"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.io.net.Socket.forAsync")
)

lazy val root = tlCrossRootProject
Expand Down Expand Up @@ -213,9 +214,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.scodec" %%% "scodec-bits" % "1.1.35",
"org.typelevel" %%% "cats-core" % "2.9.0",
"org.typelevel" %%% "cats-effect" % "3.4.7",
"org.typelevel" %%% "cats-effect-laws" % "3.4.7" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4.7" % Test,
"org.typelevel" %%% "cats-effect" % "3.5.0-RC1",
"org.typelevel" %%% "cats-effect-laws" % "3.5.0-RC1" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.0-RC1" % Test,
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
"org.typelevel" %%% "cats-laws" % "2.9.0" % Test,
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
Expand Down
31 changes: 14 additions & 17 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ object Channel {

def send(a: A) =
F.deferred[Unit].flatMap { producer =>
F.uncancelable { poll =>
state.modify {
state.flatModifyFull { case (poll, state) =>
state match {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

Expand All @@ -169,12 +169,12 @@ object Channel {
State(values, size, None, (a, producer) :: producers, false),
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
)
}.flatten
}
}
}

def trySend(a: A) =
state.modify {
state.flatModify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Boolean].pure[F])

Expand All @@ -186,22 +186,19 @@ object Channel {
)
else
(s, rightFalse.pure[F])
}.flatten
}

def close =
state
.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])
state.flatModify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting).as(rightUnit) <* signalClosure
)
}
.flatten
.uncancelable
case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting).as(rightUnit) <* signalClosure
)
}

def isClosed = closedGate.tryGet.map(_.isDefined)

Expand Down
61 changes: 33 additions & 28 deletions core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cats.data.OptionT
import cats.kernel.Eq
import cats.effect.kernel.{Concurrent, Deferred, Ref, Resource}
import cats.effect.std.MapRef
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Applicative, Functor, Invariant, Monad}

Expand Down Expand Up @@ -270,14 +271,16 @@ object SignallingRef {
private[this] def getAndDiscreteUpdatesImpl = {
def go(id: Long, lastSeen: Long): Stream[F, A] = {
def getNext: F[(A, Long)] =
F.deferred[(A, Long)].flatMap { wait =>
state.modify { case state @ State(value, lastUpdate, listeners) =>
if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else
state.copy(listeners = listeners + (id -> wait)) -> wait.get
}.flatten
}
F.deferred[(A, Long)]
.flatMap { wait =>
state.modify { case state @ State(value, lastUpdate, listeners) =>
if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else
state.copy(listeners = listeners + (id -> wait)) -> wait.get
}
}
.flatten // cancelable

Stream.eval(getNext).flatMap { case (a, lastUpdate) =>
Stream.emit(a) ++ go(id, lastSeen = lastUpdate)
Expand All @@ -297,10 +300,10 @@ object SignallingRef {
def update(f: A => A): F[Unit] = modify(a => (f(a), ()))

def modify[B](f: A => (A, B)): F[B] =
state.modify(updateAndNotify(_, f)).flatten
state.flatModify(updateAndNotify(_, f))

def tryModify[B](f: A => (A, B)): F[Option[B]] =
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence)
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence).uncancelable

def tryUpdate(f: A => A): F[Boolean] =
tryModify(a => (f(a), ())).map(_.isDefined)
Expand Down Expand Up @@ -529,23 +532,25 @@ object SignallingMapRef {
private[this] def getAndDiscreteUpdatesImpl = {
def go(id: Long, lastSeen: Long): Stream[F, Option[V]] = {
def getNext: F[(Option[V], Long)] =
F.deferred[(Option[V], Long)].flatMap { wait =>
state.modify { state =>
val keyState = state.keys.get(k)
val value = keyState.flatMap(_.value)
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)

if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else {
val newKeys =
state.keys
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
state.copy(keys = newKeys) -> wait.get
F.deferred[(Option[V], Long)]
.flatMap { wait =>
state.modify { state =>
val keyState = state.keys.get(k)
val value = keyState.flatMap(_.value)
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)

if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else {
val newKeys =
state.keys
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
state.copy(keys = newKeys) -> wait.get
}
}
}.flatten
}
}
.flatten // cancelable

Stream.eval(getNext).flatMap { case (v, lastUpdate) =>
Stream.emit(v) ++ go(id, lastSeen = lastUpdate)
Expand Down Expand Up @@ -580,10 +585,10 @@ object SignallingMapRef {
def update(f: Option[V] => Option[V]): F[Unit] = modify(v => (f(v), ()))

def modify[U](f: Option[V] => (Option[V], U)): F[U] =
state.modify(updateAndNotify(_, k, f)).flatten
state.flatModify(updateAndNotify(_, k, f))

def tryModify[U](f: Option[V] => (Option[V], U)): F[Option[U]] =
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence)
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence).uncancelable

def tryUpdate(f: Option[V] => Option[V]): F[Boolean] =
tryModify(a => (f(a), ())).map(_.isDefined)
Expand Down
8 changes: 4 additions & 4 deletions core/shared/src/main/scala/fs2/internal/ScopedResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private[internal] object ScopedResource {
.flatMap(finalizer => finalizer.map(_(ec)).getOrElse(pru))

def acquired(finalizer: Resource.ExitCase => F[Unit]): F[Either[Throwable, Boolean]] =
state.modify { s =>
state.flatModify { s =>
if (s.isFinished)
// state is closed and there are no leases, finalizer has to be invoked right away
s -> finalizer(Resource.ExitCase.Succeeded).as(false).attempt
Expand All @@ -154,7 +154,7 @@ private[internal] object ScopedResource {
Boolean
]).pure[F]
}
}.flatten
}

def lease: F[Option[Lease[F]]] =
state.modify { s =>
Expand All @@ -173,14 +173,14 @@ private[internal] object ScopedResource {
}
.flatMap { now =>
if (now.isFinished)
state.modify { s =>
state.flatModify { s =>
// Scope is closed and this is last lease, assure finalizer is removed from the state and run
// previous finalizer shall be always present at this point, this shall invoke it
s.copy(finalizer = None) -> (s.finalizer match {
case Some(ff) => ff(Resource.ExitCase.Succeeded)
case None => pru
})
}.flatten
}
else
pru
}
Expand Down
155 changes: 76 additions & 79 deletions core/shared/src/test/scala/fs2/StreamInterruptSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,96 +56,93 @@ class StreamInterruptSuite extends Fs2Suite {
}
}

// These IO streams cannot be interrupted on JS b/c they never yield execution
if (isJVM) {
test("3 - constant stream") {
val interruptSoon = Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.constant(true)
.interruptWhen(interruptSoon)
.compile
.drain
.replicateA(interruptRepeatCount)
}
test("3 - constant stream") {
val interruptSoon = Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.constant(true)
.interruptWhen(interruptSoon)
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("4 - interruption of constant stream with a flatMap") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.constant(true)
.interruptWhen(interrupt)
.flatMap(_ => Stream.emit(1))
.compile
.drain
.replicateA(interruptRepeatCount)
}
test("4 - interruption of constant stream with a flatMap") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.constant(true)
.interruptWhen(interrupt)
.flatMap(_ => Stream.emit(1))
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("5 - interruption of an infinitely recursive stream") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt
test("5 - interruption of an infinitely recursive stream") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt

def loop(i: Int): Stream[IO, Int] =
Stream.emit(i).flatMap(i => Stream.emit(i) ++ loop(i + 1))
def loop(i: Int): Stream[IO, Int] =
Stream.emit(i).flatMap(i => Stream.emit(i) ++ loop(i + 1))

loop(0)
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}
loop(0)
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("6 - interruption of an infinitely recursive stream that never emits") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt
test("6 - interruption of an infinitely recursive stream that never emits") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt

def loop: Stream[IO, Nothing] =
Stream.eval(IO.unit) >> loop
def loop: Stream[IO, Nothing] =
Stream.eval(IO.unit) >> loop

loop
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}
loop
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("7 - interruption of an infinitely recursive stream that never emits and has no eval") {
val interrupt = Stream.sleep_[IO](20.millis).compile.drain.attempt
def loop: Stream[IO, Int] = Stream.emit(()) >> loop
loop
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}
test("7 - interruption of an infinitely recursive stream that never emits and has no eval") {
val interrupt = Stream.sleep_[IO](20.millis).compile.drain.attempt
def loop: Stream[IO, Int] = Stream.emit(()) >> loop
loop
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("8 - interruption of a stream that repeatedly evaluates") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.repeatEval(IO.unit)
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}
test("8 - interruption of a stream that repeatedly evaluates") {
val interrupt =
Stream.sleep_[IO](20.millis).compile.drain.attempt
Stream
.repeatEval(IO.unit)
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("9 - interruption of the constant drained stream") {
val interrupt =
Stream.sleep_[IO](1.millis).compile.drain.attempt
Stream
.constant(true)
.dropWhile(!_)
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}
test("9 - interruption of the constant drained stream") {
val interrupt =
Stream.sleep_[IO](1.millis).compile.drain.attempt
Stream
.constant(true)
.dropWhile(!_)
.interruptWhen(interrupt)
.compile
.drain
.replicateA(interruptRepeatCount)
}

test("10 - terminates when interruption stream is infinitely false") {
forAllF { (s: Stream[Pure, Int]) =>
val allFalse = Stream.constant(false)
s.covary[IO].interruptWhen(allFalse).assertEmitsSameAs(s)
}
test("10 - terminates when interruption stream is infinitely false") {
forAllF { (s: Stream[Pure, Int]) =>
val allFalse = Stream.constant(false)
s.covary[IO].interruptWhen(allFalse).assertEmitsSameAs(s)
}
}

Expand Down
Loading