diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 78a3c5f8d5..d1aea979a7 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3646,9 +3646,13 @@ object Stream extends StreamLowPriority { def getNextChunk(i: Iterator[A]): F[Option[(Chunk[A], Iterator[A])]] = F.suspend(hint) { - i.take(chunkSize).toVector - }.map { s => - if (s.isEmpty) None else Some((Chunk.from(s), i)) + val bldr = Vector.newBuilder[A] + var cnt = 0 + while (cnt < chunkSize && i.hasNext) { + bldr += i.next() + cnt += 1 + } + if (cnt == 0) None else Some((Chunk.from(bldr.result()), i)) } Stream.unfoldChunkEval(iterator)(getNextChunk) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 0a9de56988..9feb2d65d9 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -704,16 +704,18 @@ class StreamCombinatorsSuite extends Fs2Suite { } test("fromIterator") { - forAllF { (x: List[Int], cs: Int) => + // Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415 + forAllF { (x: Vector[Int], cs: Int) => val chunkSize = (cs % 4096).abs + 1 - Stream.fromIterator[IO](x.iterator, chunkSize).assertEmits(x) + Stream.fromIterator[IO](x.iterator, chunkSize).assertEmits(x.toList) } } test("fromBlockingIterator") { - forAllF { (x: List[Int], cs: Int) => + // Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415 + forAllF { (x: Vector[Int], cs: Int) => val chunkSize = (cs % 4096).abs + 1 - Stream.fromBlockingIterator[IO](x.iterator, chunkSize).assertEmits(x) + Stream.fromBlockingIterator[IO](x.iterator, chunkSize).assertEmits(x.toList) } }