Skip to content

Commit

Permalink
Merge pull request #1148 from Daenyth/sink-either
Browse files Browse the repository at this point in the history
Add Sink.either combinator
  • Loading branch information
mpilquist authored May 23, 2018
2 parents 6b251aa + 5474f7f commit a35c269
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
50 changes: 50 additions & 0 deletions core/jvm/src/test/scala/fs2/SinkSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package fs2

import cats.effect.IO
import cats.implicits._
import fs2.async.Ref

import TestUtil._

class SinkSpec extends AsyncFs2Spec {
"Sink" - {
val s = Stream.emits(Seq(Left(1), Right("a"))).repeat.covary[IO]

"either - does not drop elements" in {
val is = Ref[IO, Vector[Int]](Vector.empty)
val as = Ref[IO, Vector[String]](Vector.empty)

val test = for {
iref <- is
aref <- as
iSink = Sink((i: Int) => iref.modify(_ :+ i).void)
aSink = Sink((a: String) => aref.modify(_ :+ a).void)
eSink = Sink.either(left = iSink, right = aSink)
_ <- s.take(10).to(eSink).compile.drain
iResult <- iref.get
aResult <- aref.get
} yield {
assert(iResult.length == 5)
assert(aResult.length == 5)
}

test.unsafeToFuture
}

"either - termination" - {
"left" in {
val left: Sink[IO, Int] = _.take(0).void
val right: Sink[IO, String] = _.void
val sink = Sink.either(left, right)
assert(runLog(s.through(sink)).length == 0)
}

"right" in {
val left: Sink[IO, Int] = _.void
val right: Sink[IO, String] = _.take(0).void
val sink = Sink.either(left, right)
assert(runLog(s.through(sink)).length == 0)
}
}
}
}
22 changes: 20 additions & 2 deletions core/shared/src/main/scala/fs2/Sink.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package fs2

import java.io.PrintStream

import cats.Show
import cats.effect.Sync
import cats.effect.{Effect, Sync}
import cats.implicits._

import java.io.PrintStream
import scala.concurrent.ExecutionContext

/** Companion for [[Sink]]. */
object Sink {
Expand All @@ -28,4 +30,20 @@ object Sink {
* using the `Show` instance for the input type.
*/
def showLinesStdOut[F[_]: Sync, I: Show]: Sink[F, I] = showLines(Console.out)

/**
* Sink that routes each element to one of two sinks.
* `Left` values get sent to the `left` sink, and likewise for `Right`
*
* If either of `left` or `right` fails, then resulting stream will fail.
* If either `halts` the evaluation will halt too.
*/
def either[F[_]: Effect, L, R](
left: Sink[F, L],
right: Sink[F, R]
)(
implicit ec: ExecutionContext
): Sink[F, Either[L, R]] =
_.observe(_.collect { case Left(l) => l } to left)
.to(_.collect { case Right(r) => r } to right)
}

0 comments on commit a35c269

Please sign in to comment.