Skip to content

Commit

Permalink
Merge pull request #2631 from mpilquist/topic/chunk-indexing
Browse files Browse the repository at this point in the history
Make indexed traversal of `Chunk.Queue` amortized logarithmic time
  • Loading branch information
mpilquist authored Sep 23, 2021
2 parents 6fe2fd6 + cc023df commit 1de1e8b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
49 changes: 37 additions & 12 deletions core/shared/src/main/scala/fs2/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import cats.syntax.all._
* Chunks can be appended via the `++` method. The returned chunk is a composite of the input
* chunks -- that is, there's no copying of the source chunks. For example, `Chunk(1, 2) ++ Chunk(3, 4) ++ Chunk(5, 6)`
* returns a `Chunk.Queue(Chunk(1, 2), Chunk(3, 4), Chunk(5, 6))`. As a result, indexed based lookup of
* an appended chunk is `O(number of underlying chunks)`. In the worse case, where each constituent chunk
* has size 1, indexed lookup is `O(size)`. To restore `O(1)` lookup, call `compact`, which copies all the underlying
* an appended chunk is `O(log2(number of underlying chunks))`. In the worst case, where each constituent chunk
* has size 1, indexed lookup is `O(log2(size))`. To restore `O(1)` lookup, call `compact`, which copies all the underlying
* chunk elements to a single array backed chunk. Note `compact` requires a `ClassTag` of the element type.
*
* Alternatively, a collection of chunks can be directly copied to a new array backed chunk via
Expand Down Expand Up @@ -922,6 +922,20 @@ object Chunk
*/
final class Queue[+O] private (val chunks: SQueue[Chunk[O]], val size: Int) extends Chunk[O] {

private[this] lazy val accumulatedLengths: (Array[Int], Array[Chunk[O]]) = {
val lens = new Array[Int](chunks.size)
val arr = new Array[Chunk[O]](chunks.size)
var accLen = 0
var i = 0
chunks.foreach { c =>
accLen += c.size
lens(i) = accLen
arr(i) = c
i += 1
}
(lens, arr)
}

override def foreach(f: O => Unit): Unit =
chunks.foreach(_.foreach(f))

Expand All @@ -945,19 +959,30 @@ object Chunk
else new Queue(chunks :+ that, size + that.size)

/** Prepends a chunk to the start of this chunk queue. */
def +:[O2 >: O](c: Chunk[O2]): Queue[O2] = new Queue(c +: chunks, c.size + size)
def +:[O2 >: O](c: Chunk[O2]): Queue[O2] =
if (c.isEmpty) this else new Queue(c +: chunks, c.size + size)

/** Appends a chunk to the end of this chunk queue. */
def :+[O2 >: O](c: Chunk[O2]): Queue[O2] = new Queue(chunks :+ c, size + c.size)
def :+[O2 >: O](c: Chunk[O2]): Queue[O2] =
if (c.isEmpty) this else new Queue(chunks :+ c, size + c.size)

def apply(i: Int): O = {
if (i < 0 || i >= size) throw new IndexOutOfBoundsException()
def go(chunks: SQueue[Chunk[O]], offset: Int): O = {
val head = chunks.head
if (offset < head.size) head(offset)
else go(chunks.tail, offset - head.size)
if (i == 0) chunks.head(0)
else if (i == size - 1) chunks.last.last.get
else {
val (lengths, chunks) = accumulatedLengths
val j = java.util.Arrays.binarySearch(lengths, i)
if (j >= 0) {
// The requested index is exactly equal to an accumulated length so the head of the next chunk is the value to return
chunks(j + 1)(0)
} else {
// The requested index is not an exact match but located in the chunk after the returned insertion point
val k = -(j + 1)
val accLenBefore = if (k == 0) 0 else lengths(k - 1)
chunks(k)(i - accLenBefore)
}
}
go(chunks, i)
}

def copyToArray[O2 >: O](xs: Array[O2], start: Int): Unit = {
Expand Down Expand Up @@ -1052,10 +1077,10 @@ object Chunk
object Queue {
private val empty_ = new Queue(collection.immutable.Queue.empty, 0)
def empty[O]: Queue[O] = empty_.asInstanceOf[Queue[O]]
def singleton[O](c: Chunk[O]): Queue[O] = new Queue(collection.immutable.Queue(c), c.size)
def singleton[O](c: Chunk[O]): Queue[O] =
if (c.isEmpty) empty else new Queue(collection.immutable.Queue(c), c.size)
def apply[O](chunks: Chunk[O]*): Queue[O] =
if (chunks.isEmpty) empty
else chunks.tail.foldLeft(singleton(chunks.head))(_ :+ _)
chunks.foldLeft(empty[O])(_ :+ _)
}

def newBuilder[O]: Collector.Builder[O, Chunk[O]] =
Expand Down
10 changes: 10 additions & 0 deletions core/shared/src/test/scala/fs2/ChunkQueueSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,14 @@ class ChunkQueueSuite extends Fs2Suite {
assertEquals(computed, queue.startsWith(items))
}
}

test("apply") {
forAll { (chunks: List[Chunk[Int]]) =>
val queue = Chunk.Queue(chunks: _*)
val flat = queue.compact
(0 until flat.size).foreach { idx =>
assertEquals(queue(idx), flat(idx))
}
}
}
}
7 changes: 4 additions & 3 deletions core/shared/src/test/scala/fs2/TextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ class TextSuite extends Fs2Suite {
Stream(l: _*).map(utf8Bytes).unchunks.through(utf8.decode).toList,
l
)
assertEquals(Stream(l0: _*).map(utf8Bytes).through(utf8.decodeC).toList, l0)
assertEquals(Stream(l: _*).map(utf8Bytes).through(utf8.decodeC).toList, l)
}
}

property("utf8Encode andThen utf8.decode = id") {
forAll(genStringNoBom) { (s: String) =>
assertEquals(Stream(s).through(utf8.encodeC).through(utf8.decodeC).toList, List(s))
if (s.nonEmpty)
if (s.nonEmpty) {
assertEquals(Stream(s).through(utf8.encodeC).through(utf8.decodeC).toList, List(s))
assertEquals(Stream(s).through(utf8.encode).through(utf8.decode).toList, List(s))
}
}
}

Expand Down

0 comments on commit 1de1e8b

Please sign in to comment.