Skip to content

Commit

Permalink
Merge pull request #2506 from armanbilge/unchunks
Browse files Browse the repository at this point in the history
Implement `unchunks` (inverse of `chunks`)
  • Loading branch information
mpilquist authored Jul 27, 2021
2 parents 21d36f6 + 0bc68dc commit 5d9845f
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 17 deletions.
3 changes: 2 additions & 1 deletion core/jvm/src/test/scala/fs2/CompressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ class CompressionSuite extends Fs2Suite {
Stream
.chunk(Chunk.empty[Byte])
.through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString)))
.unchunk // ensure chunk sizes are less than file name and comment size soft limits
.chunkLimit(1)
.unchunks // ensure chunk sizes are less than file name and comment size soft limits
.through(Compression[IO].gunzip(8192))
.flatMap { gunzipResult =>
assert(
Expand Down
10 changes: 5 additions & 5 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2582,12 +2582,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
new Stream(Pull.translate[F2, G, O](underlying, u))

/** Converts the input to a stream of 1-element chunks.
*
* @example {{{
* scala> (Stream(1,2,3) ++ Stream(4,5,6)).unchunk.chunks.toList
* res0: List[Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4), Chunk(5), Chunk(6))
* }}}
*/
@deprecated("Use .chunkLimit(1).unchunks instead.", "3.0.7")
def unchunk: Stream[F, O] =
this.repeatPull {
_.uncons1.flatMap {
Expand All @@ -2596,6 +2592,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
}
}

/** Flattens a stream of chunks. Inverse of [[chunks]]. */
def unchunks[O2](implicit ev: O <:< Chunk[O2]): Stream[F, O2] =
flatMap(Stream.chunk(_))

/** Alias for [[filter]]
* Implemented to enable filtering in for comprehensions
*/
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/test/scala/fs2/Generators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait Generators extends ChunkGenerators {
Gen.frequency(
1 -> Gen.const(Stream.empty),
5 -> smallLists(genA).map(as => Stream.emits(as)),
5 -> smallLists(genA).map(as => Stream.emits(as).unchunk),
5 -> smallLists(genA).map(as => Stream.emits(as).chunkLimit(1).unchunks),
5 -> smallLists(smallLists(genA))
.map(_.foldLeft(Stream.empty.covaryOutput[A])((acc, as) => acc ++ Stream.emits(as))),
5 -> smallLists(smallLists(genA))
Expand All @@ -48,7 +48,7 @@ trait Generators extends ChunkGenerators {
Arbitrary(
Gen.frequency(
10 -> arbitrary[List[O]].map(os => Stream.emits(os).take(10)),
10 -> arbitrary[List[O]].map(os => Stream.emits(os).take(10).unchunk),
10 -> arbitrary[List[O]].map(os => Stream.emits(os).take(10).chunkLimit(1).unchunks),
5 -> arbitrary[F[O]].map(fo => Stream.eval(fo)),
1 -> (for {
acquire <- arbitrary[F[O]]
Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ class StreamCombinatorsSuite extends Fs2Suite {

Stream
.emits(0 until streamSize)
.unchunk
.chunkLimit(1)
.unchunks
.evalTap(seenArr(_).complete(()))
.sliding(size, step)
.evalMap { chunk =>
Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/test/scala/fs2/StreamInterruptSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ class StreamInterruptSuite extends Fs2Suite {
val interrupt = IO.sleep(100.millis).attempt
Stream(1)
.covary[IO]
.unchunk
.chunkLimit(1)
.unchunks
.interruptWhen(interrupt)
.pull
.uncons
Expand Down
12 changes: 8 additions & 4 deletions core/shared/src/test/scala/fs2/StreamSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ class StreamSuite extends Fs2Suite {
}
}

property("chunks.flatMap(chunk) identity") {
property("chunks.unchunks identity") {
forAll { (v: Vector[Vector[Int]]) =>
val s = if (v.isEmpty) Stream.empty else v.map(Stream.emits).reduce(_ ++ _)
assertEquals(s.chunks.flatMap(Stream.chunk).toVector, v.flatten)
assertEquals(s.chunks.unchunks.toVector, v.flatten)
}
}
}
Expand Down Expand Up @@ -316,7 +316,8 @@ class StreamSuite extends Fs2Suite {
.range(0, 3)
.covary[SyncIO]
.append(Stream.raiseError[SyncIO](new Err))
.unchunk
.chunkLimit(1)
.unchunks
.pull
.echo
.stream
Expand All @@ -331,7 +332,10 @@ class StreamSuite extends Fs2Suite {
Stream
.range(0, 3)
.covary[IO] ++ Stream.raiseError[IO](new Err)
}.unchunk.pull.echo
}.chunkLimit(1)
.unchunks
.pull
.echo
.handleErrorWith(_ => Pull.eval(counter.increment))
.stream
.compile
Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/test/scala/fs2/StreamZipSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ class StreamZipSuite extends Fs2Suite {
Stream
.range(0, 10000)
.covary[IO]
.unchunk
.chunkLimit(1)
.unchunks
.prefetch
.flatMap(_ => Stream.empty)
.mapChunks(identity)
Expand Down
7 changes: 5 additions & 2 deletions core/shared/src/test/scala/fs2/TextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,10 @@ class TextSuite extends Fs2Suite {
assertEquals(Stream.emits(s).through(text.lines).toList, Nil)
else {
assertEquals(Stream.emits(s).through(text.lines).toList, lines.toList)
assertEquals(Stream.emits(s).unchunk.through(text.lines).toList, lines.toList)
assertEquals(
Stream.emits(s).chunkLimit(1).unchunks.through(text.lines).toList,
lines.toList
)
}
}
}
Expand All @@ -299,7 +302,7 @@ class TextSuite extends Fs2Suite {
.through(text.base64.encode)
.through {
// Change chunk structure to validate carries
if (unchunked) _.unchunk
if (unchunked) _.chunkLimit(1).unchunks
else _.rechunkRandomlyWithSeed(0.1, 2.0)(rechunkSeed)
}
.through {
Expand Down

0 comments on commit 5d9845f

Please sign in to comment.