Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Cats Effect v3.5.0-RC2 #3142

Merged
merged 21 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._

Global / onChangedBuildSource := ReloadOnSourceChanges

ThisBuild / tlBaseVersion := "3.6"
ThisBuild / tlBaseVersion := "3.7"

ThisBuild / organization := "co.fs2"
ThisBuild / organizationName := "Functional Streams for Scala"
Expand Down Expand Up @@ -178,7 +178,14 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readBytesFromInputStream"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.readInputStreamGeneric"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>")
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.package.<clinit>"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.io.net.Socket.forAsync"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"fs2.io.net.SocketCompanionPlatform#AsyncSocket.this"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform#AsyncSocket.this"
)
)

lazy val root = tlCrossRootProject
Expand Down Expand Up @@ -213,9 +220,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.scodec" %%% "scodec-bits" % "1.1.35",
"org.typelevel" %%% "cats-core" % "2.9.0",
"org.typelevel" %%% "cats-effect" % "3.4.7",
"org.typelevel" %%% "cats-effect-laws" % "3.4.7" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4.7" % Test,
"org.typelevel" %%% "cats-effect" % "3.5.0-RC2",
"org.typelevel" %%% "cats-effect-laws" % "3.5.0-RC2" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.0-RC2" % Test,
"org.typelevel" %%% "cats-laws" % "2.9.0" % Test,
"org.typelevel" %%% "discipline-munit" % "2.0.0-M3" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
Expand Down
31 changes: 14 additions & 17 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ object Channel {

def send(a: A) =
F.deferred[Unit].flatMap { producer =>
F.uncancelable { poll =>
state.modify {
state.flatModifyFull { case (poll, state) =>
state match {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

Expand All @@ -169,12 +169,12 @@ object Channel {
State(values, size, None, (a, producer) :: producers, false),
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
)
}.flatten
}
}
}

def trySend(a: A) =
state.modify {
state.flatModify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Boolean].pure[F])

Expand All @@ -186,22 +186,19 @@ object Channel {
)
else
(s, rightFalse.pure[F])
}.flatten
}

def close =
state
.modify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])
state.flatModify {
case s @ State(_, _, _, _, closed @ true) =>
(s, Channel.closed[Unit].pure[F])

case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting).as(rightUnit) <* signalClosure
)
}
.flatten
.uncancelable
case State(values, size, waiting, producers, closed @ false) =>
(
State(values, size, None, producers, true),
notifyStream(waiting).as(rightUnit) <* signalClosure
)
}

def isClosed = closedGate.tryGet.map(_.isDefined)

Expand Down
61 changes: 33 additions & 28 deletions core/shared/src/main/scala/fs2/concurrent/Signal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cats.data.OptionT
import cats.kernel.Eq
import cats.effect.kernel.{Concurrent, Deferred, Ref, Resource}
import cats.effect.std.MapRef
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.{Applicative, Functor, Invariant, Monad}

