Skip to content

Commit

Permalink
[SPARK-6269] [CORE] Use ScalaRunTime's array methods instead of java.…
Browse files Browse the repository at this point in the history
…lang.reflect.Array in size estimation

This patch switches the usage of java.lang.reflect.Array in Size estimation to using scala's RunTime array-getter methods. The notes on https://bugs.openjdk.java.net/browse/JDK-8051447 tipped me off to the fact that using java.lang.reflect.Array was not ideal. At first, I used the code from that ticket, but it turns out that ScalaRunTime's array-related methods avoid the bottleneck of invoking native code anyways, so that was sufficient to boost performance in size estimation.

The idea is to use pure Java code in implementing the methods there, as opposed to relying on native C code which ends up being ill-performing. This improves the performance of estimating the size of arrays when we are checking for spilling in Spark.

Here's the benchmark discussion from the ticket:

I did two tests. The first, less convincing, take-with-a-block-of-salt test I did was do a simple groupByKey operation to collect objects in a 4.0 GB text file RDD into 30,000 buckets. I ran 1 Master and 4 Spark Worker JVMs on my mac, fetching the RDD from a text file simply stored on disk, and saving it out to another file located on local disk. The wall clock times I got back before and after the change were:

Before: 352.195s, 343.871s, 359.080s
After (using code directly from the JDK ticket, not the scala code in this PR): 342.929583s, 329.456623s, 326.151481s

So, there is a bit of an improvement after the change. I also did some YourKit profiling of the executors to get an idea of how much time was spent in size estimation before and after the change. I roughly saw that size estimation took up less of the time after my change, but YourKit's profiling can be inconsistent and who knows if I was profiling the executors that had the same data between runs?

The more convincing test I did was to run the size-estimation logic itself in an isolated unit test. I ran the following code:
```
val bigArray = Array.fill(1000)(Array.fill(1000)(java.util.UUID.randomUUID().toString()))
test("String arrays only perf testing") {
  val startTime = System.currentTimeMillis()
  for (i <- 1 to 50000) {
    SizeEstimator.estimate(bigArray)
  }
  println("Runtime: " + (System.currentTimeMillis() - startTime) / 1000.0000)
}
```
I wanted to use a 2D array specifically because I wanted to measure the performance of repeatedly calling Array.getLength. I used UUID-Strings to ensure that the strings were randomized (so String object re-use doesn't happen), but that they would all be the same size. The results were as follows:

Before PR: 222.681 s, 218.34 s, 211.739s
After latest change: 170.715 s, 176.775 s, 180.298 s
.

Author: mcheah <mcheah@palantir.com>
Author: Justin Uang <justin.uang@gmail.com>

Closes apache#4972 from mccheah/feature/spark-6269-reflect-array and squashes the following commits:

8527852 [mcheah] Respect CamelCase for numElementsDrawn
18d4b50 [mcheah] Addressing style comments - while loops instead of for loops
16ce534 [mcheah] Organizing imports properly
db890ea [mcheah] Removing CastedArray and just using ScalaRunTime.
cb67ce2 [mcheah] Fixing a scalastyle error - line too long
5d53c4c [mcheah] Removing unused parameter in visitArray.
6467759 [mcheah] Including primitive size information inside CastedArray.
93f4b05 [mcheah] Using Scala instead of Java for the array-reflection implementation.
a557ab8 [mcheah] Using a wrapper around arrays to do casting only once
ca063fc [mcheah] Fixing a compiler error made while refactoring style
1fe09de [Justin Uang] [SPARK-6269] Use a different implementation of java.lang.reflect.Array
  • Loading branch information
mccheah authored and srowen committed Mar 17, 2015
1 parent 25f3580 commit 005d1c5
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@
package org.apache.spark.util

import java.lang.management.ManagementFactory
import java.lang.reflect.{Array => JArray}
import java.lang.reflect.Field
import java.lang.reflect.Modifier
import java.util.IdentityHashMap
import java.util.Random
import java.lang.reflect.{Field, Modifier}
import java.util.{IdentityHashMap, Random}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer
import scala.runtime.ScalaRunTime

import org.apache.spark.Logging
import org.apache.spark.util.collection.OpenHashSet


/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
* memory-aware caches.
Expand Down Expand Up @@ -184,9 +182,9 @@ private[spark] object SizeEstimator extends Logging {
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING

private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
val length = JArray.getLength(array)
val elementClass = cls.getComponentType
private def visitArray(array: AnyRef, arrayClass: Class[_], state: SearchState) {
val length = ScalaRunTime.array_length(array)
val elementClass = arrayClass.getComponentType()

// Arrays have object header and length field which is an integer
var arrSize: Long = alignSize(objectSize + INT_SIZE)
Expand All @@ -199,22 +197,26 @@ private[spark] object SizeEstimator extends Logging {
state.size += arrSize

if (length <= ARRAY_SIZE_FOR_SAMPLING) {
for (i <- 0 until length) {
state.enqueue(JArray.get(array, i))
var arrayIndex = 0
while (arrayIndex < length) {
state.enqueue(ScalaRunTime.array_apply(array, arrayIndex).asInstanceOf[AnyRef])
arrayIndex += 1
}
} else {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
val rand = new Random(42)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var numElementsDrawn = 0
while (numElementsDrawn < ARRAY_SAMPLE_SIZE) {
var index = 0
do {
index = rand.nextInt(length)
} while (drawn.contains(index))
drawn.add(index)
val elem = JArray.get(array, index)
val elem = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
size += SizeEstimator.estimate(elem, state.visited)
numElementsDrawn += 1
}
state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
}
Expand Down

0 comments on commit 005d1c5

Please sign in to comment.