Skip to content

Commit

Permalink
Merge pull request #2720 from mpilquist/topic/chunk-perf
Browse files Browse the repository at this point in the history
Improve memory efficiency of Chunk.filter and replace internal use of mutable.Buffer with mutable.ArrayBuilder
  • Loading branch information
mpilquist authored Nov 29, 2021
2 parents 72ac3d7 + 9d853bc commit d508c4c
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 38 deletions.
37 changes: 8 additions & 29 deletions benchmark/src/main/scala/fs2/benchmark/ChunkBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,26 @@ package benchmark

import org.openjdk.jmh.annotations.{Benchmark, Param, Scope, Setup, State}

import cats.syntax.all._

@State(Scope.Thread)
class ChunkBenchmark {
@Param(Array("16", "256", "4096"))
var chunkSize: Int = _

@Param(Array("1", "20", "50", "100"))
var chunkCount: Int = _

case class Obj(dummy: Boolean)
object Obj {
def create: Obj = Obj(true)
}

var chunkSeq: Seq[Chunk[Obj]] = _
var flattened: Chunk[Obj] = _
var sizeHint: Int = _
var ints: Chunk[Int] = _

@Setup
def setup() = {
chunkSeq = Seq.range(0, chunkCount).map(_ => Chunk.seq(Seq.fill(chunkSize)(Obj.create)))
sizeHint = chunkSeq.foldLeft(0)(_ + _.size)
flattened = Chunk.seq(chunkSeq).flatten
}
def setup() =
ints = Chunk.seq((0 until chunkSize).map(_ + 1000)).compact

@Benchmark
def concat(): Unit = {
Chunk.concat(chunkSeq, sizeHint)
def map(): Unit = {
ints.map(_ + 1)
()
}

@Benchmark
def traverse(): Boolean = {
val fn: () => Chunk[Boolean] =
flattened.traverse { obj =>
if (obj.dummy) { () => true }
else { () => false }
}

fn().isEmpty
def filter(): Unit = {
ints.filter(_ % 3 == 0)
()
}

}
69 changes: 69 additions & 0 deletions benchmark/src/main/scala/fs2/benchmark/ChunksBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package benchmark

import org.openjdk.jmh.annotations.{Benchmark, Param, Scope, Setup, State}

import cats.syntax.all._

@State(Scope.Thread)
class ChunksBenchmark {
@Param(Array("16", "256", "4096"))
var chunkSize: Int = _

@Param(Array("1", "20", "50", "100"))
var chunkCount: Int = _

case class Obj(dummy: Boolean)
object Obj {
def create: Obj = Obj(true)
}

var chunkSeq: Seq[Chunk[Obj]] = _
var flattened: Chunk[Obj] = _
var sizeHint: Int = _

@Setup
def setup() = {
chunkSeq = Seq.range(0, chunkCount).map(_ => Chunk.seq(Seq.fill(chunkSize)(Obj.create)))
sizeHint = chunkSeq.foldLeft(0)(_ + _.size)
flattened = Chunk.seq(chunkSeq).flatten
}

@Benchmark
def concat(): Unit = {
Chunk.concat(chunkSeq, sizeHint)
()
}

@Benchmark
def traverse(): Boolean = {
val fn: () => Chunk[Boolean] =
flattened.traverse { obj =>
if (obj.dummy) { () => true }
else { () => false }
}

fn().isEmpty
}
}
7 changes: 5 additions & 2 deletions core/shared/src/main/scala-2.12/fs2/ChunkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

package fs2

import scala.collection.mutable.WrappedArray
import scala.collection.mutable.{ArrayBuilder, WrappedArray}
import scala.reflect.ClassTag

private[fs2] trait ChunkPlatform[+O] { self: Chunk[O] => }
private[fs2] trait ChunkPlatform[+O] { self: Chunk[O] =>
protected def makeArrayBuilder[A](implicit ct: ClassTag[A]): ArrayBuilder[A] =
ArrayBuilder.make()(ct)
}

private[fs2] trait ChunkCompanionPlatform { self: Chunk.type =>

Expand Down
4 changes: 4 additions & 0 deletions core/shared/src/main/scala-2.13/fs2/ChunkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ package fs2

import scala.collection.immutable.ArraySeq
import scala.collection.immutable
import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag

private[fs2] trait ChunkPlatform[+O] { self: Chunk[O] =>

protected def makeArrayBuilder[A](implicit ct: ClassTag[A]): ArrayBuilder[A] =
ArrayBuilder.make(ct)

def toArraySeq[O2 >: O: ClassTag]: ArraySeq[O2] = {
val array: Array[O2] = new Array[O2](size)
copyToArray(array)
Expand Down
4 changes: 4 additions & 0 deletions core/shared/src/main/scala-3/fs2/ChunkPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ package fs2

import scala.collection.immutable.ArraySeq
import scala.collection.immutable
import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag

private[fs2] trait ChunkPlatform[+O] { self: Chunk[O] =>

protected def makeArrayBuilder[A](implicit ct: ClassTag[A]): ArrayBuilder[A] =
ArrayBuilder.make(ct)

def toArraySeq[O2 >: O: ClassTag]: ArraySeq[O2] = {
val array: Array[O2] = new Array[O2](size)
copyToArray(array)
Expand Down
20 changes: 13 additions & 7 deletions core/shared/src/main/scala/fs2/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import cats.{Alternative, Applicative, Eq, Eval, Monad, Monoid, Traverse, Traver
import cats.data.{Chain, NonEmptyList}
import cats.syntax.all._

/** Strict, finite sequence of values that allows index-based random access of elements.
/** Immutable, strict, finite sequence of values that supports efficient index-based random access of elements,
* is memory efficient for all sizes, and avoids unnecessary copying.
*
* `Chunk`s can be created from a variety of collection types using methods on the `Chunk` companion
* (e.g., `Chunk.array`, `Chunk.seq`, `Chunk.vector`).
Expand Down Expand Up @@ -107,12 +108,14 @@ abstract class Chunk[+O] extends Serializable with ChunkPlatform[O] with ChunkRu
/** Drops the right-most `n` elements of this chunk queue in a way that preserves chunk structure. */
def dropRight(n: Int): Chunk[O] = if (n <= 0) this else take(size - n)

protected def thisClassTag: ClassTag[Any] = implicitly[ClassTag[Any]]

/** Returns a chunk that has only the elements that satisfy the supplied predicate. */
def filter(p: O => Boolean): Chunk[O] = {
val b = collection.mutable.Buffer.newBuilder[O]
val b = makeArrayBuilder(thisClassTag)
b.sizeHint(size)
foreach(e => if (p(e)) b += e)
Chunk.buffer(b.result())
Chunk.array(b.result()).asInstanceOf[Chunk[O]]
}

/** Returns the first element for which the predicate returns true or `None` if no elements satisfy the predicate. */
Expand Down Expand Up @@ -206,13 +209,13 @@ abstract class Chunk[+O] extends Serializable with ChunkPlatform[O] with ChunkRu

/** Maps the supplied function over each element and returns a chunk of just the defined results. */
def mapFilter[O2](f: O => Option[O2]): Chunk[O2] = {
val b = collection.mutable.Buffer.newBuilder[O2]
val b = makeArrayBuilder[Any]
b.sizeHint(size)
foreach { o =>
val o2 = f(o)
if (o2.isDefined) b += o2.get
}
Chunk.buffer(b.result())
Chunk.array(b.result()).asInstanceOf[Chunk[O2]]
}

/** False if size is zero, true otherwise. */
Expand Down Expand Up @@ -321,10 +324,10 @@ abstract class Chunk[+O] extends Serializable with ChunkPlatform[O] with ChunkRu
*/
def toIndexedChunk: Chunk[O] = this match {
case _: Chunk.Queue[_] =>
val b = collection.mutable.Buffer.newBuilder[O]
val b = makeArrayBuilder[Any]
b.sizeHint(size)
foreach(o => b += o)
Chunk.buffer(b.result())
Chunk.array(b.result()).asInstanceOf[Chunk[O]]
case other => other
}

Expand Down Expand Up @@ -683,10 +686,13 @@ object Chunk

case class ArraySlice[O](values: Array[O], offset: Int, length: Int)(implicit ct: ClassTag[O])
extends Chunk[O] {

require(
offset >= 0 && offset <= values.size && length >= 0 && length <= values.size && offset + length <= values.size
)

override protected def thisClassTag: ClassTag[Any] = ct.asInstanceOf[ClassTag[Any]]

def size = length
def apply(i: Int) = values(offset + i)

Expand Down

0 comments on commit d508c4c

Please sign in to comment.