Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment/Chunk usability #1012

Merged
merged 7 commits into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ConcurrentBenchmark {
@GenerateN(1, 2, 4, 7, 16, 32, 64, 128, 256)
@Benchmark
def join(N: Int): Int = {
val each = Stream.segment(Chunk.seq(0 to 1000).map(i => Stream.eval(IO.pure(i)))).covary[IO]
val each = Stream.segment(Segment.seq(0 to 1000).map(i => Stream.eval(IO.pure(i)))).covary[IO]
each.join(N).runLast.unsafeRunSync.get
}
}
8 changes: 4 additions & 4 deletions core/jvm/src/main/scala/fs2/compress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ object compress {
case Some((hd,tl)) =>
deflater.setInput(hd.toArray)
val result = _deflate_collect(deflater, buffer, ArrayBuffer.empty, false).toArray
Pull.output(Chunk.bytes(result)) >> _deflate_stream(deflater, buffer)(tl)
Pull.outputChunk(Chunk.bytes(result)) >> _deflate_stream(deflater, buffer)(tl)
case None =>
deflater.setInput(Array.empty)
deflater.finish()
val result = _deflate_collect(deflater, buffer, ArrayBuffer.empty, true).toArray
deflater.end()
Pull.output(Chunk.bytes(result))
Pull.outputChunk(Chunk.bytes(result))
}

@tailrec
Expand Down Expand Up @@ -68,7 +68,7 @@ object compress {
val buffer = new Array[Byte](bufferSize)
inflater.setInput(hd.toArray)
val result = _inflate_collect(inflater, buffer, ArrayBuffer.empty).toArray
Pull.output(Chunk.bytes(result)) >> _inflate_stream(inflater, buffer)(tl)
Pull.outputChunk(Chunk.bytes(result)) >> _inflate_stream(inflater, buffer)(tl)
}.stream
}