Expand Down Expand Up @@ -270,14 +271,16 @@ object SignallingRef {
private[this] def getAndDiscreteUpdatesImpl = {
def go(id: Long, lastSeen: Long): Stream[F, A] = {
def getNext: F[(A, Long)] =
F.deferred[(A, Long)].flatMap { wait =>
state.modify { case state @ State(value, lastUpdate, listeners) =>
if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else
state.copy(listeners = listeners + (id -> wait)) -> wait.get
}.flatten
}
F.deferred[(A, Long)]
.flatMap { wait =>
state.modify { case state @ State(value, lastUpdate, listeners) =>
if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else
state.copy(listeners = listeners + (id -> wait)) -> wait.get
}
}
.flatten // cancelable

Stream.eval(getNext).flatMap { case (a, lastUpdate) =>
Stream.emit(a) ++ go(id, lastSeen = lastUpdate)
Expand All @@ -297,10 +300,10 @@ object SignallingRef {
def update(f: A => A): F[Unit] = modify(a => (f(a), ()))

def modify[B](f: A => (A, B)): F[B] =
state.modify(updateAndNotify(_, f)).flatten
state.flatModify(updateAndNotify(_, f))

def tryModify[B](f: A => (A, B)): F[Option[B]] =
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence)
state.tryModify(updateAndNotify(_, f)).flatMap(_.sequence).uncancelable

def tryUpdate(f: A => A): F[Boolean] =
tryModify(a => (f(a), ())).map(_.isDefined)
Expand Down Expand Up @@ -529,23 +532,25 @@ object SignallingMapRef {
private[this] def getAndDiscreteUpdatesImpl = {
def go(id: Long, lastSeen: Long): Stream[F, Option[V]] = {
def getNext: F[(Option[V], Long)] =
F.deferred[(Option[V], Long)].flatMap { wait =>
state.modify { state =>
val keyState = state.keys.get(k)
val value = keyState.flatMap(_.value)
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)

if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else {
val newKeys =
state.keys
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
state.copy(keys = newKeys) -> wait.get
F.deferred[(Option[V], Long)]
.flatMap { wait =>
state.modify { state =>
val keyState = state.keys.get(k)
val value = keyState.flatMap(_.value)
val lastUpdate = keyState.fold(-1L)(_.lastUpdate)
val listeners = keyState.fold(LongMap.empty[Listener])(_.listeners)

if (lastUpdate != lastSeen)
state -> (value -> lastUpdate).pure[F]
else {
val newKeys =
state.keys
.updated(k, KeyState(value, lastUpdate, listeners.updated(id, wait)))
state.copy(keys = newKeys) -> wait.get
}
}
}.flatten
}
}
.flatten // cancelable

Stream.eval(getNext).flatMap { case (v, lastUpdate) =>
Stream.emit(v) ++ go(id, lastSeen = lastUpdate)
Expand Down Expand Up @@ -580,10 +585,10 @@ object SignallingMapRef {
def update(f: Option[V] => Option[V]): F[Unit] = modify(v => (f(v), ()))

def modify[U](f: Option[V] => (Option[V], U)): F[U] =
state.modify(updateAndNotify(_, k, f)).flatten
state.flatModify(updateAndNotify(_, k, f))

def tryModify[U](f: Option[V] => (Option[V], U)): F[Option[U]] =
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence)
state.tryModify(updateAndNotify(_, k, f)).flatMap(_.sequence).uncancelable

def tryUpdate(f: Option[V] => Option[V]): F[Boolean] =
tryModify(a => (f(a), ())).map(_.isDefined)
Expand Down
8 changes: 4 additions & 4 deletions core/shared/src/main/scala/fs2/internal/ScopedResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private[internal] object ScopedResource {
.flatMap(finalizer => finalizer.map(_(ec)).getOrElse(pru))

def acquired(finalizer: Resource.ExitCase => F[Unit]): F[Either[Throwable, Boolean]] =
state.modify { s =>
state.flatModify { s =>
if (s.isFinished)
// state is closed and there are no leases, finalizer has to be invoked right away
s -> finalizer(Resource.ExitCase.Succeeded).as(false).attempt
Expand All @@ -154,7 +154,7 @@ private[internal] object ScopedResource {
Boolean
]).pure[F]
}
}.flatten
}

def lease: F[Option[Lease[F]]] =
state.modify { s =>
Expand All @@ -173,14 +173,14 @@ private[internal] object ScopedResource {
}
.flatMap { now =>
if (now.isFinished)
state.modify { s =>
state.flatModify { s =>
// Scope is closed and this is last lease, assure finalizer is removed from the state and run
// previous finalizer shall be always present at this point, this shall invoke it
s.copy(finalizer = None) -> (s.finalizer match {
case Some(ff) => ff(Resource.ExitCase.Succeeded)
case None => pru
})
}.flatten
}
else
pru
}
Expand Down
Loading