Skip to content

Commit

Permalink
Merge pull request #2758 from stephenjudkins/sdj/lines-enhancements
Browse files Browse the repository at this point in the history
`text.lines` enhancements
  • Loading branch information
mpilquist authored Dec 22, 2021
2 parents d9587c0 + e8c89f2 commit 61b593c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
48 changes: 41 additions & 7 deletions core/shared/src/main/scala/fs2/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,20 +316,38 @@ object text {
def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] =
utf8.encodeC

/** Transforms a stream of `String` such that each emitted `String` is a line from the input
* @param maxLineLength maximum size to accumulate a line to; throw an error if a line is larger
*/
def linesLimited[F[_]: RaiseThrowable](maxLineLength: Int): Pipe[F, String, String] =
linesImpl[F](maxLineLength = Some((maxLineLength, implicitly[RaiseThrowable[F]])))

/** Transforms a stream of `String` such that each emitted `String` is a line from the input. */
def lines[F[_]]: Pipe[F, String, String] = {
def lines[F[_]]: Pipe[F, String, String] = linesImpl[F](None)

private def linesImpl[F[_]](
maxLineLength: Option[(Int, RaiseThrowable[F])]
): Pipe[F, String, String] = {
def fillBuffers(
stringBuilder: StringBuilder,
linesBuffer: ArrayBuffer[String],
string: String
): Unit = {
val l = stringBuilder.length

var i =
if (l > 0 && stringBuilder(l - 1) == '\r' && string.nonEmpty && string(0) == '\n') {
stringBuilder.deleteCharAt(l - 1)
linesBuffer += stringBuilder.result()
stringBuilder.clear()
1
if (l > 0 && stringBuilder(l - 1) == '\r') {
if (string.nonEmpty && string(0) == '\n') {
stringBuilder.deleteCharAt(l - 1)
linesBuffer += stringBuilder.result()
stringBuilder.clear()
1
} else if (stringBuilder(l - 1) == '\r') {
stringBuilder.deleteCharAt(l - 1)
linesBuffer += stringBuilder.result()
stringBuilder.clear()
0
} else 0
} else 0

while (i < string.size) {
Expand All @@ -341,6 +359,9 @@ object text {
linesBuffer += stringBuilder.result()
stringBuilder.clear()
i += 1
case '\r' if i + 1 < string.size =>
linesBuffer += stringBuilder.result()
stringBuilder.clear()
case other =>
stringBuilder.append(other)
}
Expand All @@ -360,12 +381,25 @@ object text {
chunk.foreach { string =>
fillBuffers(stringBuilder, linesBuffer, string)
}
Pull.output(Chunk.buffer(linesBuffer)) >> go(stream, stringBuilder, first = false)

maxLineLength match {
case Some((max, raiseThrowable)) if stringBuilder.length > max =>
Pull.raiseError[F](
new LineTooLongException(stringBuilder.length, max)
)(raiseThrowable)
case _ =>
Pull.output(Chunk.indexedSeq(linesBuffer)) >> go(stream, stringBuilder, first = false)
}
}

s => Stream.suspend(go(s, new StringBuilder(), first = true).stream)
}

class LineTooLongException(val length: Int, val max: Int)
extends RuntimeException(
s"Max line size is $max but $length chars have been accumulated"
)

/** Functions for working with base 64. */
object base64 {

Expand Down
23 changes: 21 additions & 2 deletions core/shared/src/test/scala/fs2/TextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package fs2

import cats.syntax.all._

import java.nio.charset.Charset
import org.scalacheck.{Arbitrary, Gen}
import org.scalacheck.Prop.forAll
Expand Down Expand Up @@ -247,7 +248,7 @@ class TextSuite extends Fs2Suite {
}
}

group("lines") {
group("lines / linesLimited") {
def escapeCrLf(s: String): String =
s.replaceAll("\r\n", "<CRLF>").replaceAll("\n", "<LF>").replaceAll("\r", "<CR>")

Expand All @@ -256,6 +257,7 @@ class TextSuite extends Fs2Suite {
val lines = lines0.map(escapeCrLf)
assertEquals(lines.intersperse("\n").through(text.lines).toList, lines.toList)
assertEquals(lines.intersperse("\r\n").through(text.lines).toList, lines.toList)
assertEquals(lines.intersperse("\r").through(text.lines).toList, lines.toList)
}
}

Expand All @@ -269,7 +271,7 @@ class TextSuite extends Fs2Suite {
}
}

property("grouped in 3 characater chunks") {
property("grouped in 3 character chunks") {
forAll { (lines0: Stream[Pure, String]) =>
val lines = lines0.map(escapeCrLf)
val s = lines.intersperse("\r\n").toList.mkString.grouped(3).toList
Expand All @@ -284,6 +286,23 @@ class TextSuite extends Fs2Suite {
}
}
}

property("linesLimited") {
val line = "foo" * 100
(1 to line.length).foreach { i =>
val stream = Stream
.emits(line.toCharArray)
.chunkN(i)
.map(c => new String(c.toArray))
.covary[Fallible]

assert(stream.through(text.linesLimited(10)).toList.isLeft)
assertEquals(
stream.through(text.linesLimited(line.length)).toList,
Right(List(line))
)
}
}
}

property("base64.encode") {
Expand Down

0 comments on commit 61b593c

Please sign in to comment.