From c05733352a7d818795d77f4f2ea5ae693fb701a7 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 26 Jul 2021 13:04:55 +0000 Subject: [PATCH 1/4] Implement `unchunks` (inverse of `chunks`) --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++++ core/shared/src/test/scala/fs2/StreamSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 0da712afd8..9f5a39df91 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2596,6 +2596,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } } + /** Flattens a stream of chunks. Inverse of [[chunks]]. */ + def unchunks[O2](implicit ev: O <:< Chunk[O2]): Stream[F, O2] = + flatMap(Stream.chunk(_)) + /** Alias for [[filter]] * Implemented to enable filtering in for comprehensions */ diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 4326812ddb..b0649485b3 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -133,10 +133,10 @@ class StreamSuite extends Fs2Suite { } } - property("chunks.flatMap(chunk) identity") { + property("chunks.unchunks identity") { forAll { (v: Vector[Vector[Int]]) => val s = if (v.isEmpty) Stream.empty else v.map(Stream.emits).reduce(_ ++ _) - assertEquals(s.chunks.flatMap(Stream.chunk).toVector, v.flatten) + assertEquals(s.chunks.unchunks.toVector, v.flatten) } } } From a4ec8809da89e4744a8b4efe70688bef7e6b7bc1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 26 Jul 2021 15:01:16 +0000 Subject: [PATCH 2/4] Deprecate unchunk --- core/shared/src/main/scala/fs2/Stream.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 9f5a39df91..80dd79d72c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2588,6 +2588,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * res0: List[Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4), Chunk(5), Chunk(6)) * }}} */ + @deprecated("Use .chunkLimit(1).unchunks instead.", "3.0.7") def unchunk: Stream[F, O] = this.repeatPull { _.uncons1.flatMap { From 96cc4adcc4d1a40523da10a577e29be380d2b303 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 26 Jul 2021 15:58:02 +0000 Subject: [PATCH 3/4] Replace deprecated uses of unchunk in tests --- core/jvm/src/test/scala/fs2/CompressionSuite.scala | 2 +- core/shared/src/main/scala/fs2/Stream.scala | 5 ----- core/shared/src/test/scala/fs2/Generators.scala | 4 ++-- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 +- core/shared/src/test/scala/fs2/StreamInterruptSuite.scala | 2 +- core/shared/src/test/scala/fs2/StreamSuite.scala | 4 ++-- core/shared/src/test/scala/fs2/StreamZipSuite.scala | 2 +- core/shared/src/test/scala/fs2/TextSuite.scala | 4 ++-- 8 files changed, 10 insertions(+), 15 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/CompressionSuite.scala b/core/jvm/src/test/scala/fs2/CompressionSuite.scala index c193d7d7e9..b7f7847906 100644 --- a/core/jvm/src/test/scala/fs2/CompressionSuite.scala +++ b/core/jvm/src/test/scala/fs2/CompressionSuite.scala @@ -475,7 +475,7 @@ class CompressionSuite extends Fs2Suite { Stream .chunk(Chunk.empty[Byte]) .through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString))) - .unchunk // ensure chunk sizes are less than file name and comment size soft limits + .chunkLimit(1).unchunks // ensure chunk sizes are less than file name and comment size soft limits .through(Compression[IO].gunzip(8192)) .flatMap { gunzipResult => assert( diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 80dd79d72c..bbed1c43d9 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2582,11 +2582,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, new Stream(Pull.translate[F2, G, O](underlying, u)) /** Converts the input to a stream of 1-element chunks. - * - * @example {{{ - * scala> (Stream(1,2,3) ++ Stream(4,5,6)).unchunk.chunks.toList - * res0: List[Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4), Chunk(5), Chunk(6)) - * }}} */ @deprecated("Use .chunkLimit(1).unchunks instead.", "3.0.7") def unchunk: Stream[F, O] = diff --git a/core/shared/src/test/scala/fs2/Generators.scala b/core/shared/src/test/scala/fs2/Generators.scala index 13b312a58a..8c2d5c23e8 100644 --- a/core/shared/src/test/scala/fs2/Generators.scala +++ b/core/shared/src/test/scala/fs2/Generators.scala @@ -32,7 +32,7 @@ trait Generators extends ChunkGenerators { Gen.frequency( 1 -> Gen.const(Stream.empty), 5 -> smallLists(genA).map(as => Stream.emits(as)), - 5 -> smallLists(genA).map(as => Stream.emits(as).unchunk), + 5 -> smallLists(genA).map(as => Stream.emits(as).chunkLimit(1).unchunks), 5 -> smallLists(smallLists(genA)) .map(_.foldLeft(Stream.empty.covaryOutput[A])((acc, as) => acc ++ Stream.emits(as))), 5 -> smallLists(smallLists(genA)) @@ -48,7 +48,7 @@ trait Generators extends ChunkGenerators { Arbitrary( Gen.frequency( 10 -> arbitrary[List[O]].map(os => Stream.emits(os).take(10)), - 10 -> arbitrary[List[O]].map(os => Stream.emits(os).take(10).unchunk), + 10 -> arbitrary[List[O]].map(os => Stream.emits(os).take(10).chunkLimit(1).unchunks), 5 -> arbitrary[F[O]].map(fo => Stream.eval(fo)), 1 -> (for { acquire <- arbitrary[F[O]] diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index db97ae1810..114db8db0b 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -1319,7 +1319,7 @@ class StreamCombinatorsSuite extends Fs2Suite { Stream .emits(0 until streamSize) - .unchunk + .chunkLimit(1).unchunks .evalTap(seenArr(_).complete(())) .sliding(size, step) .evalMap { chunk => diff --git a/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala b/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala index 3c1a4def9c..09f69e63b0 100644 --- a/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala @@ -313,7 +313,7 @@ class StreamInterruptSuite extends Fs2Suite { val interrupt = IO.sleep(100.millis).attempt Stream(1) .covary[IO] - .unchunk + .chunkLimit(1).unchunks .interruptWhen(interrupt) .pull .uncons diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index b0649485b3..2d2ebf1ae6 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -316,7 +316,7 @@ class StreamSuite extends Fs2Suite { .range(0, 3) .covary[SyncIO] .append(Stream.raiseError[SyncIO](new Err)) - .unchunk + .chunkLimit(1).unchunks .pull .echo .stream @@ -331,7 +331,7 @@ class StreamSuite extends Fs2Suite { Stream .range(0, 3) .covary[IO] ++ Stream.raiseError[IO](new Err) - }.unchunk.pull.echo + }.chunkLimit(1).unchunks.pull.echo .handleErrorWith(_ => Pull.eval(counter.increment)) .stream .compile diff --git a/core/shared/src/test/scala/fs2/StreamZipSuite.scala b/core/shared/src/test/scala/fs2/StreamZipSuite.scala index e7e56c4990..9b34e0fccc 100644 --- a/core/shared/src/test/scala/fs2/StreamZipSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamZipSuite.scala @@ -391,7 +391,7 @@ class StreamZipSuite extends Fs2Suite { Stream .range(0, 10000) .covary[IO] - .unchunk + .chunkLimit(1).unchunks .prefetch .flatMap(_ => Stream.empty) .mapChunks(identity) diff --git a/core/shared/src/test/scala/fs2/TextSuite.scala b/core/shared/src/test/scala/fs2/TextSuite.scala index 4c6eb49299..d2f7a4dcda 100644 --- a/core/shared/src/test/scala/fs2/TextSuite.scala +++ b/core/shared/src/test/scala/fs2/TextSuite.scala @@ -274,7 +274,7 @@ class TextSuite extends Fs2Suite { assertEquals(Stream.emits(s).through(text.lines).toList, Nil) else { assertEquals(Stream.emits(s).through(text.lines).toList, lines.toList) - assertEquals(Stream.emits(s).unchunk.through(text.lines).toList, lines.toList) + assertEquals(Stream.emits(s).chunkLimit(1).unchunks.through(text.lines).toList, lines.toList) } } } @@ -299,7 +299,7 @@ class TextSuite extends Fs2Suite { .through(text.base64.encode) .through { // Change chunk structure to validate carries - if (unchunked) _.unchunk + if (unchunked) _.chunkLimit(1).unchunks else _.rechunkRandomlyWithSeed(0.1, 2.0)(rechunkSeed) } .through { From 0bc68dcf0dda33426fb71f958205dce75949041e Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 26 Jul 2021 16:06:06 +0000 Subject: [PATCH 4/4] Formatting --- core/jvm/src/test/scala/fs2/CompressionSuite.scala | 3 ++- .../src/test/scala/fs2/StreamCombinatorsSuite.scala | 3 ++- core/shared/src/test/scala/fs2/StreamInterruptSuite.scala | 3 ++- core/shared/src/test/scala/fs2/StreamSuite.scala | 8 ++++++-- core/shared/src/test/scala/fs2/StreamZipSuite.scala | 3 ++- core/shared/src/test/scala/fs2/TextSuite.scala | 5 ++++- 6 files changed, 18 insertions(+), 7 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/CompressionSuite.scala b/core/jvm/src/test/scala/fs2/CompressionSuite.scala index b7f7847906..6339f9f081 100644 --- a/core/jvm/src/test/scala/fs2/CompressionSuite.scala +++ b/core/jvm/src/test/scala/fs2/CompressionSuite.scala @@ -475,7 +475,8 @@ class CompressionSuite extends Fs2Suite { Stream .chunk(Chunk.empty[Byte]) .through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString))) - .chunkLimit(1).unchunks // ensure chunk sizes are less than file name and comment size soft limits + .chunkLimit(1) + .unchunks // ensure chunk sizes are less than file name and comment size soft limits .through(Compression[IO].gunzip(8192)) .flatMap { gunzipResult => assert( diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 114db8db0b..87dbe7b89d 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -1319,7 +1319,8 @@ class StreamCombinatorsSuite extends Fs2Suite { Stream .emits(0 until streamSize) - .chunkLimit(1).unchunks + .chunkLimit(1) + .unchunks .evalTap(seenArr(_).complete(())) .sliding(size, step) .evalMap { chunk => diff --git a/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala b/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala index 09f69e63b0..aa3447c9ba 100644 --- a/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamInterruptSuite.scala @@ -313,7 +313,8 @@ class StreamInterruptSuite extends Fs2Suite { val interrupt = IO.sleep(100.millis).attempt Stream(1) .covary[IO] - .chunkLimit(1).unchunks + .chunkLimit(1) + .unchunks .interruptWhen(interrupt) .pull .uncons diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index 2d2ebf1ae6..7599271cad 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -316,7 +316,8 @@ class StreamSuite extends Fs2Suite { .range(0, 3) .covary[SyncIO] .append(Stream.raiseError[SyncIO](new Err)) - .chunkLimit(1).unchunks + .chunkLimit(1) + .unchunks .pull .echo .stream @@ -331,7 +332,10 @@ class StreamSuite extends Fs2Suite { Stream .range(0, 3) .covary[IO] ++ Stream.raiseError[IO](new Err) - }.chunkLimit(1).unchunks.pull.echo + }.chunkLimit(1) + .unchunks + .pull + .echo .handleErrorWith(_ => Pull.eval(counter.increment)) .stream .compile diff --git a/core/shared/src/test/scala/fs2/StreamZipSuite.scala b/core/shared/src/test/scala/fs2/StreamZipSuite.scala index 9b34e0fccc..676b55b47f 100644 --- a/core/shared/src/test/scala/fs2/StreamZipSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamZipSuite.scala @@ -391,7 +391,8 @@ class StreamZipSuite extends Fs2Suite { Stream .range(0, 10000) .covary[IO] - .chunkLimit(1).unchunks + .chunkLimit(1) + .unchunks .prefetch .flatMap(_ => Stream.empty) .mapChunks(identity) diff --git a/core/shared/src/test/scala/fs2/TextSuite.scala b/core/shared/src/test/scala/fs2/TextSuite.scala index d2f7a4dcda..3704aa8650 100644 --- a/core/shared/src/test/scala/fs2/TextSuite.scala +++ b/core/shared/src/test/scala/fs2/TextSuite.scala @@ -274,7 +274,10 @@ class TextSuite extends Fs2Suite { assertEquals(Stream.emits(s).through(text.lines).toList, Nil) else { assertEquals(Stream.emits(s).through(text.lines).toList, lines.toList) - assertEquals(Stream.emits(s).chunkLimit(1).unchunks.through(text.lines).toList, lines.toList) + assertEquals( + Stream.emits(s).chunkLimit(1).unchunks.through(text.lines).toList, + lines.toList + ) } } }