Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

text.lines enhancements #2758

Merged
merged 12 commits into from
Dec 22, 2021
40 changes: 38 additions & 2 deletions core/shared/src/main/scala/fs2/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,34 @@ object text {
def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] =
utf8.encodeC

/** Transforms a stream of `String` such that each emitted `String` is a line from the input
* @param maxLineLength maximum size to accumulate a line to; throw an error if a line is larger
* @param crsOnly separate lines that are delimited only by '\r'
* @tparam F
*/
def linesFor[F[_]: RaiseThrowable](
maxLineLength: Option[Int] = None,
crsOnly: Boolean = false
): Pipe[F, String, String] =
linesImpl[F](
maxLineLength = maxLineLength.map((_, implicitly[RaiseThrowable[F]])),
crsOnly = crsOnly
)

/** Transforms a stream of `String` such that each emitted `String` is a line from the input. */
def lines[F[_]]: Pipe[F, String, String] = {
def lines[F[_]]: Pipe[F, String, String] = linesImpl[F]()

private def linesImpl[F[_]](
maxLineLength: Option[(Int, RaiseThrowable[F])] = None,
crsOnly: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that \r can also be a line delimiter.
So, should it be a regression? i.e. should the default behavior change from treating \n & \r\n as a line delimiter to treating \r, \r\n, \n as a line delimiter? I would expect lines to handle this kind of ambiguity not with a flag, but always. In that case we should check that \r\n doesn’t produce two lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption is that other users wouldn't want the existing behavior to change. If I were the only consumer of this library I'd just have it always treat bare \rs as line separators.

I don't know about the consensus-forming process is here. But I'd be happy to go with the flow and do whatever other stakeholders agree on here.

): Pipe[F, String, String] = {
def fillBuffers(
stringBuilder: StringBuilder,
linesBuffer: ArrayBuffer[String],
string: String
): Unit = {
val l = stringBuilder.length

var i =
if (l > 0 && stringBuilder(l - 1) == '\r' && string.nonEmpty && string(0) == '\n') {
stringBuilder.deleteCharAt(l - 1)
Expand All @@ -341,6 +361,9 @@ object text {
linesBuffer += stringBuilder.result()
stringBuilder.clear()
i += 1
case '\r' if crsOnly =>
linesBuffer += stringBuilder.result()
stringBuilder.clear()
case other =>
stringBuilder.append(other)
}
Expand All @@ -360,12 +383,25 @@ object text {
chunk.foreach { string =>
fillBuffers(stringBuilder, linesBuffer, string)
}
Pull.output(Chunk.buffer(linesBuffer)) >> go(stream, stringBuilder, first = false)

maxLineLength match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is LinesBenchmark, can you, please, run it against main version and this one?

case Some((max, raiseThrowable)) if stringBuilder.length > max =>
Pull.raiseError[F](
new LineTooLongException(stringBuilder.length, max)
)(raiseThrowable)
case _ =>
Pull.output(Chunk.indexedSeq(linesBuffer)) >> go(stream, stringBuilder, first = false)
}
}

s => Stream.suspend(go(s, new StringBuilder(), first = true).stream)
}

class LineTooLongException(val length: Int, val max: Int)
extends RuntimeException(
s"Max line size is $max but $length chars have been accumulated"
)

/** Functions for working with base 64. */
object base64 {

Expand Down
31 changes: 29 additions & 2 deletions core/shared/src/test/scala/fs2/TextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class TextSuite extends Fs2Suite {
}
}

group("lines") {
group("lines / linesFor") {
def escapeCrLf(s: String): String =
s.replaceAll("\r\n", "<CRLF>").replaceAll("\n", "<LF>").replaceAll("\r", "<CR>")

Expand All @@ -256,6 +256,16 @@ class TextSuite extends Fs2Suite {
val lines = lines0.map(escapeCrLf)
assertEquals(lines.intersperse("\n").through(text.lines).toList, lines.toList)
assertEquals(lines.intersperse("\r\n").through(text.lines).toList, lines.toList)
assertEquals(
lines
.intersperse("\r")
.covary[Fallible]
.through(
text.linesFor[Fallible](crsOnly = true)
)
.toList,
Right(lines.toList)
)
}
}

Expand All @@ -269,7 +279,7 @@ class TextSuite extends Fs2Suite {
}
}

property("grouped in 3 characater chunks") {
property("grouped in 3 character chunks") {
forAll { (lines0: Stream[Pure, String]) =>
val lines = lines0.map(escapeCrLf)
val s = lines.intersperse("\r\n").toList.mkString.grouped(3).toList
Expand All @@ -284,6 +294,23 @@ class TextSuite extends Fs2Suite {
}
}
}

property("linesFor with maxLineLength") {
val line = "foo" * 100
(1 to line.length).foreach { i =>
val stream = Stream
.emits(line.toCharArray)
.chunkN(i)
.map(c => new String(c.toArray))
.covary[Fallible]

assert(stream.through(text.linesFor(maxLineLength = Some(10))).toList.isLeft)
assertEquals(
stream.through(text.linesFor(maxLineLength = Some(line.length))).toList,
Right(List(line))
)
}
}
}

property("base64.encode") {
Expand Down