-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6269] [CORE] Use ScalaRunTime's array methods instead of java.lang.reflect.Array in size estimation #4972
Conversation
This patch uses a different implementation of java.lang.reflect.Array. The code is copied and pasted from https://bugs.openjdk.java.net/browse/JDK-8051447. The appropriate code is in the public domain, so we can use it. The idea is to use pure Java code in implementing the methods there, as opposed to relying on native C code which ends up being ill-performing. This greatly improves the performance of estimating the size of arrays when we are checking for spilling in Spark. Benchmark results are in the bug description and in the discussion on the linked pull request.
Test build #28454 has finished for PR 4972 at commit
|
@mccheah thanks for submitting this. Is the only thing we use getLength? |
I believe we use Array.get in the visitArray method in SizeEstimator, so there's that as well. |
Test build #28455 has finished for PR 4972 at commit
|
* over calling to native C in many cases. Based on code from | ||
* <a href=https://bugs.openjdk.java.net/browse/JDK-8051447>an open JDK ticket</a>. | ||
*/ | ||
public class ArrayReflect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only use getLength
and get
. In the name of minimizing the chunk of code that needs to be added, I suppose we can just implement those two methods?
The micro-benchmark definitely shows the alternative code is faster, although it only benchmarks the time taken in this one method. The question is how much of the overall time is consumed by a simple array length check. Your first test is more convincing, although the difference seems quite a bit larger than I'd expect. Wow, does it really get called so much / the native overhead is that big? |
I noticed that a good chunk of the time spent in the size estimation method was in Array.getLength; @justinuang (who originally proposed this idea - got to give credit where it's due!) also observed this and originally tipped me off to it. And we do call Array.getLength on every interval of size estimation if any of the RDD objects contain arrays. That will be frequent, and will happen regardless of if spilling is actually happening or not. |
I changed the implementation to not use the code from the ticket, and instead created a wrapper around arrays, and a factory method that did the appropriate casting and building of the wrapper object. I was mildly concerned about the performance of this implementation, as now I'm instantiating a new object as opposed to before when I was purely invoking static methods. I ran some more localized unit test benchmarks. With the 1000 x 1000 2D String array tests I was doing before (50,000 executions of SizeEstimator.estimate()), the performance seems to be better after my most recent commit, but I might as well attribute the difference to noise since the difference is so small: Before this PR at all: 209.275 s, 190.107 s, 185.424 s I was mostly concerned about the overhead to create the object if the arrays were small, though, because now I'm spending more time on creating the wrapper itself. It turns out we're still doing better even on small arrays compared to before the PR however. I tested estimating the size of a 1000-String 1D array 1 million times: Before PR: 27.246 s, 32.124 s, 28.433 s Most importantly however we should be able to get around the licensing issues now. I also made the code only use the array get() and getLength() methods since we never set() on arrays anywhere. |
If you write this in Scala, you can use value classes to avoid the allocation, while preserving the OOP features. |
I wanted to avoid Scala because we're working with checking instanceOf Java array of primitive type, which is slightly scary to me. Do you know the behavior of converting Scala arrays and Java arrays of primitive types? EDIT after all this is replacing java's array-reflect functionality. It seemed safer to me to keep everything in java-land. But that's also owing to my lack of deep Scala knowledge. |
Scala arrays are just Java arrays. There is no craziness there. |
Maybe I'm reading too much into this... but Scala arrays are always of the "boxed" type. Does that create issues if I create a Java array of an un-boxed type, i.e. primitive type, and try to check arr.isInstanceOf there? Or, does the fact that the array object itself already live in scala-land mean that it's always going to be array of boxed type? EDIT: never mind, did some scala research... we should be fine here. |
Yea - Scala arrays are not always boxed. Array[Int] is the same as int[] in Java. |
Test build #28489 has finished for PR 4972 at commit
|
I wrote it in scala, extending AnyVal and writing a sealed trait that extends Any to provide the extension point. However, I believe that using the trait incurs the allocation overhead, which is what we were trying to avoid in moving to Scala. From the scala docs "A value class can only extend universal traits and cannot be extended itself. A universal trait is a trait that extends Any, only has defs as members, and does no initialization. Universal traits allow basic inheritance of methods for value classes, but they incur the overhead of allocation." http://docs.scala-lang.org/overviews/core/value-classes.html That being said, I also didn't see any significant performance degradation in moving to Scala. My most recent numbers: Estimating the size of a 1000 x 1000 2D String array 50,000 times: 153.329 s, 153.1 s, 150.67 s So we didn't lose any performance from moving from Java to Scala (compare with numbers I've reported above in this discussion), but we gained a ton of readability. It's odd to be using Java in this Scala-laden codebase anyways, so it seems good to keep our things bound to Scala. And we're still better off with this than when we were relying on java.lang.reflect. |
// make the boxing happen implicitly anyways. In practice this tends to be okay | ||
// in terms of performance. | ||
private class BooleanCastedArray(val arr: Array[Boolean]) extends AnyVal with CastedArray { | ||
override def get(i: Int): AnyRef = Boolean.box(arr(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of the reason why boxing is okay is because SizeEstimator doesn't actually use CastedArray.get() on primitive arrays. If it gets a primitive array, size estimation just multiplies the size of the array by the size of the primitive type. So we don't ever invoke get() in those situations.
Nevertheless, boxing looks ugly and I'm welcome to suggestions on better ways to do this.
Test build #28497 has finished for PR 4972 at commit
|
I think maybe @rxin is suggesting a bit of code like what you see in
|
@rxin can you confirm? However I believe we wanted to only do the cast once and re-use the casted object on all future invocations of array.get(). That being said my convoluted if statement in CastedArray.castAndWrap can be cleaned up with similar Scala code to ScalaRuntime.array_length. EDIT: Actually Sean I thought the above is what you meant by: "EDIT: on second thought, since it seems like we do two checks on the array type (once to assess length, once to find the element type), I wonder if we should just write a new method along these lines that does both at once. It'd be even faster and wouldn't involve directly copying third party code?" - to do this I would have to hold the casted array somewhere and these classes implementing CastedArray were the way I went about doing that. EDIT2: And after I read your comment again just now I noticed you're not referring to the references to array.get() but to the use of Class.getComponentType(). Sorry about that! But I do think we still save on not doing the case match multiple times here. |
Test build #28521 has finished for PR 4972 at commit
|
Test build #28524 has finished for PR 4972 at commit
|
I haven't thought it through at close range but I had meant something like ...
... but if the point is merely to avoid native code, how about just using the 'equivalent' of the I think you're taking it a step further and trying to assess the component type just once, which is probably a further win but enough to matter and be worth the complexity? Hm, I don't think |
Hm, yeah I'm not too opinionated about saving the tiniest of fractions of seconds by avoiding calling instanceOf too many times - instanceOf is really really cheap compared to java.lang.reflect.Array as it is. If the majority opinion is that I made it too complex we can move to just using Scala's Arrays methods. (I don't have that much time to work on this any more until after the weekend though.) |
Latest just uses ScalaRunTime which also doesn't use the native java.lang.reflect.Array. The underlying implementation is basically identical to the first Java implementation that I wrote on here, except I need to do an extra "asInstanceOf[AnyRef]" to convert from Any (which is what ScalaRunTime.array_apply returns) to AnyRef (which is what we want in SizeEstimator). Times changed because since my last batch of runs I restarted my computer in-between so all my numbers became slower... but we still see the relative (percentage-based) performance improvement. Estimating the size of a 1000x1000 2D String array 50,000 times: Before PR: 222.681 s, 218.34 s, 211.739s coming out to close to 20% improvement on average. |
Test build #28545 has finished for PR 4972 at commit
|
import org.apache.spark.Logging | ||
import org.apache.spark.util.collection.OpenHashSet | ||
|
||
import scala.collection.mutable.ArrayBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this should go just after java imports
Great, so, this still gets a pretty good speed-up? sounds like a win to me. I'll leave it for a day or so for other thoughts, but it's pretty clean. |
I believe the pull request message and the commit message are out of date, because of the way this thread went. |
Test build #28599 has finished for PR 4972 at commit
|
@mccheah yeah are you able to go back and revise the PR title, JIRA title, and description? I can touch some of that up on merge but just boiling it down to a quick and up to date summary would help me. |
|
||
// Arrays have object header and length field which is an integer | ||
var arrSize: Long = alignSize(objectSize + INT_SIZE) | ||
|
||
if (elementClass.isPrimitive) { | ||
if (elementClass.isPrimitive()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is isPrimitive defined with parentheses? If not, let's not change it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isPrimitive is a method on Java's class. I don't really know what the convention is there. I think this was a by-product of me manually reverting things so I won't change it.
Other than the two minor things I pointed out, and the need to update title/pr description, this change lgtm. Thanks! |
I just updated the PR description and title as well. Not sure if it's necessary to amend the commit messages? |
Nah your commit messages don't matter since those get squashed. You updated the right things. LGTM. I'll merge after a pause for any more comments. |
Test build #28662 has finished for PR 4972 at commit
|
…lang.reflect.Array in size estimation This patch switches the usage of java.lang.reflect.Array in Size estimation to using scala's RunTime array-getter methods. The notes on https://bugs.openjdk.java.net/browse/JDK-8051447 tipped me off to the fact that using java.lang.reflect.Array was not ideal. At first, I used the code from that ticket, but it turns out that ScalaRunTime's array-related methods avoid the bottleneck of invoking native code anyways, so that was sufficient to boost performance in size estimation. The idea is to use pure Java code in implementing the methods there, as opposed to relying on native C code which ends up being ill-performing. This improves the performance of estimating the size of arrays when we are checking for spilling in Spark. Here's the benchmark discussion from the ticket: I did two tests. The first, less convincing, take-with-a-block-of-salt test I did was do a simple groupByKey operation to collect objects in a 4.0 GB text file RDD into 30,000 buckets. I ran 1 Master and 4 Spark Worker JVMs on my mac, fetching the RDD from a text file simply stored on disk, and saving it out to another file located on local disk. The wall clock times I got back before and after the change were: Before: 352.195s, 343.871s, 359.080s After (using code directly from the JDK ticket, not the scala code in this PR): 342.929583s, 329.456623s, 326.151481s So, there is a bit of an improvement after the change. I also did some YourKit profiling of the executors to get an idea of how much time was spent in size estimation before and after the change. I roughly saw that size estimation took up less of the time after my change, but YourKit's profiling can be inconsistent and who knows if I was profiling the executors that had the same data between runs? The more convincing test I did was to run the size-estimation logic itself in an isolated unit test. I ran the following code: ``` val bigArray = Array.fill(1000)(Array.fill(1000)(java.util.UUID.randomUUID().toString())) test("String arrays only perf testing") { val startTime = System.currentTimeMillis() for (i <- 1 to 50000) { SizeEstimator.estimate(bigArray) } println("Runtime: " + (System.currentTimeMillis() - startTime) / 1000.0000) } ``` I wanted to use a 2D array specifically because I wanted to measure the performance of repeatedly calling Array.getLength. I used UUID-Strings to ensure that the strings were randomized (so String object re-use doesn't happen), but that they would all be the same size. The results were as follows: Before PR: 222.681 s, 218.34 s, 211.739s After latest change: 170.715 s, 176.775 s, 180.298 s . Author: mcheah <mcheah@palantir.com> Author: Justin Uang <justin.uang@gmail.com> Closes apache#4972 from mccheah/feature/spark-6269-reflect-array and squashes the following commits: 8527852 [mcheah] Respect CamelCase for numElementsDrawn 18d4b50 [mcheah] Addressing style comments - while loops instead of for loops 16ce534 [mcheah] Organizing imports properly db890ea [mcheah] Removing CastedArray and just using ScalaRunTime. cb67ce2 [mcheah] Fixing a scalastyle error - line too long 5d53c4c [mcheah] Removing unused parameter in visitArray. 6467759 [mcheah] Including primitive size information inside CastedArray. 93f4b05 [mcheah] Using Scala instead of Java for the array-reflection implementation. a557ab8 [mcheah] Using a wrapper around arrays to do casting only once ca063fc [mcheah] Fixing a compiler error made while refactoring style 1fe09de [Justin Uang] [SPARK-6269] Use a different implementation of java.lang.reflect.Array
…lang.reflect.Array in size estimation This patch switches the usage of java.lang.reflect.Array in Size estimation to using scala's RunTime array-getter methods. The notes on https://bugs.openjdk.java.net/browse/JDK-8051447 tipped me off to the fact that using java.lang.reflect.Array was not ideal. At first, I used the code from that ticket, but it turns out that ScalaRunTime's array-related methods avoid the bottleneck of invoking native code anyways, so that was sufficient to boost performance in size estimation. The idea is to use pure Java code in implementing the methods there, as opposed to relying on native C code which ends up being ill-performing. This improves the performance of estimating the size of arrays when we are checking for spilling in Spark. Here's the benchmark discussion from the ticket: I did two tests. The first, less convincing, take-with-a-block-of-salt test I did was do a simple groupByKey operation to collect objects in a 4.0 GB text file RDD into 30,000 buckets. I ran 1 Master and 4 Spark Worker JVMs on my mac, fetching the RDD from a text file simply stored on disk, and saving it out to another file located on local disk. The wall clock times I got back before and after the change were: Before: 352.195s, 343.871s, 359.080s After (using code directly from the JDK ticket, not the scala code in this PR): 342.929583s, 329.456623s, 326.151481s So, there is a bit of an improvement after the change. I also did some YourKit profiling of the executors to get an idea of how much time was spent in size estimation before and after the change. I roughly saw that size estimation took up less of the time after my change, but YourKit's profiling can be inconsistent and who knows if I was profiling the executors that had the same data between runs? The more convincing test I did was to run the size-estimation logic itself in an isolated unit test. I ran the following code: ``` val bigArray = Array.fill(1000)(Array.fill(1000)(java.util.UUID.randomUUID().toString())) test("String arrays only perf testing") { val startTime = System.currentTimeMillis() for (i <- 1 to 50000) { SizeEstimator.estimate(bigArray) } println("Runtime: " + (System.currentTimeMillis() - startTime) / 1000.0000) } ``` I wanted to use a 2D array specifically because I wanted to measure the performance of repeatedly calling Array.getLength. I used UUID-Strings to ensure that the strings were randomized (so String object re-use doesn't happen), but that they would all be the same size. The results were as follows: Before PR: 222.681 s, 218.34 s, 211.739s After latest change: 170.715 s, 176.775 s, 180.298 s . Author: mcheah <mcheah@palantir.com> Author: Justin Uang <justin.uang@gmail.com> Closes apache#4972 from mccheah/feature/spark-6269-reflect-array and squashes the following commits: 8527852 [mcheah] Respect CamelCase for numElementsDrawn 18d4b50 [mcheah] Addressing style comments - while loops instead of for loops 16ce534 [mcheah] Organizing imports properly db890ea [mcheah] Removing CastedArray and just using ScalaRunTime. cb67ce2 [mcheah] Fixing a scalastyle error - line too long 5d53c4c [mcheah] Removing unused parameter in visitArray. 6467759 [mcheah] Including primitive size information inside CastedArray. 93f4b05 [mcheah] Using Scala instead of Java for the array-reflection implementation. a557ab8 [mcheah] Using a wrapper around arrays to do casting only once ca063fc [mcheah] Fixing a compiler error made while refactoring style 1fe09de [Justin Uang] [SPARK-6269] Use a different implementation of java.lang.reflect.Array
This patch switches the usage of java.lang.reflect.Array in Size estimation to using scala's RunTime array-getter methods. The notes on https://bugs.openjdk.java.net/browse/JDK-8051447 tipped me off to the fact that using java.lang.reflect.Array was not ideal. At first, I used the code from that ticket, but it turns out that ScalaRunTime's array-related methods avoid the bottleneck of invoking native code anyways, so that was sufficient to boost performance in size estimation.
The idea is to use pure Java code in implementing the methods there, as opposed to relying on native C code which ends up being ill-performing. This improves the performance of estimating the size of arrays when we are checking for spilling in Spark.
Here's the benchmark discussion from the ticket:
I did two tests. The first, less convincing, take-with-a-block-of-salt test I did was do a simple groupByKey operation to collect objects in a 4.0 GB text file RDD into 30,000 buckets. I ran 1 Master and 4 Spark Worker JVMs on my mac, fetching the RDD from a text file simply stored on disk, and saving it out to another file located on local disk. The wall clock times I got back before and after the change were:
Before: 352.195s, 343.871s, 359.080s
After (using code directly from the JDK ticket, not the scala code in this PR): 342.929583s, 329.456623s, 326.151481s
So, there is a bit of an improvement after the change. I also did some YourKit profiling of the executors to get an idea of how much time was spent in size estimation before and after the change. I roughly saw that size estimation took up less of the time after my change, but YourKit's profiling can be inconsistent and who knows if I was profiling the executors that had the same data between runs?
The more convincing test I did was to run the size-estimation logic itself in an isolated unit test. I ran the following code:
I wanted to use a 2D array specifically because I wanted to measure the performance of repeatedly calling Array.getLength. I used UUID-Strings to ensure that the strings were randomized (so String object re-use doesn't happen), but that they would all be the same size. The results were as follows:
Before PR: 222.681 s, 218.34 s, 211.739s
After latest change: 170.715 s, 176.775 s, 180.298 s
.