Skip to content

Commit

Permalink
Workaround for memory issue of *> (#72)
Browse files Browse the repository at this point in the history
* Workaround for memory issue of `*>`. See typelevel/cats-effect#401
* Monad for Applicative and FlatMap
  • Loading branch information
barambani authored Dec 31, 2018
1 parent 006b205 commit 1fc5b21
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cli/src/main/scala/laserdisc/cli/CLI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object CLI extends IOApp.WithContext { self =>
val maybePort = Either.catchNonFatal(arg2.toInt).flatMap(Port.from).toOption

(maybeHost, maybePort) match {
case (Some(ip), Some(port)) => IO(println(logo)) *> impl.mkStream(ip, port).compile.drain.as(ExitCode.Success)
case (Some(ip), Some(port)) => IO(println(logo)) >> impl.mkStream(ip, port).compile.drain.as(ExitCode.Success)
case _ => IO(println("please supply valid host and port (space separated)")).as(ExitCode.Error)
}

Expand Down
4 changes: 2 additions & 2 deletions fs2/src/main/scala/laserdisc/fs2/RedisClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ object RedisClient {

new Connection[F] {
override final def run: F[Unit] =
logger.info("Starting connection") *>
logger.info("Starting connection") >>
runner(serverStream).interruptWhen(termSignal).compile.drain.attempt.flatMap { r =>
logger.info(s"Connection terminated: $r")
}

override final def shutdown: F[Unit] =
logger.info("Shutting down connection") *> termSignal.set(true)
logger.info("Shutting down connection") >> termSignal.set(true)

override final def send[In <: HList, Out <: HList](in: In, timeout: FiniteDuration)(
implicit protocolHandler: ProtocolHandler.Aux[F, In, Out]
Expand Down
10 changes: 5 additions & 5 deletions fs2/src/main/scala/laserdisc/fs2/RedisConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import java.nio.channels.AsynchronousChannelGroup

import _root_.fs2._
import _root_.fs2.io.tcp.Socket
import cats.Applicative
import cats.Monad
import cats.effect.{ConcurrentEffect, ContextShift, Effect}
import cats.syntax.applicative._
import cats.syntax.apply._
import cats.syntax.flatMap._
import laserdisc.protocol._
import log.effect.LogWriter
import scodec.Codec
Expand Down Expand Up @@ -38,8 +38,8 @@ object RedisConnection {

private[fs2] final object impl {

def send[F[_]: Applicative](sink: Sink[F, Byte])(implicit log: LogWriter[F]): Sink[F, RESP] =
_.evalMap(resp => log.debug(s"sending $resp") *> resp.pure)
def send[F[_]: Monad](sink: Sink[F, Byte])(implicit log: LogWriter[F]): Sink[F, RESP] =
_.evalMap(resp => log.debug(s"sending $resp") >> resp.pure)
.through(streamEncoder.encode)
.flatMap(bits => Stream.chunk(Chunk.array(bits.toByteArray)))
.to(sink)
Expand Down Expand Up @@ -71,7 +71,7 @@ object RedisConnection {

_.through(framing)
.flatMap(complete => streamDecoder.decode(complete.bits))
.evalMap(resp => log.debug(s"receiving $resp") *> resp.pure)
.evalMap(resp => log.debug(s"receiving $resp") >> resp.pure)
}

}
Expand Down

0 comments on commit 1fc5b21

Please sign in to comment.