Expand All @@ -77,7 +77,7 @@ object compress {
case Some((hd,tl)) =>
inflater.setInput(hd.toArray)
val result = _inflate_collect(inflater, buffer, ArrayBuffer.empty).toArray
Pull.output(Chunk.bytes(result)) >> _inflate_stream(inflater, buffer)(tl)
Pull.outputChunk(Chunk.bytes(result)) >> _inflate_stream(inflater, buffer)(tl)
case None =>
if (!inflater.finished) Pull.raiseError(new DataFormatException("Insufficient data"))
else { inflater.end(); Pull.done }
Expand Down
32 changes: 16 additions & 16 deletions core/jvm/src/test/scala/fs2/PipeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ class PipeSpec extends Fs2Spec {
val unsegmentedV = s.get.toVector
assert {
// All but last list have n0 values
segmentedV.dropRight(1).forall(_.toChunk.size == n0.get) &&
segmentedV.dropRight(1).forall(_.force.toChunk.size == n0.get) &&
// Last list has at most n0 values
segmentedV.lastOption.fold(true)(_.toChunk.size <= n0.get) &&
segmentedV.lastOption.fold(true)(_.force.toChunk.size <= n0.get) &&
// Flattened sequence is equal to vector without segmenting
segmentedV.foldLeft(Vector.empty[Int])((v, l) => v ++ l.toVector) == unsegmentedV
segmentedV.foldLeft(Vector.empty[Int])((v, l) => v ++ l.force.toVector) == unsegmentedV
}
}

Expand All @@ -71,9 +71,9 @@ class PipeSpec extends Fs2Spec {
val expectedSize = unsegmentedV.size - (unsegmentedV.size % n0.get)
assert {
// All lists have n0 values
segmentedV.forall(_.toChunk.size == n0.get) &&
segmentedV.forall(_.force.toChunk.size == n0.get) &&
// Flattened sequence is equal to vector without segmenting, minus "left over" values that could not fit in a segment
segmentedV.foldLeft(Vector.empty[Int])((v, l) => v ++ l.toVector) == unsegmentedV.take(expectedSize)
segmentedV.foldLeft(Vector.empty[Int])((v, l) => v ++ l.force.toVector) == unsegmentedV.take(expectedSize)
}
}

Expand Down Expand Up @@ -154,19 +154,19 @@ class PipeSpec extends Fs2Spec {

"filter (2)" in forAll { (s: PureStream[Double]) =>
val predicate = (i: Double) => i - i.floor < 0.5
val s2 = s.get.mapSegments(s => Chunk.doubles(s.toVector.toArray))
val s2 = s.get.mapSegments(s => Chunk.doubles(s.force.toArray).toSegment)
runLog(s2.filter(predicate)) shouldBe runLog(s2).filter(predicate)
}

"filter (3)" in forAll { (s: PureStream[Byte]) =>
val predicate = (b: Byte) => b < 0
val s2 = s.get.mapSegments(s => Chunk.bytes(s.toVector.toArray))
val s2 = s.get.mapSegments(s => Chunk.bytes(s.force.toArray).toSegment)
runLog(s2.filter(predicate)) shouldBe runLog(s2).filter(predicate)
}

"filter (4)" in forAll { (s: PureStream[Boolean]) =>
val predicate = (b: Boolean) => !b
val s2 = s.get.mapSegments(s => Chunk.booleans(s.toVector.toArray))
val s2 = s.get.mapSegments(s => Chunk.booleans(s.force.toArray).toSegment)
runLog(s2.filter(predicate)) shouldBe runLog(s2).filter(predicate)
}

Expand Down Expand Up @@ -208,9 +208,9 @@ class PipeSpec extends Fs2Spec {
val f = (i: Int) => i % n.get
val s1 = s.get.groupAdjacentBy(f)
val s2 = s.get.map(f).changes
runLog(s1.map(_._2)).flatMap(_.toVector) shouldBe runLog(s.get)
runLog(s1.map(_._2)).flatMap(_.force.toVector) shouldBe runLog(s.get)
runLog(s1.map(_._1)) shouldBe runLog(s2)
runLog(s1.map { case (k, vs) => vs.toVector.forall(f(_) == k) }) shouldBe runLog(s2.map(_ => true))
runLog(s1.map { case (k, vs) => vs.force.toVector.forall(f(_) == k) }) shouldBe runLog(s2.map(_ => true))
}

"head" in forAll { (s: PureStream[Int]) =>
Expand Down Expand Up @@ -277,15 +277,15 @@ class PipeSpec extends Fs2Spec {
"split" in forAll { (s: PureStream[Int], n: SmallPositive) =>
val s2 = s.get.map(x => if (x == Int.MinValue) x + 1 else x).map(_.abs).filter(_ != 0)
withClue(s"n = $n, s = ${s.get.toList}, s2 = " + s2.toList) {
runLog { s2.chunkLimit(n.get).intersperse(Chunk.singleton(0)).flatMap(Stream.chunk).split(_ == 0).map(_.toVector).filter(_.nonEmpty) } shouldBe
runLog { s2.chunkLimit(n.get).intersperse(Chunk.singleton(0)).flatMap(Stream.chunk).split(_ == 0).map(_.force.toVector).filter(_.nonEmpty) } shouldBe
s2.chunkLimit(n.get).filter(_.nonEmpty).map(_.toVector).toVector
}
}

"split (2)" in {
Stream(1, 2, 0, 0, 3, 0, 4).split(_ == 0).toVector.map(_.toVector) shouldBe Vector(Vector(1, 2), Vector(), Vector(3), Vector(4))
Stream(1, 2, 0, 0, 3, 0).split(_ == 0).toVector.map(_.toVector) shouldBe Vector(Vector(1, 2), Vector(), Vector(3))
Stream(1, 2, 0, 0, 3, 0, 0).split(_ == 0).toVector.map(_.toVector) shouldBe Vector(Vector(1, 2), Vector(), Vector(3), Vector())
Stream(1, 2, 0, 0, 3, 0, 4).split(_ == 0).toVector.map(_.force.toVector) shouldBe Vector(Vector(1, 2), Vector(), Vector(3), Vector(4))
Stream(1, 2, 0, 0, 3, 0).split(_ == 0).toVector.map(_.force.toVector) shouldBe Vector(Vector(1, 2), Vector(), Vector(3))
Stream(1, 2, 0, 0, 3, 0, 0).split(_ == 0).toVector.map(_.force.toVector) shouldBe Vector(Vector(1, 2), Vector(), Vector(3), Vector())
}

"take" in forAll { (s: PureStream[Int], negate: Boolean, n0: SmallNonnegative) =>
Expand Down Expand Up @@ -350,7 +350,7 @@ class PipeSpec extends Fs2Spec {

"take.segments" in {
val s = Stream(1, 2) ++ Stream(3, 4)
runLog(s.take(3).segments.map(_.toVector)) shouldBe Vector(Vector(1, 2), Vector(3))
runLog(s.take(3).segments.map(_.force.toVector)) shouldBe Vector(Vector(1, 2), Vector(3))
}

"unNone" in forAll { (s: PureStream[Option[Int]]) =>
Expand Down Expand Up @@ -492,7 +492,7 @@ class PipeSpec extends Fs2Spec {
}
case Stepper.Await(receive) =>
s.pull.uncons1.flatMap {
case Some(((i,a),s)) => go(Some(a), receive(Some(Chunk.singleton(i))), s)
case Some(((i,a),s)) => go(Some(a), receive(Some(Segment.singleton(i))), s)
case None => go(last, receive(None), s)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/jvm/src/test/scala/fs2/StreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class StreamSpec extends Fs2Spec with Inside {

Stream(1, 2, 3, 4, 5).repartition(i => Chunk(i, i)).toList shouldBe List(1, 3, 6, 10, 15, 15)

Stream(1, 10, 100).repartition(i => Segment.from(i).map(_.toInt).take(1000).toChunk).take(4).toList shouldBe List(1, 2, 3, 4)
Stream(1, 10, 100).repartition(i => Segment.from(i).map(_.toInt).take(1000).force.toChunk).take(4).toList shouldBe List(1, 2, 3, 4)
}

"translate" in forAll { (s: PureStream[Int]) =>
Expand All @@ -149,7 +149,7 @@ class StreamSpec extends Fs2Spec with Inside {

"unfoldSegment" in {
Stream.unfoldSegment(4L) { s =>
if(s > 0) Some((Chunk.longs(Array[Long](s,s)), s-1)) else None
if(s > 0) Some((Chunk.longs(Array[Long](s,s)).toSegment, s-1)) else None
}.toList shouldBe List[Long](4,4,3,3,2,2,1,1)
}

Expand Down
2 changes: 1 addition & 1 deletion core/jvm/src/test/scala/fs2/async/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class QueueSpec extends Fs2Spec {
s.get.noneTerminate.evalMap(q.enqueue1).drain ++ q.dequeueAvailable.unNoneTerminate.segments
})
result.size should be < 2
result.flatMap(_.toVector) shouldBe s.get.toVector
result.flatMap(_.force.toVector) shouldBe s.get.toVector
}
}
}
Expand Down
26 changes: 24 additions & 2 deletions core/shared/src/main/scala/fs2/Catenable.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fs2

import cats.{Applicative, Eval, Foldable, Traverse}
import cats.{Applicative, Eval, Foldable, Monad, Traverse}
import cats.implicits._
import Catenable._

Expand Down Expand Up @@ -192,12 +192,34 @@ object Catenable {
}
}

implicit val traverseInstance: Traverse[Catenable] = new Traverse[Catenable] {
implicit val instance: Traverse[Catenable] with Monad[Catenable] = new Traverse[Catenable] with Monad[Catenable] {
def foldLeft[A, B](fa: Catenable[A], b: B)(f: (B, A) => B): B = fa.foldLeft(b)(f)
def foldRight[A, B](fa: Catenable[A], b: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = Foldable[List].foldRight(fa.toList, b)(f)
override def toList[A](fa: Catenable[A]): List[A] = fa.toList
override def isEmpty[A](fa: Catenable[A]): Boolean = fa.isEmpty
def traverse[F[_], A, B](fa: Catenable[A])(f: A => F[B])(implicit G: Applicative[F]): F[Catenable[B]] =
Traverse[List].traverse(fa.toList)(f).map(Catenable.apply)
def pure[A](a: A): Catenable[A] = Catenable.singleton(a)
def flatMap[A,B](fa: Catenable[A])(f: A => Catenable[B]): Catenable[B] = fa.flatMap(f)
def tailRecM[A,B](a: A)(f: A => Catenable[Either[A,B]]): Catenable[B] = {
var acc: Catenable[B] = Catenable.empty
@tailrec def go(rest: List[Catenable[Either[A, B]]]): Unit = rest match {
case hd :: tl =>
hd.uncons match {
case Some((hdh, hdt)) => hdh match {
case Right(b) =>
acc = acc :+ b
go(hdt :: tl)
case Left(a) =>
go(f(a) :: hdt :: tl)
}
case None =>
go(tl)
}
case _ => ()
}
go(f(a) :: Nil)
acc
}
}
}
Loading