Skip to content

Commit

Permalink
Specialize Chunk#toArraySlice (#3198)
Browse files Browse the repository at this point in the history
* Optimize `.toArraySlice` for `ByteVector`, buffers
* Avoid converting buffers to `readOnly`
* Implement `to{Byte,Char}Buffer` w/ `toArraySlice`
  • Loading branch information
armanbilge authored Apr 3, 2023
1 parent 8f937b5 commit aa60723
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 58 deletions.
14 changes: 11 additions & 3 deletions core/js/src/main/scala/fs2/ChunkRuntimePlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ private[fs2] trait ChunkRuntimePlatform[+O] { self: Chunk[O] =>

def toJSArrayBuffer[B >: O](implicit ev: B =:= Byte): ArrayBuffer = {
val bb = toByteBuffer[B]
if (bb.hasArrayBuffer())
if (
bb.hasArrayBuffer() &&
bb.position() == 0 &&
bb.arrayBufferOffset() == 0 &&
bb.arrayBuffer().byteLength == bb.remaining()
)
bb.arrayBuffer()
else {
val ab = new ArrayBuffer(bb.remaining())
Expand All @@ -40,8 +45,11 @@ private[fs2] trait ChunkRuntimePlatform[+O] { self: Chunk[O] =>
}

def toUint8Array[B >: O](implicit ev: B =:= Byte): Uint8Array = {
val ab = toJSArrayBuffer[B]
new Uint8Array(ab, 0, ab.byteLength)
val bb = toByteBuffer[B]
if (bb.hasArrayBuffer())
new Uint8Array(bb.arrayBuffer(), bb.position() + bb.arrayBufferOffset(), bb.remaining())
else
new Uint8Array(toJSArrayBuffer(ev))
}

}
Expand Down
124 changes: 70 additions & 54 deletions core/shared/src/main/scala/fs2/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,53 +300,25 @@ abstract class Chunk[+O] extends Serializable with ChunkPlatform[O] with ChunkRu

/** Converts this chunk to a `Chunk.ArraySlice`. */
def toArraySlice[O2 >: O](implicit ct: ClassTag[O2]): Chunk.ArraySlice[O2] =
this match {
case as: Chunk.ArraySlice[_] if ct.wrap.runtimeClass eq as.values.getClass =>
as.asInstanceOf[Chunk.ArraySlice[O2]]
case _ => Chunk.ArraySlice(toArray, 0, size)
}
Chunk.ArraySlice(toArray, 0, size)

/** Converts this chunk to a `java.nio.ByteBuffer`.
* @note that even "read-only" interaction with a `ByteBuffer` may increment its `position`,
* so this method should be considered as unsafely allocating mutable state.
*/
def toByteBuffer[B >: O](implicit ev: B =:= Byte): JByteBuffer =
this match {
case c: Chunk.ArraySlice[_] if c.values.isInstanceOf[Array[Byte]] =>
JByteBuffer.wrap(c.values.asInstanceOf[Array[Byte]], c.offset, c.length)
case c: Chunk.ByteBuffer =>
val b = c.buf.duplicate // share contents, independent position/limit
if (c.offset == 0 && b.position() == 0 && c.size == b.limit()) b
else {
(b: JBuffer).position(c.offset.toInt)
(b: JBuffer).limit(c.offset.toInt + c.size)
b
}
case c: Chunk.ByteVectorChunk =>
c.bv.toByteBuffer
case _ =>
JByteBuffer.wrap(this.asInstanceOf[Chunk[Byte]].toArray, 0, size)
}
def toByteBuffer[B >: O](implicit ev: B =:= Byte): JByteBuffer = {
val slice = this.asInstanceOf[Chunk[Byte]].toArraySlice
JByteBuffer.wrap(slice.values, slice.offset, slice.length)
}

/** Converts this chunk to a `java.nio.CharBuffer`.
* @note that even "read-only" interaction with a `CharBuffer` may increment its position,
* so this method should be considered as unsafely allocating mutable state.
*/
def toCharBuffer[B >: O](implicit ev: B =:= Char): JCharBuffer =
this match {
case c: Chunk.ArraySlice[_] if c.values.isInstanceOf[Array[Char]] =>
JCharBuffer.wrap(c.values.asInstanceOf[Array[Char]], c.offset, c.length)
case c: Chunk.CharBuffer =>
val b = c.buf.duplicate // share contents, independent position/limit
if (c.offset == 0 && b.position() == 0 && c.size == b.limit()) b
else {
(b: JBuffer).position(c.offset.toInt)
(b: JBuffer).limit(c.offset.toInt + c.size)
b
}
case _ =>
JCharBuffer.wrap(this.asInstanceOf[Chunk[Char]].toArray, 0, size)
}
def toCharBuffer[C >: O](implicit ev: C =:= Char): JCharBuffer = {
val slice = this.asInstanceOf[Chunk[Char]].toArraySlice
JCharBuffer.wrap(slice.values, slice.offset, slice.length)
}

/** Converts this chunk to a NonEmptyList */
def toNel: Option[NonEmptyList[O]] =
Expand Down Expand Up @@ -775,16 +747,23 @@ object Chunk
if (n <= 0) Chunk.empty
else if (n >= size) this
else ArraySlice(values, offset, n)

override def toArraySlice[O2 >: O](implicit ct: ClassTag[O2]): Chunk.ArraySlice[O2] =
if (ct.wrap.runtimeClass eq values.getClass)
asInstanceOf[Chunk.ArraySlice[O2]]
else super.toArraySlice

}
object ArraySlice {
def apply[O: ClassTag](values: Array[O]): ArraySlice[O] = ArraySlice(values, 0, values.length)
}

sealed abstract class Buffer[A <: Buffer[A, B, C], B <: JBuffer, C: ClassTag](
sealed abstract class Buffer[A <: Buffer[A, B, C], B <: JBuffer, C](
buf: B,
val offset: Int,
val size: Int
) extends Chunk[C] {
)(implicit ct: ClassTag[C])
extends Chunk[C] {
def readOnly(b: B): B
def buffer(b: B): A
def get(b: B, n: Int): C
Expand All @@ -798,7 +777,7 @@ object Chunk
if (n <= 0) this
else if (n >= size) Chunk.empty
else {
val second = readOnly(buf)
val second = duplicate(buf)
(second: JBuffer).position(n + offset)
buffer(second)
}
Expand All @@ -807,13 +786,13 @@ object Chunk
if (n <= 0) Chunk.empty
else if (n >= size) this
else {
val first = readOnly(buf)
val first = duplicate(buf)
(first: JBuffer).limit(n + offset)
buffer(first)
}

def copyToArray[O2 >: C](xs: Array[O2], start: Int): Unit = {
val b = readOnly(buf)
val b = duplicate(buf)
(b: JBuffer).position(offset)
(b: JBuffer).limit(offset + size)
val arr = new Array[C](size)
Expand All @@ -823,25 +802,26 @@ object Chunk
}

protected def splitAtChunk_(n: Int): (A, A) = {
val first = readOnly(buf)
val first = duplicate(buf)
(first: JBuffer).limit(n + offset)
val second = readOnly(buf)
val second = duplicate(buf)
(second: JBuffer).position(n + offset)
(buffer(first), buffer(second))
}

override def toArray[O2 >: C: ClassTag]: Array[O2] = {
val bs = new Array[C](size)
val b = duplicate(buf)
(b: JBuffer).position(offset)
get(b, bs, 0, size)
bs.asInstanceOf[Array[O2]]
}
override def toArray[O2 >: C](implicit o2ct: ClassTag[O2]): Array[O2] =
if (o2ct.runtimeClass == ct.runtimeClass) {
val bs = new Array[O2](size)
val b = duplicate(buf)
(b: JBuffer).position(offset)
get(b, bs.asInstanceOf[Array[C]], 0, size)
bs
} else super.toArray
}

object CharBuffer {
def apply(buf: JCharBuffer): CharBuffer =
view(buf.duplicate().asReadOnlyBuffer)
view(buf.duplicate())

def view(buf: JCharBuffer): CharBuffer =
new CharBuffer(buf, buf.position, buf.remaining)
Expand All @@ -864,14 +844,28 @@ object Chunk

override def toByteVector[B >: Char](implicit ev: B =:= Byte): ByteVector =
throw new UnsupportedOperationException

override def toArraySlice[O2 >: Char](implicit ct: ClassTag[O2]): Chunk.ArraySlice[O2] =
if (ct.runtimeClass == classOf[Char] && buf.hasArray)
Chunk
.ArraySlice(buf.array, buf.arrayOffset + offset, size)
.asInstanceOf[Chunk.ArraySlice[O2]]
else super.toArraySlice

override def toCharBuffer[C >: Char](implicit ev: C =:= Char): JCharBuffer = {
val b = buf.duplicate // share contents, independent position/limit
(b: JBuffer).position(offset.toInt)
(b: JBuffer).limit(offset.toInt + size)
b
}
}

/** Creates a chunk backed by an char buffer, bounded by the current position and limit */
def charBuffer(buf: JCharBuffer): Chunk[Char] = CharBuffer(buf)

object ByteBuffer {
def apply(buf: JByteBuffer): ByteBuffer =
view(buf.duplicate().asReadOnlyBuffer)
view(buf.duplicate())

def view(buf: JByteBuffer): ByteBuffer =
new ByteBuffer(buf, buf.position, buf.remaining)
Expand All @@ -896,11 +890,25 @@ object Chunk
def duplicate(b: JByteBuffer): JByteBuffer = b.duplicate()

override def toByteVector[B >: Byte](implicit ev: B =:= Byte): ByteVector = {
val bb = buf.asReadOnlyBuffer()
val bb = buf.duplicate()
(bb: JBuffer).position(offset)
(bb: JBuffer).limit(offset + size)
ByteVector.view(bb)
}

override def toArraySlice[O2 >: Byte](implicit ct: ClassTag[O2]): Chunk.ArraySlice[O2] =
if (ct.runtimeClass == classOf[Byte] && buf.hasArray)
Chunk
.ArraySlice(buf.array, buf.arrayOffset + offset, size)
.asInstanceOf[Chunk.ArraySlice[O2]]
else super.toArraySlice

override def toByteBuffer[B >: Byte](implicit ev: B =:= Byte): JByteBuffer = {
val b = buf.duplicate // share contents, independent position/limit
(b: JBuffer).position(offset.toInt)
(b: JBuffer).limit(offset.toInt + size)
b
}
}

/** Creates a chunk backed by an byte buffer, bounded by the current position and limit */
Expand Down Expand Up @@ -948,6 +956,14 @@ object Chunk

@deprecated("Retained for bincompat", "3.2.12")
def toByteVector() = bv

override def toArraySlice[O2 >: Byte](implicit ct: ClassTag[O2]): Chunk.ArraySlice[O2] =
if (ct.runtimeClass == classOf[Byte])
Chunk.ArraySlice[Byte](bv.toArrayUnsafe, 0, size).asInstanceOf[Chunk.ArraySlice[O2]]
else super.toArraySlice

override def toByteBuffer[B >: Byte](implicit ev: B =:= Byte): JByteBuffer =
bv.toByteBuffer
}

/** Concatenates the specified sequence of chunks in to a single chunk, avoiding boxing. */
Expand Down
38 changes: 37 additions & 1 deletion core/shared/src/test/scala/fs2/ChunkSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import cats.kernel.laws.discipline.EqTests
import cats.laws.discipline.{AlternativeTests, MonadTests, TraverseFilterTests, TraverseTests}
import org.scalacheck.{Arbitrary, Cogen, Gen, Test}
import org.scalacheck.Prop.forAll
import scodec.bits.ByteVector

import java.nio.ByteBuffer
import java.nio.CharBuffer
import scala.reflect.ClassTag

class ChunkSuite extends Fs2Suite {
Expand Down Expand Up @@ -225,13 +228,46 @@ class ChunkSuite extends Fs2Suite {
Chunk.ArraySlice(Array[Any](0.toByte)).asInstanceOf[Chunk[Byte]].toByteVector
}

test("ArraySlice does not copy when chunk is already an ArraySlice instance") {
test("toArraySlice does not copy when chunk is already an ArraySlice instance") {
val chunk: Chunk[Int] = Chunk.ArraySlice(Array(0))
assert(chunk eq chunk.toArraySlice)
val chunk2: Chunk[Any] = Chunk.ArraySlice(Array(new Object))
assert(chunk2 eq chunk2.toArraySlice)
}

test("toArraySlice does not copy when chunk is an array-backed bytevector") {
val arr = Array[Byte](0, 1, 2, 3)
val chunk: Chunk[Byte] = Chunk.byteVector(ByteVector.view(arr))
assert(chunk.toArraySlice.values eq arr)
}

test("ByteVectorChunk#toArraySlice does not throw class cast exception") {
val chunk: Chunk[Byte] = Chunk.byteVector(ByteVector.view(Array[Byte](0, 1, 2, 3)))
assertEquals(chunk.toArraySlice[Any].values.apply(0), 0)
}

test("toArraySlice does not copy when chunk is an array-backed bytebuffer") {
val bb = ByteBuffer.allocate(4)
val chunk: Chunk[Byte] = Chunk.byteBuffer(bb)
assert(chunk.toArraySlice.values eq bb.array)
}

test("ByteBuffer#toArraySlice does not throw class cast exception") {
val chunk: Chunk[Byte] = Chunk.byteBuffer(ByteBuffer.allocate(4))
assertEquals(chunk.toArraySlice[Any].values.apply(0), 0)
}

test("toArraySlice does not copy when chunk is an array-backed charbuffer") {
val cb = CharBuffer.allocate(4)
val chunk: Chunk[Char] = Chunk.charBuffer(cb)
assert(chunk.toArraySlice.values eq cb.array)
}

test("CharBuffer#toArraySlice does not throw class cast exception") {
val chunk: Chunk[Char] = Chunk.charBuffer(CharBuffer.allocate(4))
assertEquals(chunk.toArraySlice[Any].values.apply(0), 0)
}

test("compactUntagged - regression #2679") {
Chunk.Queue.singleton(Chunk.singleton(1)).traverse(x => Option(x))
}
Expand Down

0 comments on commit aa60723

Please sign in to comment.